implement osis actor
This commit is contained in:
174
examples/actor.rs
Normal file
174
examples/actor.rs
Normal file
@@ -0,0 +1,174 @@
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use tokio;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::{sleep, timeout};
|
||||
use redis::AsyncCommands;
|
||||
|
||||
use actor_osis::spawn_osis_actor;
|
||||
use hero_job::{Job, JobStatus, ScriptType};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Initialize logging
|
||||
env_logger::init();
|
||||
|
||||
println!("=== OSIS Actor Redis Dispatch Example ===");
|
||||
|
||||
// Find all Rhai scripts in examples/scripts directory
|
||||
let scripts_dir = Path::new("examples/scripts");
|
||||
if !scripts_dir.exists() {
|
||||
eprintln!("Scripts directory not found: {}", scripts_dir.display());
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut script_files = Vec::new();
|
||||
for entry in fs::read_dir(scripts_dir)? {
|
||||
let entry = entry?;
|
||||
let path = entry.path();
|
||||
if path.extension().and_then(|s| s.to_str()) == Some("rhai") {
|
||||
script_files.push(path);
|
||||
}
|
||||
}
|
||||
|
||||
script_files.sort();
|
||||
println!("Found {} Rhai scripts in {}", script_files.len(), scripts_dir.display());
|
||||
|
||||
if script_files.is_empty() {
|
||||
println!("No Rhai scripts found. Exiting.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Create temporary database path
|
||||
let db_path = "temp_osis_actor_example_db";
|
||||
|
||||
// Clean up previous database if it exists
|
||||
if Path::new(db_path).exists() {
|
||||
fs::remove_dir_all(db_path)?;
|
||||
}
|
||||
|
||||
// Redis configuration
|
||||
let redis_url = "redis://127.0.0.1:6379";
|
||||
|
||||
// Try to connect to Redis
|
||||
let redis_client = redis::Client::open(redis_url)?;
|
||||
let mut redis_conn = match redis_client.get_multiplexed_async_connection().await {
|
||||
Ok(conn) => {
|
||||
println!("Connected to Redis at {}", redis_url);
|
||||
conn
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Failed to connect to Redis: {}", e);
|
||||
println!("Please ensure Redis is running on localhost:6379");
|
||||
println!("You can start Redis with: redis-server");
|
||||
return Err(e.into());
|
||||
}
|
||||
};
|
||||
|
||||
// Create shutdown channel for the actor
|
||||
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
|
||||
|
||||
// Spawn the OSIS actor
|
||||
println!("\n--- Spawning OSIS Actor ---");
|
||||
let actor_handle = spawn_osis_actor(
|
||||
db_path.to_string(),
|
||||
redis_url.to_string(),
|
||||
shutdown_rx,
|
||||
);
|
||||
|
||||
println!("OSIS actor spawned and listening for jobs");
|
||||
|
||||
// Process each script
|
||||
for (i, script_path) in script_files.iter().enumerate() {
|
||||
println!("\n=== Processing Script {}/{}: {} ===", i + 1, script_files.len(), script_path.file_name().unwrap().to_string_lossy());
|
||||
|
||||
let script_content = match fs::read_to_string(script_path) {
|
||||
Ok(content) => content,
|
||||
Err(e) => {
|
||||
println!("Failed to read script {}: {}", script_path.display(), e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Create a job for this script
|
||||
let job = Job::new(
|
||||
"example_caller".to_string(),
|
||||
format!("script_{}", script_path.file_stem().unwrap().to_string_lossy()),
|
||||
script_content,
|
||||
ScriptType::OSIS,
|
||||
);
|
||||
|
||||
println!("Created job with ID: {}", job.id);
|
||||
|
||||
// Store the job in Redis
|
||||
let job_key = format!("job:{}", job.id);
|
||||
job.store_in_redis(&mut redis_conn).await?;
|
||||
|
||||
// Set initial status to Dispatched (since store_in_redis sets it to "pending" which isn't in the enum)
|
||||
Job::update_status(&mut redis_conn, &job.id, JobStatus::Dispatched).await?;
|
||||
println!("Stored job in Redis with key: {} and status: Dispatched", job_key);
|
||||
|
||||
// Add the job to the OSIS queue for processing
|
||||
// Note: The supervisor uses "actor_queue:" prefix, so the correct queue is:
|
||||
let queue_key = "hero:job:actor_queue:osis";
|
||||
let _: () = redis_conn.lpush(&queue_key, &job.id).await?;
|
||||
println!("Dispatched job {} to OSIS queue: {}", job.id, queue_key);
|
||||
|
||||
println!("\n--- Waiting for Job Result ---");
|
||||
|
||||
// Wait for result or error from Redis queues with timeout
|
||||
let result_key = format!("hero:job:{}:result", job.id);
|
||||
let error_key = format!("hero:job:{}:error", job.id);
|
||||
let timeout_secs = 10.0;
|
||||
|
||||
// Use BLPOP to block and wait for either result or error
|
||||
let keys = vec![result_key.clone(), error_key.clone()];
|
||||
match redis_conn.blpop::<_, Option<(String, String)>>(&keys, timeout_secs).await {
|
||||
Ok(Some((queue_name, value))) => {
|
||||
if queue_name == result_key {
|
||||
println!("✓ Job completed successfully!");
|
||||
println!(" Result: {}", value);
|
||||
} else if queue_name == error_key {
|
||||
println!("✗ Job failed with error!");
|
||||
println!(" Error: {}", value);
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
println!("⏱ Job timed out after {} seconds", timeout_secs);
|
||||
}
|
||||
Err(e) => {
|
||||
println!("❌ Failed to wait for job result: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown the actor
|
||||
println!("\n--- Shutting Down Actor ---");
|
||||
if let Err(e) = shutdown_tx.send(()).await {
|
||||
println!("Failed to send shutdown signal: {}", e);
|
||||
}
|
||||
|
||||
// Wait for actor to shutdown with timeout
|
||||
match timeout(Duration::from_secs(5), actor_handle).await {
|
||||
Ok(result) => {
|
||||
match result {
|
||||
Ok(Ok(())) => println!("OSIS actor shut down successfully"),
|
||||
Ok(Err(e)) => println!("OSIS actor shut down with error: {}", e),
|
||||
Err(e) => println!("OSIS actor panicked: {}", e),
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
println!("OSIS actor shutdown timed out");
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up the temporary database
|
||||
if Path::new(db_path).exists() {
|
||||
fs::remove_dir_all(db_path)?;
|
||||
println!("Cleaned up temporary database: {}", db_path);
|
||||
}
|
||||
|
||||
println!("=== Actor Example Complete ===");
|
||||
Ok(())
|
||||
}
|
Reference in New Issue
Block a user