use std::fs; use std::path::Path; use std::time::Duration; use tokio; use tokio::sync::mpsc; use tokio::time::{sleep, timeout}; use redis::AsyncCommands; use actor_osis::spawn_osis_actor; use hero_job::{Job, JobStatus, ScriptType}; #[tokio::main] async fn main() -> Result<(), Box> { // Initialize logging // env_logger::init(); hero_logger::init_system_logger("logs", &["osis_actor".to_string()]).unwrap(); println!("=== OSIS Actor Redis Dispatch Example ==="); // Find all Rhai scripts in examples/scripts directory let scripts_dir = Path::new("examples/scripts"); if !scripts_dir.exists() { eprintln!("Scripts directory not found: {}", scripts_dir.display()); return Ok(()); } let mut script_files = Vec::new(); for entry in fs::read_dir(scripts_dir)? { let entry = entry?; let path = entry.path(); if path.extension().and_then(|s| s.to_str()) == Some("rhai") { script_files.push(path); } } script_files.sort(); println!("Found {} Rhai scripts in {}", script_files.len(), scripts_dir.display()); if script_files.is_empty() { println!("No Rhai scripts found. Exiting."); return Ok(()); } // Create temporary database path let db_path = "temp_osis_actor_example_db"; // Clean up previous database if it exists if Path::new(db_path).exists() { fs::remove_dir_all(db_path)?; } // Redis configuration let redis_url = "redis://127.0.0.1:6379"; // Try to connect to Redis let redis_client = redis::Client::open(redis_url)?; let mut redis_conn = match redis_client.get_multiplexed_async_connection().await { Ok(conn) => { println!("Connected to Redis at {}", redis_url); conn } Err(e) => { println!("Failed to connect to Redis: {}", e); println!("Please ensure Redis is running on localhost:6379"); println!("You can start Redis with: redis-server"); return Err(e.into()); } }; // Create shutdown channel for the actor let (shutdown_tx, shutdown_rx) = mpsc::channel(1); // Spawn the OSIS actor println!("\n--- Spawning OSIS Actor ---"); let actor_handle = spawn_osis_actor( db_path.to_string(), redis_url.to_string(), shutdown_rx, ); println!("OSIS actor spawned and listening for jobs"); // Process each script for (i, script_path) in script_files.iter().enumerate() { println!("\n=== Processing Script {}/{}: {} ===", i + 1, script_files.len(), script_path.file_name().unwrap().to_string_lossy()); let script_content = match fs::read_to_string(script_path) { Ok(content) => content, Err(e) => { println!("Failed to read script {}: {}", script_path.display(), e); continue; } }; // Create a job for this script let job = Job::new( "example_caller".to_string(), format!("script_{}", script_path.file_stem().unwrap().to_string_lossy()), script_content, ScriptType::OSIS, ); println!("Created job with ID: {}", job.id); // Store the job in Redis let job_key = format!("job:{}", job.id); job.store_in_redis(&mut redis_conn).await?; // Set initial status to Dispatched (since store_in_redis sets it to "pending" which isn't in the enum) Job::update_status(&mut redis_conn, &job.id, JobStatus::Dispatched).await?; println!("Stored job in Redis with key: {} and status: Dispatched", job_key); // Add the job to the OSIS queue for processing // Note: The supervisor uses "actor_queue:" prefix, so the correct queue is: let queue_key = "hero:job:actor_queue:osis"; let _: () = redis_conn.lpush(&queue_key, &job.id).await?; println!("Dispatched job {} to OSIS queue: {}", job.id, queue_key); println!("\n--- Waiting for Job Result ---"); // Wait for result or error from Redis queues with timeout let result_key = format!("hero:job:{}:result", job.id); let error_key = format!("hero:job:{}:error", job.id); let timeout_secs = 10.0; // Use BLPOP to block and wait for either result or error let keys = vec![result_key.clone(), error_key.clone()]; match redis_conn.blpop::<_, Option<(String, String)>>(&keys, timeout_secs).await { Ok(Some((queue_name, value))) => { if queue_name == result_key { println!("✓ Job completed successfully!"); println!(" Result: {}", value); } else if queue_name == error_key { println!("✗ Job failed with error!"); println!(" Error: {}", value); } } Ok(None) => { println!("⏱ Job timed out after {} seconds", timeout_secs); } Err(e) => { println!("❌ Failed to wait for job result: {}", e); } } } // Shutdown the actor println!("\n--- Shutting Down Actor ---"); if let Err(e) = shutdown_tx.send(()).await { println!("Failed to send shutdown signal: {}", e); } // Wait for actor to shutdown with timeout match timeout(Duration::from_secs(5), actor_handle).await { Ok(result) => { match result { Ok(Ok(())) => println!("OSIS actor shut down successfully"), Ok(Err(e)) => println!("OSIS actor shut down with error: {}", e), Err(e) => println!("OSIS actor panicked: {}", e), } } Err(_) => { println!("OSIS actor shutdown timed out"); } } // Clean up the temporary database if Path::new(db_path).exists() { fs::remove_dir_all(db_path)?; println!("Cleaned up temporary database: {}", db_path); } println!("=== Actor Example Complete ==="); Ok(()) }