176 lines
6.1 KiB
Rust
176 lines
6.1 KiB
Rust
use std::fs;
|
|
use std::path::Path;
|
|
use std::time::Duration;
|
|
use tokio;
|
|
use tokio::sync::mpsc;
|
|
use tokio::time::{sleep, timeout};
|
|
use redis::AsyncCommands;
|
|
|
|
use actor_osis::spawn_osis_actor;
|
|
use hero_job::{Job, JobStatus, ScriptType};
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
// Initialize logging
|
|
// env_logger::init();
|
|
hero_logger::init_system_logger("logs", &["osis_actor".to_string()]).unwrap();
|
|
|
|
println!("=== OSIS Actor Redis Dispatch Example ===");
|
|
|
|
// Find all Rhai scripts in examples/scripts directory
|
|
let scripts_dir = Path::new("examples/scripts");
|
|
if !scripts_dir.exists() {
|
|
eprintln!("Scripts directory not found: {}", scripts_dir.display());
|
|
return Ok(());
|
|
}
|
|
|
|
let mut script_files = Vec::new();
|
|
for entry in fs::read_dir(scripts_dir)? {
|
|
let entry = entry?;
|
|
let path = entry.path();
|
|
if path.extension().and_then(|s| s.to_str()) == Some("rhai") {
|
|
script_files.push(path);
|
|
}
|
|
}
|
|
|
|
script_files.sort();
|
|
println!("Found {} Rhai scripts in {}", script_files.len(), scripts_dir.display());
|
|
|
|
if script_files.is_empty() {
|
|
println!("No Rhai scripts found. Exiting.");
|
|
return Ok(());
|
|
}
|
|
|
|
// Create temporary database path
|
|
let db_path = "temp_osis_actor_example_db";
|
|
|
|
// Clean up previous database if it exists
|
|
if Path::new(db_path).exists() {
|
|
fs::remove_dir_all(db_path)?;
|
|
}
|
|
|
|
// Redis configuration
|
|
let redis_url = "redis://127.0.0.1:6379";
|
|
|
|
// Try to connect to Redis
|
|
let redis_client = redis::Client::open(redis_url)?;
|
|
let mut redis_conn = match redis_client.get_multiplexed_async_connection().await {
|
|
Ok(conn) => {
|
|
println!("Connected to Redis at {}", redis_url);
|
|
conn
|
|
}
|
|
Err(e) => {
|
|
println!("Failed to connect to Redis: {}", e);
|
|
println!("Please ensure Redis is running on localhost:6379");
|
|
println!("You can start Redis with: redis-server");
|
|
return Err(e.into());
|
|
}
|
|
};
|
|
|
|
// Create shutdown channel for the actor
|
|
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
|
|
|
|
// Spawn the OSIS actor
|
|
println!("\n--- Spawning OSIS Actor ---");
|
|
let actor_handle = spawn_osis_actor(
|
|
db_path.to_string(),
|
|
redis_url.to_string(),
|
|
shutdown_rx,
|
|
);
|
|
|
|
println!("OSIS actor spawned and listening for jobs");
|
|
|
|
// Process each script
|
|
for (i, script_path) in script_files.iter().enumerate() {
|
|
println!("\n=== Processing Script {}/{}: {} ===", i + 1, script_files.len(), script_path.file_name().unwrap().to_string_lossy());
|
|
|
|
let script_content = match fs::read_to_string(script_path) {
|
|
Ok(content) => content,
|
|
Err(e) => {
|
|
println!("Failed to read script {}: {}", script_path.display(), e);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
// Create a job for this script
|
|
let job = Job::new(
|
|
"example_caller".to_string(),
|
|
format!("script_{}", script_path.file_stem().unwrap().to_string_lossy()),
|
|
script_content,
|
|
ScriptType::OSIS,
|
|
);
|
|
|
|
println!("Created job with ID: {}", job.id);
|
|
|
|
// Store the job in Redis
|
|
let job_key = format!("job:{}", job.id);
|
|
job.store_in_redis(&mut redis_conn).await?;
|
|
|
|
// Set initial status to Dispatched (since store_in_redis sets it to "pending" which isn't in the enum)
|
|
Job::update_status(&mut redis_conn, &job.id, JobStatus::Dispatched).await?;
|
|
println!("Stored job in Redis with key: {} and status: Dispatched", job_key);
|
|
|
|
// Add the job to the OSIS queue for processing
|
|
// Note: The supervisor uses "actor_queue:" prefix, so the correct queue is:
|
|
let queue_key = "hero:job:actor_queue:osis";
|
|
let _: () = redis_conn.lpush(&queue_key, &job.id).await?;
|
|
println!("Dispatched job {} to OSIS queue: {}", job.id, queue_key);
|
|
|
|
println!("\n--- Waiting for Job Result ---");
|
|
|
|
// Wait for result or error from Redis queues with timeout
|
|
let result_key = format!("hero:job:{}:result", job.id);
|
|
let error_key = format!("hero:job:{}:error", job.id);
|
|
let timeout_secs = 10.0;
|
|
|
|
// Use BLPOP to block and wait for either result or error
|
|
let keys = vec![result_key.clone(), error_key.clone()];
|
|
match redis_conn.blpop::<_, Option<(String, String)>>(&keys, timeout_secs).await {
|
|
Ok(Some((queue_name, value))) => {
|
|
if queue_name == result_key {
|
|
println!("✓ Job completed successfully!");
|
|
println!(" Result: {}", value);
|
|
} else if queue_name == error_key {
|
|
println!("✗ Job failed with error!");
|
|
println!(" Error: {}", value);
|
|
}
|
|
}
|
|
Ok(None) => {
|
|
println!("⏱ Job timed out after {} seconds", timeout_secs);
|
|
}
|
|
Err(e) => {
|
|
println!("❌ Failed to wait for job result: {}", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Shutdown the actor
|
|
println!("\n--- Shutting Down Actor ---");
|
|
if let Err(e) = shutdown_tx.send(()).await {
|
|
println!("Failed to send shutdown signal: {}", e);
|
|
}
|
|
|
|
// Wait for actor to shutdown with timeout
|
|
match timeout(Duration::from_secs(5), actor_handle).await {
|
|
Ok(result) => {
|
|
match result {
|
|
Ok(Ok(())) => println!("OSIS actor shut down successfully"),
|
|
Ok(Err(e)) => println!("OSIS actor shut down with error: {}", e),
|
|
Err(e) => println!("OSIS actor panicked: {}", e),
|
|
}
|
|
}
|
|
Err(_) => {
|
|
println!("OSIS actor shutdown timed out");
|
|
}
|
|
}
|
|
|
|
// Clean up the temporary database
|
|
if Path::new(db_path).exists() {
|
|
fs::remove_dir_all(db_path)?;
|
|
println!("Cleaned up temporary database: {}", db_path);
|
|
}
|
|
|
|
println!("=== Actor Example Complete ===");
|
|
Ok(())
|
|
}
|