diff --git a/Cargo.lock b/Cargo.lock index 19adc3b..6770bee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,9 +12,10 @@ dependencies = [ "clap", "env_logger", "hero_job", - "heromodels 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)", - "heromodels-derive 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)", - "heromodels_core 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)", + "hero_logger", + "heromodels", + "heromodels-derive", + "heromodels_core", "log", "redis", "rhai", @@ -24,6 +25,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "toml", + "tracing", "uuid", ] @@ -197,9 +199,9 @@ dependencies = [ "env_logger", "hero_job", "hero_supervisor", - "heromodels 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)", - "heromodels-derive 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)", - "heromodels_core 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)", + "heromodels", + "heromodels-derive", + "heromodels_core", "log", "redis", "rhai", @@ -481,6 +483,21 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crossterm" version = "0.28.1" @@ -557,10 +574,19 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "deranged" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +dependencies = [ + "powerfmt", +] + [[package]] name = "derive" version = "0.1.0" -source = "git+https://git.ourworld.tf/herocode/rhailib.git#c37be2dfcc447538be8363296b2b465c93ad3e11" +source = "git+https://git.ourworld.tf/herocode/rhailib.git#02d9f5937ea5d5ce78f6f4a89c7400bfd1881057" dependencies = [ "quote", "syn 1.0.109", @@ -964,6 +990,23 @@ dependencies = [ "uuid", ] +[[package]] +name = "hero_logger" +version = "0.1.0" +source = "git+https://git.ourworld.tf/herocode/baobab.git?branch=logger#9c4fa1a78bf3cbeb57802d261ee9c9fb115ba219" +dependencies = [ + "anyhow", + "chrono", + "rhai", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tracing", + "tracing-appender", + "tracing-subscriber", +] + [[package]] name = "hero_supervisor" version = "0.1.0" @@ -994,10 +1037,10 @@ source = "git+https://git.ourworld.tf/herocode/db.git#453e86edd24d6009f0b154ac77 dependencies = [ "bincode", "chrono", - "heromodels-derive 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)", - "heromodels_core 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)", + "heromodels-derive", + "heromodels_core", "jsonb", - "ourdb 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)", + "ourdb", "postgres", "r2d2", "r2d2_postgres", @@ -1006,30 +1049,7 @@ dependencies = [ "serde_json", "strum", "strum_macros", - "tst 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)", - "uuid", -] - -[[package]] -name = "heromodels" -version = "0.1.0" -source = "git+https://git.ourworld.tf/herocode/rhailib.git#c37be2dfcc447538be8363296b2b465c93ad3e11" -dependencies = [ - "bincode", - "chrono", - "heromodels-derive 0.1.0 (git+https://git.ourworld.tf/herocode/rhailib.git)", - "heromodels_core 0.1.0 (git+https://git.ourworld.tf/herocode/rhailib.git)", - "jsonb", - "ourdb 0.1.0 (git+https://git.ourworld.tf/herocode/rhailib.git)", - "postgres", - "r2d2", - "r2d2_postgres", - "rhai", - "serde", - "serde_json", - "strum", - "strum_macros", - "tst 0.1.0 (git+https://git.ourworld.tf/herocode/rhailib.git)", + "tst", "uuid", ] @@ -1043,16 +1063,6 @@ dependencies = [ "syn 2.0.104", ] -[[package]] -name = "heromodels-derive" -version = "0.1.0" -source = "git+https://git.ourworld.tf/herocode/rhailib.git#c37be2dfcc447538be8363296b2b465c93ad3e11" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.104", -] - [[package]] name = "heromodels_core" version = "0.1.0" @@ -1062,15 +1072,6 @@ dependencies = [ "serde", ] -[[package]] -name = "heromodels_core" -version = "0.1.0" -source = "git+https://git.ourworld.tf/herocode/rhailib.git#c37be2dfcc447538be8363296b2b465c93ad3e11" -dependencies = [ - "chrono", - "serde", -] - [[package]] name = "hmac" version = "0.12.1" @@ -1812,14 +1813,23 @@ dependencies = [ [[package]] name = "macros" version = "0.1.0" -source = "git+https://git.ourworld.tf/herocode/rhailib.git#c37be2dfcc447538be8363296b2b465c93ad3e11" +source = "git+https://git.ourworld.tf/herocode/rhailib.git#02d9f5937ea5d5ce78f6f4a89c7400bfd1881057" dependencies = [ - "heromodels 0.1.0 (git+https://git.ourworld.tf/herocode/rhailib.git)", - "heromodels_core 0.1.0 (git+https://git.ourworld.tf/herocode/rhailib.git)", + "heromodels", + "heromodels_core", "rhai", "serde", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "md-5" version = "0.10.6" @@ -1898,6 +1908,22 @@ dependencies = [ "memchr", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-traits" version = "0.2.19" @@ -1996,15 +2022,10 @@ dependencies = [ ] [[package]] -name = "ourdb" -version = "0.1.0" -source = "git+https://git.ourworld.tf/herocode/rhailib.git#c37be2dfcc447538be8363296b2b465c93ad3e11" -dependencies = [ - "crc32fast", - "log", - "rand 0.8.5", - "thiserror 1.0.69", -] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "parking_lot" @@ -2166,6 +2187,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -2353,8 +2380,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -2365,9 +2401,15 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -2467,7 +2509,7 @@ dependencies = [ [[package]] name = "rhai_dispatcher" version = "0.1.0" -source = "git+https://git.ourworld.tf/herocode/rhailib.git#c37be2dfcc447538be8363296b2b465c93ad3e11" +source = "git+https://git.ourworld.tf/herocode/rhailib.git#02d9f5937ea5d5ce78f6f4a89c7400bfd1881057" dependencies = [ "chrono", "clap", @@ -2484,14 +2526,14 @@ dependencies = [ [[package]] name = "rhailib_dsl" version = "0.1.0" -source = "git+https://git.ourworld.tf/herocode/rhailib.git#c37be2dfcc447538be8363296b2b465c93ad3e11" +source = "git+https://git.ourworld.tf/herocode/rhailib.git#02d9f5937ea5d5ce78f6f4a89c7400bfd1881057" dependencies = [ "chrono", "derive", "dotenv", - "heromodels 0.1.0 (git+https://git.ourworld.tf/herocode/rhailib.git)", - "heromodels-derive 0.1.0 (git+https://git.ourworld.tf/herocode/rhailib.git)", - "heromodels_core 0.1.0 (git+https://git.ourworld.tf/herocode/rhailib.git)", + "heromodels", + "heromodels-derive", + "heromodels_core", "macros", "reqwest", "rhai", @@ -2822,6 +2864,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -3120,6 +3171,46 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "time" +version = "0.3.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" + +[[package]] +name = "time-macros" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tiny-keccak" version = "2.0.2" @@ -3335,6 +3426,18 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" +dependencies = [ + "crossbeam-channel", + "thiserror 1.0.69", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.30" @@ -3353,6 +3456,49 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", + "tracing-serde", ] [[package]] @@ -3366,16 +3512,7 @@ name = "tst" version = "0.1.0" source = "git+https://git.ourworld.tf/herocode/db.git#453e86edd24d6009f0b154ac777cc66dc5f3bf76" dependencies = [ - "ourdb 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)", - "thiserror 1.0.69", -] - -[[package]] -name = "tst" -version = "0.1.0" -source = "git+https://git.ourworld.tf/herocode/rhailib.git#c37be2dfcc447538be8363296b2b465c93ad3e11" -dependencies = [ - "ourdb 0.1.0 (git+https://git.ourworld.tf/herocode/rhailib.git)", + "ourdb", "thiserror 1.0.69", ] @@ -3482,6 +3619,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index a8a908f..40000aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,8 @@ heromodels = { git = "https://git.ourworld.tf/herocode/db.git" } heromodels_core = { git = "https://git.ourworld.tf/herocode/db.git" } heromodels-derive = { git = "https://git.ourworld.tf/herocode/db.git" } rhailib_dsl = { git = "https://git.ourworld.tf/herocode/rhailib.git" } +hero_logger = { git = "https://git.ourworld.tf/herocode/baobab.git", branch = "logger" } +tracing = "0.1.41" [features] default = ["calendar", "finance"] diff --git a/examples/actor.rs b/examples/actor.rs index b8eb5eb..ede0ee7 100644 --- a/examples/actor.rs +++ b/examples/actor.rs @@ -12,7 +12,8 @@ use hero_job::{Job, JobStatus, ScriptType}; #[tokio::main] async fn main() -> Result<(), Box> { // Initialize logging - env_logger::init(); + // env_logger::init(); + hero_logger::init_system_logger("logs", &["osis_actor".to_string()]).unwrap(); println!("=== OSIS Actor Redis Dispatch Example ==="); diff --git a/src/engine.rs b/src/engine.rs index 2e5b92d..1f895a4 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -51,17 +51,19 @@ impl EngineFactory { pub fn new() -> Self { let mut engine = Engine::new(); register_dsl_modules(&mut engine); - + // Logger + hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "osis_actor"); + Self { engine: Arc::new(engine), } } - + /// Get a shared reference to the engine. pub fn get_engine(&self) -> Arc { Arc::clone(&self.engine) } - + /// Get the global singleton engine factory. pub fn global() -> &'static EngineFactory { static FACTORY: OnceLock = OnceLock::new(); @@ -73,28 +75,33 @@ impl EngineFactory { /// This provides object functionality without relying on the problematic rhailib_dsl object module. fn register_object_functions(engine: &mut Engine) { use heromodels::models::object::Object; - + // Register the Object type engine.register_type_with_name::("Object"); - + // Register constructor function engine.register_fn("new_object", || Object::new()); - + // Register setter functions engine.register_fn("object_title", |obj: &mut Object, title: String| { obj.title = title; obj.clone() }); - - engine.register_fn("object_description", |obj: &mut Object, description: String| { - obj.description = description; - obj.clone() - }); - + + engine.register_fn( + "object_description", + |obj: &mut Object, description: String| { + obj.description = description; + obj.clone() + }, + ); + // Register getter functions engine.register_fn("get_object_id", |obj: &mut Object| obj.id() as i64); engine.register_fn("get_object_title", |obj: &mut Object| obj.title.clone()); - engine.register_fn("get_object_description", |obj: &mut Object| obj.description.clone()); + engine.register_fn("get_object_description", |obj: &mut Object| { + obj.description.clone() + }); } /// Registers all DSL modules with the provided Rhai engine. @@ -151,14 +158,13 @@ pub fn register_dsl_modules(engine: &mut Engine) { // Skip problematic object module for now - can be implemented separately if needed // rhailib_dsl::object::register_object_fns(engine); rhailib_dsl::payment::register_payment_rhai_module(engine); - + // Register basic object functionality directly register_object_functions(engine); - + println!("Rhailib Domain Specific Language modules registered successfully."); } - /// Create a shared heromodels engine using the factory. pub fn create_osis_engine() -> Arc { EngineFactory::global().get_engine() @@ -171,5 +177,3 @@ pub fn eval_script( ) -> Result> { engine.eval(script) } - - diff --git a/src/lib.rs b/src/lib.rs index 93a8a6d..a1278dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,13 +3,14 @@ mod engine; use async_trait::async_trait; use baobab_actor::execute_job_with_engine; use hero_job::{Job, JobStatus, ScriptType}; +use hero_logger::{create_job_logger, create_job_logger_with_guard}; use log::{error, info}; use redis::AsyncCommands; use rhai::Engine; use std::sync::Arc; use tokio::sync::mpsc; use tokio::task::JoinHandle; - +use tracing::subscriber::with_default; use baobab_actor::{actor_trait::Actor, spawn_actor}; @@ -43,7 +44,7 @@ impl OSISActorBuilder { self.engine = Some(Arc::new(engine)); self } - + pub fn shared_engine(mut self, engine: Arc) -> Self { self.engine = Some(engine); self @@ -60,12 +61,16 @@ impl OSISActorBuilder { } pub fn build(self) -> Result { - let engine = self.engine.unwrap_or_else(|| crate::engine::create_osis_engine()); - + let engine = self + .engine + .unwrap_or_else(|| crate::engine::create_osis_engine()); + Ok(OSISActor { engine, db_path: self.db_path.ok_or("db_path is required")?, - redis_url: self.redis_url.unwrap_or("redis://localhost:6379".to_string()), + redis_url: self + .redis_url + .unwrap_or("redis://localhost:6379".to_string()), }) } } @@ -97,101 +102,155 @@ impl Default for OSISActor { #[async_trait] impl Actor for OSISActor { - async fn process_job( - &self, - job: Job, - redis_conn: &mut redis::aio::MultiplexedConnection, - ) { + async fn process_job(&self, job: Job, redis_conn: &mut redis::aio::MultiplexedConnection) { let job_id = &job.id; let _db_path = &self.db_path; - - info!("OSIS Actor '{}', Job {}: Starting sequential processing", OSIS, job_id); + + // Create job-specific logger + let (job_logger, _guard) = create_job_logger_with_guard("logs", "osis", job_id).unwrap(); + + info!( + "OSIS Actor '{}', Job {}: Starting sequential processing", + OSIS, job_id + ); // Update job status to Started if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Started).await { - error!("OSIS Actor '{}', Job {}: Failed to update status to Started: {}", - OSIS, job_id, e); + error!( + "OSIS Actor '{}', Job {}: Failed to update status to Started: {}", + OSIS, job_id, e + ); return; } - // Execute the Rhai script with proper job context - // Note: We create a fresh engine instance for each job to avoid state conflicts - let mut job_engine = Engine::new(); - register_dsl_modules(&mut job_engine); - match execute_job_with_engine(&mut job_engine, &job, &self.db_path).await { + // Execute ALL job processing within logging context + let job_result = with_default(job_logger, || { + tracing::info!(target: "osis_actor", "Job {} started", job_id); + + // Move the Rhai script execution inside this scope + // IMPORTANT: Create a new engine and configure Rhai logging for this job context + let mut job_engine = Engine::new(); + register_dsl_modules(&mut job_engine); + // Configure Rhai logging integration for this engine instance + hero_logger::rhai_integration::configure_rhai_logging(&mut job_engine, "osis_actor"); + + // Execute the script within the job logger context + let script_result = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + execute_job_with_engine(&mut job_engine, &job, &self.db_path).await + }) + }); + + tracing::info!(target: "osis_actor", "Job {} completed", job_id); + + script_result // Return the result + }); + + // Handle the result outside the logging context + match job_result { Ok(result) => { let result_str = format!("{:?}", result); - info!("OSIS Actor '{}', Job {}: Script executed successfully. Result: {}", - OSIS, job_id, result_str); - + info!( + "OSIS Actor '{}', Job {}: Script executed successfully. Result: {}", + OSIS, job_id, result_str + ); + // Update job with success result (stores in job hash output field) if let Err(e) = Job::set_result(redis_conn, job_id, &result_str).await { - error!("OSIS Actor '{}', Job {}: Failed to set result: {}", - OSIS, job_id, e); + error!( + "OSIS Actor '{}', Job {}: Failed to set result: {}", + OSIS, job_id, e + ); return; } - + // Also push result to result queue for retrieval let result_queue_key = format!("hero:job:{}:result", job_id); - if let Err(e) = redis_conn.lpush::<_, _, ()>(&result_queue_key, &result_str).await { - error!("OSIS Actor '{}', Job {}: Failed to push result to queue {}: {}", - OSIS, job_id, result_queue_key, e); + if let Err(e) = redis_conn + .lpush::<_, _, ()>(&result_queue_key, &result_str) + .await + { + error!( + "OSIS Actor '{}', Job {}: Failed to push result to queue {}: {}", + OSIS, job_id, result_queue_key, e + ); } else { - info!("OSIS Actor '{}', Job {}: Result pushed to queue: {}", - OSIS, job_id, result_queue_key); + info!( + "OSIS Actor '{}', Job {}: Result pushed to queue: {}", + OSIS, job_id, result_queue_key + ); } - + if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Finished).await { - error!("OSIS Actor '{}', Job {}: Failed to update status to Finished: {}", - OSIS, job_id, e); + error!( + "OSIS Actor '{}', Job {}: Failed to update status to Finished: {}", + OSIS, job_id, e + ); } } Err(e) => { let error_msg = format!("Script execution error: {}", e); error!("OSIS Actor '{}', Job {}: {}", OSIS, job_id, error_msg); - + // Update job with error (stores in job hash error field) if let Err(e) = Job::set_error(redis_conn, job_id, &error_msg).await { - error!("OSIS Actor '{}', Job {}: Failed to set error: {}", - OSIS, job_id, e); + error!( + "OSIS Actor '{}', Job {}: Failed to set error: {}", + OSIS, job_id, e + ); } - + // Also push error to error queue for retrieval let error_queue_key = format!("hero:job:{}:error", job_id); - if let Err(e) = redis_conn.lpush::<_, _, ()>(&error_queue_key, &error_msg).await { - error!("OSIS Actor '{}', Job {}: Failed to push error to queue {}: {}", - OSIS, job_id, error_queue_key, e); + if let Err(e) = redis_conn + .lpush::<_, _, ()>(&error_queue_key, &error_msg) + .await + { + error!( + "OSIS Actor '{}', Job {}: Failed to push error to queue {}: {}", + OSIS, job_id, error_queue_key, e + ); } else { - info!("OSIS Actor '{}', Job {}: Error pushed to queue: {}", - OSIS, job_id, error_queue_key); + info!( + "OSIS Actor '{}', Job {}: Error pushed to queue: {}", + OSIS, job_id, error_queue_key + ); } - + if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Error).await { - error!("OSIS Actor '{}', Job {}: Failed to update status to Error: {}", - OSIS, job_id, e); + error!( + "OSIS Actor '{}', Job {}: Failed to update status to Error: {}", + OSIS, job_id, e + ); } } } - info!("OSIS Actor '{}', Job {}: Sequential processing completed", OSIS, job_id); + // Keep the guard alive until after processing + drop(_guard); + + info!( + "OSIS Actor '{}', Job {}: Sequential processing completed", + OSIS, job_id + ); } fn actor_type(&self) -> &'static str { "OSIS" } - + fn actor_id(&self) -> &str { // Use actor_queue:osis to match supervisor's dispatch queue naming "actor_queue:osis" } - + fn redis_url(&self) -> &str { &self.redis_url } } /// Convenience function to spawn an OSIS actor using the trait interface -/// +/// /// This function provides backward compatibility with the original actor API /// while using the new trait-based implementation. pub fn spawn_osis_actor( @@ -204,7 +263,7 @@ pub fn spawn_osis_actor( .db_path(db_path) .redis_url(redis_url) .build() - .expect("Failed to build OSISActor") + .expect("Failed to build OSISActor"), ); spawn_actor(actor, shutdown_rx) } @@ -231,7 +290,7 @@ mod tests { #[tokio::test] async fn test_osis_actor_process_job_interface() { let actor = OSISActor::default(); - + // Create a simple test job let _job = Job::new( "test_caller".to_string(), @@ -242,7 +301,7 @@ mod tests { // Note: This test doesn't actually connect to Redis, it just tests the interface // In a real test environment, you'd need a Redis instance or mock - + // For now, just verify the actor was created successfully assert_eq!(actor.actor_type(), "OSIS"); }