use crate::Job; use crate::runner_trait::Runner; use log::{debug, error, info}; use rhai::{Engine, Dynamic}; use std::sync::Arc; use tracing::subscriber::with_default; /// Configuration for sync runner instances #[derive(Debug, Clone)] pub struct SyncRunnerConfig { pub runner_id: String, pub redis_url: String, } /// Synchronous runner that processes jobs sequentially pub struct SyncRunner { pub config: SyncRunnerConfig, pub engine_factory: Arc Engine + Send + Sync>, } impl SyncRunner { /// Create a new SyncRunner with the provided engine factory pub fn new(config: SyncRunnerConfig, engine_factory: F) -> Self where F: Fn() -> Engine + Send + Sync + 'static, { Self { config, engine_factory: Arc::new(engine_factory), } } /// Execute a job with the given engine, setting proper job context /// /// This function sets up the engine with job context (DB_PATH, CALLER_ID, CONTEXT_ID) /// and evaluates the script. It returns the result or error. fn execute_job_with_engine( engine: &mut Engine, job: &Job, ) -> Result> { // Set up job context in the engine let mut db_config = rhai::Map::new(); db_config.insert("CALLER_ID".into(), job.caller_id.clone().into()); db_config.insert("CONTEXT_ID".into(), job.context_id.clone().into()); // Extract signatories from job signatures, or fall back to env_vars let signatories: Vec = if !job.signatures.is_empty() { // Use signatures from the job job.signatures.iter() .map(|sig| Dynamic::from(sig.public_key.clone())) .collect() } else { Vec::new() }; db_config.insert("SIGNATORIES".into(), Dynamic::from(signatories)); engine.set_default_tag(Dynamic::from(db_config)); debug!("Sync Runner for Context ID '{}': Evaluating script with Rhai engine (job context set).", job.context_id); // Execute the script with the configured engine engine.eval::(&job.payload) } } impl Runner for SyncRunner { fn process_job(&self, job: Job) -> Result> { let job_id = &job.id; let runner_id = &self.config.runner_id; debug!("Sync Runner '{}', Job {}: Processing started.", runner_id, job_id); info!("Sync Runner '{}' processing job_id: {}. Script: {:.50}...", job.context_id, job_id, job.payload); // Determine logs directory (default to ~/hero/logs) let logs_root = if let Some(home) = std::env::var_os("HOME") { std::path::PathBuf::from(home).join("hero").join("logs") } else { std::path::PathBuf::from("logs") }; // Create job-specific logger let job_logger_result = hero_logger::create_job_logger_with_guard( &logs_root, runner_id, // Use runner_id as the actor_type job_id, ); // Verify signatures before executing (if any) if let Err(e) = job.verify_signatures() { error!("Job {} signature verification failed: {}", job_id, e); return Err(Box::new(e)); } // Execute job within logging context let result = match job_logger_result { Ok((job_logger, _guard)) => { // Execute ALL job processing within logging context with_default(job_logger, || { tracing::info!("Job {} started", job_id); // Create a new engine instance and configure Rhai logging let mut engine = (self.engine_factory)(); // Reconfigure Rhai logging for this specific job context // This ensures print() and debug() calls go to the job logger hero_logger::rhai_integration::configure_rhai_logging(&mut engine, runner_id); // Execute the script let script_result = Self::execute_job_with_engine(&mut engine, &job); tracing::info!("Job {} completed", job_id); script_result }) } Err(e) => { error!("Failed to create job logger for job {}: {}", job_id, e); // Fallback: execute without job-specific logging let mut engine = (self.engine_factory)(); Self::execute_job_with_engine(&mut engine, &job) } }; // Process result match result { Ok(result) => { let output_str = if result.is::() { result.into_string().unwrap() } else { result.to_string() }; info!("Sync Runner for Context ID '{}' job {} completed. Output: {}", job.context_id, job.id, output_str); Ok(output_str) } Err(e) => { let error_str = format!("{:?}", *e); error!("Sync Runner for Context ID '{}' job {} script evaluation failed. Error: {}", job.context_id, job.id, error_str); Err(Box::new(e) as Box) } } } fn runner_type(&self) -> &'static str { "Sync" } fn runner_id(&self) -> &str { &self.config.runner_id } fn redis_url(&self) -> &str { &self.config.redis_url } } /// Convenience function to spawn a synchronous runner using the trait interface pub fn spawn_sync_runner( runner_id: String, redis_url: String, shutdown_rx: tokio::sync::mpsc::Receiver<()>, engine_factory: F, ) -> tokio::task::JoinHandle>> where F: Fn() -> Engine + Send + Sync + 'static, { let config = SyncRunnerConfig { runner_id, redis_url, }; let runner = Arc::new(SyncRunner::new(config, engine_factory)); crate::runner_trait::spawn_runner(runner, shutdown_rx) }