From 1a3fa6242d546d83ff999afe7e46da89019f3281 Mon Sep 17 00:00:00 2001 From: timurgordon Date: Thu, 19 Jun 2025 02:32:56 +0300 Subject: [PATCH] linting and fmt --- benches/simple_rhai_bench/main.rs | 97 ++++--- examples/Cargo.toml | 4 - examples/dedicated_reply_queue_demo.rs | 16 +- examples/end_to_end/main.rs | 15 +- examples/example_math_worker.rs | 36 ++- examples/example_string_worker.rs | 34 ++- src/client/examples/timeout_example.rs | 27 +- src/client/src/lib.rs | 94 ++++--- src/engine/build.rs | 8 +- src/engine/examples/calendar/example.rs | 44 ++-- src/engine/examples/finance/example.rs | 47 ++-- src/engine/examples/flow/example.rs | 127 +++++---- src/engine/src/lib.rs | 54 ++-- src/engine/src/mock_db.rs | 239 ++++++++++------- src/lib.rs | 4 +- src/monitor/src/cli_logic.rs | 11 +- src/monitor/src/main.rs | 1 - src/monitor/src/tasks.rs | 9 +- src/repl/examples/connect_and_play.rs | 118 ++++++--- src/repl/src/main.rs | 126 ++++++--- src/rhai_engine_ui/src/app.rs | 104 +++++--- src/rhai_engine_ui/src/main.rs | 57 +++- src/worker/cmd/worker.rs | 44 ++-- src/worker/src/lib.rs | 328 +++++++++++++----------- 24 files changed, 1024 insertions(+), 620 deletions(-) diff --git a/benches/simple_rhai_bench/main.rs b/benches/simple_rhai_bench/main.rs index bce8568..9881d6d 100644 --- a/benches/simple_rhai_bench/main.rs +++ b/benches/simple_rhai_bench/main.rs @@ -1,9 +1,9 @@ use criterion::{criterion_group, criterion_main, Criterion}; use redis::{Client, Commands}; -use std::process::{Command, Child, Stdio}; -use std::time::Duration; -use std::thread; use std::fs; +use std::process::{Child, Command, Stdio}; +use std::thread; +use std::time::Duration; const REDIS_URL: &str = "redis://127.0.0.1:6379"; const CIRCLE_NAME: &str = "bench_circle"; @@ -12,24 +12,33 @@ const SIMPLE_SCRIPT: &str = "new_event()\n .title(\"Weekly Sync\")\n .loca fn cleanup_redis() -> Result<(), redis::RedisError> { let client = Client::open(REDIS_URL)?; let mut conn = client.get_connection()?; - + // Clear task queue and any existing task details let _: () = conn.del(format!("rhai_tasks:{}", CIRCLE_NAME))?; let keys: Vec = conn.scan_match("rhai_task_details:*")?.collect(); if !keys.is_empty() { let _: () = conn.del(keys)?; } - + Ok(()) } fn start_worker() -> Result { Command::new("cargo") - .args(&["run", "--release", "--bin", "worker", "--", - "--circle", CIRCLE_NAME, - "--redis-url", REDIS_URL, - "--worker-id", "bench_worker", - "--preserve-tasks"]) + .args(&[ + "run", + "--release", + "--bin", + "worker", + "--", + "--circle", + CIRCLE_NAME, + "--redis-url", + REDIS_URL, + "--worker-id", + "bench_worker", + "--preserve-tasks", + ]) .current_dir("src/worker") .stdout(Stdio::null()) .stderr(Stdio::null()) @@ -39,7 +48,7 @@ fn start_worker() -> Result { fn create_batch_tasks(task_count: usize) -> Result, Box> { let client = Client::open(REDIS_URL)?; let mut conn = client.get_connection()?; - + // Load and execute Lua script let lua_script = fs::read_to_string("benches/simple_rhai_bench/batch_task.lua")?; let result: redis::Value = redis::cmd("EVAL") @@ -49,7 +58,7 @@ fn create_batch_tasks(task_count: usize) -> Result, Box { @@ -65,24 +74,28 @@ fn create_batch_tasks(task_count: usize) -> Result, Box Result { let client = Client::open(REDIS_URL)?; let mut conn = client.get_connection()?; - + let start_time = std::time::Instant::now(); let timeout = Duration::from_secs(100); - + // Poll until task is completed or timeout loop { let status: Option = conn.hget(task_key, "status")?; - + match status.as_deref() { Some("completed") | Some("error") => { - println!("Task {} completed with status: {}", task_key, status.as_deref().unwrap_or("unknown")); + println!( + "Task {} completed with status: {}", + task_key, + status.as_deref().unwrap_or("unknown") + ); let created_at: u64 = conn.hget(task_key, "createdAt")?; let updated_at: u64 = conn.hget(task_key, "updatedAt")?; return Ok((updated_at - created_at) as f64 * 1000.0); // Convert to milliseconds @@ -94,12 +107,12 @@ fn wait_and_measure(task_key: &str) -> Result { thread::sleep(Duration::from_millis(100)); } } - + // Check timeout if start_time.elapsed() > timeout { return Err(redis::RedisError::from(( redis::ErrorKind::IoError, - "Timeout waiting for task completion" + "Timeout waiting for task completion", ))); } } @@ -108,22 +121,22 @@ fn wait_and_measure(task_key: &str) -> Result { fn wait_for_batch_completion(task_keys: &[String]) -> Result> { let client = Client::open(REDIS_URL)?; let mut conn = client.get_connection()?; - + let start_time = std::time::Instant::now(); let timeout = Duration::from_secs(30); - + // Wait for all tasks to complete loop { let mut completed_count = 0; let mut total_latency = 0u64; - + for task_key in task_keys { let status: Option = conn.hget(task_key, "status")?; - + match status.as_deref() { Some("completed") | Some("error") => { completed_count += 1; - + // Get timing data let created_at: u64 = conn.hget(task_key, "createdAt")?; let updated_at: u64 = conn.hget(task_key, "updatedAt")?; @@ -132,18 +145,23 @@ fn wait_for_batch_completion(task_keys: &[String]) -> Result {} // Still pending or processing } } - + if completed_count == task_keys.len() { // All tasks completed, calculate average latency in milliseconds let avg_latency_ms = (total_latency as f64 / task_keys.len() as f64) * 1000.0; return Ok(avg_latency_ms); } - + // Check timeout if start_time.elapsed() > timeout { - return Err(format!("Timeout waiting for batch completion. Completed: {}/{}", completed_count, task_keys.len()).into()); + return Err(format!( + "Timeout waiting for batch completion. Completed: {}/{}", + completed_count, + task_keys.len() + ) + .into()); } - + thread::sleep(Duration::from_millis(100)); } } @@ -164,41 +182,42 @@ fn bench_single_rhai_task(c: &mut Criterion) { // Clean up before starting cleanup_redis().expect("Failed to cleanup Redis"); - + // Start worker once and reuse it let worker = start_worker().expect("Failed to start worker"); thread::sleep(Duration::from_millis(1000)); // Give worker time to start - + let mut group = c.benchmark_group("rhai_task_execution"); group.sample_size(10); // Reduce sample size group.measurement_time(Duration::from_secs(10)); // Reduce measurement time - + group.bench_function("batch_task_latency", |b| { b.iter_custom(|iters| { let mut total_latency = Duration::ZERO; - + for _i in 0..iters { // Clean up Redis between iterations cleanup_redis().expect("Failed to cleanup Redis"); - + // Create 100 tasks and measure average latency using Redis timestamps let task_keys = create_batch_tasks(5000).expect("Failed to create batch tasks"); - let avg_latency_ms = wait_for_batch_completion(&task_keys).expect("Failed to measure batch completion"); - + let avg_latency_ms = wait_for_batch_completion(&task_keys) + .expect("Failed to measure batch completion"); + // Convert average latency to duration total_latency += Duration::from_millis(avg_latency_ms as u64); } - + total_latency }); }); - + group.finish(); - + // Cleanup worker cleanup_worker(worker).expect("Failed to cleanup worker"); cleanup_redis().expect("Failed to cleanup Redis"); } criterion_group!(benches, bench_single_rhai_task); -criterion_main!(benches); \ No newline at end of file +criterion_main!(benches); diff --git a/examples/Cargo.toml b/examples/Cargo.toml index d9d4537..48f6d5c 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -27,7 +27,3 @@ path = "example_string_worker.rs" [[bin]] name = "dedicated_reply_queue_demo" path = "dedicated_reply_queue_demo.rs" - -[[bin]] -name = "lua_client_demo" -path = "lua_client_demo.rs" diff --git a/examples/dedicated_reply_queue_demo.rs b/examples/dedicated_reply_queue_demo.rs index 6535941..c5ab69f 100644 --- a/examples/dedicated_reply_queue_demo.rs +++ b/examples/dedicated_reply_queue_demo.rs @@ -1,4 +1,4 @@ -use log::{info, error, debug}; +use log::{debug, error, info}; use rhai::Engine; use rhai_client::{RhaiClient, RhaiClientError}; // RhaiTaskDetails is now used for its fields use rhailib_worker::spawn_rhai_worker; @@ -55,7 +55,10 @@ async fn main() -> Result<(), Box> { let task_timeout = Duration::from_secs(10); let task_id = Uuid::new_v4().to_string(); // Generate a unique task_id - info!("Submitting script to circle '{}' with task_id '{}' and awaiting result...", circle_name, task_id); + info!( + "Submitting script to circle '{}' with task_id '{}' and awaiting result...", + circle_name, task_id + ); info!("Script: {}", script_to_run); match client @@ -64,7 +67,7 @@ async fn main() -> Result<(), Box> { task_id.clone(), // Pass the generated task_id script_to_run.to_string(), task_timeout, - None // public_key + None, // public_key ) .await { @@ -72,7 +75,10 @@ async fn main() -> Result<(), Box> { info!("Task {} completed successfully!", details.task_id); debug!("Full Task Details: {:#?}", details); // The task_id is now part of the returned RhaiTaskDetails struct. - info!("Received details for task_id: {}, script: {}", details.task_id, details.script); + info!( + "Received details for task_id: {}, script: {}", + details.task_id, details.script + ); info!("Status: {}", details.status); if let Some(output) = details.output { info!("Output: {}", output); // Expected: 42 @@ -110,4 +116,4 @@ async fn main() -> Result<(), Box> { info!("Dedicated Reply Queue Demo finished."); Ok(()) -} \ No newline at end of file +} diff --git a/examples/end_to_end/main.rs b/examples/end_to_end/main.rs index d952001..222c80d 100644 --- a/examples/end_to_end/main.rs +++ b/examples/end_to_end/main.rs @@ -57,7 +57,10 @@ async fn main() -> Result<(), Box> { log::error!("Failed to read script file '{}': {}", script_path_str, e); // Attempt to read from an alternative path if run via `cargo run --example` // where current dir might be the crate root. - let alt_script_path = Path::new(file!()).parent().unwrap().join("auth_script.rhai"); + let alt_script_path = Path::new(file!()) + .parent() + .unwrap() + .join("auth_script.rhai"); log::info!("Attempting alternative script path: {:?}", alt_script_path); fs::read_to_string(&alt_script_path)? } @@ -106,9 +109,15 @@ async fn main() -> Result<(), Box> { ); // Basic assertion for expected output if caller_pk == "admin_pk" { - assert_eq!(details.output, Some("Access Granted: Welcome Admin!".to_string())); + assert_eq!( + details.output, + Some("Access Granted: Welcome Admin!".to_string()) + ); } else if caller_pk == "user_pk" { - assert_eq!(details.output, Some("Limited Access: Welcome User!".to_string())); + assert_eq!( + details.output, + Some("Limited Access: Welcome User!".to_string()) + ); } } Err(e) => { diff --git a/examples/example_math_worker.rs b/examples/example_math_worker.rs index abc3bb0..e2e32f4 100644 --- a/examples/example_math_worker.rs +++ b/examples/example_math_worker.rs @@ -2,9 +2,9 @@ use rhai::Engine; use rhai_client::RhaiClient; // To submit tasks use uuid::Uuid; // For generating task_id +use rhailib_worker::spawn_rhai_worker; use std::time::Duration; use tokio::time::sleep; -use rhailib_worker::spawn_rhai_worker; // Custom function for Rhai fn add(a: i64, b: i64) -> i64 { @@ -47,26 +47,36 @@ async fn main() -> Result<(), Box> { "#; log::info!("Submitting math script to 'math_circle' and awaiting result..."); - + let timeout_duration = Duration::from_secs(10); let task_id = Uuid::new_v4().to_string(); - match client.submit_script_and_await_result( - "math_circle", - script_content.to_string(), - task_id, // Pass the generated task_id - timeout_duration, - None - ).await { + match client + .submit_script_and_await_result( + "math_circle", + script_content.to_string(), + task_id, // Pass the generated task_id + timeout_duration, + None, + ) + .await + { Ok(details) => { - log::info!("Math Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}", - details.status, details.output, details.error); + log::info!( + "Math Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}", + details.status, + details.output, + details.error + ); if details.status == "completed" { assert_eq!(details.output, Some("42".to_string())); log::info!("Math Worker Example: Assertion for output 42 passed!"); Ok(()) } else { - log::error!("Math Worker Example: Task completed with error: {:?}", details.error); + log::error!( + "Math Worker Example: Task completed with error: {:?}", + details.error + ); Err(format!("Task failed with error: {:?}", details.error).into()) } } @@ -75,4 +85,4 @@ async fn main() -> Result<(), Box> { Err(e.into()) } } -} \ No newline at end of file +} diff --git a/examples/example_string_worker.rs b/examples/example_string_worker.rs index ef992e1..63b0b89 100644 --- a/examples/example_string_worker.rs +++ b/examples/example_string_worker.rs @@ -2,9 +2,9 @@ use rhai::Engine; use rhai_client::RhaiClient; // To submit tasks use uuid::Uuid; // For generating task_id +use rhailib_worker::spawn_rhai_worker; use std::time::Duration; use tokio::time::sleep; -use rhailib_worker::spawn_rhai_worker; // Custom function for Rhai fn reverse_string(s: String) -> String { @@ -51,22 +51,32 @@ async fn main() -> Result<(), Box> { let timeout_duration = Duration::from_secs(10); let task_id = Uuid::new_v4().to_string(); - match client.submit_script_and_await_result( - "string_circle", - script_content.to_string(), - task_id, // Pass the generated task_id - timeout_duration, - None - ).await { + match client + .submit_script_and_await_result( + "string_circle", + script_content.to_string(), + task_id, // Pass the generated task_id + timeout_duration, + None, + ) + .await + { Ok(details) => { - log::info!("String Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}", - details.status, details.output, details.error); + log::info!( + "String Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}", + details.status, + details.output, + details.error + ); if details.status == "completed" { assert_eq!(details.output, Some("dlrow olleh".to_string())); log::info!("String Worker Example: Assertion for output \"dlrow olleh\" passed!"); Ok(()) } else { - log::error!("String Worker Example: Task completed with error: {:?}", details.error); + log::error!( + "String Worker Example: Task completed with error: {:?}", + details.error + ); Err(format!("Task failed with error: {:?}", details.error).into()) } } @@ -75,4 +85,4 @@ async fn main() -> Result<(), Box> { Err(e.into()) } } -} \ No newline at end of file +} diff --git a/src/client/examples/timeout_example.rs b/src/client/examples/timeout_example.rs index a768f6a..78e2198 100644 --- a/src/client/examples/timeout_example.rs +++ b/src/client/examples/timeout_example.rs @@ -1,10 +1,12 @@ -use rhai_client::{RhaiClient, RhaiTaskDetails}; // Assuming RhaiTaskDetails might be part of the success path, though we expect error -use std::time::Duration; use log::info; +use rhai_client::RhaiClient; // Assuming RhaiTaskDetails might be part of the success path, though we expect error +use std::time::Duration; #[tokio::main] async fn main() -> Result<(), Box> { - env_logger::builder().filter_level(log::LevelFilter::Info).init(); + env_logger::builder() + .filter_level(log::LevelFilter::Info) + .init(); let client = RhaiClient::new("redis://127.0.0.1/")?; info!("RhaiClient created."); @@ -53,8 +55,17 @@ async fn main() -> Result<(), Box> { info!("Timeout Example PASSED: Correctly received RhaiClientError::Timeout for task_id: {}", task_id); // Ensure the elapsed time is close to the timeout duration // Allow for some buffer for processing - assert!(elapsed >= very_short_timeout && elapsed < very_short_timeout + Duration::from_secs(1), "Elapsed time {:?} should be close to timeout {:?}", elapsed, very_short_timeout); - info!("Elapsed time {:?} is consistent with timeout duration {:?}.", elapsed, very_short_timeout); + assert!( + elapsed >= very_short_timeout + && elapsed < very_short_timeout + Duration::from_secs(1), + "Elapsed time {:?} should be close to timeout {:?}", + elapsed, + very_short_timeout + ); + info!( + "Elapsed time {:?} is consistent with timeout duration {:?}.", + elapsed, very_short_timeout + ); Ok(()) } other_error => { @@ -62,7 +73,11 @@ async fn main() -> Result<(), Box> { "Timeout Example FAILED: Expected RhaiClientError::Timeout, but got other error: {:?}", other_error ); - Err(format!("Expected RhaiClientError::Timeout, got other error: {:?}", other_error).into()) + Err(format!( + "Expected RhaiClientError::Timeout, got other error: {:?}", + other_error + ) + .into()) } } } diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 184ab82..4ec2c5c 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -1,8 +1,8 @@ use chrono::Utc; -use log::{debug, info, warn, error}; // Added error +use log::{debug, error, info, warn}; // Added error use redis::AsyncCommands; -use std::time::Duration; // Duration is still used, Instant and sleep were removed use serde::{Deserialize, Serialize}; +use std::time::Duration; // Duration is still used, Instant and sleep were removed use uuid::Uuid; const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:"; @@ -33,7 +33,7 @@ pub struct RhaiTaskDetails { pub enum RhaiClientError { RedisError(redis::RedisError), SerializationError(serde_json::Error), - Timeout(String), // task_id that timed out + Timeout(String), // task_id that timed out TaskNotFound(String), // task_id not found after submission (should be rare) } @@ -54,8 +54,12 @@ impl std::fmt::Display for RhaiClientError { match self { RhaiClientError::RedisError(e) => write!(f, "Redis error: {}", e), RhaiClientError::SerializationError(e) => write!(f, "Serialization error: {}", e), - RhaiClientError::Timeout(task_id) => write!(f, "Timeout waiting for task {} to complete", task_id), - RhaiClientError::TaskNotFound(task_id) => write!(f, "Task {} not found after submission", task_id), + RhaiClientError::Timeout(task_id) => { + write!(f, "Timeout waiting for task {} to complete", task_id) + } + RhaiClientError::TaskNotFound(task_id) => { + write!(f, "Task {} not found after submission", task_id) + } } } } @@ -69,7 +73,9 @@ pub struct RhaiClient { impl RhaiClient { pub fn new(redis_url: &str) -> Result { let client = redis::Client::open(redis_url)?; - Ok(Self { redis_client: client }) + Ok(Self { + redis_client: client, + }) } // Internal helper to submit script details and push to work queue @@ -84,9 +90,13 @@ impl RhaiClient { public_key: Option, ) -> Result<(), RhaiClientError> { let now = Utc::now(); - + let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); - let worker_queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_name.replace(" ", "_").to_lowercase()); + let worker_queue_key = format!( + "{}{}", + REDIS_QUEUE_PREFIX, + circle_name.replace(" ", "_").to_lowercase() + ); debug!( "Preparing task_id: {} for circle: {} to worker_queue: {}. Script: {}, replyToQueue: {:?}, publicKey: {:?}", @@ -95,18 +105,20 @@ impl RhaiClient { let mut hset_args: Vec<(String, String)> = vec![ ("taskId".to_string(), task_id.to_string()), // Add taskId - ("script".to_string(), script), // script is moved here + ("script".to_string(), script), // script is moved here ("status".to_string(), "pending".to_string()), ("createdAt".to_string(), now.to_rfc3339()), ("updatedAt".to_string(), now.to_rfc3339()), ]; // clientRpcId field and its corresponding hset_args logic are removed. - - if let Some(queue_name) = &reply_to_queue_name { // Use the passed parameter + + if let Some(queue_name) = &reply_to_queue_name { + // Use the passed parameter hset_args.push(("replyToQueue".to_string(), queue_name.clone())); } - if let Some(pk) = &public_key { // Use the passed parameter + if let Some(pk) = &public_key { + // Use the passed parameter hset_args.push(("publicKey".to_string(), pk.clone())); } @@ -116,15 +128,14 @@ impl RhaiClient { // Simpler: // Explicitly type K, F, V for hset_multiple if inference is problematic. // RV (return value of the command itself) is typically () for HSET type commands. - conn.hset_multiple::<_, _, _, ()>(&task_key, &hset_args).await?; - + conn.hset_multiple::<_, _, _, ()>(&task_key, &hset_args) + .await?; // lpush also infers its types, RV is typically i64 (length of list) or () depending on exact command variant // For `redis::AsyncCommands::lpush`, it's `RedisResult` where R: FromRedisValue // Often this is the length of the list. Let's allow inference or specify if needed. let _: redis::RedisResult = conn.lpush(&worker_queue_key, task_id).await; - Ok(()) } @@ -133,13 +144,15 @@ impl RhaiClient { &self, circle_name: &str, script: String, - // client_rpc_id: Option is removed public_key: Option, ) -> Result { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; let task_id = Uuid::new_v4().to_string(); // task_id is generated here for fire-and-forget - debug!("Client submitting script (fire-and-forget) with new task_id: {} to circle: {}", task_id, circle_name); + debug!( + "Client submitting script (fire-and-forget) with new task_id: {} to circle: {}", + task_id, circle_name + ); self.submit_script_to_worker_queue( &mut conn, @@ -157,11 +170,15 @@ impl RhaiClient { // Optional: A method to check task status, similar to what circle_server_ws polling does. // This could be useful for a client that wants to poll for results itself. - pub async fn get_task_status(&self, task_id: &str) -> Result, RhaiClientError> { + pub async fn get_task_status( + &self, + task_id: &str, + ) -> Result, RhaiClientError> { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); - - let result_map: Option> = conn.hgetall(&task_key).await?; + + let result_map: Option> = + conn.hgetall(&task_key).await?; match result_map { Some(map) => { @@ -204,7 +221,7 @@ impl RhaiClient { } } else { warn!("Task {}: 'taskId' field missing from Redis hash.", task_id); - } + } Ok(Some(details)) } None => Ok(None), @@ -222,8 +239,7 @@ impl RhaiClient { ) -> Result { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; // let task_id = Uuid::new_v4().to_string(); // Removed, task_id is a parameter - let reply_to_queue_name = - format!("{}{}", REDIS_REPLY_QUEUE_PREFIX, task_id); // Derived from the passed task_id + let reply_to_queue_name = format!("{}{}", REDIS_REPLY_QUEUE_PREFIX, task_id); // Derived from the passed task_id self.submit_script_to_worker_queue( &mut conn, @@ -246,8 +262,14 @@ impl RhaiClient { // BLPOP on the reply queue // The timeout for BLPOP is in seconds (integer) let blpop_timeout_secs = timeout.as_secs().max(1); // Ensure at least 1 second for BLPOP timeout - - match conn.blpop::<&String, Option<(String, String)>>(&reply_to_queue_name, blpop_timeout_secs as f64).await { + + match conn + .blpop::<&String, Option<(String, String)>>( + &reply_to_queue_name, + blpop_timeout_secs as f64, + ) + .await + { Ok(Some((_queue, result_message_str))) => { // Attempt to deserialize the result message into RhaiTaskDetails or a similar structure // For now, we assume the worker sends back a JSON string of RhaiTaskDetails @@ -267,21 +289,29 @@ impl RhaiClient { } Err(e) => { error!("Task {}: Failed to deserialize result message from reply queue: {}. Message: {}", task_id, e, result_message_str); - // Optionally, delete the reply queue + // Optionally, delete the reply queue let _: redis::RedisResult = conn.del(&reply_to_queue_name).await; Err(RhaiClientError::SerializationError(e)) } } } - Ok(None) => { // BLPOP timed out - warn!("Timeout waiting for result on reply queue {} for task {}", reply_to_queue_name, task_id); - // Optionally, delete the reply queue + Ok(None) => { + // BLPOP timed out + warn!( + "Timeout waiting for result on reply queue {} for task {}", + reply_to_queue_name, task_id + ); + // Optionally, delete the reply queue let _: redis::RedisResult = conn.del(&reply_to_queue_name).await; Err(RhaiClientError::Timeout(task_id)) } - Err(e) => { // Redis error - error!("Redis error on BLPOP for reply queue {}: {}", reply_to_queue_name, e); - // Optionally, delete the reply queue + Err(e) => { + // Redis error + error!( + "Redis error on BLPOP for reply queue {}: {}", + reply_to_queue_name, e + ); + // Optionally, delete the reply queue let _: redis::RedisResult = conn.del(&reply_to_queue_name).await; Err(RhaiClientError::RedisError(e)) } diff --git a/src/engine/build.rs b/src/engine/build.rs index dfac228..8b8ff97 100644 --- a/src/engine/build.rs +++ b/src/engine/build.rs @@ -1,16 +1,16 @@ fn main() { // Tell Cargo to re-run this build script if the calendar/rhai.rs file changes println!("cargo:rerun-if-changed=../heromodels/src/models/calendar/rhai.rs"); - + // Tell Cargo to re-run this build script if the flow/rhai.rs file changes println!("cargo:rerun-if-changed=../heromodels/src/models/flow/rhai.rs"); - + // Tell Cargo to re-run this build script if the legal/rhai.rs file changes println!("cargo:rerun-if-changed=../heromodels/src/models/legal/rhai.rs"); - + // Tell Cargo to re-run this build script if the projects/rhai.rs file changes println!("cargo:rerun-if-changed=../heromodels/src/models/projects/rhai.rs"); - + // Tell Cargo to re-run this build script if the biz/rhai.rs file changes println!("cargo:rerun-if-changed=../heromodels/src/models/biz/rhai.rs"); } diff --git a/src/engine/examples/calendar/example.rs b/src/engine/examples/calendar/example.rs index 4f0c8da..d426517 100644 --- a/src/engine/examples/calendar/example.rs +++ b/src/engine/examples/calendar/example.rs @@ -1,36 +1,33 @@ -use std::sync::Arc; -use std::path::Path; -use rhai::{Engine, Scope}; -use heromodels::models::calendar::{Calendar, Event, Attendee, AttendanceStatus}; -use engine::{create_heromodels_engine, eval_file}; use engine::mock_db::{create_mock_db, seed_mock_db}; +use engine::{create_heromodels_engine, eval_file}; +use rhai::Engine; fn main() -> Result<(), Box> { println!("Calendar Rhai Example"); println!("====================="); - + // Create a mock database let db = create_mock_db(); - + // Seed the database with some initial data seed_mock_db(db.clone()); - + // Create the Rhai engine using our central engine creator let mut engine = create_heromodels_engine(db.clone()); - + // Register timestamp helper functions register_timestamp_helpers(&mut engine); - + // Get the path to the script let manifest_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); let script_path = manifest_dir .join("examples") .join("calendar") .join("calendar_script.rhai"); - + println!("\nRunning script: {}", script_path.display()); println!("---------------------"); - + // Run the script match eval_file(&engine, &script_path) { Ok(result) => { @@ -39,29 +36,32 @@ fn main() -> Result<(), Box> { } println!("\nScript executed successfully!"); Ok(()) - }, + } Err(err) => { eprintln!("\nError running script: {}", err); - Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))) + Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + err.to_string(), + ))) } } } // Register timestamp helper functions with the engine fn register_timestamp_helpers(engine: &mut Engine) { - use chrono::{DateTime, Utc, TimeZone, NaiveDateTime}; - + use chrono::{TimeZone, Utc}; + // Function to get current timestamp - engine.register_fn("timestamp_now", || { - Utc::now().timestamp() as i64 - }); - + engine.register_fn("timestamp_now", || Utc::now().timestamp() as i64); + // Function to format a timestamp engine.register_fn("format_timestamp", |ts: i64| { - let dt = Utc.timestamp_opt(ts, 0).single() + let dt = Utc + .timestamp_opt(ts, 0) + .single() .expect("Invalid timestamp"); dt.format("%Y-%m-%d %H:%M:%S UTC").to_string() }); - + println!("Timestamp helper functions registered successfully."); } diff --git a/src/engine/examples/finance/example.rs b/src/engine/examples/finance/example.rs index 823a4a0..cc442da 100644 --- a/src/engine/examples/finance/example.rs +++ b/src/engine/examples/finance/example.rs @@ -1,37 +1,33 @@ -use std::sync::Arc; -use std::path::Path; -use rhai::{Engine, Scope}; -use heromodels::models::finance::account::Account; -use heromodels::models::finance::asset::{Asset, AssetType}; -use heromodels::models::finance::marketplace::{Listing, Bid, ListingStatus, ListingType, BidStatus}; -use engine::{create_heromodels_engine, eval_file}; use engine::mock_db::{create_mock_db, seed_mock_db}; +use engine::{create_heromodels_engine, eval_file}; +use rhai::Engine; +use std::path::Path; fn main() -> Result<(), Box> { println!("Finance Rhai Example"); println!("==================="); - + // Create a mock database let db = create_mock_db(); - + // Seed the database with some initial data seed_mock_db(db.clone()); - + // Create the Rhai engine using our central engine creator let mut engine = create_heromodels_engine(db.clone()); - + // Register timestamp helper functions register_timestamp_helpers(&mut engine); - + // Get the path to the script let script_path = Path::new(file!()) .parent() .unwrap() .join("finance_script.rhai"); - + println!("\nRunning script: {}", script_path.display()); println!("---------------------"); - + // Run the script match eval_file(&engine, &script_path) { Ok(result) => { @@ -40,29 +36,32 @@ fn main() -> Result<(), Box> { } println!("\nScript executed successfully!"); Ok(()) - }, + } Err(err) => { eprintln!("\nError running script: {}", err); - Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))) + Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + err.to_string(), + ))) } } } // Register timestamp helper functions with the engine fn register_timestamp_helpers(engine: &mut Engine) { - use chrono::{DateTime, Utc, TimeZone, NaiveDateTime}; - + use chrono::{TimeZone, Utc}; + // Function to get current timestamp - engine.register_fn("timestamp_now", || { - Utc::now().timestamp() as i64 - }); - + engine.register_fn("timestamp_now", || Utc::now().timestamp() as i64); + // Function to format a timestamp engine.register_fn("format_timestamp", |ts: i64| { - let dt = Utc.timestamp_opt(ts, 0).single() + let dt = Utc + .timestamp_opt(ts, 0) + .single() .expect("Invalid timestamp"); dt.format("%Y-%m-%d %H:%M:%S UTC").to_string() }); - + println!("Timestamp helper functions registered successfully."); } diff --git a/src/engine/examples/flow/example.rs b/src/engine/examples/flow/example.rs index a6d72f8..96a9bce 100644 --- a/src/engine/examples/flow/example.rs +++ b/src/engine/examples/flow/example.rs @@ -1,32 +1,32 @@ -use std::path::Path; -use rhai::{Scope}; -use heromodels::models::flow::{Flow, FlowStep, SignatureRequirement}; -use engine::{create_heromodels_engine, eval_file}; use engine::mock_db::{create_mock_db, seed_mock_db}; +use engine::{create_heromodels_engine, eval_file}; +use heromodels::models::flow::{Flow, FlowStep, SignatureRequirement}; use heromodels_core::Model; +use rhai::Scope; +use std::path::Path; fn main() -> Result<(), Box> { println!("Flow Rhai Example"); println!("================="); - + // Create a mock database let db = create_mock_db(); - + // Seed the database with initial data seed_mock_db(db.clone()); - + // Create the Rhai engine with all modules registered let engine = create_heromodels_engine(db.clone()); - + // Get the path to the script let script_path = Path::new(file!()) .parent() .unwrap() .join("flow_script.rhai"); - + println!("\nRunning script: {}", script_path.display()); println!("---------------------"); - + // Run the script match eval_file(&engine, &script_path.to_string_lossy()) { Ok(result) => { @@ -34,87 +34,126 @@ fn main() -> Result<(), Box> { println!("\nScript returned: {:?}", result); } println!("\nScript executed successfully!"); - }, + } Err(err) => { eprintln!("\nError running script: {}", err); - return Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))); + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + err.to_string(), + ))); } } - + // Demonstrate direct Rust interaction with the Rhai-exposed flow functionality println!("\nDirect Rust interaction with Rhai-exposed flow functionality"); println!("----------------------------------------------------------"); - + // Create a new scope let mut scope = Scope::new(); - + // Create a new flow using the Rhai function let result = engine.eval::("new_flow(0, \"Direct Rust Flow\")"); match result { Ok(mut flow) => { - println!("Created flow from Rust: {} (ID: {})", flow.name, flow.get_id()); - + println!( + "Created flow from Rust: {} (ID: {})", + flow.name, + flow.get_id() + ); + // Set flow status using the builder pattern flow = flow.status("active".to_string()); println!("Set flow status to: {}", flow.status); - + // Create a new flow step using the Rhai function let result = engine.eval::("new_flow_step(0, 1)"); - + match result { Ok(mut step) => { - println!("Created flow step from Rust: Step Order {} (ID: {})", - step.step_order, step.get_id()); - + println!( + "Created flow step from Rust: Step Order {} (ID: {})", + step.step_order, + step.get_id() + ); + // Set step description step = step.description("Direct Rust Step".to_string()); - println!("Set step description to: {}", - step.description.clone().unwrap_or_else(|| "None".to_string())); - + println!( + "Set step description to: {}", + step.description + .clone() + .unwrap_or_else(|| "None".to_string()) + ); + // Create a signature requirement using the Rhai function let result = engine.eval::( "new_signature_requirement(0, 1, \"Direct Rust Signer\", \"Please sign this document\")" ); - + match result { Ok(req) => { - println!("Created signature requirement from Rust: Public Key {} (ID: {})", - req.public_key, req.get_id()); - + println!( + "Created signature requirement from Rust: Public Key {} (ID: {})", + req.public_key, + req.get_id() + ); + // Add the step to the flow using the builder pattern flow = flow.add_step(step); - println!("Added step to flow. Flow now has {} steps", flow.steps.len()); - + println!( + "Added step to flow. Flow now has {} steps", + flow.steps.len() + ); + // Save the flow to the database using the Rhai function let save_flow_script = "fn save_it(f) { return db::save_flow(f); }"; let save_flow_ast = engine.compile(save_flow_script).unwrap(); - let result = engine.call_fn::(&mut scope, &save_flow_ast, "save_it", (flow,)); + let result = engine.call_fn::( + &mut scope, + &save_flow_ast, + "save_it", + (flow,), + ); match result { Ok(saved_flow) => { - println!("Saved flow to database with ID: {}", saved_flow.get_id()); - }, + println!( + "Saved flow to database with ID: {}", + saved_flow.get_id() + ); + } Err(err) => eprintln!("Error saving flow: {}", err), } - + // Save the signature requirement to the database using the Rhai function - let save_req_script = "fn save_it(r) { return db::save_signature_requirement(r); }"; + let save_req_script = + "fn save_it(r) { return db::save_signature_requirement(r); }"; let save_req_ast = engine.compile(save_req_script).unwrap(); - let result = engine.call_fn::(&mut scope, &save_req_ast, "save_it", (req,)); + let result = engine.call_fn::( + &mut scope, + &save_req_ast, + "save_it", + (req,), + ); match result { Ok(saved_req) => { - println!("Saved signature requirement to database with ID: {}", saved_req.get_id()); - }, - Err(err) => eprintln!("Error saving signature requirement: {}", err), + println!( + "Saved signature requirement to database with ID: {}", + saved_req.get_id() + ); + } + Err(err) => { + eprintln!("Error saving signature requirement: {}", err) + } } - }, + } Err(err) => eprintln!("Error creating signature requirement: {}", err), } - }, + } Err(err) => eprintln!("Error creating flow step: {}", err), } - }, + } Err(err) => eprintln!("Error creating flow: {}", err), } - + Ok(()) } diff --git a/src/engine/src/lib.rs b/src/engine/src/lib.rs index 32512c2..45f13f3 100644 --- a/src/engine/src/lib.rs +++ b/src/engine/src/lib.rs @@ -1,4 +1,4 @@ -use rhai::{Engine, AST, Scope, EvalAltResult}; // Added EvalAltResult +use rhai::{Engine, EvalAltResult, Scope, AST}; // Added EvalAltResult use std::sync::Arc; // use std::sync::Mutex; // Unused // use std::collections::HashMap; // Unused @@ -11,16 +11,16 @@ pub mod mock_db; pub fn create_heromodels_engine(db: Arc) -> Engine { let mut engine = Engine::new(); - + // Configure engine settings engine.set_max_expr_depths(128, 128); engine.set_max_string_size(10 * 1024 * 1024); // 10 MB - engine.set_max_array_size(10 * 1024); // 10K elements - engine.set_max_map_size(10 * 1024); // 10K elements - + engine.set_max_array_size(10 * 1024); // 10K elements + engine.set_max_map_size(10 * 1024); // 10K elements + // Register all heromodels Rhai modules register_all_modules(&mut engine, db); - + engine } @@ -33,47 +33,49 @@ pub fn register_all_modules(engine: &mut Engine, db: Arc) { heromodels::models::contact::register_contact_rhai_module(engine, db.clone()); heromodels::models::library::register_library_rhai_module(engine, db.clone()); heromodels::models::circle::register_circle_rhai_module(engine, db.clone()); - + // Register the flow module if the feature is enabled #[cfg(feature = "flow")] heromodels::models::flow::register_flow_rhai_module(engine, db.clone()); - + // // Register the finance module if the feature is enabled // #[cfg(feature = "finance")] // heromodels::models::finance::register_finance_rhai_module(engine, db.clone()); - + // Register the legal module if the feature is enabled #[cfg(feature = "legal")] heromodels::models::legal::register_legal_rhai_module(engine, db.clone()); - + // Register the projects module if the feature is enabled #[cfg(feature = "projects")] heromodels::models::projects::register_projects_rhai_module(engine, db.clone()); - + // Register the biz module if the feature is enabled #[cfg(feature = "biz")] heromodels::models::biz::register_biz_rhai_module(engine, db.clone()); - + println!("Heromodels Rhai modules registered successfully."); } /// Evaluate a Rhai script string -pub fn eval_script(engine: &Engine, script: &str) -> Result> { +pub fn eval_script( + engine: &Engine, + script: &str, +) -> Result> { engine.eval::(script) } /// Evaluate a Rhai script file -pub fn eval_file(engine: &Engine, file_path: &Path) -> Result> { +pub fn eval_file( + engine: &Engine, + file_path: &Path, +) -> Result> { match fs::read_to_string(file_path) { - Ok(script_content) => { - engine.eval::(&script_content) - } - Err(io_err) => { - Err(Box::new(EvalAltResult::ErrorSystem( - format!("Failed to read script file: {}", file_path.display()), - Box::new(io_err), - ))) - } + Ok(script_content) => engine.eval::(&script_content), + Err(io_err) => Err(Box::new(EvalAltResult::ErrorSystem( + format!("Failed to read script file: {}", file_path.display()), + Box::new(io_err), + ))), } } @@ -83,6 +85,10 @@ pub fn compile_script(engine: &Engine, script: &str) -> Result Result> { +pub fn run_ast( + engine: &Engine, + ast: &AST, + scope: &mut Scope, +) -> Result> { engine.eval_ast_with_scope(scope, ast) } diff --git a/src/engine/src/mock_db.rs b/src/engine/src/mock_db.rs index 8032f79..11b458b 100644 --- a/src/engine/src/mock_db.rs +++ b/src/engine/src/mock_db.rs @@ -1,36 +1,37 @@ -use std::sync::Arc; -use std::env; -use heromodels::db::hero::OurDB; -use heromodels::db::{Db, Collection}; // Import both Db and Collection traits -use heromodels::models::calendar::{Calendar, Event, Attendee, AttendanceStatus}; -use heromodels_core::Model; // Import Model trait to use build method use chrono::Utc; -use heromodels::models::userexample::User; +use heromodels::db::hero::OurDB; +use heromodels::db::{Collection, Db}; // Import both Db and Collection traits +use heromodels::models::calendar::{Calendar, Event}; +use heromodels_core::Model; // Import Model trait to use build method +use std::env; +use std::sync::Arc; // Import finance models use heromodels::models::finance::account::Account; use heromodels::models::finance::asset::{Asset, AssetType}; -use heromodels::models::finance::marketplace::{Listing, Bid, ListingStatus, ListingType, BidStatus}; +use heromodels::models::finance::marketplace::{Listing, ListingType}; // Conditionally import other modules based on features #[cfg(feature = "flow")] use heromodels::models::flow::{Flow, FlowStep, SignatureRequirement}; #[cfg(feature = "legal")] -use heromodels::models::legal::{Contract, ContractRevision, ContractSigner, ContractStatus, SignerStatus}; +use heromodels::models::legal::{ + Contract, ContractRevision, ContractSigner, ContractStatus, SignerStatus, +}; #[cfg(feature = "projects")] -use heromodels::models::projects::{Project, Status as ProjectStatus, Priority, ItemType}; +use heromodels::models::projects::{ItemType, Priority, Project, Status as ProjectStatus}; /// Create a mock in-memory database for examples pub fn create_mock_db() -> Arc { // Create a temporary directory for the database files let temp_dir = env::temp_dir().join("engine_examples"); std::fs::create_dir_all(&temp_dir).expect("Failed to create temp directory"); - + // Create a new OurDB instance with reset=true to ensure it's clean let db = OurDB::new(temp_dir, true).expect("Failed to create OurDB instance"); - + Arc::new(db) } @@ -38,22 +39,22 @@ pub fn create_mock_db() -> Arc { pub fn seed_mock_db(db: Arc) { // Seed calendar data seed_calendar_data(db.clone()); - + // Seed finance data seed_finance_data(db.clone()); - + // Seed flow data if the feature is enabled #[cfg(feature = "flow")] seed_flow_data(db.clone()); - + // Seed legal data if the feature is enabled #[cfg(feature = "legal")] seed_legal_data(db.clone()); - + // Seed projects data if the feature is enabled #[cfg(feature = "projects")] seed_projects_data(db.clone()); - + println!("Mock database seeded with initial data for all enabled modules."); } @@ -62,18 +63,18 @@ fn seed_calendar_data(db: Arc) { // Create a calendar let mut calendar = Calendar::new(None, "Work Calendar".to_string()); calendar.description = Some("My work schedule".to_string()); - - // Store the calendar in the database - let (calendar_id, updated_calendar) = db.collection::() - .expect("Failed to get Calendar collection") - .set(&calendar) - .expect("Failed to store calendar"); - + + // Store the calendar in the database + let (calendar_id, updated_calendar) = db + .collection::() + .expect("Failed to get Calendar collection") + .set(&calendar) + .expect("Failed to store calendar"); // Create an event let now = Utc::now().timestamp(); let end_time = now + 3600; // Add 1 hour in seconds - + // Use the builder pattern for Event let event = Event::new() .title("Team Meeting".to_string()) @@ -83,36 +84,44 @@ fn seed_calendar_data(db: Arc) { // .add_attendee(Attendee::new(1)) // .add_attendee(Attendee::new(2)) .build(); - + // // Add attendees to the event using the builder pattern // let attendee1 = Attendee::new(1); // let attendee2 = Attendee::new(2); - + // // Add attendees using the builder pattern // event = event.add_attendee(attendee1); // event = event.add_attendee(attendee2); - + // Call build and capture the returned value // let event = event.build(); // Store the event in the database first to get its ID - let (event_id, updated_event) = db.collection() + let (event_id, updated_event) = db + .collection() .expect("Failed to get Event collection") .set(&event) .expect("Failed to store event"); - + // Add the event ID to the calendar calendar = calendar.add_event(event_id as i64); - + // Store the calendar in the database - let (calendar_id, updated_calendar) = db.collection::() + let (calendar_id, updated_calendar) = db + .collection::() .expect("Failed to get Calendar collection") .set(&calendar) .expect("Failed to store calendar"); - + println!("Mock database seeded with calendar data:"); - println!(" - Added calendar: {} (ID: {})", updated_calendar.name, updated_calendar.base_data.id); - println!(" - Added event: {} (ID: {})", updated_event.title, updated_event.base_data.id); + println!( + " - Added calendar: {} (ID: {})", + updated_calendar.name, updated_calendar.base_data.id + ); + println!( + " - Added event: {} (ID: {})", + updated_event.title, updated_event.base_data.id + ); } /// Seed the mock database with flow data @@ -120,50 +129,68 @@ fn seed_calendar_data(db: Arc) { fn seed_flow_data(db: Arc) { // Create a flow let mut flow = Flow::new(0, "Document Approval".to_string()); - + // Set flow properties using the builder pattern flow = flow.status("draft".to_string()); flow = flow.name("Document Approval Flow".to_string()); - + // Create flow steps let mut step1 = FlowStep::new(0, 1); step1 = step1.description("Initial review by legal team".to_string()); step1 = step1.status("pending".to_string()); - + let mut step2 = FlowStep::new(0, 2); step2 = step2.description("Approval by department head".to_string()); step2 = step2.status("pending".to_string()); - + // Add signature requirements - let mut req1 = SignatureRequirement::new(0, 1, "Legal Team".to_string(), "Please review this document".to_string()); - let mut req2 = SignatureRequirement::new(0, 2, "Department Head".to_string(), "Please approve this document".to_string()); - + let mut req1 = SignatureRequirement::new( + 0, + 1, + "Legal Team".to_string(), + "Please review this document".to_string(), + ); + let mut req2 = SignatureRequirement::new( + 0, + 2, + "Department Head".to_string(), + "Please approve this document".to_string(), + ); + // Add steps to flow flow = flow.add_step(step1); flow = flow.add_step(step2); - + // Store in the database - let (_, updated_flow) = db.collection::() + let (_, updated_flow) = db + .collection::() .expect("Failed to get Flow collection") .set(&flow) .expect("Failed to store flow"); - + // Store signature requirements in the database - let (_, updated_req1) = db.collection::() + let (_, updated_req1) = db + .collection::() .expect("Failed to get SignatureRequirement collection") .set(&req1) .expect("Failed to store signature requirement"); - - let (_, updated_req2) = db.collection::() + + let (_, updated_req2) = db + .collection::() .expect("Failed to get SignatureRequirement collection") .set(&req2) .expect("Failed to store signature requirement"); - + println!("Mock database seeded with flow data:"); - println!(" - Added flow: {} (ID: {})", updated_flow.name, updated_flow.base_data.id); + println!( + " - Added flow: {} (ID: {})", + updated_flow.name, updated_flow.base_data.id + ); println!(" - Added {} steps", updated_flow.steps.len()); - println!(" - Added signature requirements with IDs: {} and {}", - updated_req1.base_data.id, updated_req2.base_data.id); + println!( + " - Added signature requirements with IDs: {} and {}", + updated_req1.base_data.id, updated_req2.base_data.id + ); } /// Seed the mock database with legal data @@ -173,34 +200,40 @@ fn seed_legal_data(db: Arc) { let mut contract = Contract::new(None, "Service Agreement".to_string()); contract.description = Some("Agreement for software development services".to_string()); contract.status = ContractStatus::Draft; - + // Create a revision let revision = ContractRevision::new( None, "Initial draft".to_string(), "https://example.com/contract/v1".to_string(), ); - + // Create signers let signer1 = ContractSigner::new(None, 1, "Client".to_string()); let signer2 = ContractSigner::new(None, 2, "Provider".to_string()); - + // Add revision and signers to contract contract.add_revision(revision); contract.add_signer(signer1); contract.add_signer(signer2); - + // Store in the database - let (_, updated_contract) = db.collection::() + let (_, updated_contract) = db + .collection::() .expect("Failed to get Contract collection") .set(&contract) .expect("Failed to store contract"); - + println!("Mock database seeded with legal data:"); - println!(" - Added contract: {} (ID: {})", updated_contract.name, updated_contract.base_data.id); - println!(" - Added {} revisions and {} signers", - updated_contract.revisions.len(), - updated_contract.signers.len()); + println!( + " - Added contract: {} (ID: {})", + updated_contract.name, updated_contract.base_data.id + ); + println!( + " - Added {} revisions and {} signers", + updated_contract.revisions.len(), + updated_contract.signers.len() + ); } /// Seed the mock database with projects data @@ -211,25 +244,34 @@ fn seed_projects_data(db: Arc) { project.description = Some("Redesign the company website".to_string()); project.status = ProjectStatus::InProgress; project.priority = Priority::High; - + // Add members and tags project.add_member_id(1); project.add_member_id(2); project.add_tag("design".to_string()); project.add_tag("web".to_string()); - + // Store in the database - let (_, updated_project) = db.collection::() + let (_, updated_project) = db + .collection::() .expect("Failed to get Project collection") .set(&project) .expect("Failed to store project"); - + println!("Mock database seeded with projects data:"); - println!(" - Added project: {} (ID: {})", updated_project.name, updated_project.base_data.id); - println!(" - Status: {}, Priority: {}", updated_project.status, updated_project.priority); - println!(" - Added {} members and {} tags", - updated_project.member_ids.len(), - updated_project.tags.len()); + println!( + " - Added project: {} (ID: {})", + updated_project.name, updated_project.base_data.id + ); + println!( + " - Status: {}, Priority: {}", + updated_project.status, updated_project.priority + ); + println!( + " - Added {} members and {} tags", + updated_project.member_ids.len(), + updated_project.tags.len() + ); } /// Seed the mock database with finance data fn seed_finance_data(db: Arc) { @@ -241,13 +283,14 @@ fn seed_finance_data(db: Arc) { .ledger("ethereum") .address("0x1234567890abcdef1234567890abcdef12345678") .pubkey("0xabcdef1234567890abcdef1234567890abcdef12"); - + // Store the account in the database - let (account_id, updated_account) = db.collection::() + let (account_id, updated_account) = db + .collection::() .expect("Failed to get Account collection") .set(&account) .expect("Failed to store account"); - + // Create an ERC20 token asset let token_asset = Asset::new() .name("HERO Token") @@ -256,13 +299,14 @@ fn seed_finance_data(db: Arc) { .address("0x9876543210abcdef9876543210abcdef98765432") .asset_type(AssetType::Erc20) .decimals(18); - + // Store the token asset in the database - let (token_id, updated_token) = db.collection::() + let (token_id, updated_token) = db + .collection::() .expect("Failed to get Asset collection") .set(&token_asset) .expect("Failed to store token asset"); - + // Create an NFT asset let nft_asset = Asset::new() .name("Herocode #1") @@ -271,23 +315,25 @@ fn seed_finance_data(db: Arc) { .address("0xabcdef1234567890abcdef1234567890abcdef12") .asset_type(AssetType::Erc721) .decimals(0); - + // Store the NFT asset in the database - let (nft_id, updated_nft) = db.collection::() + let (nft_id, updated_nft) = db + .collection::() .expect("Failed to get Asset collection") .set(&nft_asset) .expect("Failed to store NFT asset"); - + // Add assets to the account account = updated_account.add_asset(token_id); account = account.add_asset(nft_id); - + // Update the account in the database - let (_, updated_account) = db.collection::() + let (_, updated_account) = db + .collection::() .expect("Failed to get Account collection") .set(&account) .expect("Failed to store updated account"); - + // Create a listing for the NFT let listing = Listing::new() .seller_id(account_id) @@ -300,16 +346,29 @@ fn seed_finance_data(db: Arc) { .image_url(Some("hcttps://example.com/nft/1.png".to_string())) .add_tag("rare".to_string()) .add_tag("collectible".to_string()); - + // Store the listing in the database - let (listing_id, updated_listing) = db.collection::() + let (listing_id, updated_listing) = db + .collection::() .expect("Failed to get Listing collection") .set(&listing) .expect("Failed to store listing"); - + println!("Mock database seeded with finance data:"); - println!(" - Added account: {} (ID: {})", updated_account.name, updated_account.base_data.id); - println!(" - Added token asset: {} (ID: {})", updated_token.name, updated_token.base_data.id); - println!(" - Added NFT asset: {} (ID: {})", updated_nft.name, updated_nft.base_data.id); - println!(" - Added listing: {} (ID: {})", updated_listing.title, updated_listing.base_data.id); + println!( + " - Added account: {} (ID: {})", + updated_account.name, updated_account.base_data.id + ); + println!( + " - Added token asset: {} (ID: {})", + updated_token.name, updated_token.base_data.id + ); + println!( + " - Added NFT asset: {} (ID: {})", + updated_nft.name, updated_nft.base_data.id + ); + println!( + " - Added listing: {} (ID: {})", + updated_listing.title, updated_listing.base_data.id + ); } diff --git a/src/lib.rs b/src/lib.rs index 2f360a7..3832452 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,8 @@ //! Rhailib - Distributed Rhai Scripting Library -//! +//! //! This library provides infrastructure for executing Rhai scripts in a distributed //! manner using Redis as a message broker and task queue. // Re-export commonly used types pub use redis; -pub use serde_json; \ No newline at end of file +pub use serde_json; diff --git a/src/monitor/src/cli_logic.rs b/src/monitor/src/cli_logic.rs index 5c93000..3bdb20a 100644 --- a/src/monitor/src/cli_logic.rs +++ b/src/monitor/src/cli_logic.rs @@ -7,8 +7,8 @@ use crate::plot; use crate::tasks::{self, RhaiTask}; use redis::{AsyncCommands, Client as RedisClient}; use std::collections::HashMap; -use tokio::time::{sleep, Duration}; use tokio::signal; +use tokio::time::{sleep, Duration}; const REDIS_URL: &str = "redis://127.0.0.1/"; const POLLING_INTERVAL_MILLISECONDS: u64 = 10; // Increased polling interval for SCAN @@ -24,7 +24,10 @@ pub async fn start_monitoring(worker_names: &[String]) -> Result<()> { let ping_result: String = redis::cmd("PING").query_async(&mut con).await?; tracing::info!("Redis PING response: {}", ping_result); - tracing::info!("Starting live monitor. Configured workers: {:?}. Press Ctrl+C to exit.", worker_names); + tracing::info!( + "Starting live monitor. Configured workers: {:?}. Press Ctrl+C to exit.", + worker_names + ); loop { tokio::select! { @@ -75,7 +78,7 @@ pub async fn start_monitoring(worker_names: &[String]) -> Result<()> { .buffer_unordered(10) // Concurrently fetch details for 10 tasks .collect::>() .await; - + all_rhai_tasks.extend(tasks_futures.into_iter().flatten()); cursor = new_cursor; @@ -83,7 +86,7 @@ pub async fn start_monitoring(worker_names: &[String]) -> Result<()> { break; } } - + // Sort tasks by creation date (optional, assuming created_at is parsable) // For simplicity, we'll skip sorting for now as created_at is a string. diff --git a/src/monitor/src/main.rs b/src/monitor/src/main.rs index 347a4ba..9001563 100644 --- a/src/monitor/src/main.rs +++ b/src/monitor/src/main.rs @@ -11,7 +11,6 @@ struct Args { /// List of worker names to monitor, comma-separated #[clap(short, long, value_delimiter = ',', required = true, num_args = 1..)] workers: Vec, - // TODO: Add other options like Redis connection details if not using a config file or env vars. } diff --git a/src/monitor/src/tasks.rs b/src/monitor/src/tasks.rs index 8cf3bc1..0a0482d 100644 --- a/src/monitor/src/tasks.rs +++ b/src/monitor/src/tasks.rs @@ -1,6 +1,6 @@ // rhailib/monitor/src/tasks.rs use anyhow::Result; -use prettytable::{Cell, Row, Table, format}; +use prettytable::{format, Cell, Row, Table}; use std::collections::HashMap; #[derive(Debug, Clone)] @@ -32,7 +32,10 @@ impl RhaiTask { RhaiTask { id: task_id, script: get_opt_string("script"), - status: details.get("status").cloned().unwrap_or_else(|| "unknown".to_string()), + status: details + .get("status") + .cloned() + .unwrap_or_else(|| "unknown".to_string()), created_at: get_opt_string("createdAt"), updated_at: get_opt_string("updatedAt"), client_rpc_id: get_opt_string("clientRpcId"), @@ -66,7 +69,7 @@ pub async fn display_task_table(tasks: &[RhaiTask]) -> Result<()> { for task in tasks { let details_str = match (&task.output, &task.error) { (Some(out), None) => format!("Output: {:.50}", out), // Truncate for display - (None, Some(err)) => format!("Error: {:.50}", err), // Truncate for display + (None, Some(err)) => format!("Error: {:.50}", err), // Truncate for display (Some(out), Some(err)) => format!("Output: {:.30}... Error: {:.30}...", out, err), (None, None) => "N/A".to_string(), }; diff --git a/src/repl/examples/connect_and_play.rs b/src/repl/examples/connect_and_play.rs index f51ff87..698f025 100644 --- a/src/repl/examples/connect_and_play.rs +++ b/src/repl/examples/connect_and_play.rs @@ -1,20 +1,24 @@ +use anyhow::Context; use rhai_client::{RhaiClient, RhaiClientError, RhaiTaskDetails}; use std::env; -use std::time::Duration; use std::sync::Arc; -use anyhow::Context; -use tracing_subscriber::EnvFilter; +use std::time::Duration; use tokio::sync::mpsc; +use tracing_subscriber::EnvFilter; -use worker_lib::spawn_rhai_worker; use engine::create_heromodels_engine; use heromodels::db::hero::OurDB; use std::path::PathBuf; +use worker_lib::spawn_rhai_worker; #[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env().add_directive("connect_and_play=info".parse().unwrap()).add_directive("rhai_client=info".parse().unwrap())) + .with_env_filter( + EnvFilter::from_default_env() + .add_directive("connect_and_play=info".parse().unwrap()) + .add_directive("rhai_client=info".parse().unwrap()), + ) .init(); let args: Vec = env::args().collect(); @@ -41,13 +45,23 @@ async fn main() -> Result<(), Box> { let worker_circle_name_for_task = worker_name.clone(); let db_path_for_task = db_path_str.clone(); - log::info!("[Main] Spawning worker for circle '{}' with DB path '{}'", worker_circle_name_for_task, db_path_for_task); + log::info!( + "[Main] Spawning worker for circle '{}' with DB path '{}'", + worker_circle_name_for_task, + db_path_for_task + ); let worker_join_handle = tokio::spawn(async move { - log::info!("[BG Worker] Starting for circle '{}' on Redis '{}'", worker_circle_name_for_task, worker_redis_url); + log::info!( + "[BG Worker] Starting for circle '{}' on Redis '{}'", + worker_circle_name_for_task, + worker_redis_url + ); // The `reset: true` in OurDB::new handles pre-cleanup if the directory exists. - let db = Arc::new(OurDB::new(&db_path_for_task, true) - .expect("Failed to create temp DB for example worker")); + let db = Arc::new( + OurDB::new(&db_path_for_task, true) + .expect("Failed to create temp DB for example worker"), + ); let mut engine = create_heromodels_engine(db); engine.set_max_operations(0); engine.set_max_expr_depths(0, 0); @@ -59,18 +73,30 @@ async fn main() -> Result<(), Box> { engine, worker_redis_url.clone(), shutdown_rx, // Pass the receiver from main - false, // preserve_tasks - ).await { - log::error!("[BG Worker] Failed to spawn or worker error for circle '{}': {}", worker_circle_name_for_task, e); + false, // preserve_tasks + ) + .await + { + log::error!( + "[BG Worker] Failed to spawn or worker error for circle '{}': {}", + worker_circle_name_for_task, + e + ); } else { - log::info!("[BG Worker] Worker for circle '{}' shut down gracefully.", worker_circle_name_for_task); + log::info!( + "[BG Worker] Worker for circle '{}' shut down gracefully.", + worker_circle_name_for_task + ); } }); // Give the worker a moment to start up tokio::time::sleep(Duration::from_secs(1)).await; - println!("Initializing RhaiClient for Redis at {} to target worker '{}'...", redis_url, worker_name); + println!( + "Initializing RhaiClient for Redis at {} to target worker '{}'...", + redis_url, worker_name + ); let client = RhaiClient::new(&redis_url) .with_context(|| format!("Failed to create RhaiClient for Redis URL: {}", redis_url))?; println!("RhaiClient initialized."); @@ -79,7 +105,10 @@ async fn main() -> Result<(), Box> { println!("\nSending script:\n```rhai\n{}\n```", script); let timeout = Duration::from_secs(30); - match client.submit_script_and_await_result(&worker_name, script.to_string(), None, timeout).await { + match client + .submit_script_and_await_result(&worker_name, script.to_string(), None, timeout) + .await + { Ok(task_details) => { println!("\nWorker response:"); if let Some(ref output) = task_details.output { @@ -89,32 +118,46 @@ async fn main() -> Result<(), Box> { eprintln!("Error: {}", error_msg); } if task_details.output.is_none() && task_details.error.is_none() { - println!("Worker finished with no explicit output or error. Status: {}", task_details.status); + println!( + "Worker finished with no explicit output or error. Status: {}", + task_details.status + ); } } Err(e) => match e { RhaiClientError::Timeout(task_id) => { - eprintln!("\nError: Script execution timed out for task_id: {}.", task_id); + eprintln!( + "\nError: Script execution timed out for task_id: {}.", + task_id + ); } RhaiClientError::RedisError(redis_err) => { - eprintln!("\nError: Redis communication failed: {}. Check Redis connection and server status.", redis_err); + eprintln!( + "\nError: Redis communication failed: {}. Check Redis connection and server status.", + redis_err + ); } RhaiClientError::SerializationError(serde_err) => { - eprintln!("\nError: Failed to serialize/deserialize task data: {}.", serde_err); + eprintln!( + "\nError: Failed to serialize/deserialize task data: {}.", + serde_err + ); } RhaiClientError::TaskNotFound(task_id) => { - eprintln!("\nError: Task {} not found after submission.", task_id); - } - /* All RhaiClientError variants are handled, so _ arm is not strictly needed - unless RhaiClientError becomes non-exhaustive in the future. */ - }, + eprintln!("\nError: Task {} not found after submission.", task_id); + } /* All RhaiClientError variants are handled, so _ arm is not strictly needed + unless RhaiClientError becomes non-exhaustive in the future. */ + }, } println!("\nExample client operations finished. Shutting down worker..."); // Send shutdown signal to the worker if let Err(e) = shutdown_tx.send(()).await { - eprintln!("[Main] Failed to send shutdown signal to worker: {} (worker might have already exited or an error occurred)", e); + eprintln!( + "[Main] Failed to send shutdown signal to worker: {} (worker might have already exited or an error occurred)", + e + ); } // Wait for the worker to finish @@ -126,17 +169,30 @@ async fn main() -> Result<(), Box> { } // Clean up the database directory - log::info!("[Main] Cleaning up database directory: {}", db_path.display()); + log::info!( + "[Main] Cleaning up database directory: {}", + db_path.display() + ); if db_path.exists() { if let Err(e) = std::fs::remove_dir_all(&db_path) { - eprintln!("[Main] Failed to remove database directory '{}': {}", db_path.display(), e); + eprintln!( + "[Main] Failed to remove database directory '{}': {}", + db_path.display(), + e + ); } else { - log::info!("[Main] Successfully removed database directory: {}", db_path.display()); + log::info!( + "[Main] Successfully removed database directory: {}", + db_path.display() + ); } } else { - log::info!("[Main] Database directory '{}' not found, no cleanup needed.", db_path.display()); + log::info!( + "[Main] Database directory '{}' not found, no cleanup needed.", + db_path.display() + ); } - + println!("Example fully completed and cleaned up."); Ok(()) -} \ No newline at end of file +} diff --git a/src/repl/src/main.rs b/src/repl/src/main.rs index 1514146..ded28e4 100644 --- a/src/repl/src/main.rs +++ b/src/repl/src/main.rs @@ -1,13 +1,13 @@ -use tracing_subscriber::EnvFilter; +use anyhow::Context; use rhai_client::{RhaiClient, RhaiClientError, RhaiTaskDetails}; use rustyline::error::ReadlineError; -use rustyline::{DefaultEditor, Config, EditMode}; +use rustyline::{Config, DefaultEditor, EditMode}; +use std::env; use std::fs; use std::process::Command; -use std::env; use std::time::Duration; use tempfile::Builder as TempFileBuilder; -use anyhow::Context; +use tracing_subscriber::EnvFilter; // Default timeout for script execution const DEFAULT_SCRIPT_TIMEOUT_SECONDS: u64 = 30; @@ -17,11 +17,17 @@ async fn execute_script(client: &RhaiClient, circle_name: &str, script_content: println!("Script is empty, not sending."); return; } - println!("Sending script to worker '{}':\n---\n{}\n---", circle_name, script_content); - + println!( + "Sending script to worker '{}':\n---\n{}\n---", + circle_name, script_content + ); + let timeout = Duration::from_secs(DEFAULT_SCRIPT_TIMEOUT_SECONDS); - match client.submit_script_and_await_result(circle_name, script_content, None, timeout).await { + match client + .submit_script_and_await_result(circle_name, script_content, None, timeout) + .await + { Ok(task_details) => { if let Some(output) = &task_details.output { println!("worker: {}", output); @@ -30,36 +36,58 @@ async fn execute_script(client: &RhaiClient, circle_name: &str, script_content: eprintln!("Worker error: {}", error_msg); } if task_details.output.is_none() && task_details.error.is_none() { - println!("Worker finished with no explicit output or error. Status: {}", task_details.status); + println!( + "Worker finished with no explicit output or error. Status: {}", + task_details.status + ); } } Err(e) => match e { RhaiClientError::Timeout(task_id) => { - eprintln!("Error: Script execution timed out for task_id: {}.", task_id); + eprintln!( + "Error: Script execution timed out for task_id: {}.", + task_id + ); } RhaiClientError::RedisError(redis_err) => { - eprintln!("Error: Redis communication failed: {}. Check Redis connection and server status.", redis_err); + eprintln!( + "Error: Redis communication failed: {}. Check Redis connection and server status.", + redis_err + ); } RhaiClientError::SerializationError(serde_err) => { - eprintln!("Error: Failed to serialize/deserialize task data: {}.", serde_err); + eprintln!( + "Error: Failed to serialize/deserialize task data: {}.", + serde_err + ); } RhaiClientError::TaskNotFound(task_id) => { - eprintln!("Error: Task {} not found after submission (this should be rare).", task_id); + eprintln!( + "Error: Task {} not found after submission (this should be rare).", + task_id + ); } - }, } } async fn run_repl(redis_url: String, circle_name: String) -> anyhow::Result<()> { - println!("Initializing Rhai REPL for worker '{}' via Redis at {}...", circle_name, redis_url); + println!( + "Initializing Rhai REPL for worker '{}' via Redis at {}...", + circle_name, redis_url + ); let client = RhaiClient::new(&redis_url) .with_context(|| format!("Failed to create RhaiClient for Redis URL: {}", redis_url))?; // No explicit connect() needed for rhai_client, connection is handled per-operation or pooled. - println!("RhaiClient initialized. Ready to send scripts to worker '{}'.", circle_name); - println!("Type Rhai scripts, '.edit' to use $EDITOR, '.run ' to execute a file, or 'exit'/'quit'."); + println!( + "RhaiClient initialized. Ready to send scripts to worker '{}'.", + circle_name + ); + println!( + "Type Rhai scripts, '.edit' to use $EDITOR, '.run ' to execute a file, or 'exit'/'quit'." + ); println!("Vi mode enabled for input line."); let config = Config::builder() @@ -67,7 +95,7 @@ async fn run_repl(redis_url: String, circle_name: String) -> anyhow::Result<()> .auto_add_history(true) // Automatically add to history .build(); let mut rl = DefaultEditor::with_config(config)?; - + let history_file = ".rhai_repl_history.txt"; // Simple history file in current dir if rl.load_history(history_file).is_err() { // No history found or error loading, not critical @@ -91,7 +119,7 @@ async fn run_repl(redis_url: String, circle_name: String) -> anyhow::Result<()> .suffix(".rhai") .tempfile_in(".") // Create in current directory for simplicity .with_context(|| "Failed to create temp file")?; - + // You can pre-populate the temp file if needed: // use std::io::Write; // Add this import if using write_all // if let Err(e) = temp_file.as_file().write_all(b"// Start your Rhai script here\n") { @@ -100,17 +128,20 @@ async fn run_repl(redis_url: String, circle_name: String) -> anyhow::Result<()> let temp_path = temp_file.path().to_path_buf(); let editor_cmd_str = env::var("EDITOR").unwrap_or_else(|_| "vi".to_string()); - + let mut editor_parts = editor_cmd_str.split_whitespace(); let editor_executable = editor_parts.next().unwrap_or("vi"); // Default to vi if $EDITOR is empty string let editor_args: Vec<&str> = editor_parts.collect(); - println!("Launching editor: '{}' with args: {:?} for script editing. Save and exit editor to execute.", editor_executable, editor_args); - + println!( + "Launching editor: '{}' with args: {:?} for script editing. Save and exit editor to execute.", + editor_executable, editor_args + ); + let mut command = Command::new(editor_executable); command.args(editor_args); // Add any arguments from $EDITOR (like -w) - command.arg(&temp_path); // Add the temp file path as the last argument - + command.arg(&temp_path); // Add the temp file path as the last argument + let status = command.status(); match status { @@ -119,11 +150,19 @@ async fn run_repl(redis_url: String, circle_name: String) -> anyhow::Result<()> Ok(script_content) => { execute_script(&client, &circle_name, script_content).await; } - Err(e) => eprintln!("Error reading temp file {:?}: {}", temp_path, e), + Err(e) => { + eprintln!("Error reading temp file {:?}: {}", temp_path, e) + } } } - Ok(exit_status) => eprintln!("Editor exited with status: {}. Script not executed.", exit_status), - Err(e) => eprintln!("Failed to launch editor '{}': {}. Ensure it's in your PATH.", editor_executable, e), // Changed 'editor' to 'editor_executable' + Ok(exit_status) => eprintln!( + "Editor exited with status: {}. Script not executed.", + exit_status + ), + Err(e) => eprintln!( + "Failed to launch editor '{}': {}. Ensure it's in your PATH.", + editor_executable, e + ), // Changed 'editor' to 'editor_executable' } // temp_file is automatically deleted when it goes out of scope } else if input.starts_with(".run ") || input.starts_with("run ") { @@ -145,11 +184,13 @@ async fn run_repl(redis_url: String, circle_name: String) -> anyhow::Result<()> } // rl.add_history_entry(line.as_str()) is handled by auto_add_history(true) } - Err(ReadlineError::Interrupted) => { // Ctrl-C + Err(ReadlineError::Interrupted) => { + // Ctrl-C println!("Input interrupted. Type 'exit' or 'quit' to close."); continue; } - Err(ReadlineError::Eof) => { // Ctrl-D + Err(ReadlineError::Eof) => { + // Ctrl-D println!("Exiting REPL (EOF)."); break; } @@ -159,7 +200,7 @@ async fn run_repl(redis_url: String, circle_name: String) -> anyhow::Result<()> } } } - + if rl.save_history(history_file).is_err() { // Failed to save history, not critical } @@ -172,7 +213,11 @@ async fn run_repl(redis_url: String, circle_name: String) -> anyhow::Result<()> #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env().add_directive("ui_repl=info".parse()?).add_directive("rhai_client=info".parse()?)) + .with_env_filter( + EnvFilter::from_default_env() + .add_directive("ui_repl=info".parse()?) + .add_directive("rhai_client=info".parse()?), + ) .init(); let args: Vec = env::args().collect(); @@ -189,18 +234,29 @@ async fn main() -> anyhow::Result<()> { args[2].clone() } else { let default_circle = "default_worker".to_string(); - println!("No worker/circle name provided. Defaulting to: {}", default_circle); + println!( + "No worker/circle name provided. Defaulting to: {}", + default_circle + ); default_circle }; - - println!("Usage: {} [redis_url] [worker_name]", args.get(0).map_or("ui_repl", |s| s.as_str())); - println!("Example: {} redis://127.0.0.1/ my_rhai_worker", args.get(0).map_or("ui_repl", |s| s.as_str())); + println!( + "Usage: {} [redis_url] [worker_name]", + args.get(0).map_or("ui_repl", |s| s.as_str()) + ); + println!( + "Example: {} redis://127.0.0.1/ my_rhai_worker", + args.get(0).map_or("ui_repl", |s| s.as_str()) + ); // Basic validation for Redis URL (scheme) // A more robust validation might involve trying to parse it with redis::ConnectionInfo if !redis_url_str.starts_with("redis://") { - eprintln!("Warning: Redis URL '{}' does not start with 'redis://'. Attempting to use it anyway.", redis_url_str); + eprintln!( + "Warning: Redis URL '{}' does not start with 'redis://'. Attempting to use it anyway.", + redis_url_str + ); } if let Err(e) = run_repl(redis_url_str, circle_name_str).await { diff --git a/src/rhai_engine_ui/src/app.rs b/src/rhai_engine_ui/src/app.rs index 00e8673..db026f2 100644 --- a/src/rhai_engine_ui/src/app.rs +++ b/src/rhai_engine_ui/src/app.rs @@ -1,9 +1,9 @@ -use yew::prelude::*; use gloo_net::http::Request; use gloo_timers::callback::Interval; use serde::{Deserialize, Serialize}; use wasm_bindgen_futures::spawn_local; use web_sys::HtmlInputElement; +use yew::prelude::*; use yew::{html, Component, Context, Html, TargetCast}; // --- Data Structures (placeholders, to be refined based on backend API) --- @@ -21,8 +21,6 @@ pub struct TaskSummary { pub status: String, } - - #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct TaskDetails { pub hash: String, @@ -33,8 +31,6 @@ pub struct TaskDetails { pub error: Option, } - - // Combined structure for initial fetch #[derive(Clone, PartialEq, Serialize, Deserialize, Debug)] pub struct WorkerDataResponse { @@ -42,7 +38,7 @@ pub struct WorkerDataResponse { pub tasks: Vec, } -// --- Component --- +// --- Component --- pub enum Msg { UpdateWorkerName(String), @@ -103,28 +99,39 @@ impl Component for App { self.current_queue_stats = None; self.selected_task_details = None; self.is_loading_initial_data = true; - + let link = ctx.link().clone(); - let tasks_url = format!("/api/worker/{}/tasks_and_stats", worker_name); + let tasks_url = format!("/api/worker/{}/tasks_and_stats", worker_name); spawn_local(async move { match Request::get(&tasks_url).send().await { Ok(response) => { if response.ok() { match response.json::().await { Ok(data) => link.send_message(Msg::SetWorkerData(Ok(data))), - Err(e) => link.send_message(Msg::SetWorkerData(Err(format!("Failed to parse worker data: {}", e)))), + Err(e) => link.send_message(Msg::SetWorkerData(Err(format!( + "Failed to parse worker data: {}", + e + )))), } } else { - link.send_message(Msg::SetWorkerData(Err(format!("API error: {} {}", response.status(), response.status_text())))); + link.send_message(Msg::SetWorkerData(Err(format!( + "API error: {} {}", + response.status(), + response.status_text() + )))); } } - Err(e) => link.send_message(Msg::SetWorkerData(Err(format!("Network error fetching worker data: {}", e)))), + Err(e) => link.send_message(Msg::SetWorkerData(Err(format!( + "Network error fetching worker data: {}", + e + )))), } }); // Set up polling for queue stats let link_for_timer = ctx.link().clone(); - let timer = Interval::new(5000, move || { // Poll every 5 seconds + let timer = Interval::new(5000, move || { + // Poll every 5 seconds link_for_timer.send_message(Msg::IntervalTick); }); if let Some(old_timer) = self.queue_poll_timer.take() { @@ -142,14 +149,25 @@ impl Component for App { Ok(response) => { if response.ok() { match response.json::().await { - Ok(stats) => link.send_message(Msg::SetQueueStats(Ok(stats))), - Err(e) => link.send_message(Msg::SetQueueStats(Err(format!("Failed to parse queue stats: {}", e)))), + Ok(stats) => { + link.send_message(Msg::SetQueueStats(Ok(stats))) + } + Err(e) => link.send_message(Msg::SetQueueStats(Err( + format!("Failed to parse queue stats: {}", e), + ))), } } else { - link.send_message(Msg::SetQueueStats(Err(format!("API error (queue_stats): {} {}", response.status(), response.status_text())))); + link.send_message(Msg::SetQueueStats(Err(format!( + "API error (queue_stats): {} {}", + response.status(), + response.status_text() + )))); } } - Err(e) => link.send_message(Msg::SetQueueStats(Err(format!("Network error fetching queue stats: {}", e)))), + Err(e) => link.send_message(Msg::SetQueueStats(Err(format!( + "Network error fetching queue stats: {}", + e + )))), } }); } @@ -178,7 +196,7 @@ impl Component for App { Msg::SetQueueStats(Err(err_msg)) => { log::error!("Failed to update queue stats: {}", err_msg); // Optionally show a non-blocking error for queue stats - self.current_queue_stats = None; + self.current_queue_stats = None; true } Msg::ViewTaskDetails(hash) => { @@ -191,14 +209,26 @@ impl Component for App { Ok(response) => { if response.ok() { match response.json::().await { - Ok(details) => link.send_message(Msg::SetTaskDetails(Ok(details))), - Err(e) => link.send_message(Msg::SetTaskDetails(Err(format!("Failed to parse task details: {}", e)))), + Ok(details) => { + link.send_message(Msg::SetTaskDetails(Ok(details))) + } + Err(e) => link.send_message(Msg::SetTaskDetails(Err(format!( + "Failed to parse task details: {}", + e + )))), } } else { - link.send_message(Msg::SetTaskDetails(Err(format!("API error (task_details): {} {}", response.status(), response.status_text())))); + link.send_message(Msg::SetTaskDetails(Err(format!( + "API error (task_details): {} {}", + response.status(), + response.status_text() + )))); } } - Err(e) => link.send_message(Msg::SetTaskDetails(Err(format!("Network error fetching task details: {}", e)))), + Err(e) => link.send_message(Msg::SetTaskDetails(Err(format!( + "Network error fetching task details: {}", + e + )))), } }); true @@ -232,12 +262,12 @@ impl Component for App { html! {

{ "Rhai Worker Monitor" }

- +
- ().value()) } @@ -254,10 +284,10 @@ impl Component for App { if self.worker_name_to_monitor.is_some() && !self.is_loading_initial_data && self.error_message.is_none() {

{ format!("Monitoring: {}", self.worker_name_to_monitor.as_ref().unwrap()) }

- +

{ "Queue Status" }

- { + { if let Some(stats) = &self.current_queue_stats { // TODO: Implement actual color coding and bar visualization html! {

{format!("Tasks in queue: {} ({})", stats.current_size, stats.color_code)}

} @@ -281,7 +311,10 @@ impl Component for App { impl App { fn view_tasks_table(&self, ctx: &Context) -> Html { - if self.tasks_list.is_empty() && self.worker_name_to_monitor.is_some() && !self.is_loading_initial_data { + if self.tasks_list.is_empty() + && self.worker_name_to_monitor.is_some() + && !self.is_loading_initial_data + { return html! {

{ "No tasks found for this worker, or worker not found." }

}; } if !self.tasks_list.is_empty() { @@ -306,10 +339,12 @@ impl App { fn view_task_row(&self, ctx: &Context, task: &TaskSummary) -> Html { let task_hash_clone = task.hash.clone(); - let created_at_str = chrono::DateTime::from_timestamp(task.created_at, 0) - .map_or_else(|| "Invalid date".to_string(), |dt| dt.format("%Y-%m-%d %H:%M:%S").to_string()); + let created_at_str = chrono::DateTime::from_timestamp(task.created_at, 0).map_or_else( + || "Invalid date".to_string(), + |dt| dt.format("%Y-%m-%d %H:%M:%S").to_string(), + ); html! { - { task.hash.chars().take(12).collect::() }{ "..." } { created_at_str } @@ -324,7 +359,10 @@ impl App { } if let Some(details) = &self.selected_task_details { let created_at_str = chrono::DateTime::from_timestamp(details.created_at, 0) - .map_or_else(|| "Invalid date".to_string(), |dt| dt.format("%Y-%m-%d %H:%M:%S UTC").to_string()); + .map_or_else( + || "Invalid date".to_string(), + |dt| dt.format("%Y-%m-%d %H:%M:%S UTC").to_string(), + ); html! {

{ format!("Task Details: {}", details.hash) }

diff --git a/src/rhai_engine_ui/src/main.rs b/src/rhai_engine_ui/src/main.rs index 264561e..4143f28 100644 --- a/src/rhai_engine_ui/src/main.rs +++ b/src/rhai_engine_ui/src/main.rs @@ -9,8 +9,7 @@ mod server { extract::{Path, State}, http::{Method, StatusCode}, routing::get, - Json, - Router, + Json, Router, }; use deadpool_redis::{Config, Pool, Runtime}; use redis::{from_redis_value, AsyncCommands, FromRedisValue, Value}; @@ -30,12 +29,19 @@ mod server { pub async fn run() { let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1/".to_string()); let cfg = Config::from_url(redis_url); - let pool = cfg.create_pool(Some(Runtime::Tokio1)).expect("Failed to create Redis pool"); + let pool = cfg + .create_pool(Some(Runtime::Tokio1)) + .expect("Failed to create Redis pool"); - let cors = CorsLayer::new().allow_methods([Method::GET]).allow_origin(Any); + let cors = CorsLayer::new() + .allow_methods([Method::GET]) + .allow_origin(Any); let app = Router::new() - .route("/api/worker/:worker_name/tasks_and_stats", get(get_worker_data)) + .route( + "/api/worker/:worker_name/tasks_and_stats", + get(get_worker_data), + ) .route("/api/worker/:worker_name/queue_stats", get(get_queue_stats)) .route("/api/task/:hash", get(get_task_details)) .nest_service("/", ServeDir::new("dist")) @@ -59,12 +65,16 @@ mod server { let mut conn = pool.get().await.map_err(internal_error)?; let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, worker_name); - let task_ids: Vec = conn.lrange(&queue_key, 0, -1).await.map_err(internal_error)?; + let task_ids: Vec = conn + .lrange(&queue_key, 0, -1) + .await + .map_err(internal_error)?; let mut tasks = Vec::new(); for task_id in task_ids { let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); - let task_details: redis::Value = conn.hgetall(&task_key).await.map_err(internal_error)?; + let task_details: redis::Value = + conn.hgetall(&task_key).await.map_err(internal_error)?; if let Ok(summary) = task_summary_from_redis_value(&task_details) { tasks.push(summary); } @@ -72,7 +82,10 @@ mod server { let queue_stats = get_queue_stats_internal(&mut conn, &worker_name).await?; - Ok(Json(WorkerDataResponse { tasks, queue_stats: Some(queue_stats) })) + Ok(Json(WorkerDataResponse { + tasks, + queue_stats: Some(queue_stats), + })) } async fn get_queue_stats( @@ -107,8 +120,12 @@ mod server { 0..=10 => "green", 11..=50 => "yellow", _ => "red", - }.to_string(); - Ok(QueueStats { current_size: size, color_code }) + } + .to_string(); + Ok(QueueStats { + current_size: size, + color_code, + }) } fn internal_error(err: E) -> (StatusCode, String) { @@ -119,8 +136,14 @@ mod server { let map: HashMap = from_redis_value(v)?; Ok(TaskSummary { hash: map.get("hash").cloned().unwrap_or_default(), - created_at: map.get("createdAt").and_then(|s| s.parse().ok()).unwrap_or_default(), - status: map.get("status").cloned().unwrap_or_else(|| "Unknown".to_string()), + created_at: map + .get("createdAt") + .and_then(|s| s.parse().ok()) + .unwrap_or_default(), + status: map + .get("status") + .cloned() + .unwrap_or_else(|| "Unknown".to_string()), }) } @@ -128,8 +151,14 @@ mod server { let map: HashMap = from_redis_value(v)?; Ok(TaskDetails { hash: map.get("hash").cloned().unwrap_or_default(), - created_at: map.get("createdAt").and_then(|s| s.parse().ok()).unwrap_or_default(), - status: map.get("status").cloned().unwrap_or_else(|| "Unknown".to_string()), + created_at: map + .get("createdAt") + .and_then(|s| s.parse().ok()) + .unwrap_or_default(), + status: map + .get("status") + .cloned() + .unwrap_or_else(|| "Unknown".to_string()), script_content: map.get("script").cloned().unwrap_or_default(), result: map.get("output").cloned(), error: map.get("error").cloned(), diff --git a/src/worker/cmd/worker.rs b/src/worker/cmd/worker.rs index 1c98464..3ebde3f 100644 --- a/src/worker/cmd/worker.rs +++ b/src/worker/cmd/worker.rs @@ -1,8 +1,8 @@ -use worker::spawn_rhai_worker; +use clap::Parser; use engine::create_heromodels_engine; use heromodels::db::hero::OurDB; +use rhailib_worker::spawn_rhai_worker; use std::sync::Arc; -use clap::Parser; use tokio::sync::mpsc; #[derive(Parser, Debug)] @@ -28,27 +28,35 @@ struct Args { #[tokio::main] async fn main() -> Result<(), Box> { env_logger::init(); - + let args = Args::parse(); log::info!("Rhai Worker (binary) starting with performance-optimized engine."); - log::info!("Worker ID: {}, Circle Public Key: {}, Redis: {}", args.worker_id, args.circle_public_key, args.redis_url); + log::info!( + "Worker ID: {}, Circle Public Key: {}, Redis: {}", + args.worker_id, + args.circle_public_key, + args.redis_url + ); // Initialize database with OurDB for the Rhai engine // Using a temporary/in-memory like database for the worker - let db = Arc::new(OurDB::new("worker_rhai_temp_db", true).expect("Failed to create temporary DB for Rhai engine")); + let db = Arc::new( + OurDB::new("worker_rhai_temp_db", true) + .expect("Failed to create temporary DB for Rhai engine"), + ); let mut engine = create_heromodels_engine(db); - + // Performance optimizations for benchmarking engine.set_max_operations(0); // Unlimited operations for performance testing engine.set_max_expr_depths(0, 0); // Unlimited expression depth engine.set_max_string_size(0); // Unlimited string size engine.set_max_array_size(0); // Unlimited array size engine.set_max_map_size(0); // Unlimited map size - + // Enable full optimization for maximum performance engine.set_optimization_level(rhai::OptimizationLevel::Full); - + log::info!("Engine configured for maximum performance"); // Create shutdown channel (for graceful shutdown, though not used in benchmarks) @@ -66,18 +74,16 @@ async fn main() -> Result<(), Box> { // Wait for the worker to complete match worker_handle.await { - Ok(result) => { - match result { - Ok(_) => { - log::info!("Worker completed successfully"); - Ok(()) - } - Err(e) => { - log::error!("Worker failed: {}", e); - Err(e) - } + Ok(result) => match result { + Ok(_) => { + log::info!("Worker completed successfully"); + Ok(()) } - } + Err(e) => { + log::error!("Worker failed: {}", e); + Err(e) + } + }, Err(e) => { log::error!("Worker task panicked: {}", e); Err(Box::new(e) as Box) diff --git a/src/worker/src/lib.rs b/src/worker/src/lib.rs index 8414fd8..68457e7 100644 --- a/src/worker/src/lib.rs +++ b/src/worker/src/lib.rs @@ -2,11 +2,11 @@ use chrono::Utc; use log::{debug, error, info}; use redis::AsyncCommands; use rhai::{Engine, Scope}; -use std::collections::HashMap; -use tokio::task::JoinHandle; -use tokio::sync::mpsc; // For shutdown signal use rhai_client::RhaiTaskDetails; // Import for constructing the reply message -use serde_json; // For serializing the reply message +use serde_json; +use std::collections::HashMap; +use tokio::sync::mpsc; // For shutdown signal +use tokio::task::JoinHandle; // For serializing the reply message const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:"; const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:"; @@ -32,8 +32,12 @@ async fn update_task_status_in_redis( if let Some(err) = error_msg { updates.push(("error", err)); } - debug!("Updating task {} in Redis with status: {}, updates: {:?}", task_id, status, updates); - conn.hset_multiple::<_, _, _, ()>(&task_key, &updates).await?; + debug!( + "Updating task {} in Redis with status: {}, updates: {:?}", + task_id, status, updates + ); + conn.hset_multiple::<_, _, _, ()>(&task_key, &updates) + .await?; Ok(()) } @@ -43,7 +47,7 @@ pub fn spawn_rhai_worker( engine: Engine, redis_url: String, mut shutdown_rx: mpsc::Receiver<()>, // Add shutdown receiver - preserve_tasks: bool, // Flag to control task cleanup + preserve_tasks: bool, // Flag to control task cleanup ) -> JoinHandle>> { tokio::spawn(async move { let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_public_key); @@ -55,188 +59,200 @@ pub fn spawn_rhai_worker( let redis_client = match redis::Client::open(redis_url.as_str()) { Ok(client) => client, Err(e) => { - error!("Worker for Circle Public Key '{}': Failed to open Redis client: {}", circle_public_key, e); + error!( + "Worker for Circle Public Key '{}': Failed to open Redis client: {}", + circle_public_key, e + ); return Err(Box::new(e) as Box); } }; let mut redis_conn = match redis_client.get_multiplexed_async_connection().await { Ok(conn) => conn, Err(e) => { - error!("Worker for Circle Public Key '{}': Failed to get Redis connection: {}", circle_public_key, e); + error!( + "Worker for Circle Public Key '{}': Failed to get Redis connection: {}", + circle_public_key, e + ); return Err(Box::new(e) as Box); } }; - info!("Worker for Circle Public Key '{}' successfully connected to Redis.", circle_public_key); + info!( + "Worker for Circle Public Key '{}' successfully connected to Redis.", + circle_public_key + ); loop { let blpop_keys = vec![queue_key.clone()]; tokio::select! { - // Listen for shutdown signal - _ = shutdown_rx.recv() => { - info!("Worker for Circle Public Key '{}': Shutdown signal received. Terminating loop.", circle_public_key); - break; - } - // Listen for tasks from Redis - blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => { - debug!("Worker for Circle Public Key '{}': Attempting BLPOP on queue: {}", circle_public_key, queue_key); - let response: Option<(String, String)> = match blpop_result { - Ok(resp) => resp, - Err(e) => { - error!("Worker for Circle Public Key '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", circle_public_key, queue_key, e); - return Err(Box::new(e) as Box); - } - }; - - if let Some((_queue_name_recv, task_id)) = response { - info!("Worker for Circle Public Key '{}' received task_id: {} from queue: {}", circle_public_key, task_id, _queue_name_recv); - debug!("Worker for Circle Public Key '{}', Task {}: Processing started.", circle_public_key, task_id); - - let task_details_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); - debug!("Worker for Circle Public Key '{}', Task {}: Attempting HGETALL from key: {}", circle_public_key, task_id, task_details_key); - - let task_details_map_result: Result, _> = - redis_conn.hgetall(&task_details_key).await; - - match task_details_map_result { - Ok(details_map) => { - debug!("Worker for Circle Public Key '{}', Task {}: HGETALL successful. Details: {:?}", circle_public_key, task_id, details_map); - let script_content_opt = details_map.get("script").cloned(); - let reply_to_queue_opt = details_map.get("replyToQueue").cloned(); - let created_at_str_opt = details_map.get("createdAt").cloned(); - let public_key_opt = details_map.get("publicKey").cloned(); - - if let Some(script_content) = script_content_opt { - info!("Worker for Circle Public Key '{}' processing task_id: {}. Script: {:.50}...", circle_public_key, task_id, script_content); - debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to 'processing'.", circle_public_key, task_id); - if let Err(e) = update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await { - error!("Worker for Circle Public Key '{}', Task {}: Failed to update status to 'processing': {}", circle_public_key, task_id, e); - } else { - debug!("Worker for Circle Public Key '{}', Task {}: Status updated to 'processing'.", circle_public_key, task_id); + // Listen for shutdown signal + _ = shutdown_rx.recv() => { + info!("Worker for Circle Public Key '{}': Shutdown signal received. Terminating loop.", circle_public_key); + break; + } + // Listen for tasks from Redis + blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => { + debug!("Worker for Circle Public Key '{}': Attempting BLPOP on queue: {}", circle_public_key, queue_key); + let response: Option<(String, String)> = match blpop_result { + Ok(resp) => resp, + Err(e) => { + error!("Worker for Circle Public Key '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", circle_public_key, queue_key, e); + return Err(Box::new(e) as Box); } + }; - let mut scope = Scope::new(); - scope.push_constant("CIRCLE_PUBLIC_KEY", circle_public_key.clone()); - debug!("Worker for Circle Public Key '{}', Task {}: Injected CIRCLE_PUBLIC_KEY into scope.", circle_public_key, task_id); + if let Some((_queue_name_recv, task_id)) = response { + info!("Worker for Circle Public Key '{}' received task_id: {} from queue: {}", circle_public_key, task_id, _queue_name_recv); + debug!("Worker for Circle Public Key '{}', Task {}: Processing started.", circle_public_key, task_id); - if let Some(public_key) = public_key_opt.as_deref() { - if !public_key.is_empty() { - scope.push_constant("CALLER_PUBLIC_KEY", public_key.to_string()); - debug!("Worker for Circle Public Key '{}', Task {}: Injected CALLER_PUBLIC_KEY into scope.", circle_public_key, task_id); + let task_details_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); + debug!("Worker for Circle Public Key '{}', Task {}: Attempting HGETALL from key: {}", circle_public_key, task_id, task_details_key); + + let task_details_map_result: Result, _> = + redis_conn.hgetall(&task_details_key).await; + + match task_details_map_result { + Ok(details_map) => { + debug!("Worker for Circle Public Key '{}', Task {}: HGETALL successful. Details: {:?}", circle_public_key, task_id, details_map); + let script_content_opt = details_map.get("script").cloned(); + let reply_to_queue_opt = details_map.get("replyToQueue").cloned(); + let created_at_str_opt = details_map.get("createdAt").cloned(); + let public_key_opt = details_map.get("publicKey").cloned(); + + if let Some(script_content) = script_content_opt { + info!("Worker for Circle Public Key '{}' processing task_id: {}. Script: {:.50}...", circle_public_key, task_id, script_content); + debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to 'processing'.", circle_public_key, task_id); + if let Err(e) = update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await { + error!("Worker for Circle Public Key '{}', Task {}: Failed to update status to 'processing': {}", circle_public_key, task_id, e); + } else { + debug!("Worker for Circle Public Key '{}', Task {}: Status updated to 'processing'.", circle_public_key, task_id); } - } - debug!("Worker for Circle Public Key '{}', Task {}: Evaluating script with Rhai engine.", circle_public_key, task_id); - - let mut final_status = "error".to_string(); // Default to error - let mut final_output: Option = None; - let mut final_error_msg: Option = None; - match engine.eval_with_scope::(&mut scope, &script_content) { - Ok(result) => { - let output_str = if result.is::() { - // If the result is a string, we can unwrap it directly. - // This moves `result`, which is fine because it's the last time we use it in this branch. - result.into_string().unwrap() - } else { - result.to_string() - }; - info!("Worker for Circle Public Key '{}' task {} completed. Output: {}", circle_public_key, task_id, output_str); - final_status = "completed".to_string(); - final_output = Some(output_str); + let mut scope = Scope::new(); + scope.push_constant("CIRCLE_PUBLIC_KEY", circle_public_key.clone()); + debug!("Worker for Circle Public Key '{}', Task {}: Injected CIRCLE_PUBLIC_KEY into scope.", circle_public_key, task_id); + + if let Some(public_key) = public_key_opt.as_deref() { + if !public_key.is_empty() { + scope.push_constant("CALLER_PUBLIC_KEY", public_key.to_string()); + debug!("Worker for Circle Public Key '{}', Task {}: Injected CALLER_PUBLIC_KEY into scope.", circle_public_key, task_id); + } } - Err(e) => { - let error_str = format!("{:?}", *e); - error!("Worker for Circle Public Key '{}' task {} script evaluation failed. Error: {}", circle_public_key, task_id, error_str); - final_error_msg = Some(error_str); - // final_status remains "error" + debug!("Worker for Circle Public Key '{}', Task {}: Evaluating script with Rhai engine.", circle_public_key, task_id); + + let mut final_status = "error".to_string(); // Default to error + let mut final_output: Option = None; + let mut final_error_msg: Option = None; + + match engine.eval_with_scope::(&mut scope, &script_content) { + Ok(result) => { + let output_str = if result.is::() { + // If the result is a string, we can unwrap it directly. + // This moves `result`, which is fine because it's the last time we use it in this branch. + result.into_string().unwrap() + } else { + result.to_string() + }; + info!("Worker for Circle Public Key '{}' task {} completed. Output: {}", circle_public_key, task_id, output_str); + final_status = "completed".to_string(); + final_output = Some(output_str); + } + Err(e) => { + let error_str = format!("{:?}", *e); + error!("Worker for Circle Public Key '{}' task {} script evaluation failed. Error: {}", circle_public_key, task_id, error_str); + final_error_msg = Some(error_str); + // final_status remains "error" + } } - } - debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to '{}'.", circle_public_key, task_id, final_status); - if let Err(e) = update_task_status_in_redis( - &mut redis_conn, - &task_id, - &final_status, - final_output.clone(), // Clone for task hash update - final_error_msg.clone(), // Clone for task hash update - ).await { - error!("Worker for Circle Public Key '{}', Task {}: Failed to update final status to '{}': {}", circle_public_key, task_id, final_status, e); - } else { - debug!("Worker for Circle Public Key '{}', Task {}: Final status updated to '{}'.", circle_public_key, task_id, final_status); - } + debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to '{}'.", circle_public_key, task_id, final_status); + if let Err(e) = update_task_status_in_redis( + &mut redis_conn, + &task_id, + &final_status, + final_output.clone(), // Clone for task hash update + final_error_msg.clone(), // Clone for task hash update + ).await { + error!("Worker for Circle Public Key '{}', Task {}: Failed to update final status to '{}': {}", circle_public_key, task_id, final_status, e); + } else { + debug!("Worker for Circle Public Key '{}', Task {}: Final status updated to '{}'.", circle_public_key, task_id, final_status); + } - // Send to reply queue if specified - if let Some(reply_q) = reply_to_queue_opt { - let created_at = created_at_str_opt - .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok()) - .map(|dt| dt.with_timezone(&Utc)) - .unwrap_or_else(Utc::now); // Fallback, though createdAt should exist + // Send to reply queue if specified + if let Some(reply_q) = reply_to_queue_opt { + let created_at = created_at_str_opt + .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok()) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(Utc::now); // Fallback, though createdAt should exist - let reply_details = RhaiTaskDetails { - task_id: task_id.to_string(), // Add the task_id - script: script_content.clone(), // Include script for context in reply - status: final_status, // The final status - // client_rpc_id is no longer a field - output: final_output, // The final output - error: final_error_msg, // The final error - created_at, // Original creation time - updated_at: Utc::now(), // Time of this final update/reply - // reply_to_queue is no longer a field - public_key: public_key_opt, - }; - match serde_json::to_string(&reply_details) { - Ok(reply_json) => { - let lpush_result: redis::RedisResult = redis_conn.lpush(&reply_q, &reply_json).await; - match lpush_result { - Ok(_) => debug!("Worker for Circle Public Key '{}', Task {}: Successfully sent result to reply queue {}", circle_public_key, task_id, reply_q), - Err(e_lpush) => error!("Worker for Circle Public Key '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", circle_public_key, task_id, reply_q, e_lpush), + let reply_details = RhaiTaskDetails { + task_id: task_id.to_string(), // Add the task_id + script: script_content.clone(), // Include script for context in reply + status: final_status, // The final status + // client_rpc_id is no longer a field + output: final_output, // The final output + error: final_error_msg, // The final error + created_at, // Original creation time + updated_at: Utc::now(), // Time of this final update/reply + // reply_to_queue is no longer a field + public_key: public_key_opt, + }; + match serde_json::to_string(&reply_details) { + Ok(reply_json) => { + let lpush_result: redis::RedisResult = redis_conn.lpush(&reply_q, &reply_json).await; + match lpush_result { + Ok(_) => debug!("Worker for Circle Public Key '{}', Task {}: Successfully sent result to reply queue {}", circle_public_key, task_id, reply_q), + Err(e_lpush) => error!("Worker for Circle Public Key '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", circle_public_key, task_id, reply_q, e_lpush), + } + } + Err(e_json) => { + error!("Worker for Circle Public Key '{}', Task {}: Failed to serialize reply details for queue {}: {}", circle_public_key, task_id, reply_q, e_json); } } - Err(e_json) => { - error!("Worker for Circle Public Key '{}', Task {}: Failed to serialize reply details for queue {}: {}", circle_public_key, task_id, reply_q, e_json); + } + // Clean up task details based on preserve_tasks flag + if !preserve_tasks { + // The worker is responsible for cleaning up the task details hash. + if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await { + error!("Worker for Circle Public Key '{}', Task {}: Failed to delete task details key '{}': {}", circle_public_key, task_id, task_details_key, e); + } else { + debug!("Worker for Circle Public Key '{}', Task {}: Cleaned up task details key '{}'.", circle_public_key, task_id, task_details_key); } - } - } - // Clean up task details based on preserve_tasks flag - if !preserve_tasks { - // The worker is responsible for cleaning up the task details hash. - if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await { - error!("Worker for Circle Public Key '{}', Task {}: Failed to delete task details key '{}': {}", circle_public_key, task_id, task_details_key, e); } else { - debug!("Worker for Circle Public Key '{}', Task {}: Cleaned up task details key '{}'.", circle_public_key, task_id, task_details_key); + debug!("Worker for Circle Public Key '{}', Task {}: Preserving task details (preserve_tasks=true)", circle_public_key, task_id); } - } else { - debug!("Worker for Circle Public Key '{}', Task {}: Preserving task details (preserve_tasks=true)", circle_public_key, task_id); - } - } else { // Script content not found in hash - error!( - "Worker for Circle Public Key '{}', Task {}: Script content not found in Redis hash. Details map: {:?}", - circle_public_key, task_id, details_map - ); - // Clean up invalid task details based on preserve_tasks flag - if !preserve_tasks { - // Even if the script is not found, the worker should clean up the invalid task hash. - if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await { - error!("Worker for Circle Public Key '{}', Task {}: Failed to delete invalid task details key '{}': {}", circle_public_key, task_id, task_details_key, e); + } else { // Script content not found in hash + error!( + "Worker for Circle Public Key '{}', Task {}: Script content not found in Redis hash. Details map: {:?}", + circle_public_key, task_id, details_map + ); + // Clean up invalid task details based on preserve_tasks flag + if !preserve_tasks { + // Even if the script is not found, the worker should clean up the invalid task hash. + if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await { + error!("Worker for Circle Public Key '{}', Task {}: Failed to delete invalid task details key '{}': {}", circle_public_key, task_id, task_details_key, e); + } + } else { + debug!("Worker for Circle Public Key '{}', Task {}: Preserving invalid task details (preserve_tasks=true)", circle_public_key, task_id); } - } else { - debug!("Worker for Circle Public Key '{}', Task {}: Preserving invalid task details (preserve_tasks=true)", circle_public_key, task_id); } } + Err(e) => { + error!( + "Worker for Circle Public Key '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}", + circle_public_key, task_id, task_details_key, e + ); + } } - Err(e) => { - error!( - "Worker for Circle Public Key '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}", - circle_public_key, task_id, task_details_key, e - ); - } - } - } else { - debug!("Worker for Circle Public Key '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", &circle_public_key, &queue_key); - } - } // End of blpop_result match - } // End of tokio::select! + } else { + debug!("Worker for Circle Public Key '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", &circle_public_key, &queue_key); + } + } // End of blpop_result match + } // End of tokio::select! } // End of loop - info!("Worker for Circle Public Key '{}' has shut down.", circle_public_key); + info!( + "Worker for Circle Public Key '{}' has shut down.", + circle_public_key + ); Ok(()) }) }