use anyhow::Context; use rhai_dispatcher::{RhaiDispatcher, RhaiDispatcherError, RhaiTaskDetails}; use std::env; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use tracing_subscriber::EnvFilter; use engine::create_heromodels_engine; use heromodels::db::hero::OurDB; use std::path::PathBuf; use worker_lib::spawn_rhai_worker; #[tokio::main] async fn main() -> Result<(), Box> { tracing_subscriber::fmt() .with_env_filter( EnvFilter::from_default_env() .add_directive("connect_and_play=info".parse().unwrap()) .add_directive("rhai_dispatcher=info".parse().unwrap()), ) .init(); let args: Vec = env::args().collect(); let redis_url = args.get(1).cloned().unwrap_or_else(|| { let default_url = "redis://127.0.0.1/".to_string(); println!("No Redis URL provided. Defaulting to: {}", default_url); default_url }); let worker_name = args.get(2).cloned().unwrap_or_else(|| { let default_worker = "default_worker".to_string(); println!("No worker name provided. Defaulting to: {}", default_worker); default_worker }); // Define DB path for the worker let db_path_str = format!("./temp_db_for_example_worker_{}", worker_name); let db_path = PathBuf::from(&db_path_str); // Create shutdown channel for the worker let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); // Spawn a worker in the background let worker_redis_url = redis_url.clone(); let worker_circle_name_for_task = worker_name.clone(); let db_path_for_task = db_path_str.clone(); log::info!( "[Main] Spawning worker for circle '{}' with DB path '{}'", worker_circle_name_for_task, db_path_for_task ); let worker_join_handle = tokio::spawn(async move { log::info!( "[BG Worker] Starting for circle '{}' on Redis '{}'", worker_circle_name_for_task, worker_redis_url ); // The `reset: true` in OurDB::new handles pre-cleanup if the directory exists. let db = Arc::new( OurDB::new(&db_path_for_task, true) .expect("Failed to create temp DB for example worker"), ); let mut engine = create_heromodels_engine(db); engine.set_max_operations(0); engine.set_max_expr_depths(0, 0); engine.set_optimization_level(rhai::OptimizationLevel::Full); if let Err(e) = spawn_rhai_worker( 1, // dummy circle_id worker_circle_name_for_task.clone(), engine, worker_redis_url.clone(), shutdown_rx, // Pass the receiver from main false, // preserve_tasks ) .await { log::error!( "[BG Worker] Failed to spawn or worker error for circle '{}': {}", worker_circle_name_for_task, e ); } else { log::info!( "[BG Worker] Worker for circle '{}' shut down gracefully.", worker_circle_name_for_task ); } }); // Give the worker a moment to start up tokio::time::sleep(Duration::from_secs(1)).await; println!( "Initializing RhaiDispatcher for Redis at {} to target worker '{}'...", redis_url, worker_name ); let client = RhaiDispatcher::new(&redis_url) .with_context(|| format!("Failed to create RhaiDispatcher for Redis URL: {}", redis_url))?; println!("RhaiDispatcher initialized."); let script = "let a = 10; let b = 32; let message = \"Hello from example script!\"; message + \" Result: \" + (a + b)"; println!("\nSending script:\n```rhai\n{}\n```", script); let timeout = Duration::from_secs(30); match client .submit_script_and_await_result(&worker_name, script.to_string(), None, timeout) .await { Ok(task_details) => { println!("\nWorker response:"); if let Some(ref output) = task_details.output { println!("Output: {}", output); } if let Some(ref error_msg) = task_details.error { eprintln!("Error: {}", error_msg); } if task_details.output.is_none() && task_details.error.is_none() { println!( "Worker finished with no explicit output or error. Status: {}", task_details.status ); } } Err(e) => match e { RhaiDispatcherError::Timeout(task_id) => { eprintln!( "\nError: Script execution timed out for task_id: {}.", task_id ); } RhaiDispatcherError::RedisError(redis_err) => { eprintln!( "\nError: Redis communication failed: {}. Check Redis connection and server status.", redis_err ); } RhaiDispatcherError::SerializationError(serde_err) => { eprintln!( "\nError: Failed to serialize/deserialize task data: {}.", serde_err ); } RhaiDispatcherError::TaskNotFound(task_id) => { eprintln!("\nError: Task {} not found after submission.", task_id); } /* All RhaiDispatcherError variants are handled, so _ arm is not strictly needed unless RhaiDispatcherError becomes non-exhaustive in the future. */ }, } println!("\nExample client operations finished. Shutting down worker..."); // Send shutdown signal to the worker if let Err(e) = shutdown_tx.send(()).await { eprintln!( "[Main] Failed to send shutdown signal to worker: {} (worker might have already exited or an error occurred)", e ); } // Wait for the worker to finish log::info!("[Main] Waiting for worker task to join..."); if let Err(e) = worker_join_handle.await { eprintln!("[Main] Error waiting for worker task to join: {:?}", e); } else { log::info!("[Main] Worker task joined successfully."); } // Clean up the database directory log::info!( "[Main] Cleaning up database directory: {}", db_path.display() ); if db_path.exists() { if let Err(e) = std::fs::remove_dir_all(&db_path) { eprintln!( "[Main] Failed to remove database directory '{}': {}", db_path.display(), e ); } else { log::info!( "[Main] Successfully removed database directory: {}", db_path.display() ); } } else { log::info!( "[Main] Database directory '{}' not found, no cleanup needed.", db_path.display() ); } println!("Example fully completed and cleaned up."); Ok(()) }