333 lines
11 KiB
Rust
333 lines
11 KiB
Rust
mod engine;
|
|
|
|
use async_trait::async_trait;
|
|
use baobab_actor::execute_job_with_engine;
|
|
use hero_job::{Job, JobStatus, ScriptType};
|
|
use hero_logger::{create_job_logger, create_job_logger_with_guard};
|
|
use log::{error, info};
|
|
use redis::AsyncCommands;
|
|
use rhai::Engine;
|
|
use std::sync::Arc;
|
|
use tokio::sync::mpsc;
|
|
use tokio::task::JoinHandle;
|
|
use tracing::subscriber::with_default;
|
|
|
|
use baobab_actor::{actor_trait::Actor, spawn_actor};
|
|
|
|
/// Constant actor ID for OSIS actor
|
|
const OSIS: &str = "osis";
|
|
|
|
/// Builder for OSISActor
|
|
#[derive(Debug)]
|
|
pub struct OSISActorBuilder {
|
|
engine: Option<Arc<Engine>>,
|
|
db_path: Option<String>,
|
|
redis_url: Option<String>,
|
|
}
|
|
|
|
impl Default for OSISActorBuilder {
|
|
fn default() -> Self {
|
|
Self {
|
|
engine: None,
|
|
db_path: None,
|
|
redis_url: Some("redis://localhost:6379".to_string()),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl OSISActorBuilder {
|
|
pub fn new() -> Self {
|
|
Self::default()
|
|
}
|
|
|
|
pub fn engine(mut self, engine: Engine) -> Self {
|
|
self.engine = Some(Arc::new(engine));
|
|
self
|
|
}
|
|
|
|
pub fn shared_engine(mut self, engine: Arc<Engine>) -> Self {
|
|
self.engine = Some(engine);
|
|
self
|
|
}
|
|
|
|
pub fn db_path<S: Into<String>>(mut self, db_path: S) -> Self {
|
|
self.db_path = Some(db_path.into());
|
|
self
|
|
}
|
|
|
|
pub fn redis_url<S: Into<String>>(mut self, redis_url: S) -> Self {
|
|
self.redis_url = Some(redis_url.into());
|
|
self
|
|
}
|
|
|
|
pub fn build(self) -> Result<OSISActor, String> {
|
|
let engine = self
|
|
.engine
|
|
.unwrap_or_else(|| crate::engine::create_osis_engine());
|
|
|
|
Ok(OSISActor {
|
|
engine,
|
|
db_path: self.db_path.ok_or("db_path is required")?,
|
|
redis_url: self
|
|
.redis_url
|
|
.unwrap_or("redis://localhost:6379".to_string()),
|
|
})
|
|
}
|
|
}
|
|
|
|
/// OSIS actor that processes jobs in a blocking, synchronized manner
|
|
#[derive(Debug, Clone)]
|
|
pub struct OSISActor {
|
|
pub engine: Arc<Engine>,
|
|
pub db_path: String,
|
|
pub redis_url: String,
|
|
}
|
|
|
|
impl OSISActor {
|
|
/// Create a new OSISActorBuilder
|
|
pub fn builder() -> OSISActorBuilder {
|
|
OSISActorBuilder::new()
|
|
}
|
|
}
|
|
|
|
impl Default for OSISActor {
|
|
fn default() -> Self {
|
|
Self {
|
|
engine: crate::engine::create_osis_engine(),
|
|
db_path: "/tmp".to_string(),
|
|
redis_url: "redis://localhost:6379".to_string(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Actor for OSISActor {
|
|
async fn process_job(&self, job: Job, redis_conn: &mut redis::aio::MultiplexedConnection) {
|
|
let job_id = &job.id;
|
|
let _db_path = &self.db_path;
|
|
|
|
// Debug: Log job details
|
|
info!(
|
|
"OSIS Actor '{}', Job {}: Processing job with context_id: {}, script length: {}",
|
|
OSIS, job_id, job.context_id, job.script.len()
|
|
);
|
|
|
|
// Create job-specific logger
|
|
let (job_logger, guard) = match create_job_logger_with_guard("logs", "osis", job_id) {
|
|
Ok((logger, guard)) => {
|
|
info!(
|
|
"OSIS Actor '{}', Job {}: Job logger created successfully",
|
|
OSIS, job_id
|
|
);
|
|
(logger, guard)
|
|
},
|
|
Err(e) => {
|
|
error!(
|
|
"OSIS Actor '{}', Job {}: Failed to create job logger: {}",
|
|
OSIS, job_id, e
|
|
);
|
|
return;
|
|
}
|
|
};
|
|
|
|
info!(
|
|
"OSIS Actor '{}', Job {}: Starting sequential processing",
|
|
OSIS, job_id
|
|
);
|
|
|
|
// Update job status to Started
|
|
if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Started).await {
|
|
error!(
|
|
"OSIS Actor '{}', Job {}: Failed to update status to Started: {}",
|
|
OSIS, job_id, e
|
|
);
|
|
return;
|
|
}
|
|
|
|
// Execute ALL job processing within logging context
|
|
let job_result = with_default(job_logger, || {
|
|
tracing::info!(target: "osis_actor", "Job {} started", job_id);
|
|
|
|
// Move the Rhai script execution inside this scope
|
|
// IMPORTANT: Create a new engine and configure Rhai logging for this job context
|
|
let mut job_engine = Engine::new();
|
|
register_dsl_modules(&mut job_engine);
|
|
// Configure Rhai logging integration for this engine instance
|
|
hero_logger::rhai_integration::configure_rhai_logging(&mut job_engine, "osis_actor");
|
|
|
|
// Execute the script within the job logger context
|
|
let script_result = tokio::task::block_in_place(|| {
|
|
tokio::runtime::Handle::current().block_on(async {
|
|
execute_job_with_engine(&mut job_engine, &job, &self.db_path).await
|
|
})
|
|
});
|
|
|
|
tracing::info!(target: "osis_actor", "Job {} completed", job_id);
|
|
|
|
script_result // Return the result
|
|
});
|
|
|
|
// Handle the result outside the logging context
|
|
match job_result {
|
|
Ok(result) => {
|
|
let result_str = format!("{:?}", result);
|
|
info!(
|
|
"OSIS Actor '{}', Job {}: Script executed successfully. Result: {}",
|
|
OSIS, job_id, result_str
|
|
);
|
|
|
|
// Update job with success result (stores in job hash output field)
|
|
if let Err(e) = Job::set_result(redis_conn, job_id, &result_str).await {
|
|
error!(
|
|
"OSIS Actor '{}', Job {}: Failed to set result: {}",
|
|
OSIS, job_id, e
|
|
);
|
|
return;
|
|
}
|
|
|
|
// Also push result to result queue for retrieval
|
|
let result_queue_key = format!("hero:job:{}:result", job_id);
|
|
if let Err(e) = redis_conn
|
|
.lpush::<_, _, ()>(&result_queue_key, &result_str)
|
|
.await
|
|
{
|
|
error!(
|
|
"OSIS Actor '{}', Job {}: Failed to push result to queue {}: {}",
|
|
OSIS, job_id, result_queue_key, e
|
|
);
|
|
} else {
|
|
info!(
|
|
"OSIS Actor '{}', Job {}: Result pushed to queue: {}",
|
|
OSIS, job_id, result_queue_key
|
|
);
|
|
}
|
|
|
|
if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Finished).await {
|
|
error!(
|
|
"OSIS Actor '{}', Job {}: Failed to update status to Finished: {}",
|
|
OSIS, job_id, e
|
|
);
|
|
}
|
|
}
|
|
Err(e) => {
|
|
let error_msg = format!("Script execution error: {}", e);
|
|
error!("OSIS Actor '{}', Job {}: {}", OSIS, job_id, error_msg);
|
|
|
|
// Update job with error (stores in job hash error field)
|
|
if let Err(e) = Job::set_error(redis_conn, job_id, &error_msg).await {
|
|
error!(
|
|
"OSIS Actor '{}', Job {}: Failed to set error: {}",
|
|
OSIS, job_id, e
|
|
);
|
|
}
|
|
|
|
// Also push error to error queue for retrieval
|
|
let error_queue_key = format!("hero:job:{}:error", job_id);
|
|
if let Err(e) = redis_conn
|
|
.lpush::<_, _, ()>(&error_queue_key, &error_msg)
|
|
.await
|
|
{
|
|
error!(
|
|
"OSIS Actor '{}', Job {}: Failed to push error to queue {}: {}",
|
|
OSIS, job_id, error_queue_key, e
|
|
);
|
|
} else {
|
|
info!(
|
|
"OSIS Actor '{}', Job {}: Error pushed to queue: {}",
|
|
OSIS, job_id, error_queue_key
|
|
);
|
|
}
|
|
|
|
if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Error).await {
|
|
error!(
|
|
"OSIS Actor '{}', Job {}: Failed to update status to Error: {}",
|
|
OSIS, job_id, e
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Force flush logs before dropping guard
|
|
std::thread::sleep(std::time::Duration::from_millis(100));
|
|
|
|
// Keep the guard alive until after processing
|
|
drop(guard);
|
|
|
|
info!(
|
|
"OSIS Actor '{}', Job {}: Sequential processing completed",
|
|
OSIS, job_id
|
|
);
|
|
}
|
|
|
|
fn actor_type(&self) -> &'static str {
|
|
"OSIS"
|
|
}
|
|
|
|
fn actor_id(&self) -> &str {
|
|
// Actor ID contains "osis" so the runtime derives ScriptType=OSIS and consumes the canonical type queue.
|
|
"osis"
|
|
}
|
|
|
|
fn redis_url(&self) -> &str {
|
|
&self.redis_url
|
|
}
|
|
}
|
|
|
|
/// Convenience function to spawn an OSIS actor using the trait interface
|
|
///
|
|
/// This function provides backward compatibility with the original actor API
|
|
/// while using the new trait-based implementation.
|
|
pub fn spawn_osis_actor(
|
|
db_path: String,
|
|
redis_url: String,
|
|
shutdown_rx: mpsc::Receiver<()>,
|
|
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
|
|
let actor = Arc::new(
|
|
OSISActor::builder()
|
|
.db_path(db_path)
|
|
.redis_url(redis_url)
|
|
.build()
|
|
.expect("Failed to build OSISActor"),
|
|
);
|
|
spawn_actor(actor, shutdown_rx)
|
|
}
|
|
|
|
// Re-export engine functions for examples and external use
|
|
pub use crate::engine::{create_osis_engine, register_dsl_modules};
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[tokio::test]
|
|
async fn test_osis_actor_creation() {
|
|
let actor = OSISActor::builder().build().unwrap();
|
|
assert_eq!(actor.actor_type(), "OSIS");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_osis_actor_default() {
|
|
let actor = OSISActor::default();
|
|
assert_eq!(actor.actor_type(), "OSIS");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_osis_actor_process_job_interface() {
|
|
let actor = OSISActor::default();
|
|
|
|
// Create a simple test job
|
|
let _job = Job::new(
|
|
"test_caller".to_string(),
|
|
"test_context".to_string(),
|
|
r#"print("Hello from sync actor test!"); 42"#.to_string(),
|
|
ScriptType::OSIS,
|
|
);
|
|
|
|
// Note: This test doesn't actually connect to Redis, it just tests the interface
|
|
// In a real test environment, you'd need a Redis instance or mock
|
|
|
|
// For now, just verify the actor was created successfully
|
|
assert_eq!(actor.actor_type(), "OSIS");
|
|
}
|
|
}
|