actor_osis/examples/actor.rs
2025-08-06 20:56:38 +02:00

176 lines
6.1 KiB
Rust

use std::fs;
use std::path::Path;
use std::time::Duration;
use tokio;
use tokio::sync::mpsc;
use tokio::time::{sleep, timeout};
use redis::AsyncCommands;
use actor_osis::spawn_osis_actor;
use hero_job::{Job, JobStatus, ScriptType};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
// env_logger::init();
hero_logger::init_system_logger("logs", &["osis_actor".to_string()]).unwrap();
println!("=== OSIS Actor Redis Dispatch Example ===");
// Find all Rhai scripts in examples/scripts directory
let scripts_dir = Path::new("examples/scripts");
if !scripts_dir.exists() {
eprintln!("Scripts directory not found: {}", scripts_dir.display());
return Ok(());
}
let mut script_files = Vec::new();
for entry in fs::read_dir(scripts_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("rhai") {
script_files.push(path);
}
}
script_files.sort();
println!("Found {} Rhai scripts in {}", script_files.len(), scripts_dir.display());
if script_files.is_empty() {
println!("No Rhai scripts found. Exiting.");
return Ok(());
}
// Create temporary database path
let db_path = "temp_osis_actor_example_db";
// Clean up previous database if it exists
if Path::new(db_path).exists() {
fs::remove_dir_all(db_path)?;
}
// Redis configuration
let redis_url = "redis://127.0.0.1:6379";
// Try to connect to Redis
let redis_client = redis::Client::open(redis_url)?;
let mut redis_conn = match redis_client.get_multiplexed_async_connection().await {
Ok(conn) => {
println!("Connected to Redis at {}", redis_url);
conn
}
Err(e) => {
println!("Failed to connect to Redis: {}", e);
println!("Please ensure Redis is running on localhost:6379");
println!("You can start Redis with: redis-server");
return Err(e.into());
}
};
// Create shutdown channel for the actor
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
// Spawn the OSIS actor
println!("\n--- Spawning OSIS Actor ---");
let actor_handle = spawn_osis_actor(
db_path.to_string(),
redis_url.to_string(),
shutdown_rx,
);
println!("OSIS actor spawned and listening for jobs");
// Process each script
for (i, script_path) in script_files.iter().enumerate() {
println!("\n=== Processing Script {}/{}: {} ===", i + 1, script_files.len(), script_path.file_name().unwrap().to_string_lossy());
let script_content = match fs::read_to_string(script_path) {
Ok(content) => content,
Err(e) => {
println!("Failed to read script {}: {}", script_path.display(), e);
continue;
}
};
// Create a job for this script
let job = Job::new(
"example_caller".to_string(),
format!("script_{}", script_path.file_stem().unwrap().to_string_lossy()),
script_content,
ScriptType::OSIS,
);
println!("Created job with ID: {}", job.id);
// Store the job in Redis
let job_key = format!("job:{}", job.id);
job.store_in_redis(&mut redis_conn).await?;
// Set initial status to Dispatched (since store_in_redis sets it to "pending" which isn't in the enum)
Job::update_status(&mut redis_conn, &job.id, JobStatus::Dispatched).await?;
println!("Stored job in Redis with key: {} and status: Dispatched", job_key);
// Add the job to the OSIS queue for processing
// Note: The supervisor uses "actor_queue:" prefix, so the correct queue is:
let queue_key = "hero:job:actor_queue:osis";
let _: () = redis_conn.lpush(&queue_key, &job.id).await?;
println!("Dispatched job {} to OSIS queue: {}", job.id, queue_key);
println!("\n--- Waiting for Job Result ---");
// Wait for result or error from Redis queues with timeout
let result_key = format!("hero:job:{}:result", job.id);
let error_key = format!("hero:job:{}:error", job.id);
let timeout_secs = 10.0;
// Use BLPOP to block and wait for either result or error
let keys = vec![result_key.clone(), error_key.clone()];
match redis_conn.blpop::<_, Option<(String, String)>>(&keys, timeout_secs).await {
Ok(Some((queue_name, value))) => {
if queue_name == result_key {
println!("✓ Job completed successfully!");
println!(" Result: {}", value);
} else if queue_name == error_key {
println!("✗ Job failed with error!");
println!(" Error: {}", value);
}
}
Ok(None) => {
println!("⏱ Job timed out after {} seconds", timeout_secs);
}
Err(e) => {
println!("❌ Failed to wait for job result: {}", e);
}
}
}
// Shutdown the actor
println!("\n--- Shutting Down Actor ---");
if let Err(e) = shutdown_tx.send(()).await {
println!("Failed to send shutdown signal: {}", e);
}
// Wait for actor to shutdown with timeout
match timeout(Duration::from_secs(5), actor_handle).await {
Ok(result) => {
match result {
Ok(Ok(())) => println!("OSIS actor shut down successfully"),
Ok(Err(e)) => println!("OSIS actor shut down with error: {}", e),
Err(e) => println!("OSIS actor panicked: {}", e),
}
}
Err(_) => {
println!("OSIS actor shutdown timed out");
}
}
// Clean up the temporary database
if Path::new(db_path).exists() {
fs::remove_dir_all(db_path)?;
println!("Cleaned up temporary database: {}", db_path);
}
println!("=== Actor Example Complete ===");
Ok(())
}