logger
This commit is contained in:
		@@ -15,8 +15,8 @@ rhai = { version = "1.21.0", features = ["std", "sync", "decimal", "internals"]
 | 
			
		||||
serde = { version = "1.0", features = ["derive"] }
 | 
			
		||||
serde_json = "1.0"
 | 
			
		||||
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
 | 
			
		||||
log = "0.4"
 | 
			
		||||
env_logger = "0.10"
 | 
			
		||||
tracing = "0.1"
 | 
			
		||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
 | 
			
		||||
clap = { version = "4.4", features = ["derive"] }
 | 
			
		||||
uuid = { version = "1.6", features = ["v4", "serde"] } # Though task_id is string, uuid might be useful
 | 
			
		||||
chrono = { version = "0.4", features = ["serde"] }
 | 
			
		||||
@@ -25,6 +25,7 @@ thiserror = "1.0"
 | 
			
		||||
async-trait = "0.1"
 | 
			
		||||
hero_supervisor = { path = "../supervisor" }
 | 
			
		||||
hero_job = { path = "../job" }
 | 
			
		||||
hero_logger = { path = "../logger" }
 | 
			
		||||
heromodels = { git = "https://git.ourworld.tf/herocode/db.git" }
 | 
			
		||||
heromodels_core = { git = "https://git.ourworld.tf/herocode/db.git" }
 | 
			
		||||
heromodels-derive = { git = "https://git.ourworld.tf/herocode/db.git" }
 | 
			
		||||
 
 | 
			
		||||
@@ -28,7 +28,7 @@
 | 
			
		||||
//! ```
 | 
			
		||||
 | 
			
		||||
use hero_job::Job;
 | 
			
		||||
use log::{debug, error, info};
 | 
			
		||||
use tracing::{debug, error, info};
 | 
			
		||||
use redis::AsyncCommands;
 | 
			
		||||
use rhai::Engine;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
 
 | 
			
		||||
@@ -1,9 +1,10 @@
 | 
			
		||||
use hero_job::{Job, JobStatus};
 | 
			
		||||
use log::{debug, error, info};
 | 
			
		||||
use tracing::{debug, error, info};
 | 
			
		||||
use redis::AsyncCommands;
 | 
			
		||||
use rhai::{Dynamic, Engine};
 | 
			
		||||
use tokio::sync::mpsc; // For shutdown signal
 | 
			
		||||
use tokio::task::JoinHandle;
 | 
			
		||||
use tracing::subscriber::with_default;
 | 
			
		||||
 | 
			
		||||
/// Actor trait abstraction for unified actor interface
 | 
			
		||||
pub mod actor_trait;
 | 
			
		||||
@@ -52,13 +53,109 @@ pub(crate) async fn load_job_from_redis(
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Execute the Rhai script and update job status in Redis
 | 
			
		||||
/// Execute the Rhai script and update job status in Redis with per-job logging
 | 
			
		||||
async fn execute_script_and_update_status(
 | 
			
		||||
    redis_conn: &mut redis::aio::MultiplexedConnection,
 | 
			
		||||
    engine: &mut Engine,
 | 
			
		||||
    job: &Job,
 | 
			
		||||
    db_path: &str,
 | 
			
		||||
    actor_type: &str,
 | 
			
		||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
 | 
			
		||||
    // Create per-job logger for isolated logging
 | 
			
		||||
    let job_logger = match hero_logger::create_job_logger("logs", actor_type, &job.id) {
 | 
			
		||||
        Ok(logger) => logger,
 | 
			
		||||
        Err(e) => {
 | 
			
		||||
            error!("Failed to create job logger for job {}: {}", job.id, e);
 | 
			
		||||
            // Continue without per-job logging
 | 
			
		||||
            return execute_script_without_job_logging(redis_conn, engine, job, db_path, actor_type).await;
 | 
			
		||||
        }
 | 
			
		||||
    };
 | 
			
		||||
    
 | 
			
		||||
    // Execute the job within the per-job logging context
 | 
			
		||||
    let job_id = job.id.clone();
 | 
			
		||||
    let context_id = job.context_id.clone();
 | 
			
		||||
    let script = job.script.clone();
 | 
			
		||||
    let caller_id = job.caller_id.clone();
 | 
			
		||||
    let db_path = db_path.to_string();
 | 
			
		||||
    let actor_target = format!("{}_actor", actor_type);
 | 
			
		||||
    
 | 
			
		||||
    let result = with_default(job_logger, || {
 | 
			
		||||
        // Configure Rhai engine for logging within the job context
 | 
			
		||||
        hero_logger::rhai_integration::configure_rhai_logging(engine, &actor_target);
 | 
			
		||||
        
 | 
			
		||||
        // Set up Rhai engine configuration
 | 
			
		||||
        let mut db_config = rhai::Map::new();
 | 
			
		||||
        db_config.insert("DB_PATH".into(), db_path.into());
 | 
			
		||||
        db_config.insert("CALLER_ID".into(), caller_id.into());
 | 
			
		||||
        db_config.insert("CONTEXT_ID".into(), context_id.clone().into());
 | 
			
		||||
        engine.set_default_tag(Dynamic::from(db_config));
 | 
			
		||||
        
 | 
			
		||||
        info!(target: &actor_target, "Job {} processing started for context {}", job_id, context_id);
 | 
			
		||||
        debug!(target: &actor_target, "Evaluating script with Rhai engine");
 | 
			
		||||
        
 | 
			
		||||
        // Execute the script (print/debug calls will now be captured in job logs)
 | 
			
		||||
        match engine.eval::<rhai::Dynamic>(&script) {
 | 
			
		||||
            Ok(result) => {
 | 
			
		||||
                let output_str = if result.is::<String>() {
 | 
			
		||||
                    result.into_string().unwrap()
 | 
			
		||||
                } else {
 | 
			
		||||
                    result.to_string()
 | 
			
		||||
                };
 | 
			
		||||
                info!(target: &actor_target, "Job {} completed successfully. Output: {}", job_id, output_str);
 | 
			
		||||
                Ok(output_str)
 | 
			
		||||
            }
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                let error_str = format!("{:?}", *e);
 | 
			
		||||
                error!(target: &actor_target, "Job {} script evaluation failed: {}", job_id, error_str);
 | 
			
		||||
                Err(error_str)
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
    
 | 
			
		||||
    // Update job status based on execution result
 | 
			
		||||
    match result {
 | 
			
		||||
        Ok(output_str) => {
 | 
			
		||||
            Job::update_status(redis_conn, &job.id, JobStatus::Finished).await
 | 
			
		||||
                .map_err(|e| {
 | 
			
		||||
                    error!("Failed to update job {} status to finished: {}", job.id, e);
 | 
			
		||||
                    e
 | 
			
		||||
                })?;
 | 
			
		||||
            
 | 
			
		||||
            Job::set_result(redis_conn, &job.id, &output_str).await
 | 
			
		||||
                .map_err(|e| {
 | 
			
		||||
                    error!("Failed to set job {} result: {}", job.id, e);
 | 
			
		||||
                    e
 | 
			
		||||
                })?;
 | 
			
		||||
        }
 | 
			
		||||
        Err(error_str) => {
 | 
			
		||||
            Job::update_status(redis_conn, &job.id, JobStatus::Error).await
 | 
			
		||||
                .map_err(|e| {
 | 
			
		||||
                    error!("Failed to update job {} status to error: {}", job.id, e);
 | 
			
		||||
                    e
 | 
			
		||||
                })?;
 | 
			
		||||
            
 | 
			
		||||
            Job::set_error(redis_conn, &job.id, &error_str).await
 | 
			
		||||
                .map_err(|e| {
 | 
			
		||||
                    error!("Failed to set job {} error: {}", job.id, e);
 | 
			
		||||
                    e
 | 
			
		||||
                })?;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Fallback function for script execution without per-job logging
 | 
			
		||||
async fn execute_script_without_job_logging(
 | 
			
		||||
    redis_conn: &mut redis::aio::MultiplexedConnection,
 | 
			
		||||
    engine: &mut Engine,
 | 
			
		||||
    job: &Job,
 | 
			
		||||
    db_path: &str,
 | 
			
		||||
    actor_type: &str,
 | 
			
		||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
 | 
			
		||||
    // Configure Rhai logging to use system logger
 | 
			
		||||
    let actor_target = format!("{}_actor", actor_type);
 | 
			
		||||
    hero_logger::rhai_integration::configure_rhai_logging(engine, &actor_target);
 | 
			
		||||
    let mut db_config = rhai::Map::new();
 | 
			
		||||
    db_config.insert("DB_PATH".into(), db_path.to_string().into());
 | 
			
		||||
    db_config.insert("CALLER_ID".into(), job.caller_id.clone().into());
 | 
			
		||||
@@ -76,7 +173,6 @@ async fn execute_script_and_update_status(
 | 
			
		||||
            };
 | 
			
		||||
            info!("Actor for Context ID '{}' job {} completed. Output: {}", job.context_id, job.id, output_str);
 | 
			
		||||
            
 | 
			
		||||
            // Update job status to finished and set result
 | 
			
		||||
            Job::update_status(redis_conn, &job.id, JobStatus::Finished).await
 | 
			
		||||
                .map_err(|e| {
 | 
			
		||||
                    error!("Failed to update job {} status to finished: {}", job.id, e);
 | 
			
		||||
@@ -95,7 +191,6 @@ async fn execute_script_and_update_status(
 | 
			
		||||
            let error_str = format!("{:?}", *e);
 | 
			
		||||
            error!("Actor for Context ID '{}' job {} script evaluation failed. Error: {}", job.context_id, job.id, error_str);
 | 
			
		||||
            
 | 
			
		||||
            // Update job status to error and set error message
 | 
			
		||||
            Job::update_status(redis_conn, &job.id, JobStatus::Error).await
 | 
			
		||||
                .map_err(|e| {
 | 
			
		||||
                    error!("Failed to update job {} status to error: {}", job.id, e);
 | 
			
		||||
@@ -140,6 +235,8 @@ async fn process_job(
 | 
			
		||||
    engine: &mut Engine,
 | 
			
		||||
    preserve_tasks: bool,
 | 
			
		||||
) {
 | 
			
		||||
    // Extract actor type from actor_id (e.g., "osis_actor_1" -> "osis")
 | 
			
		||||
    let actor_type = hero_logger::extract_actor_type(actor_id);
 | 
			
		||||
    debug!("Actor '{}', Job {}: Processing started.", actor_id, job_id);
 | 
			
		||||
    
 | 
			
		||||
    // Load job from Redis
 | 
			
		||||
@@ -155,8 +252,8 @@ async fn process_job(
 | 
			
		||||
                debug!("Actor for Context ID '{}', Job {}: Status updated to 'started'.", job.context_id, job_id);
 | 
			
		||||
            }
 | 
			
		||||
            
 | 
			
		||||
            // Execute the script and update status
 | 
			
		||||
            if let Err(e) = execute_script_and_update_status(redis_conn, engine, &job, db_path).await {
 | 
			
		||||
            // Execute the script and update status with per-job logging
 | 
			
		||||
            if let Err(e) = execute_script_and_update_status(redis_conn, engine, &job, db_path, actor_type).await {
 | 
			
		||||
                error!("Actor for Context ID '{}', Job {}: Script execution failed: {}", job.context_id, job_id, e);
 | 
			
		||||
                
 | 
			
		||||
                // Ensure job status is set to error if execution failed
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user