diff --git a/Cargo.lock b/Cargo.lock index 09e7441..bb32693 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,6 +13,7 @@ dependencies = [ "clap", "env_logger", "hero_job", + "hero_logger", "heromodels", "heromodels-derive", "heromodels_core", @@ -25,6 +26,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "toml", + "tracing", "uuid", ] @@ -485,6 +487,21 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crossterm" version = "0.28.1" @@ -561,6 +578,15 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "deranged" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +dependencies = [ + "powerfmt", +] + [[package]] name = "derive" version = "0.1.0" @@ -968,6 +994,23 @@ dependencies = [ "uuid", ] +[[package]] +name = "hero_logger" +version = "0.1.0" +source = "git+https://git.ourworld.tf/herocode/baobab.git?branch=logger#0da7b9363c2956e6f17ac78232152c549f1d5e68" +dependencies = [ + "anyhow", + "chrono", + "rhai", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tracing", + "tracing-appender", + "tracing-subscriber", +] + [[package]] name = "hero_supervisor" version = "0.1.0" @@ -1782,6 +1825,15 @@ dependencies = [ "serde", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "md-5" version = "0.10.6" @@ -1860,6 +1912,22 @@ dependencies = [ "memchr", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-traits" version = "0.2.19" @@ -1957,6 +2025,12 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.4" @@ -2117,6 +2191,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -2304,8 +2384,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -2316,9 +2405,15 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -2773,6 +2868,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -3071,6 +3175,46 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "time" +version = "0.3.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" + +[[package]] +name = "time-macros" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tiny-keccak" version = "2.0.2" @@ -3286,6 +3430,18 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" +dependencies = [ + "crossbeam-channel", + "thiserror 1.0.69", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.30" @@ -3304,6 +3460,49 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", + "tracing-serde", ] [[package]] @@ -3424,6 +3623,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 8022f5f..62440e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,8 @@ heromodels = { git = "https://git.ourworld.tf/herocode/db.git" } heromodels_core = { git = "https://git.ourworld.tf/herocode/db.git" } heromodels-derive = { git = "https://git.ourworld.tf/herocode/db.git" } rhailib_dsl = { git = "https://git.ourworld.tf/herocode/rhailib.git" } +hero_logger = { git = "https://git.ourworld.tf/herocode/baobab.git", branch = "logger" } +tracing = "0.1.41" [features] default = ["calendar", "finance"] diff --git a/examples/actor.rs b/examples/actor.rs index b8eb5eb..ede0ee7 100644 --- a/examples/actor.rs +++ b/examples/actor.rs @@ -12,7 +12,8 @@ use hero_job::{Job, JobStatus, ScriptType}; #[tokio::main] async fn main() -> Result<(), Box> { // Initialize logging - env_logger::init(); + // env_logger::init(); + hero_logger::init_system_logger("logs", &["osis_actor".to_string()]).unwrap(); println!("=== OSIS Actor Redis Dispatch Example ==="); diff --git a/src/engine.rs b/src/engine.rs index 2e5b92d..1f895a4 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -51,17 +51,19 @@ impl EngineFactory { pub fn new() -> Self { let mut engine = Engine::new(); register_dsl_modules(&mut engine); - + // Logger + hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "osis_actor"); + Self { engine: Arc::new(engine), } } - + /// Get a shared reference to the engine. pub fn get_engine(&self) -> Arc { Arc::clone(&self.engine) } - + /// Get the global singleton engine factory. pub fn global() -> &'static EngineFactory { static FACTORY: OnceLock = OnceLock::new(); @@ -73,28 +75,33 @@ impl EngineFactory { /// This provides object functionality without relying on the problematic rhailib_dsl object module. fn register_object_functions(engine: &mut Engine) { use heromodels::models::object::Object; - + // Register the Object type engine.register_type_with_name::("Object"); - + // Register constructor function engine.register_fn("new_object", || Object::new()); - + // Register setter functions engine.register_fn("object_title", |obj: &mut Object, title: String| { obj.title = title; obj.clone() }); - - engine.register_fn("object_description", |obj: &mut Object, description: String| { - obj.description = description; - obj.clone() - }); - + + engine.register_fn( + "object_description", + |obj: &mut Object, description: String| { + obj.description = description; + obj.clone() + }, + ); + // Register getter functions engine.register_fn("get_object_id", |obj: &mut Object| obj.id() as i64); engine.register_fn("get_object_title", |obj: &mut Object| obj.title.clone()); - engine.register_fn("get_object_description", |obj: &mut Object| obj.description.clone()); + engine.register_fn("get_object_description", |obj: &mut Object| { + obj.description.clone() + }); } /// Registers all DSL modules with the provided Rhai engine. @@ -151,14 +158,13 @@ pub fn register_dsl_modules(engine: &mut Engine) { // Skip problematic object module for now - can be implemented separately if needed // rhailib_dsl::object::register_object_fns(engine); rhailib_dsl::payment::register_payment_rhai_module(engine); - + // Register basic object functionality directly register_object_functions(engine); - + println!("Rhailib Domain Specific Language modules registered successfully."); } - /// Create a shared heromodels engine using the factory. pub fn create_osis_engine() -> Arc { EngineFactory::global().get_engine() @@ -171,5 +177,3 @@ pub fn eval_script( ) -> Result> { engine.eval(script) } - - diff --git a/src/lib.rs b/src/lib.rs index 93a8a6d..bf3807b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,13 +3,14 @@ mod engine; use async_trait::async_trait; use baobab_actor::execute_job_with_engine; use hero_job::{Job, JobStatus, ScriptType}; +use hero_logger::{create_job_logger, create_job_logger_with_guard}; use log::{error, info}; use redis::AsyncCommands; use rhai::Engine; use std::sync::Arc; use tokio::sync::mpsc; use tokio::task::JoinHandle; - +use tracing::subscriber::with_default; use baobab_actor::{actor_trait::Actor, spawn_actor}; @@ -43,7 +44,7 @@ impl OSISActorBuilder { self.engine = Some(Arc::new(engine)); self } - + pub fn shared_engine(mut self, engine: Arc) -> Self { self.engine = Some(engine); self @@ -60,12 +61,16 @@ impl OSISActorBuilder { } pub fn build(self) -> Result { - let engine = self.engine.unwrap_or_else(|| crate::engine::create_osis_engine()); - + let engine = self + .engine + .unwrap_or_else(|| crate::engine::create_osis_engine()); + Ok(OSISActor { engine, db_path: self.db_path.ok_or("db_path is required")?, - redis_url: self.redis_url.unwrap_or("redis://localhost:6379".to_string()), + redis_url: self + .redis_url + .unwrap_or("redis://localhost:6379".to_string()), }) } } @@ -97,101 +102,179 @@ impl Default for OSISActor { #[async_trait] impl Actor for OSISActor { - async fn process_job( - &self, - job: Job, - redis_conn: &mut redis::aio::MultiplexedConnection, - ) { + async fn process_job(&self, job: Job, redis_conn: &mut redis::aio::MultiplexedConnection) { let job_id = &job.id; let _db_path = &self.db_path; - - info!("OSIS Actor '{}', Job {}: Starting sequential processing", OSIS, job_id); + + // Debug: Log job details + info!( + "OSIS Actor '{}', Job {}: Processing job with context_id: {}, script length: {}", + OSIS, job_id, job.context_id, job.script.len() + ); + + // Create job-specific logger + let (job_logger, guard) = match create_job_logger_with_guard("logs", "osis", job_id) { + Ok((logger, guard)) => { + info!( + "OSIS Actor '{}', Job {}: Job logger created successfully", + OSIS, job_id + ); + (logger, guard) + }, + Err(e) => { + error!( + "OSIS Actor '{}', Job {}: Failed to create job logger: {}", + OSIS, job_id, e + ); + return; + } + }; + + info!( + "OSIS Actor '{}', Job {}: Starting sequential processing", + OSIS, job_id + ); // Update job status to Started if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Started).await { - error!("OSIS Actor '{}', Job {}: Failed to update status to Started: {}", - OSIS, job_id, e); + error!( + "OSIS Actor '{}', Job {}: Failed to update status to Started: {}", + OSIS, job_id, e + ); return; } - // Execute the Rhai script with proper job context - // Note: We create a fresh engine instance for each job to avoid state conflicts - let mut job_engine = Engine::new(); - register_dsl_modules(&mut job_engine); - match execute_job_with_engine(&mut job_engine, &job, &self.db_path).await { + // Execute ALL job processing within logging context + let job_result = with_default(job_logger, || { + tracing::info!(target: "osis_actor", "Job {} started", job_id); + + // Move the Rhai script execution inside this scope + // IMPORTANT: Create a new engine and configure Rhai logging for this job context + let mut job_engine = Engine::new(); + register_dsl_modules(&mut job_engine); + // Configure Rhai logging integration for this engine instance + hero_logger::rhai_integration::configure_rhai_logging(&mut job_engine, "osis_actor"); + + // Execute the script within the job logger context + let script_result = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + execute_job_with_engine(&mut job_engine, &job, &self.db_path).await + }) + }); + + tracing::info!(target: "osis_actor", "Job {} completed", job_id); + + script_result // Return the result + }); + + // Handle the result outside the logging context + match job_result { Ok(result) => { let result_str = format!("{:?}", result); - info!("OSIS Actor '{}', Job {}: Script executed successfully. Result: {}", - OSIS, job_id, result_str); - + info!( + "OSIS Actor '{}', Job {}: Script executed successfully. Result: {}", + OSIS, job_id, result_str + ); + // Update job with success result (stores in job hash output field) if let Err(e) = Job::set_result(redis_conn, job_id, &result_str).await { - error!("OSIS Actor '{}', Job {}: Failed to set result: {}", - OSIS, job_id, e); + error!( + "OSIS Actor '{}', Job {}: Failed to set result: {}", + OSIS, job_id, e + ); return; } - + // Also push result to result queue for retrieval let result_queue_key = format!("hero:job:{}:result", job_id); - if let Err(e) = redis_conn.lpush::<_, _, ()>(&result_queue_key, &result_str).await { - error!("OSIS Actor '{}', Job {}: Failed to push result to queue {}: {}", - OSIS, job_id, result_queue_key, e); + if let Err(e) = redis_conn + .lpush::<_, _, ()>(&result_queue_key, &result_str) + .await + { + error!( + "OSIS Actor '{}', Job {}: Failed to push result to queue {}: {}", + OSIS, job_id, result_queue_key, e + ); } else { - info!("OSIS Actor '{}', Job {}: Result pushed to queue: {}", - OSIS, job_id, result_queue_key); + info!( + "OSIS Actor '{}', Job {}: Result pushed to queue: {}", + OSIS, job_id, result_queue_key + ); } - + if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Finished).await { - error!("OSIS Actor '{}', Job {}: Failed to update status to Finished: {}", - OSIS, job_id, e); + error!( + "OSIS Actor '{}', Job {}: Failed to update status to Finished: {}", + OSIS, job_id, e + ); } } Err(e) => { let error_msg = format!("Script execution error: {}", e); error!("OSIS Actor '{}', Job {}: {}", OSIS, job_id, error_msg); - + // Update job with error (stores in job hash error field) if let Err(e) = Job::set_error(redis_conn, job_id, &error_msg).await { - error!("OSIS Actor '{}', Job {}: Failed to set error: {}", - OSIS, job_id, e); + error!( + "OSIS Actor '{}', Job {}: Failed to set error: {}", + OSIS, job_id, e + ); } - + // Also push error to error queue for retrieval let error_queue_key = format!("hero:job:{}:error", job_id); - if let Err(e) = redis_conn.lpush::<_, _, ()>(&error_queue_key, &error_msg).await { - error!("OSIS Actor '{}', Job {}: Failed to push error to queue {}: {}", - OSIS, job_id, error_queue_key, e); + if let Err(e) = redis_conn + .lpush::<_, _, ()>(&error_queue_key, &error_msg) + .await + { + error!( + "OSIS Actor '{}', Job {}: Failed to push error to queue {}: {}", + OSIS, job_id, error_queue_key, e + ); } else { - info!("OSIS Actor '{}', Job {}: Error pushed to queue: {}", - OSIS, job_id, error_queue_key); + info!( + "OSIS Actor '{}', Job {}: Error pushed to queue: {}", + OSIS, job_id, error_queue_key + ); } - + if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Error).await { - error!("OSIS Actor '{}', Job {}: Failed to update status to Error: {}", - OSIS, job_id, e); + error!( + "OSIS Actor '{}', Job {}: Failed to update status to Error: {}", + OSIS, job_id, e + ); } } } - info!("OSIS Actor '{}', Job {}: Sequential processing completed", OSIS, job_id); + // Force flush logs before dropping guard + std::thread::sleep(std::time::Duration::from_millis(100)); + + // Keep the guard alive until after processing + drop(guard); + + info!( + "OSIS Actor '{}', Job {}: Sequential processing completed", + OSIS, job_id + ); } fn actor_type(&self) -> &'static str { "OSIS" } - + fn actor_id(&self) -> &str { // Use actor_queue:osis to match supervisor's dispatch queue naming "actor_queue:osis" } - + fn redis_url(&self) -> &str { &self.redis_url } } /// Convenience function to spawn an OSIS actor using the trait interface -/// +/// /// This function provides backward compatibility with the original actor API /// while using the new trait-based implementation. pub fn spawn_osis_actor( @@ -204,7 +287,7 @@ pub fn spawn_osis_actor( .db_path(db_path) .redis_url(redis_url) .build() - .expect("Failed to build OSISActor") + .expect("Failed to build OSISActor"), ); spawn_actor(actor, shutdown_rx) } @@ -231,7 +314,7 @@ mod tests { #[tokio::test] async fn test_osis_actor_process_job_interface() { let actor = OSISActor::default(); - + // Create a simple test job let _job = Job::new( "test_caller".to_string(), @@ -242,7 +325,7 @@ mod tests { // Note: This test doesn't actually connect to Redis, it just tests the interface // In a real test environment, you'd need a Redis instance or mock - + // For now, just verify the actor was created successfully assert_eq!(actor.actor_type(), "OSIS"); }