diff --git a/core/actor/src/lib.rs b/core/actor/src/lib.rs index 3911898..693ccb8 100644 --- a/core/actor/src/lib.rs +++ b/core/actor/src/lib.rs @@ -1,10 +1,9 @@ use hero_job::{Job, JobStatus}; -use tracing::{debug, error, info}; +use tracing::log::{debug, error, info}; use redis::AsyncCommands; use rhai::{Dynamic, Engine}; use tokio::sync::mpsc; // For shutdown signal use tokio::task::JoinHandle; -use tracing::subscriber::with_default; /// Actor trait abstraction for unified actor interface pub mod actor_trait; @@ -53,109 +52,13 @@ pub(crate) async fn load_job_from_redis( } } -/// Execute the Rhai script and update job status in Redis with per-job logging +/// Execute the Rhai script and update job status in Redis async fn execute_script_and_update_status( redis_conn: &mut redis::aio::MultiplexedConnection, engine: &mut Engine, job: &Job, db_path: &str, - actor_type: &str, ) -> Result<(), Box> { - // Create per-job logger for isolated logging - let job_logger = match hero_logger::create_job_logger("logs", actor_type, &job.id) { - Ok(logger) => logger, - Err(e) => { - error!("Failed to create job logger for job {}: {}", job.id, e); - // Continue without per-job logging - return execute_script_without_job_logging(redis_conn, engine, job, db_path, actor_type).await; - } - }; - - // Execute the job within the per-job logging context - let job_id = job.id.clone(); - let context_id = job.context_id.clone(); - let script = job.script.clone(); - let caller_id = job.caller_id.clone(); - let db_path = db_path.to_string(); - let actor_target = format!("{}_actor", actor_type); - - let result = with_default(job_logger, || { - // Configure Rhai engine for logging within the job context - hero_logger::rhai_integration::configure_rhai_logging(engine, &actor_target); - - // Set up Rhai engine configuration - let mut db_config = rhai::Map::new(); - db_config.insert("DB_PATH".into(), db_path.into()); - db_config.insert("CALLER_ID".into(), caller_id.into()); - db_config.insert("CONTEXT_ID".into(), context_id.clone().into()); - engine.set_default_tag(Dynamic::from(db_config)); - - info!(target: &actor_target, "Job {} processing started for context {}", job_id, context_id); - debug!(target: &actor_target, "Evaluating script with Rhai engine"); - - // Execute the script (print/debug calls will now be captured in job logs) - match engine.eval::(&script) { - Ok(result) => { - let output_str = if result.is::() { - result.into_string().unwrap() - } else { - result.to_string() - }; - info!(target: &actor_target, "Job {} completed successfully. Output: {}", job_id, output_str); - Ok(output_str) - } - Err(e) => { - let error_str = format!("{:?}", *e); - error!(target: &actor_target, "Job {} script evaluation failed: {}", job_id, error_str); - Err(error_str) - } - } - }); - - // Update job status based on execution result - match result { - Ok(output_str) => { - Job::update_status(redis_conn, &job.id, JobStatus::Finished).await - .map_err(|e| { - error!("Failed to update job {} status to finished: {}", job.id, e); - e - })?; - - Job::set_result(redis_conn, &job.id, &output_str).await - .map_err(|e| { - error!("Failed to set job {} result: {}", job.id, e); - e - })?; - } - Err(error_str) => { - Job::update_status(redis_conn, &job.id, JobStatus::Error).await - .map_err(|e| { - error!("Failed to update job {} status to error: {}", job.id, e); - e - })?; - - Job::set_error(redis_conn, &job.id, &error_str).await - .map_err(|e| { - error!("Failed to set job {} error: {}", job.id, e); - e - })?; - } - } - - Ok(()) -} - -/// Fallback function for script execution without per-job logging -async fn execute_script_without_job_logging( - redis_conn: &mut redis::aio::MultiplexedConnection, - engine: &mut Engine, - job: &Job, - db_path: &str, - actor_type: &str, -) -> Result<(), Box> { - // Configure Rhai logging to use system logger - let actor_target = format!("{}_actor", actor_type); - hero_logger::rhai_integration::configure_rhai_logging(engine, &actor_target); let mut db_config = rhai::Map::new(); db_config.insert("DB_PATH".into(), db_path.to_string().into()); db_config.insert("CALLER_ID".into(), job.caller_id.clone().into()); @@ -173,6 +76,7 @@ async fn execute_script_without_job_logging( }; info!("Actor for Context ID '{}' job {} completed. Output: {}", job.context_id, job.id, output_str); + // Update job status to finished and set result Job::update_status(redis_conn, &job.id, JobStatus::Finished).await .map_err(|e| { error!("Failed to update job {} status to finished: {}", job.id, e); @@ -191,6 +95,7 @@ async fn execute_script_without_job_logging( let error_str = format!("{:?}", *e); error!("Actor for Context ID '{}' job {} script evaluation failed. Error: {}", job.context_id, job.id, error_str); + // Update job status to error and set error message Job::update_status(redis_conn, &job.id, JobStatus::Error).await .map_err(|e| { error!("Failed to update job {} status to error: {}", job.id, e); @@ -235,8 +140,6 @@ async fn process_job( engine: &mut Engine, preserve_tasks: bool, ) { - // Extract actor type from actor_id (e.g., "osis_actor_1" -> "osis") - let actor_type = hero_logger::extract_actor_type(actor_id); debug!("Actor '{}', Job {}: Processing started.", actor_id, job_id); // Load job from Redis @@ -252,8 +155,8 @@ async fn process_job( debug!("Actor for Context ID '{}', Job {}: Status updated to 'started'.", job.context_id, job_id); } - // Execute the script and update status with per-job logging - if let Err(e) = execute_script_and_update_status(redis_conn, engine, &job, db_path, actor_type).await { + // Execute the script and update status + if let Err(e) = execute_script_and_update_status(redis_conn, engine, &job, db_path).await { error!("Actor for Context ID '{}', Job {}: Script execution failed: {}", job.context_id, job_id, e); // Ensure job status is set to error if execution failed