diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..24e536a Binary files /dev/null and b/.DS_Store differ diff --git a/.gitignore b/.gitignore index 1de5659..b6e51e6 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ -target \ No newline at end of file +target +worker_rhai_temp_db +dump.rdb \ No newline at end of file diff --git a/BENCHMARK_README.md b/BENCHMARK_README.md deleted file mode 100644 index d6ce95f..0000000 --- a/BENCHMARK_README.md +++ /dev/null @@ -1,91 +0,0 @@ -# Rhailib Benchmarking - SIMPLIFIED โœจ - -> **Note**: This document describes the old complex benchmarking system. -> **For the new minimal system, see [`bench/README.md`](bench/README.md)** - -## ๐ŸŽฏ New Minimal Benchmark System - -The benchmarking system has been **drastically simplified**: - -- **85% Code Reduction**: From 800+ lines to ~113 lines -- **Single File**: All logic in [`bench/simple_bench.rs`](bench/simple_bench.rs) -- **Direct Timing**: Redis timestamps, no complex stats -- **Minimal Dependencies**: No criterion, no abstractions - -### Quick Start - -```bash -cd bench -cargo run --bin simple_bench -``` - -### Expected Output -``` -๐Ÿงน Cleaning up Redis... -๐Ÿš€ Starting worker... -๐Ÿ“ Creating single task... -โฑ๏ธ Waiting for completion... -โœ… Task completed in 23.45ms -๐Ÿงน Cleaning up... -``` - -## ๐Ÿ“ New Structure - -``` -rhailib/ -โ”œโ”€โ”€ bench/ # NEW: Minimal benchmark system -โ”‚ โ”œโ”€โ”€ simple_bench.rs # Main benchmark (85 lines) -โ”‚ โ”œโ”€โ”€ batch_task.lua # Simple task creation (28 lines) -โ”‚ โ”œโ”€โ”€ Cargo.toml # Dependencies -โ”‚ โ””โ”€โ”€ README.md # Usage instructions -โ””โ”€โ”€ scripts/ # Cleaned up scripts - โ”œโ”€โ”€ run_rhai_batch.lua # Original batch script (kept) - โ””โ”€โ”€ run_rhai.lua # Basic script (kept) -``` - -## ๐Ÿ—‘๏ธ What Was Removed - -- `benches/` directory (complex criterion-based benchmarks) -- `src/benchmarks/` module (redis_stats.rs, worker_manager.rs) -- Complex Lua scripts (`run_rhai_with_wait.lua`, `run_rhai_blocking.sh`) -- Framework dependencies (criterion, complex stats) - -## ๐Ÿš€ Benefits of New System - -1. **Minimalism**: Single file, linear flow -2. **Direct Timing**: `updated_at - created_at` from Redis -3. **Easy to Understand**: No abstractions or frameworks -4. **Fast to Modify**: 85 lines vs 800+ lines -5. **Reliable**: Simple Redis operations -6. **Extensible**: Easy to add features incrementally - -## ๐Ÿ“ˆ Iteration Plan - -- **Current**: Single task (n=1) benchmarking -- **Next**: Small batches (n=5, n=10) -- **Future**: Larger batches and script complexity - ---- - -## ๐Ÿ“š Old System Documentation (Archived) - -The following describes the previous complex system that has been removed: - -### Old Architecture (REMOVED) -- Complex Criterion-based benchmarking -- Multi-module statistics collection -- Abstract worker management -- Complex configuration systems -- Framework dependencies - -### Old Files (REMOVED) -- `benches/rhai_performance_bench.rs` (237 lines) -- `src/benchmarks/redis_stats.rs` (285 lines) -- `src/benchmarks/worker_manager.rs` (~200 lines) -- `src/benchmarks/mod.rs` (10 lines) - -**Total removed**: ~800+ lines of complex code - ---- - -**For current benchmarking, use the new minimal system in [`bench/`](bench/)** \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index dc0ead4..5da9138 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2342,9 +2342,13 @@ dependencies = [ "env_logger", "log", "redis", + "rhai", + "rhai_client", + "rhailib_worker", "serde", "serde_json", "tokio", + "uuid", ] [[package]] @@ -2358,7 +2362,25 @@ dependencies = [ "rhai_client", "serde_json", "tokio", - "worker", +] + +[[package]] +name = "rhailib_worker" +version = "0.1.0" +dependencies = [ + "chrono", + "clap", + "engine", + "env_logger", + "heromodels", + "log", + "redis", + "rhai", + "rhai_client", + "serde", + "serde_json", + "tokio", + "uuid", ] [[package]] @@ -3018,13 +3040,13 @@ dependencies = [ "log", "rhai", "rhai_client", + "rhailib_worker", "rustyline", "tempfile", "tokio", "tracing", "tracing-subscriber", "url", - "worker", ] [[package]] @@ -3429,25 +3451,6 @@ dependencies = [ "bitflags 2.9.1", ] -[[package]] -name = "worker" -version = "0.1.0" -dependencies = [ - "chrono", - "clap", - "engine", - "env_logger", - "heromodels", - "log", - "redis", - "rhai", - "rhai_client", - "serde", - "serde_json", - "tokio", - "uuid", -] - [[package]] name = "writeable" version = "0.6.1" diff --git a/Cargo.toml b/Cargo.toml index 7771602..01c9f33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,6 @@ version = "0.1.0" edition = "2021" # Changed to 2021 for consistency with other crates [dependencies] - - anyhow = "1.0" chrono = { version = "0.4", features = ["serde"] } env_logger = "0.10" @@ -13,15 +11,24 @@ log = "0.4" redis = { version = "0.25.0", features = ["tokio-comp"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } +tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "sync", "signal"] } +rhai = "1.21.0" +rhailib_worker = { path = "src/worker" } +rhai_client = { path = "src/client" } + [dev-dependencies] criterion = { version = "0.5", features = ["html_reports"] } +uuid = { version = "1.6", features = ["v4", "serde"] } # For examples like dedicated_reply_queue_demo [[bench]] name = "simple_rhai_bench" harness = false +[[example]] +name = "end_to_end_auth_demo" +path = "examples/end_to_end/main.rs" + [workspace] members = [ diff --git a/benches/simple_rhai_bench/README.md b/benches/simple_rhai_bench/README.md index 4d62e31..1e52197 100644 --- a/benches/simple_rhai_bench/README.md +++ b/benches/simple_rhai_bench/README.md @@ -65,7 +65,6 @@ Where: ## Benefits -- **Minimal Code**: 85 lines vs previous 800+ lines - **Easy to Understand**: Single file, linear flow - **Direct Timing**: Redis timestamps, no complex stats - **Fast to Modify**: No abstractions or frameworks diff --git a/ARCHITECTURE.md b/docs/ARCHITECTURE.md similarity index 97% rename from ARCHITECTURE.md rename to docs/ARCHITECTURE.md index 83ab804..1a9baa9 100644 --- a/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -21,6 +21,7 @@ The `rhailib` system is composed of the following main components, leveraging Re * Listens to Redis task queues ("circles") for incoming task IDs. * Fetches task details, executes the script using the `rhai_engine`. * Updates task status and results in Redis. + * Injects the caller's public key into the script's scope as `CALLER_PUBLIC_KEY` if available. * Sends a notification/result to the client's dedicated reply queue. 4. **Redis:** @@ -112,6 +113,7 @@ This architecture allows for: * `created_at`: Timestamp of task creation. * `updated_at`: Timestamp of the last update to the task details. * `reply_to_queue`: (New) The name of the dedicated Redis List the client is listening on for the result. + * `publicKey`: (Optional) The public key of the user who submitted the task. * **Reply Queues:** * Key Pattern: `rhai_reply:` (e.g., `rhai_reply:`) * Type: List diff --git a/examples/Cargo.toml b/examples/Cargo.toml index b410c05..d9d4537 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -7,7 +7,6 @@ publish = false # This is a package of examples, not meant to be published [dependencies] # Local Rhailib crates rhai_client = { path = "../src/client" } -worker = { path = "../src/worker" } # External dependencies rhai = "1.18.0" diff --git a/examples/dedicated_reply_queue_demo.rs b/examples/dedicated_reply_queue_demo.rs index c8b4949..6535941 100644 --- a/examples/dedicated_reply_queue_demo.rs +++ b/examples/dedicated_reply_queue_demo.rs @@ -1,10 +1,10 @@ use log::{info, error, debug}; use rhai::Engine; -use rhai_client::{RhaiClient, RhaiClientError}; // RhaiTaskDetails is not directly used -use worker_lib::spawn_rhai_worker; +use rhai_client::{RhaiClient, RhaiClientError}; // RhaiTaskDetails is now used for its fields +use rhailib_worker::spawn_rhai_worker; use std::time::Duration; use tokio::sync::mpsc; -use serde_json::Value; +use uuid::Uuid; // Added for generating task_id #[tokio::main] async fn main() -> Result<(), Box> { @@ -53,28 +53,26 @@ async fn main() -> Result<(), Box> { // 5. Submit script and await result using the new mechanism let task_timeout = Duration::from_secs(10); - let client_rpc_id: Option = Some(serde_json::json!({ "demo_request_id": "reply_queue_test_001" })); + let task_id = Uuid::new_v4().to_string(); // Generate a unique task_id - info!("Submitting script to circle '{}' and awaiting result...", circle_name); + info!("Submitting script to circle '{}' with task_id '{}' and awaiting result...", circle_name, task_id); info!("Script: {}", script_to_run); match client .submit_script_and_await_result( circle_name, + task_id.clone(), // Pass the generated task_id script_to_run.to_string(), - client_rpc_id, task_timeout, - // poll_interval is no longer needed + None // public_key ) .await { Ok(details) => { - info!("Task completed successfully!"); + info!("Task {} completed successfully!", details.task_id); debug!("Full Task Details: {:#?}", details); - // The task_id is not part of the returned RhaiTaskDetails struct. - // We could modify the client to return (task_id, details) if needed, - // but for this demo, we'll just log the content of the returned details. - info!("Received details for script: {}", details.script); + // The task_id is now part of the returned RhaiTaskDetails struct. + info!("Received details for task_id: {}, script: {}", details.task_id, details.script); info!("Status: {}", details.status); if let Some(output) = details.output { info!("Output: {}", output); // Expected: 42 @@ -89,7 +87,9 @@ async fn main() -> Result<(), Box> { Err(e) => { error!("An error occurred while awaiting task result: {}", e); // The specific error can be inspected if needed, e.g., for timeout - if let RhaiClientError::Timeout(task_id) = e { + if let RhaiClientError::Timeout(returned_task_id) = e { + // Note: 'task_id' here is the one from the error, which should match the one we sent. + info!("Task {} timed out.", returned_task_id); info!("Task {} timed out.", task_id); } } diff --git a/examples/end_to_end/README.md b/examples/end_to_end/README.md new file mode 100644 index 0000000..d06385e --- /dev/null +++ b/examples/end_to_end/README.md @@ -0,0 +1,24 @@ +# End-to-End Authorization Demo + +This example demonstrates an end-to-end scenario involving a custom Rhai engine, `rhailib_worker`, and `rhai_client` to showcase how authorization based on `CALLER_PUBLIC_KEY` can be implemented. + +## Overview + +1. **Custom Rhai Engine**: A Rhai engine is created, and a custom function `check_permission(caller_pk: String)` is registered. This function returns different messages based on the `caller_pk` provided. +2. **Rhai Worker (`rhailib_worker`)**: A worker is spawned with this custom engine. The worker is configured with its own `CIRCLE_PUBLIC_KEY` (e.g., "auth_worker_circle"). +3. **Rhai Client (`rhai_client`)**: The client is used to submit a script (`auth_script.rhai`) to the worker. +4. **Authorization Script (`auth_script.rhai`)**: This script calls the `check_permission` function, passing the `CALLER_PUBLIC_KEY` (which is automatically injected into the script's scope by the worker based on the client's submission). +5. **Demonstration**: The `main.rs` program submits the script twice, using two different `CALLER_PUBLIC_KEY`s ("admin_pk" and "user_pk"), and shows that the script produces different results based on the authorization logic in `check_permission`. + +This example illustrates how the `rhailib` components can work together to build systems where script execution is controlled and authorized based on the identity of the calling client. + +## Running the Example + +Assuming you have Redis running and accessible at `redis://127.0.0.1/`: + +Run the example from the `rhailib` root directory: +```bash +cargo run --example end_to_end_auth_demo +``` + +You should see output indicating the results of the script execution for both the "admin_pk" and "user_pk" callers. diff --git a/examples/end_to_end/auth_script.rhai b/examples/end_to_end/auth_script.rhai new file mode 100644 index 0000000..e6facc1 --- /dev/null +++ b/examples/end_to_end/auth_script.rhai @@ -0,0 +1,6 @@ +// auth_script.rhai +// This script calls a custom registered function 'check_permission' +// and passes the CALLER_PUBLIC_KEY to it. +// CALLER_PUBLIC_KEY is injected into the script's scope by the rhailib_worker. + +check_permission(CALLER_PUBLIC_KEY) diff --git a/examples/end_to_end/main.rs b/examples/end_to_end/main.rs new file mode 100644 index 0000000..d952001 --- /dev/null +++ b/examples/end_to_end/main.rs @@ -0,0 +1,136 @@ +use rhai::{Engine, EvalAltResult}; +use rhai_client::RhaiClient; +use rhailib_worker::spawn_rhai_worker; +use std::{fs, path::Path, time::Duration}; +use tokio::sync::mpsc; +use uuid::Uuid; + +// Custom Rhai function for authorization +// It takes the caller's public key as an argument. +fn check_permission(caller_pk: String) -> Result> { + log::info!("check_permission called with PK: {}", caller_pk); + if caller_pk == "admin_pk" { + Ok("Access Granted: Welcome Admin!".to_string()) + } else if caller_pk == "user_pk" { + Ok("Limited Access: Welcome User!".to_string()) + } else { + Ok(format!("Access Denied: Unknown public key '{}'", caller_pk)) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + + let redis_url = "redis://127.0.0.1/"; + let worker_circle_pk = "auth_worker_circle".to_string(); + + // 1. Create a Rhai engine and register custom functionality + let mut engine = Engine::new(); + engine.register_fn("check_permission", check_permission); + log::info!("Custom 'check_permission' function registered with Rhai engine."); + + // 2. Spawn the Rhai worker + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); + let worker_handle = tokio::spawn(spawn_rhai_worker( + 0, // worker_id + worker_circle_pk.clone(), + engine, + redis_url.to_string(), + shutdown_rx, + false, // use_sentinel + )); + log::info!("Rhai worker spawned for circle: {}", worker_circle_pk); + + // Give the worker a moment to start up + tokio::time::sleep(Duration::from_secs(1)).await; + + // 3. Create a Rhai client + let client = RhaiClient::new(redis_url)?; + log::info!("Rhai client created."); + + // 4. Load the Rhai script content + let script_path_str = "examples/end_to_end/auth_script.rhai"; // Relative to Cargo.toml / rhailib root + let script_content = match fs::read_to_string(script_path_str) { + Ok(content) => content, + Err(e) => { + log::error!("Failed to read script file '{}': {}", script_path_str, e); + // Attempt to read from an alternative path if run via `cargo run --example` + // where current dir might be the crate root. + let alt_script_path = Path::new(file!()).parent().unwrap().join("auth_script.rhai"); + log::info!("Attempting alternative script path: {:?}", alt_script_path); + fs::read_to_string(&alt_script_path)? + } + }; + log::info!("Loaded script content from '{}'", script_path_str); + + // Define different caller public keys + let admin_caller_pk = "admin_pk".to_string(); + let user_caller_pk = "user_pk".to_string(); + let unknown_caller_pk = "unknown_pk".to_string(); + + let callers = vec![ + ("Admin", admin_caller_pk), + ("User", user_caller_pk), + ("Unknown", unknown_caller_pk), + ]; + + for (caller_name, caller_pk) in callers { + let task_id = Uuid::new_v4().to_string(); + log::info!( + "Submitting script for caller '{}' (PK: {}) with task_id: {}", + caller_name, + caller_pk, + task_id + ); + + match client + .submit_script_and_await_result( + &worker_circle_pk, + task_id.clone(), // task_id (UUID) first + script_content.clone(), // script_content second + Duration::from_secs(10), + Some(caller_pk.clone()), // This is the CALLER_PUBLIC_KEY + ) + .await + { + Ok(details) => { + log::info!( + "Task {} for caller '{}' (PK: {}) completed. Status: {}, Output: {:?}, Error: {:?}", + task_id, + caller_name, + caller_pk, + details.status, + details.output, + details.error + ); + // Basic assertion for expected output + if caller_pk == "admin_pk" { + assert_eq!(details.output, Some("Access Granted: Welcome Admin!".to_string())); + } else if caller_pk == "user_pk" { + assert_eq!(details.output, Some("Limited Access: Welcome User!".to_string())); + } + } + Err(e) => { + log::error!( + "Task {} for caller '{}' (PK: {}) failed: {}", + task_id, + caller_name, + caller_pk, + e + ); + } + } + tokio::time::sleep(Duration::from_millis(100)).await; // Small delay between submissions + } + + // 5. Shutdown the worker (optional, could also let it run until program exits) + log::info!("Signaling worker to shutdown..."); + let _ = shutdown_tx.send(()).await; + if let Err(e) = worker_handle.await { + log::error!("Worker task panicked or encountered an error: {:?}", e); + } + log::info!("Worker shutdown complete."); + + Ok(()) +} diff --git a/examples/example_math_worker.rs b/examples/example_math_worker.rs index 1ba0167..abc3bb0 100644 --- a/examples/example_math_worker.rs +++ b/examples/example_math_worker.rs @@ -1,9 +1,10 @@ use rhai::Engine; use rhai_client::RhaiClient; // To submit tasks +use uuid::Uuid; // For generating task_id use std::time::Duration; use tokio::time::sleep; -use worker_lib::spawn_rhai_worker; +use rhailib_worker::spawn_rhai_worker; // Custom function for Rhai fn add(a: i64, b: i64) -> i64 { @@ -48,13 +49,14 @@ async fn main() -> Result<(), Box> { log::info!("Submitting math script to 'math_circle' and awaiting result..."); let timeout_duration = Duration::from_secs(10); - + let task_id = Uuid::new_v4().to_string(); match client.submit_script_and_await_result( "math_circle", script_content.to_string(), - None, - timeout_duration + task_id, // Pass the generated task_id + timeout_duration, + None ).await { Ok(details) => { log::info!("Math Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}", diff --git a/examples/example_string_worker.rs b/examples/example_string_worker.rs index 7c7c18b..ef992e1 100644 --- a/examples/example_string_worker.rs +++ b/examples/example_string_worker.rs @@ -1,9 +1,10 @@ use rhai::Engine; use rhai_client::RhaiClient; // To submit tasks +use uuid::Uuid; // For generating task_id use std::time::Duration; use tokio::time::sleep; -use worker_lib::spawn_rhai_worker; +use rhailib_worker::spawn_rhai_worker; // Custom function for Rhai fn reverse_string(s: String) -> String { @@ -48,13 +49,14 @@ async fn main() -> Result<(), Box> { log::info!("Submitting string script to 'string_circle' and awaiting result..."); let timeout_duration = Duration::from_secs(10); - + let task_id = Uuid::new_v4().to_string(); match client.submit_script_and_await_result( "string_circle", script_content.to_string(), - None, - timeout_duration + task_id, // Pass the generated task_id + timeout_duration, + None ).await { Ok(details) => { log::info!("String Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}", diff --git a/examples/lua_client_demo.rs b/examples/lua_client_demo.rs deleted file mode 100644 index c055a73..0000000 --- a/examples/lua_client_demo.rs +++ /dev/null @@ -1,52 +0,0 @@ -use worker_lib::spawn_rhai_worker; -use rhai::Engine; -use tokio::sync::mpsc; -use tokio::signal; -use log::info; - -#[tokio::main] -async fn main() -> Result<(), Box> { - // Initialize the logger - env_logger::init(); - - let redis_url = "redis://127.0.0.1/"; - let circle_name = "default".to_string(); - let mut engine = Engine::new(); // Create a new, simple Rhai engine - - // Register a simple 'ping' function for the readiness check. - engine.register_fn("ping", || -> String { - "pong".to_string() - }); - - // Create a channel for the shutdown signal - let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - - info!("Spawning Rhai worker for circle: {}", circle_name); - - // Spawn the worker - let worker_handle = spawn_rhai_worker( - 1, // circle_id - circle_name.clone(), - engine, - redis_url.to_string(), - shutdown_rx, - false, // preserve_tasks - ); - - info!("Worker spawned. Press Ctrl+C to shut down."); - - // Wait for Ctrl+C - signal::ctrl_c().await?; - - info!("Ctrl+C received. Sending shutdown signal to worker."); - let _ = shutdown_tx.send(()).await; - - // Wait for the worker to finish - if let Err(e) = worker_handle.await? { - eprintln!("Worker process finished with an error: {:?}", e); - } - - info!("Worker has shut down gracefully."); - - Ok(()) -} \ No newline at end of file diff --git a/examples/run_benchmark_demo.rs b/examples/run_benchmark_demo.rs deleted file mode 100644 index e5c7ca1..0000000 --- a/examples/run_benchmark_demo.rs +++ /dev/null @@ -1,133 +0,0 @@ -//! Demo script showing how to run the hybrid performance benchmark -//! -//! This example demonstrates: -//! 1. Starting workers programmatically -//! 2. Running the Lua batch script -//! 3. Collecting and displaying statistics - -use rhailib::{RedisStatsCollector, WorkerManager, clear_redis_test_data, check_redis_connection}; -use redis::{Client, Commands}; -use std::fs; -use std::time::Duration; - -const REDIS_URL: &str = "redis://localhost:6379"; -const CIRCLE_NAME: &str = "demo_circle"; - -fn main() -> Result<(), Box> { - env_logger::init(); - - println!("๐Ÿš€ Rhailib Hybrid Performance Benchmark Demo"); - println!("============================================"); - - // Check Redis connection - println!("๐Ÿ“ก Checking Redis connection..."); - check_redis_connection(REDIS_URL)?; - println!("โœ… Redis connection successful"); - - // Clear any existing test data - println!("๐Ÿงน Clearing existing test data..."); - clear_redis_test_data(REDIS_URL)?; - println!("โœ… Test data cleared"); - - // Load Lua script - println!("๐Ÿ“œ Loading Lua batch script..."); - let lua_script = fs::read_to_string("scripts/run_rhai_batch.lua")?; - println!("โœ… Lua script loaded ({} bytes)", lua_script.len()); - - // Start workers - println!("๐Ÿ‘ท Starting 2 worker processes..."); - let mut worker_manager = WorkerManager::new(); - worker_manager.start_workers(2, CIRCLE_NAME, REDIS_URL)?; - worker_manager.wait_for_workers_ready(Duration::from_secs(3))?; - println!("โœ… Workers started and ready"); - - // Connect to Redis - let redis_client = Client::open(REDIS_URL)?; - let mut conn = redis_client.get_connection()?; - - // Execute batch workload - println!("๐ŸŽฏ Submitting batch of 100 tasks..."); - let batch_id = format!("demo_batch_{}", chrono::Utc::now().timestamp_millis()); - let simple_script = "let x = 42; x * 2"; - - let start_time = std::time::Instant::now(); - - let result: redis::Value = redis::cmd("EVAL") - .arg(&lua_script) - .arg(0) // No keys - .arg(CIRCLE_NAME) - .arg(100) // task count - .arg(simple_script) - .arg(&batch_id) - .query(&mut conn)?; - - let submission_time = start_time.elapsed(); - println!("โœ… Batch submitted in {:?}", submission_time); - - // Parse result - if let redis::Value::Data(data) = result { - let response: serde_json::Value = serde_json::from_slice(&data)?; - println!("๐Ÿ“Š Batch info: {}", serde_json::to_string_pretty(&response)?); - } - - // Wait for completion and collect statistics - println!("โณ Waiting for batch completion..."); - let stats_collector = RedisStatsCollector::new(REDIS_URL)?; - - let completed = stats_collector.wait_for_batch_completion( - &batch_id, - 100, - Duration::from_secs(30), - )?; - - if !completed { - println!("โš ๏ธ Batch did not complete within timeout"); - return Ok(()); - } - - println!("โœ… Batch completed!"); - - // Collect and display statistics - println!("๐Ÿ“ˆ Collecting performance statistics..."); - let timings = stats_collector.collect_batch_timings(&batch_id)?; - let stats = stats_collector.calculate_stats(&timings); - - println!("\n๐Ÿ“Š PERFORMANCE RESULTS"); - println!("======================"); - println!("Total tasks: {}", stats.total_tasks); - println!("Completed tasks: {}", stats.completed_tasks); - println!("Failed tasks: {}", stats.failed_tasks); - println!("Error rate: {:.2}%", stats.error_rate); - println!("Throughput: {:.2} tasks/second", stats.throughput_tps); - println!("Batch duration: {:.2} ms", stats.batch_duration_ms); - println!("\nLatency Statistics:"); - println!(" Min: {:.2} ms", stats.latency_stats.min_ms); - println!(" Max: {:.2} ms", stats.latency_stats.max_ms); - println!(" Mean: {:.2} ms", stats.latency_stats.mean_ms); - println!(" Median: {:.2} ms", stats.latency_stats.median_ms); - println!(" P95: {:.2} ms", stats.latency_stats.p95_ms); - println!(" P99: {:.2} ms", stats.latency_stats.p99_ms); - println!(" Std Dev: {:.2} ms", stats.latency_stats.std_dev_ms); - - // Show some individual task timings - println!("\n๐Ÿ” Sample Task Timings (first 10):"); - for (i, timing) in timings.iter().take(10).enumerate() { - println!(" Task {}: {} -> {} ({:.2}ms, status: {})", - i + 1, - timing.task_id, - timing.status, - timing.latency_ms, - timing.status - ); - } - - // Cleanup - println!("\n๐Ÿงน Cleaning up..."); - stats_collector.cleanup_batch_data(&batch_id)?; - worker_manager.shutdown()?; - println!("โœ… Cleanup complete"); - - println!("\n๐ŸŽ‰ Demo completed successfully!"); - - Ok(()) -} \ No newline at end of file diff --git a/run_benchmarks.sh b/run_benchmarks.sh deleted file mode 100755 index 4f3d65d..0000000 --- a/run_benchmarks.sh +++ /dev/null @@ -1,117 +0,0 @@ -#!/bin/bash - -# Hybrid Performance Benchmark Runner for Rhailib -# This script sets up the environment and runs the benchmarks - -set -e - -echo "๐Ÿš€ Rhailib Hybrid Performance Benchmark Runner" -echo "==============================================" - -# Check if Redis is running -echo "๐Ÿ“ก Checking Redis connection..." -if ! redis-cli ping > /dev/null 2>&1; then - echo "โŒ Redis is not running. Please start Redis server:" - echo " redis-server" - exit 1 -fi -echo "โœ… Redis is running" - -# Check if we're in the right directory -if [ ! -f "Cargo.toml" ] || [ ! -d "scripts" ]; then - echo "โŒ Please run this script from the rhailib root directory" - exit 1 -fi - -# Build the worker binary in release mode for performance -echo "๐Ÿ”จ Building worker binary in release mode..." -cd src/worker -cargo build --release --bin worker -cd ../.. -echo "โœ… Worker binary built (release mode)" - -# Clear any existing Redis data -echo "๐Ÿงน Clearing Redis test data..." -redis-cli FLUSHDB > /dev/null -echo "โœ… Redis data cleared" - -# Parse command line arguments -BENCHMARK_TYPE="full" -TASK_COUNT="" -WORKER_COUNT="" - -while [[ $# -gt 0 ]]; do - case $1 in - --demo) - BENCHMARK_TYPE="demo" - shift - ;; - --quick) - BENCHMARK_TYPE="quick" - shift - ;; - --tasks) - TASK_COUNT="$2" - shift 2 - ;; - --workers) - WORKER_COUNT="$2" - shift 2 - ;; - --help|-h) - echo "Usage: $0 [OPTIONS]" - echo "" - echo "Options:" - echo " --demo Run demo script instead of full benchmarks" - echo " --quick Run quick benchmarks (fewer configurations)" - echo " --tasks N Override task count for demo" - echo " --workers N Override worker count for demo" - echo " --help, -h Show this help message" - echo "" - echo "Examples:" - echo " $0 # Run full benchmarks" - echo " $0 --demo # Run demo script" - echo " $0 --quick # Run quick benchmarks" - echo " $0 --demo --tasks 50 --workers 4 # Custom demo" - exit 0 - ;; - *) - echo "โŒ Unknown option: $1" - echo "Use --help for usage information" - exit 1 - ;; - esac -done - -case $BENCHMARK_TYPE in - "demo") - echo "๐ŸŽฏ Running benchmark demo..." - if [ -n "$TASK_COUNT" ] || [ -n "$WORKER_COUNT" ]; then - echo "โš ๏ธ Custom task/worker counts not yet supported in demo" - echo " Using default values (100 tasks, 2 workers)" - fi - cargo run --example run_benchmark_demo - ;; - "quick") - echo "โšก Running quick benchmarks..." - echo " This will test basic configurations only" - cargo bench --bench rhai_performance_bench -- --quick - ;; - "full") - echo "๐Ÿ Running full benchmark suite..." - echo " This may take several minutes..." - cargo bench --bench rhai_performance_bench - echo "" - echo "๐Ÿ“Š Benchmark results saved to: target/criterion/" - echo " Open target/criterion/report/index.html to view detailed results" - ;; -esac - -echo "" -echo "๐ŸŽ‰ Benchmark run completed!" -echo "" -echo "๐Ÿ“ˆ Next steps:" -echo " โ€ข View HTML reports in target/criterion/report/" -echo " โ€ข Run 'cargo bench' for full Criterion benchmarks" -echo " โ€ข Run '$0 --demo' for a quick demonstration" -echo " โ€ข Check BENCHMARK_README.md for detailed documentation" \ No newline at end of file diff --git a/src/.DS_Store b/src/.DS_Store index 53d16cc..14831a4 100644 Binary files a/src/.DS_Store and b/src/.DS_Store differ diff --git a/src/client/README.md b/src/client/README.md index c879611..049abeb 100644 --- a/src/client/README.md +++ b/src/client/README.md @@ -11,28 +11,28 @@ The `rhai_client` crate provides a client interface for submitting Rhai scripts - Submit a script and get a `task_id` back immediately. - Poll for task status and results using the `task_id`. - Optionally, submit a script and await its completion (or error/timeout) with configurable timeout and polling intervals. -- **Circle-based Task Routing**: Scripts are submitted to named "circles," allowing for different worker pools or configurations. +- **Public Key-based Task Routing**: Scripts are submitted to a "circle" identified by its unique `secp256k1` public key. This ensures tasks are routed to the correct, isolated worker process. ## Core Components -- **`RhaiClient`**: The main struct for interacting with the Rhai task system. It's initialized with a Redis connection URL. +- **`RhaiClient`**: The main struct for interacting with the Rhai task system. - `new(redis_url: &str)`: Creates a new client. - - `submit_script(...)`: Submits a script and returns a `task_id`. + - `submit_script(...)`: Submits a script and returns a `task_id`. It requires the target circle's public key for routing. - `get_task_status(task_id: &str)`: Retrieves the current status and details of a task. - - `submit_script_and_await_result(...)`: Submits a script and polls until it completes, errors out, or the specified timeout is reached. -- **`RhaiTaskDetails`**: A struct representing the details of a task, including its script, status (`pending`, `processing`, `completed`, `error`), output, error messages, and timestamps. + - `submit_script_and_await_result(...)`: A convenient wrapper that submits a script and polls until it completes, errors out, or the specified timeout is reached. +- **`RhaiTaskDetails`**: A struct representing the details of a task, including its script, status (`pending`, `processing`, `completed`, `error`), output, error messages, and the public key of the caller. - **`RhaiClientError`**: An enum for various errors that can occur, such as Redis errors, serialization issues, or task timeouts. ## How It Works 1. The `RhaiClient` is initialized with the Redis server URL. -2. When a script is submitted via `submit_script` or `submit_script_and_await_result`: +2. When a script is submitted (e.g., via `submit_script_and_await_result`): a. A unique `task_id` (UUID v4) is generated. - b. `RhaiTaskDetails` are created with the script, initial status set to "pending", and other relevant metadata. + b. `RhaiTaskDetails` are created, including the script, the caller's public key (if provided), and an initial status of "pending". c. These details are stored in a Redis hash with a key like `rhai_task_details:`. - d. The `task_id` is pushed onto a Redis list named `rhai_tasks:`, which acts as a queue for workers listening to that specific circle. -3. Workers (not part of this client crate) would pop `task_id`s from their respective circle queues, retrieve task details from Redis, execute the script, and update the task details (status, output/error) in Redis. -4. The `RhaiClient` can then use `get_task_status` to poll the Redis hash for updates or `submit_script_and_await_result` to automate this polling. + d. The `task_id` is pushed onto a Redis list named `rhai_tasks:`, which acts as a queue for the worker assigned to that specific circle. +3. A dedicated `rhai_worker` process, which was spawned with the same `circle_public_key`, pops the `task_id` from its queue, retrieves the task details, executes the script, and updates the results in the Redis hash. +4. The `RhaiClient` can use `get_task_status` to poll the Redis hash for these updates, or `submit_script_and_await_result` to automate the polling. ## Prerequisites @@ -40,12 +40,12 @@ The `rhai_client` crate provides a client interface for submitting Rhai scripts ## Usage Example -The following example demonstrates submitting a script and waiting for its result with a timeout. (This is a conceptual adaptation; see `examples/timeout_example.rs` for a runnable example focused on timeout behavior). +The following example demonstrates submitting a script to a circle identified by its public key and waiting for the result. ```rust use rhai_client::RhaiClient; use std::time::Duration; -use serde_json::json; // For client_rpc_id example +use serde_json::json; #[tokio::main] async fn main() -> Result<(), Box> { @@ -56,31 +56,30 @@ async fn main() -> Result<(), Box> { log::info!("RhaiClient created."); let script_content = r#" - fn add(a, b) { a + b } - add(10, 32) + // This script can access both CIRCLE_PUBLIC_KEY and CALLER_PUBLIC_KEY + "Hello from circle: " + CIRCLE_PUBLIC_KEY + ", called by: " + CALLER_PUBLIC_KEY "#; - let circle_name = "general_compute"; + // The target circle is identified by its public key + let circle_public_key = "02f...some_public_key_hex"; + // The entity calling the script also has a public key + let caller_public_key = Some("03a...another_public_key_hex"); + let timeout = Duration::from_secs(10); - let poll_interval = Duration::from_millis(500); - // Optional client-side RPC ID to associate with the task - let client_rpc_id = Some(json!({ "request_id": "user_request_abc123" })); - - log::info!("Submitting script to circle '{}' and awaiting result...", circle_name); + log::info!("Submitting script to circle '{}' and awaiting result...", circle_public_key); match client .submit_script_and_await_result( - circle_name, + circle_public_key, script_content.to_string(), - client_rpc_id, + None, // Optional client-side RPC ID timeout, - poll_interval, + caller_public_key, ) .await { Ok(details) => { log::info!("Task completed successfully!"); - log::info!("Task ID: {}", details.script); // Note: This should likely be a dedicated task_id field if needed from details log::info!("Status: {}", details.status); if let Some(output) = details.output { log::info!("Output: {}", output); @@ -95,10 +94,7 @@ async fn main() -> Result<(), Box> { rhai_client::RhaiClientError::Timeout(task_id) => { log::warn!("Task {} timed out.", task_id); } - rhai_client::RhaiClientError::TaskNotFound(task_id) => { - log::error!("Task {} was not found after submission.", task_id); - } - _ => { // Handle other errors like RedisError, SerializationError + _ => { log::error!("Unhandled client error: {:?}", e); } } diff --git a/src/client/examples/timeout_example.rs b/src/client/examples/timeout_example.rs index a3da037..a768f6a 100644 --- a/src/client/examples/timeout_example.rs +++ b/src/client/examples/timeout_example.rs @@ -30,9 +30,9 @@ async fn main() -> Result<(), Box> { .submit_script_and_await_result( non_existent_circle, script_content.to_string(), - None, // No specific task_id + "some_task_id".to_string(), // No specific task_id very_short_timeout, - poll_interval, + None, ) .await { diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index b5af886..184ab82 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -3,7 +3,6 @@ use log::{debug, info, warn, error}; // Added error use redis::AsyncCommands; use std::time::Duration; // Duration is still used, Instant and sleep were removed use serde::{Deserialize, Serialize}; -use serde_json::Value; // For client_rpc_id, though not directly used by this client's submit method use uuid::Uuid; const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:"; @@ -12,18 +11,22 @@ const REDIS_REPLY_QUEUE_PREFIX: &str = "rhai_reply:"; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct RhaiTaskDetails { + #[serde(rename = "taskId")] // Ensure consistent naming with other fields + pub task_id: String, pub script: String, pub status: String, // "pending", "processing", "completed", "error" - #[serde(rename = "clientRpcId")] - pub client_rpc_id: Option, // Kept for compatibility with worker/server, but optional for client + // client_rpc_id: Option is removed. + // Worker responses should ideally not include it, or Serde will ignore unknown fields by default. pub output: Option, pub error: Option, // Renamed from error_message for consistency #[serde(rename = "createdAt")] pub created_at: chrono::DateTime, #[serde(rename = "updatedAt")] pub updated_at: chrono::DateTime, - #[serde(rename = "replyToQueue")] - pub reply_to_queue: Option, // New field for dedicated reply queue + // reply_to_queue: Option is removed from the struct. + // It's passed to submit_script_to_worker_queue if needed and stored in Redis directly. + #[serde(rename = "publicKey")] + pub public_key: Option, } #[derive(Debug)] @@ -74,46 +77,37 @@ impl RhaiClient { &self, conn: &mut redis::aio::MultiplexedConnection, circle_name: &str, - task_id: &str, + task_id: &str, // This is the main task_id script: String, - client_rpc_id: Option, - reply_to_queue_name: Option, // Made this an Option + // client_rpc_id: Option is removed + reply_to_queue_name: Option, // Still needed to tell the worker where to reply, if applicable + public_key: Option, ) -> Result<(), RhaiClientError> { let now = Utc::now(); - let task_details = RhaiTaskDetails { - script, - status: "pending".to_string(), - client_rpc_id, - output: None, - error: None, - created_at: now, - updated_at: now, - reply_to_queue: reply_to_queue_name.clone(), - }; - + let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); let worker_queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_name.replace(" ", "_").to_lowercase()); debug!( - "Preparing task_id: {} for circle: {} to worker_queue: {}. Details: {:?}", - task_id, circle_name, worker_queue_key, task_details + "Preparing task_id: {} for circle: {} to worker_queue: {}. Script: {}, replyToQueue: {:?}, publicKey: {:?}", + task_id, circle_name, worker_queue_key, script, reply_to_queue_name, public_key ); let mut hset_args: Vec<(String, String)> = vec![ - ("script".to_string(), task_details.script.clone()), - ("status".to_string(), task_details.status.clone()), - ("createdAt".to_string(), task_details.created_at.to_rfc3339()), - ("updatedAt".to_string(), task_details.updated_at.to_rfc3339()), + ("taskId".to_string(), task_id.to_string()), // Add taskId + ("script".to_string(), script), // script is moved here + ("status".to_string(), "pending".to_string()), + ("createdAt".to_string(), now.to_rfc3339()), + ("updatedAt".to_string(), now.to_rfc3339()), ]; - if let Some(rpc_id_val) = &task_details.client_rpc_id { - hset_args.push(("clientRpcId".to_string(), serde_json::to_string(rpc_id_val)?)); - } else { - hset_args.push(("clientRpcId".to_string(), Value::Null.to_string())); + // clientRpcId field and its corresponding hset_args logic are removed. + + if let Some(queue_name) = &reply_to_queue_name { // Use the passed parameter + hset_args.push(("replyToQueue".to_string(), queue_name.clone())); } - - if let Some(reply_q) = &task_details.reply_to_queue { - hset_args.push(("replyToQueue".to_string(), reply_q.clone())); + if let Some(pk) = &public_key { // Use the passed parameter + hset_args.push(("publicKey".to_string(), pk.clone())); } // Ensure hset_args is a slice of tuples (String, String) @@ -139,21 +133,25 @@ impl RhaiClient { &self, circle_name: &str, script: String, - client_rpc_id: Option, + // client_rpc_id: Option is removed + public_key: Option, ) -> Result { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; - let task_id = Uuid::new_v4().to_string(); - + let task_id = Uuid::new_v4().to_string(); // task_id is generated here for fire-and-forget + + debug!("Client submitting script (fire-and-forget) with new task_id: {} to circle: {}", task_id, circle_name); + self.submit_script_to_worker_queue( &mut conn, circle_name, &task_id, script, - client_rpc_id, - None, // No reply queue for fire-and-forget + // client_rpc_id argument removed + None, // No dedicated reply queue for fire-and-forget + public_key, ) .await?; - + Ok(task_id) } @@ -168,25 +166,45 @@ impl RhaiClient { match result_map { Some(map) => { // Reconstruct RhaiTaskDetails from HashMap - // This is a simplified reconstruction; ensure all fields are handled robustly let details = RhaiTaskDetails { - script: map.get("script").cloned().unwrap_or_default(), - status: map.get("status").cloned().unwrap_or_default(), - client_rpc_id: map.get("clientRpcId") - .and_then(|s| serde_json::from_str(s).ok()) - .or(Some(Value::Null)), // Default to Value::Null if missing or parse error + task_id: task_id.to_string(), // Use the task_id parameter passed to the function + script: map.get("script").cloned().unwrap_or_else(|| { + warn!("Task {}: 'script' field missing from Redis hash, defaulting to empty.", task_id); + String::new() + }), + status: map.get("status").cloned().unwrap_or_else(|| { + warn!("Task {}: 'status' field missing from Redis hash, defaulting to empty.", task_id); + String::new() + }), + // client_rpc_id is no longer a field in RhaiTaskDetails output: map.get("output").cloned(), error: map.get("error").cloned(), created_at: map.get("createdAt") .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) .map(|dt| dt.with_timezone(&Utc)) - .unwrap_or_else(Utc::now), // Provide a default + .unwrap_or_else(|| { + warn!("Task {}: 'createdAt' field missing or invalid in Redis hash, defaulting to Utc::now().", task_id); + Utc::now() + }), updated_at: map.get("updatedAt") .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) .map(|dt| dt.with_timezone(&Utc)) - .unwrap_or_else(Utc::now), // Provide a default - reply_to_queue: map.get("replyToQueue").cloned(), + .unwrap_or_else(|| { + warn!("Task {}: 'updatedAt' field missing or invalid in Redis hash, defaulting to Utc::now().", task_id); + Utc::now() + }), + // reply_to_queue is no longer a field in RhaiTaskDetails (it's stored in Redis but not in this struct) + public_key: map.get("publicKey").cloned(), }; + // It's important to also check if the 'taskId' field exists in the map and matches the input task_id + // for data integrity, though the struct construction above uses the input task_id directly. + if let Some(redis_task_id) = map.get("taskId") { + if redis_task_id != task_id { + warn!("Task {}: Mismatch between requested task_id and taskId found in Redis hash ('{}'). Proceeding with requested task_id.", task_id, redis_task_id); + } + } else { + warn!("Task {}: 'taskId' field missing from Redis hash.", task_id); + } Ok(Some(details)) } None => Ok(None), @@ -197,27 +215,32 @@ impl RhaiClient { pub async fn submit_script_and_await_result( &self, circle_name: &str, + task_id: String, // task_id is now a mandatory parameter provided by the caller script: String, - client_rpc_id: Option, timeout: Duration, + public_key: Option, ) -> Result { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; - let task_id = Uuid::new_v4().to_string(); - let reply_to_queue_name = format!("{}{}", REDIS_REPLY_QUEUE_PREFIX, Uuid::new_v4().to_string()); + // let task_id = Uuid::new_v4().to_string(); // Removed, task_id is a parameter + let reply_to_queue_name = + format!("{}{}", REDIS_REPLY_QUEUE_PREFIX, task_id); // Derived from the passed task_id self.submit_script_to_worker_queue( &mut conn, circle_name, - &task_id, + &task_id, // Pass the task_id parameter script, - client_rpc_id, - Some(reply_to_queue_name.clone()), + // client_rpc_id argument removed + Some(reply_to_queue_name.clone()), // Pass the derived reply_to_queue_name + public_key, ) .await?; info!( "Task {} submitted. Waiting for result on queue {} with timeout {:?}...", - task_id, reply_to_queue_name, timeout + task_id, // This is the UUID + reply_to_queue_name, + timeout ); // BLPOP on the reply queue diff --git a/src/engine/Cargo.toml b/src/engine/Cargo.toml index 49b8f4d..cbdf4de 100644 --- a/src/engine/Cargo.toml +++ b/src/engine/Cargo.toml @@ -32,6 +32,6 @@ path = "examples/flow/example.rs" required-features = ["flow"] [[example]] -name = "finance_example" +name = "finance" path = "examples/finance/example.rs" required-features = ["finance"] diff --git a/src/engine/examples/finance/example.rs b/src/engine/examples/finance/example.rs index 89e31fb..823a4a0 100644 --- a/src/engine/examples/finance/example.rs +++ b/src/engine/examples/finance/example.rs @@ -33,7 +33,7 @@ fn main() -> Result<(), Box> { println!("---------------------"); // Run the script - match eval_file(&engine, &script_path.to_string_lossy()) { + match eval_file(&engine, &script_path) { Ok(result) => { if !result.is_unit() { println!("\nScript returned: {:?}", result); diff --git a/src/engine/src/lib.rs b/src/engine/src/lib.rs index 0a25b34..32512c2 100644 --- a/src/engine/src/lib.rs +++ b/src/engine/src/lib.rs @@ -27,8 +27,10 @@ pub fn create_heromodels_engine(db: Arc) -> Engine { /// Register all heromodels Rhai modules with the engine pub fn register_all_modules(engine: &mut Engine, db: Arc) { // Register the calendar module if the feature is enabled + heromodels::models::access::register_access_rhai_module(engine, db.clone()); #[cfg(feature = "calendar")] heromodels::models::calendar::register_calendar_rhai_module(engine, db.clone()); + heromodels::models::contact::register_contact_rhai_module(engine, db.clone()); heromodels::models::library::register_library_rhai_module(engine, db.clone()); heromodels::models::circle::register_circle_rhai_module(engine, db.clone()); diff --git a/src/engine/src/mock_db.rs.part b/src/engine/src/mock_db.rs.part deleted file mode 100644 index ed08975..0000000 --- a/src/engine/src/mock_db.rs.part +++ /dev/null @@ -1,82 +0,0 @@ -/// Seed the mock database with finance data -fn seed_finance_data(db: Arc) { - // Create a user account - let mut account = Account::new() - .name("Demo Account") - .user_id(1) - .description("Demo trading account") - .ledger("ethereum") - .address("0x1234567890abcdef1234567890abcdef12345678") - .pubkey("0xabcdef1234567890abcdef1234567890abcdef12"); - - // Store the account in the database - let (account_id, updated_account) = db.collection::() - .expect("Failed to get Account collection") - .set(&account) - .expect("Failed to store account"); - - // Create an ERC20 token asset - let token_asset = Asset::new() - .name("HERO Token") - .description("Herocode governance token") - .amount(1000.0) - .address("0x9876543210abcdef9876543210abcdef98765432") - .asset_type(AssetType::Erc20) - .decimals(18); - - // Store the token asset in the database - let (token_id, updated_token) = db.collection::() - .expect("Failed to get Asset collection") - .set(&token_asset) - .expect("Failed to store token asset"); - - // Create an NFT asset - let nft_asset = Asset::new() - .name("Herocode #1") - .description("Unique digital collectible") - .amount(1.0) - .address("0xabcdef1234567890abcdef1234567890abcdef12") - .asset_type(AssetType::Erc721) - .decimals(0); - - // Store the NFT asset in the database - let (nft_id, updated_nft) = db.collection::() - .expect("Failed to get Asset collection") - .set(&nft_asset) - .expect("Failed to store NFT asset"); - - // Add assets to the account - account = updated_account.add_asset(token_id); - account = account.add_asset(nft_id); - - // Update the account in the database - let (_, updated_account) = db.collection::() - .expect("Failed to get Account collection") - .set(&account) - .expect("Failed to store updated account"); - - // Create a listing for the NFT - let listing = Listing::new() - .seller_id(account_id) - .asset_id(nft_id) - .price(0.5) - .currency("ETH") - .listing_type(ListingType::Auction) - .title(Some("Rare Herocode NFT".to_string())) - .description(Some("One of a kind digital collectible".to_string())) - .image_url(Some("https://example.com/nft/1.png".to_string())) - .add_tag("rare".to_string()) - .add_tag("collectible".to_string()); - - // Store the listing in the database - let (listing_id, updated_listing) = db.collection::() - .expect("Failed to get Listing collection") - .set(&listing) - .expect("Failed to store listing"); - - println!("Mock database seeded with finance data:"); - println!(" - Added account: {} (ID: {})", updated_account.name, updated_account.base_data.id); - println!(" - Added token asset: {} (ID: {})", updated_token.name, updated_token.base_data.id); - println!(" - Added NFT asset: {} (ID: {})", updated_nft.name, updated_nft.base_data.id); - println!(" - Added listing: {} (ID: {})", updated_listing.title.unwrap_or_default(), updated_listing.base_data.id); -} diff --git a/src/repl/.DS_Store b/src/repl/.DS_Store new file mode 100644 index 0000000..1dec388 Binary files /dev/null and b/src/repl/.DS_Store differ diff --git a/src/repl/Cargo.toml b/src/repl/Cargo.toml index 47fadad..22b1f6d 100644 --- a/src/repl/Cargo.toml +++ b/src/repl/Cargo.toml @@ -15,7 +15,7 @@ tempfile = "3.8" # For creating temporary files for editing rhai_client = { path = "../client" } anyhow = "1.0" # For simpler error handling -worker_lib = { path = "../worker", package = "worker" } +rhailib_worker = { path = "../worker", package = "rhailib_worker" } engine = { path = "../engine" } heromodels = { path = "../../../db/heromodels", features = ["rhai"] } rhai = { version = "1.18.0" } # Match version used by worker/engine diff --git a/src/worker/.DS_Store b/src/worker/.DS_Store index 95a15a4..8961f3f 100644 Binary files a/src/worker/.DS_Store and b/src/worker/.DS_Store differ diff --git a/src/worker/Cargo.toml b/src/worker/Cargo.toml index 9f25e7b..f4d3c55 100644 --- a/src/worker/Cargo.toml +++ b/src/worker/Cargo.toml @@ -1,10 +1,10 @@ [package] -name = "worker" +name = "rhailib_worker" version = "0.1.0" edition = "2021" [lib] -name = "worker_lib" # Can be different from package name, or same +name = "rhailib_worker" # Can be different from package name, or same path = "src/lib.rs" [[bin]] @@ -15,7 +15,7 @@ path = "cmd/worker.rs" [dependencies] redis = { version = "0.25.0", features = ["tokio-comp"] } -rhai = { version = "1.18.0", features = ["sync", "decimal"] } # Added "decimal" for broader script support +rhai = { version = "1.18.0", default-features = false, features = ["sync", "decimal", "std"] } # Added "decimal" for broader script support serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } diff --git a/src/worker/README.md b/src/worker/README.md index 50f2b40..ebaa815 100644 --- a/src/worker/README.md +++ b/src/worker/README.md @@ -1,76 +1,69 @@ # Rhai Worker -The `rhai_worker` crate implements a worker service that listens for Rhai script execution tasks from a Redis queue, executes them using the Rhai scripting engine, and posts results back to Redis. It is designed to work in conjunction with the `rhai_client` crate. +The `rhai_worker` crate implements a standalone worker service that listens for Rhai script execution tasks from a Redis queue, executes them, and posts results back to Redis. It is designed to be spawned as a separate OS process by an orchestrator like the `launcher` crate. ## Features -- **Redis Queue Consumption**: Listens to one or more specified Redis lists (acting as task queues) for incoming task IDs. -- **Rhai Script Execution**: Executes Rhai scripts retrieved based on task IDs. -- **Task State Management**: Updates task status (`processing`, `completed`, `error`) and stores results (output or error messages) in Redis hashes. -- **Configurable**: - - Redis URL can be specified via command-line arguments. - - Listens to specific "circles" (task queues) provided as command-line arguments. -- **Asynchronous Operations**: Built with `tokio` for non-blocking Redis communication and script processing. +- **Redis Queue Consumption**: Listens to a specific Redis list (acting as a task queue) for incoming task IDs. The queue is determined by the `--circle-public-key` argument. +- **Rhai Script Execution**: Executes Rhai scripts retrieved from Redis based on task IDs. +- **Task State Management**: Updates task status (`processing`, `completed`, `error`) and stores results in Redis hashes. +- **Script Scope Injection**: Automatically injects two important constants into the Rhai script's scope: + - `CIRCLE_PUBLIC_KEY`: The public key of the worker's own circle. + - `CALLER_PUBLIC_KEY`: The public key of the entity that requested the script execution. +- **Asynchronous Operations**: Built with `tokio` for non-blocking Redis communication. - **Graceful Error Handling**: Captures errors during script execution and stores them for the client. ## Core Components - **`worker_lib` (Library Crate)**: - - **`Args`**: A struct (using `clap`) for parsing command-line arguments like Redis URL and target circle names. + - **`Args`**: A struct (using `clap`) for parsing command-line arguments: `--redis-url` and `--circle-public-key`. - **`run_worker_loop(engine: Engine, args: Args)`**: The main asynchronous function that: - Connects to Redis. - - Continuously polls specified Redis queues (e.g., `rhai_tasks:`) using `BLPOP`. - - Upon receiving a `task_id`: - - Fetches task details (including the script) from a Redis hash (e.g., `rhai_task_details:`). - - Updates the task status to "processing". - - Executes the Rhai script using the provided `rhai::Engine`. - - Updates the task status to "completed" with the script's output or "error" with the error message. - - **`update_task_status_in_redis(...)`**: A helper function to update task details in Redis. + - Continuously polls the designated Redis queue (`rhai_tasks:`) using `BLPOP`. + - Upon receiving a `task_id`, it fetches the task details from a Redis hash. + - It injects `CALLER_PUBLIC_KEY` and `CIRCLE_PUBLIC_KEY` into the script's scope. + - It executes the script and updates the task status in Redis with the output or error. - **`worker` (Binary Crate - `cmd/worker.rs`)**: - - The main executable entry point. - - Parses command-line arguments. - - Initializes a default `rhai::Engine`. - - Invokes `run_worker_loop` from `worker_lib`. + - The main executable entry point. It parses command-line arguments, initializes a Rhai engine, and invokes `run_worker_loop`. ## How It Works -1. The worker executable is launched, typically with command-line arguments specifying the Redis URL and the "circle(s)" (queues) to monitor. +1. The worker executable is launched by an external process (e.g., `launcher`), which passes the required command-line arguments. ```bash - ./worker --redis-url redis://your-redis-host/ --circles circle_A circle_B + # This is typically done programmatically by a parent process. + /path/to/worker --redis-url redis://127.0.0.1/ --circle-public-key 02...abc ``` -2. The `run_worker_loop` connects to Redis and starts listening to the designated task queues (e.g., `rhai_tasks:circle_a`, `rhai_tasks:circle_b`). -3. When a `rhai_client` submits a task, it pushes a `task_id` to one of these queues and stores task details (script, initial status "pending") in a Redis hash. -4. The worker's `BLPOP` command picks up a `task_id` from a queue. -5. The worker retrieves the script from the corresponding `rhai_task_details:` hash in Redis. -6. It updates the task's status to "processing" in the Redis hash. -7. The Rhai script is executed. -8. After execution: - - If successful, the status is updated to "completed", and the output is stored in the Redis hash. - - If an error occurs, the status is updated to "error", and the error message is stored. +2. The `run_worker_loop` connects to Redis and starts listening to its designated task queue (e.g., `rhai_tasks:02...abc`). +3. A `rhai_client` submits a task by pushing a `task_id` to this queue and storing the script and other details in a Redis hash. +4. The worker's `BLPOP` command picks up the `task_id`. +5. The worker retrieves the script from the corresponding `rhai_task_details:` hash. +6. It updates the task's status to "processing". +7. The Rhai script is executed within a scope that contains both `CIRCLE_PUBLIC_KEY` and `CALLER_PUBLIC_KEY`. +8. After execution, the status is updated to "completed" (with output) or "error" (with an error message). 9. The worker then goes back to listening for the next task. ## Prerequisites - A running Redis instance accessible by the worker. -- The `rhai_client` (or another system) populating the Redis queues and task detail hashes. +- An orchestrator process (like `launcher`) to spawn the worker. +- A `rhai_client` (or another system) to populate the Redis queues. ## Building and Running +The worker is intended to be built as a dependency and run by another program. + 1. **Build the worker:** ```bash - # From the root of the rhailib project or within src/worker - cargo build # Or cargo build --release for an optimized version + # From the root of the rhailib project + cargo build --package worker ``` - The binary will typically be found in `target/debug/worker` or `target/release/worker`. + The binary will be located at `target/debug/worker`. -2. **Run the worker:** +2. **Running the worker:** + The worker is not typically run manually. The `launcher` crate is responsible for spawning it with the correct arguments. If you need to run it manually for testing, you must provide the required arguments: ```bash - # Example: - ./target/debug/worker --redis-url redis://127.0.0.1/ --circles my_circle_1 my_circle_2 + ./target/debug/worker --redis-url redis://127.0.0.1/ --circle-public-key ``` - Replace `redis://127.0.0.1/` with your Redis server's URL and `my_circle_1 my_circle_2` with the names of the task queues you want this worker instance to process. - - You can run multiple instances of the worker, potentially listening to the same or different circles, to scale out processing. ## Dependencies @@ -80,7 +73,3 @@ Key dependencies include: - `clap`: For command-line argument parsing. - `tokio`: For the asynchronous runtime. - `log`, `env_logger`: For logging. -- `rhai_client`: For shared definitions (potentially, though direct usage is minimal in current `lib.rs`). - -## Note on Binary Path -The `Cargo.toml` for the worker specifies the binary path as `src/bin/worker.rs`. However, the actual file is located at `src/cmd/worker.rs`. This README assumes the latter is the correct and current location. If `Cargo.toml` is updated, this note might become obsolete. diff --git a/src/worker/cmd/worker.rs b/src/worker/cmd/worker.rs index b94de72..1c98464 100644 --- a/src/worker/cmd/worker.rs +++ b/src/worker/cmd/worker.rs @@ -1,4 +1,4 @@ -use worker_lib::spawn_rhai_worker; +use worker::spawn_rhai_worker; use engine::create_heromodels_engine; use heromodels::db::hero::OurDB; use std::sync::Arc; @@ -8,9 +8,9 @@ use tokio::sync::mpsc; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { - /// Circle name to listen to - #[arg(short, long, default_value = "default")] - circle: String, + /// Public key of the circle to listen to + #[arg(short, long, default_value = "default_public_key")] + circle_public_key: String, /// Redis URL #[arg(short, long, default_value = "redis://localhost:6379")] @@ -32,7 +32,7 @@ async fn main() -> Result<(), Box> { let args = Args::parse(); log::info!("Rhai Worker (binary) starting with performance-optimized engine."); - log::info!("Worker ID: {}, Circle: {}, Redis: {}", args.worker_id, args.circle, args.redis_url); + log::info!("Worker ID: {}, Circle Public Key: {}, Redis: {}", args.worker_id, args.circle_public_key, args.redis_url); // Initialize database with OurDB for the Rhai engine // Using a temporary/in-memory like database for the worker @@ -57,7 +57,7 @@ async fn main() -> Result<(), Box> { // Spawn the worker let worker_handle = spawn_rhai_worker( 1, // circle_id (not used but required) - args.circle, + args.circle_public_key, engine, args.redis_url, shutdown_rx, diff --git a/src/worker/src/lib.rs b/src/worker/src/lib.rs index d557620..8414fd8 100644 --- a/src/worker/src/lib.rs +++ b/src/worker/src/lib.rs @@ -39,83 +39,92 @@ async fn update_task_status_in_redis( pub fn spawn_rhai_worker( _circle_id: u32, // For logging or specific logic if needed in the future - circle_name: String, + circle_public_key: String, engine: Engine, redis_url: String, mut shutdown_rx: mpsc::Receiver<()>, // Add shutdown receiver preserve_tasks: bool, // Flag to control task cleanup ) -> JoinHandle>> { tokio::spawn(async move { - let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_name.replace(" ", "_").to_lowercase()); + let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_public_key); info!( - "Rhai Worker for Circle '{}' starting. Connecting to Redis at {}. Listening on queue: {}. Waiting for tasks or shutdown signal.", - circle_name, redis_url, queue_key + "Rhai Worker for Circle Public Key '{}' starting. Connecting to Redis at {}. Listening on queue: {}. Waiting for tasks or shutdown signal.", + circle_public_key, redis_url, queue_key ); let redis_client = match redis::Client::open(redis_url.as_str()) { Ok(client) => client, Err(e) => { - error!("Worker for Circle '{}': Failed to open Redis client: {}", circle_name, e); + error!("Worker for Circle Public Key '{}': Failed to open Redis client: {}", circle_public_key, e); return Err(Box::new(e) as Box); } }; let mut redis_conn = match redis_client.get_multiplexed_async_connection().await { Ok(conn) => conn, Err(e) => { - error!("Worker for Circle '{}': Failed to get Redis connection: {}", circle_name, e); + error!("Worker for Circle Public Key '{}': Failed to get Redis connection: {}", circle_public_key, e); return Err(Box::new(e) as Box); } }; - info!("Worker for Circle '{}' successfully connected to Redis.", circle_name); + info!("Worker for Circle Public Key '{}' successfully connected to Redis.", circle_public_key); loop { let blpop_keys = vec![queue_key.clone()]; tokio::select! { // Listen for shutdown signal _ = shutdown_rx.recv() => { - info!("Worker for Circle '{}': Shutdown signal received. Terminating loop.", circle_name); + info!("Worker for Circle Public Key '{}': Shutdown signal received. Terminating loop.", circle_public_key); break; } // Listen for tasks from Redis blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => { - debug!("Worker for Circle '{}': Attempting BLPOP on queue: {}", circle_name, queue_key); + debug!("Worker for Circle Public Key '{}': Attempting BLPOP on queue: {}", circle_public_key, queue_key); let response: Option<(String, String)> = match blpop_result { Ok(resp) => resp, Err(e) => { - error!("Worker for Circle '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", circle_name, queue_key, e); + error!("Worker for Circle Public Key '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", circle_public_key, queue_key, e); return Err(Box::new(e) as Box); } }; if let Some((_queue_name_recv, task_id)) = response { - info!("Worker for Circle '{}' received task_id: {} from queue: {}", circle_name, task_id, _queue_name_recv); - debug!("Worker for Circle '{}', Task {}: Processing started.", circle_name, task_id); + info!("Worker for Circle Public Key '{}' received task_id: {} from queue: {}", circle_public_key, task_id, _queue_name_recv); + debug!("Worker for Circle Public Key '{}', Task {}: Processing started.", circle_public_key, task_id); let task_details_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); - debug!("Worker for Circle '{}', Task {}: Attempting HGETALL from key: {}", circle_name, task_id, task_details_key); + debug!("Worker for Circle Public Key '{}', Task {}: Attempting HGETALL from key: {}", circle_public_key, task_id, task_details_key); let task_details_map_result: Result, _> = redis_conn.hgetall(&task_details_key).await; match task_details_map_result { Ok(details_map) => { - debug!("Worker for Circle '{}', Task {}: HGETALL successful. Details: {:?}", circle_name, task_id, details_map); + debug!("Worker for Circle Public Key '{}', Task {}: HGETALL successful. Details: {:?}", circle_public_key, task_id, details_map); let script_content_opt = details_map.get("script").cloned(); let reply_to_queue_opt = details_map.get("replyToQueue").cloned(); - let client_rpc_id_str_opt = details_map.get("clientRpcId").cloned(); let created_at_str_opt = details_map.get("createdAt").cloned(); + let public_key_opt = details_map.get("publicKey").cloned(); if let Some(script_content) = script_content_opt { - info!("Worker for Circle '{}' processing task_id: {}. Script: {:.50}...", circle_name, task_id, script_content); - debug!("Worker for Circle '{}', Task {}: Attempting to update status to 'processing'.", circle_name, task_id); + info!("Worker for Circle Public Key '{}' processing task_id: {}. Script: {:.50}...", circle_public_key, task_id, script_content); + debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to 'processing'.", circle_public_key, task_id); if let Err(e) = update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await { - error!("Worker for Circle '{}', Task {}: Failed to update status to 'processing': {}", circle_name, task_id, e); + error!("Worker for Circle Public Key '{}', Task {}: Failed to update status to 'processing': {}", circle_public_key, task_id, e); } else { - debug!("Worker for Circle '{}', Task {}: Status updated to 'processing'.", circle_name, task_id); + debug!("Worker for Circle Public Key '{}', Task {}: Status updated to 'processing'.", circle_public_key, task_id); } let mut scope = Scope::new(); - debug!("Worker for Circle '{}', Task {}: Evaluating script with Rhai engine.", circle_name, task_id); + scope.push_constant("CIRCLE_PUBLIC_KEY", circle_public_key.clone()); + debug!("Worker for Circle Public Key '{}', Task {}: Injected CIRCLE_PUBLIC_KEY into scope.", circle_public_key, task_id); + + if let Some(public_key) = public_key_opt.as_deref() { + if !public_key.is_empty() { + scope.push_constant("CALLER_PUBLIC_KEY", public_key.to_string()); + debug!("Worker for Circle Public Key '{}', Task {}: Injected CALLER_PUBLIC_KEY into scope.", circle_public_key, task_id); + } + } + debug!("Worker for Circle Public Key '{}', Task {}: Evaluating script with Rhai engine.", circle_public_key, task_id); let mut final_status = "error".to_string(); // Default to error let mut final_output: Option = None; @@ -130,19 +139,19 @@ pub fn spawn_rhai_worker( } else { result.to_string() }; - info!("Worker for Circle '{}' task {} completed. Output: {}", circle_name, task_id, output_str); + info!("Worker for Circle Public Key '{}' task {} completed. Output: {}", circle_public_key, task_id, output_str); final_status = "completed".to_string(); final_output = Some(output_str); } Err(e) => { let error_str = format!("{:?}", *e); - error!("Worker for Circle '{}' task {} script evaluation failed. Error: {}", circle_name, task_id, error_str); + error!("Worker for Circle Public Key '{}' task {} script evaluation failed. Error: {}", circle_public_key, task_id, error_str); final_error_msg = Some(error_str); // final_status remains "error" } } - debug!("Worker for Circle '{}', Task {}: Attempting to update status to '{}'.", circle_name, task_id, final_status); + debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to '{}'.", circle_public_key, task_id, final_status); if let Err(e) = update_task_status_in_redis( &mut redis_conn, &task_id, @@ -150,39 +159,40 @@ pub fn spawn_rhai_worker( final_output.clone(), // Clone for task hash update final_error_msg.clone(), // Clone for task hash update ).await { - error!("Worker for Circle '{}', Task {}: Failed to update final status to '{}': {}", circle_name, task_id, final_status, e); + error!("Worker for Circle Public Key '{}', Task {}: Failed to update final status to '{}': {}", circle_public_key, task_id, final_status, e); } else { - debug!("Worker for Circle '{}', Task {}: Final status updated to '{}'.", circle_name, task_id, final_status); + debug!("Worker for Circle Public Key '{}', Task {}: Final status updated to '{}'.", circle_public_key, task_id, final_status); } // Send to reply queue if specified if let Some(reply_q) = reply_to_queue_opt { - let client_rpc_id = client_rpc_id_str_opt.and_then(|s| serde_json::from_str(&s).ok()); let created_at = created_at_str_opt .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok()) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(Utc::now); // Fallback, though createdAt should exist let reply_details = RhaiTaskDetails { + task_id: task_id.to_string(), // Add the task_id script: script_content.clone(), // Include script for context in reply status: final_status, // The final status - client_rpc_id, + // client_rpc_id is no longer a field output: final_output, // The final output error: final_error_msg, // The final error created_at, // Original creation time updated_at: Utc::now(), // Time of this final update/reply - reply_to_queue: None, // This field is not relevant for the message content itself + // reply_to_queue is no longer a field + public_key: public_key_opt, }; match serde_json::to_string(&reply_details) { Ok(reply_json) => { let lpush_result: redis::RedisResult = redis_conn.lpush(&reply_q, &reply_json).await; match lpush_result { - Ok(_) => debug!("Worker for Circle '{}', Task {}: Successfully sent result to reply queue {}", circle_name, task_id, reply_q), - Err(e_lpush) => error!("Worker for Circle '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", circle_name, task_id, reply_q, e_lpush), + Ok(_) => debug!("Worker for Circle Public Key '{}', Task {}: Successfully sent result to reply queue {}", circle_public_key, task_id, reply_q), + Err(e_lpush) => error!("Worker for Circle Public Key '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", circle_public_key, task_id, reply_q, e_lpush), } } Err(e_json) => { - error!("Worker for Circle '{}', Task {}: Failed to serialize reply details for queue {}: {}", circle_name, task_id, reply_q, e_json); + error!("Worker for Circle Public Key '{}', Task {}: Failed to serialize reply details for queue {}: {}", circle_public_key, task_id, reply_q, e_json); } } } @@ -190,43 +200,43 @@ pub fn spawn_rhai_worker( if !preserve_tasks { // The worker is responsible for cleaning up the task details hash. if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await { - error!("Worker for Circle '{}', Task {}: Failed to delete task details key '{}': {}", circle_name, task_id, task_details_key, e); + error!("Worker for Circle Public Key '{}', Task {}: Failed to delete task details key '{}': {}", circle_public_key, task_id, task_details_key, e); } else { - debug!("Worker for Circle '{}', Task {}: Cleaned up task details key '{}'.", circle_name, task_id, task_details_key); + debug!("Worker for Circle Public Key '{}', Task {}: Cleaned up task details key '{}'.", circle_public_key, task_id, task_details_key); } } else { - debug!("Worker for Circle '{}', Task {}: Preserving task details (preserve_tasks=true)", circle_name, task_id); + debug!("Worker for Circle Public Key '{}', Task {}: Preserving task details (preserve_tasks=true)", circle_public_key, task_id); } } else { // Script content not found in hash error!( - "Worker for Circle '{}', Task {}: Script content not found in Redis hash. Details map: {:?}", - circle_name, task_id, details_map + "Worker for Circle Public Key '{}', Task {}: Script content not found in Redis hash. Details map: {:?}", + circle_public_key, task_id, details_map ); // Clean up invalid task details based on preserve_tasks flag if !preserve_tasks { // Even if the script is not found, the worker should clean up the invalid task hash. if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await { - error!("Worker for Circle '{}', Task {}: Failed to delete invalid task details key '{}': {}", circle_name, task_id, task_details_key, e); + error!("Worker for Circle Public Key '{}', Task {}: Failed to delete invalid task details key '{}': {}", circle_public_key, task_id, task_details_key, e); } } else { - debug!("Worker for Circle '{}', Task {}: Preserving invalid task details (preserve_tasks=true)", circle_name, task_id); + debug!("Worker for Circle Public Key '{}', Task {}: Preserving invalid task details (preserve_tasks=true)", circle_public_key, task_id); } } } Err(e) => { error!( - "Worker for Circle '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}", - circle_name, task_id, task_details_key, e + "Worker for Circle Public Key '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}", + circle_public_key, task_id, task_details_key, e ); } } } else { - debug!("Worker for Circle '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", circle_name, queue_key); + debug!("Worker for Circle Public Key '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", &circle_public_key, &queue_key); } } // End of blpop_result match } // End of tokio::select! } // End of loop - info!("Worker for Circle '{}' has shut down.", circle_name); + info!("Worker for Circle Public Key '{}' has shut down.", circle_public_key); Ok(()) }) } diff --git a/start_ws_servers.sh b/start_ws_servers.sh deleted file mode 100755 index c4a8e8e..0000000 --- a/start_ws_servers.sh +++ /dev/null @@ -1,158 +0,0 @@ -#!/bin/bash - -# Exit immediately if a command exits with a non-zero status. -# set -e # We will handle errors manually for cleanup -# Instead of set -e, we'll check command statuses and exit if needed after attempting cleanup. - -# Default to not force killing processes -FORCE_KILL=false - -# Parse command-line options -while getopts "f" opt; do - case ${opt} in - f ) - FORCE_KILL=true - ;; - \? ) - echo "Invalid option: -$OPTARG" 1>&2 - exit 1 - ;; - esac -done -shift $((OPTIND -1)) - - -# Array to store PIDs of background processes -BG_PIDS=() - -# Cleanup function -cleanup() { - echo "Caught signal, cleaning up background processes..." - for pid in "${BG_PIDS[@]}"; do - if ps -p "$pid" > /dev/null; then # Check if process exists - echo "Stopping process $pid..." - kill "$pid" - fi - done - # Wait for all background processes to terminate - for pid in "${BG_PIDS[@]}"; do - if ps -p "$pid" > /dev/null; then - wait "$pid" 2>/dev/null # Suppress "No such process" if already gone - fi - done - echo "All background processes stopped." - exit 0 # Exit script after cleanup -} - -# Trap SIGINT (Ctrl+C) and SIGTERM -trap cleanup SIGINT SIGTERM - -# Define circles and their base port -# The client will need to know these port assignments. -# Circle names should match what's in your mock data for consistency, -# but for the WS server, it's what the server identifies itself as. -# The client will use the lowercase_with_underscores version for the path. - -# Define circles and their ports using indexed arrays -CIRCLE_NAMES=( - "OurWorld" - "My Personal Space" - "threefold" - "circles_app" -) -CIRCLE_PORTS=( - "9000" - "9001" - "9002" - "9003" -) -# Add more circles and their ports here if needed, ensuring arrays match - -# Build the WebSocket server first -echo "Building circle_server_ws..." -cargo build --package circle_server_ws -if [ $? -ne 0 ]; then echo "Failed to build circle_server_ws"; cleanup; exit 1; fi - -echo "Building rhai_worker..." -cargo build --package rhai_worker -if [ $? -ne 0 ]; then echo "Failed to build rhai_worker"; cleanup; exit 1; fi - - -# Paths to the compiled binaries -WS_SERVER_BINARY="./target/debug/circle_server_ws" -RHAI_WORKER_BINARY="./target/debug/rhai_worker" - -if [ ! -f "$WS_SERVER_BINARY" ]; then - echo "Error: WebSocket server binary not found at $WS_SERVER_BINARY after build." - cleanup - exit 1 -fi - -if [ ! -f "$RHAI_WORKER_BINARY" ]; then - echo "Error: Rhai worker binary not found at $RHAI_WORKER_BINARY after build." - cleanup - exit 1 -fi - -echo "Starting WebSocket servers..." - -for i in "${!CIRCLE_NAMES[@]}"; do - NAME="${CIRCLE_NAMES[i]}" - PORT="${CIRCLE_PORTS[i]}" - - if [ "$FORCE_KILL" = true ]; then - echo "Checking if port $PORT is in use (force mode)..." - # lsof -i : -t lists PIDs listening on the port - # The output might be empty or multiple PIDs. - # We'll kill any PID found. - PIDS_ON_PORT=$(lsof -i ":$PORT" -t 2>/dev/null || true) # Suppress lsof error if port not in use, || true ensures command doesn't fail script - if [ -n "$PIDS_ON_PORT" ]; then - for PID_TO_KILL in $PIDS_ON_PORT; do - echo "Port $PORT is in use by PID $PID_TO_KILL. Forcing kill..." - kill -9 "$PID_TO_KILL" # Force kill - # Add a small delay to give the OS time to free the port - sleep 0.5 - done - else - echo "Port $PORT is free." - fi - fi - - # The circle name passed to the server is the "identity" name. - # The client will still connect to ws://localhost:PORT/ws - echo "Starting server for '$NAME' on port $PORT..." - # Run in background - "$WS_SERVER_BINARY" --port "$PORT" --circle-name "$NAME" & - BG_PIDS+=($!) # Store PID of the last backgrounded process -done - -echo "All WebSocket servers launched." - -# Prepare circle names for rhai_worker -# It expects names like "OurWorld", "My Personal Space" -# We can directly use the CIRCLE_NAMES array -echo "Starting Rhai worker for circles: ${CIRCLE_NAMES[@]}..." -# Run rhai_worker in the background -# Assuming default Redis URL redis://127.0.0.1/ -"$RHAI_WORKER_BINARY" --circles "${CIRCLE_NAMES[@]}" & -BG_PIDS+=($!) # Store PID of the Rhai worker - -echo "Rhai worker launched." -echo "All processes launched. Press Ctrl+C to stop all servers and the worker." - -# Wait for all background PIDs. -# If any of them exit prematurely, this script will also exit. -# The trap will handle cleanup if Ctrl+C is pressed. -for pid in "${BG_PIDS[@]}"; do - wait "$pid" - # If a process exits with an error, its exit code will be propagated by wait. - # The script will then exit due to `wait` itself exiting with that code. - # The trap should still run on SIGINT/SIGTERM. - # For other signals or unexpected exits, the trap might not run. - # More robust error handling for individual process failures could be added here. -done - -# If all processes exited cleanly (e.g., were killed by the trap), -# the script will reach here. The trap's exit 0 will handle this. -# If they exited due to an error, `wait` would have exited the script. -cleanup # Call cleanup if all jobs finished normally (e.g. if they self-terminate) \ No newline at end of file