rename worker to actor
This commit is contained in:
		
							
								
								
									
										331
									
								
								core/actor/src/actor_trait.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										331
									
								
								core/actor/src/actor_trait.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,331 @@
 | 
			
		||||
//! # Actor Trait Abstraction
 | 
			
		||||
//!
 | 
			
		||||
//! This module provides a trait-based abstraction for Rhai actors that eliminates
 | 
			
		||||
//! code duplication between synchronous and asynchronous actor implementations.
 | 
			
		||||
//! 
 | 
			
		||||
//! The `Actor` trait defines the common interface and behavior, while specific
 | 
			
		||||
//! implementations handle job processing differently (sync vs async).
 | 
			
		||||
//!
 | 
			
		||||
//! ## Architecture
 | 
			
		||||
//!
 | 
			
		||||
//! ```text
 | 
			
		||||
//! ┌─────────────────┐    ┌─────────────────┐
 | 
			
		||||
//! │   SyncActor    │    │  AsyncActor    │
 | 
			
		||||
//! │                 │    │                 │
 | 
			
		||||
//! │ process_job()   │    │ process_job()   │
 | 
			
		||||
//! │ (sequential)    │    │ (concurrent)    │
 | 
			
		||||
//! └─────────────────┘    └─────────────────┘
 | 
			
		||||
//!          │                       │
 | 
			
		||||
//!          └───────┬───────────────┘
 | 
			
		||||
//!                  │
 | 
			
		||||
//!          ┌───────▼───────┐
 | 
			
		||||
//!          │ Actor Trait  │
 | 
			
		||||
//!          │               │
 | 
			
		||||
//!          │ spawn()       │
 | 
			
		||||
//!          │ config        │
 | 
			
		||||
//!          │ common loop   │
 | 
			
		||||
//!          └───────────────┘
 | 
			
		||||
//! ```
 | 
			
		||||
 | 
			
		||||
use hero_job::Job;
 | 
			
		||||
use log::{debug, error, info};
 | 
			
		||||
use redis::AsyncCommands;
 | 
			
		||||
use rhai::Engine;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
use tokio::sync::mpsc;
 | 
			
		||||
use tokio::task::JoinHandle;
 | 
			
		||||
 | 
			
		||||
use crate::{initialize_redis_connection, NAMESPACE_PREFIX, BLPOP_TIMEOUT_SECONDS};
 | 
			
		||||
 | 
			
		||||
/// Configuration for actor instances
 | 
			
		||||
#[derive(Debug, Clone)]
 | 
			
		||||
pub struct ActorConfig {
 | 
			
		||||
    pub actor_id: String,
 | 
			
		||||
    pub db_path: String,
 | 
			
		||||
    pub redis_url: String,
 | 
			
		||||
    pub preserve_tasks: bool,
 | 
			
		||||
    pub default_timeout: Option<Duration>, // Only used by async actors
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ActorConfig {
 | 
			
		||||
    /// Create a new actor configuration
 | 
			
		||||
    pub fn new(
 | 
			
		||||
        actor_id: String,
 | 
			
		||||
        db_path: String,
 | 
			
		||||
        redis_url: String,
 | 
			
		||||
        preserve_tasks: bool,
 | 
			
		||||
    ) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            actor_id,
 | 
			
		||||
            db_path,
 | 
			
		||||
            redis_url,
 | 
			
		||||
            preserve_tasks,
 | 
			
		||||
            default_timeout: None,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Set default timeout for async actors
 | 
			
		||||
    pub fn with_default_timeout(mut self, timeout: Duration) -> Self {
 | 
			
		||||
        self.default_timeout = Some(timeout);
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Trait defining the common interface for Rhai actors
 | 
			
		||||
/// 
 | 
			
		||||
/// This trait abstracts the common functionality between synchronous and
 | 
			
		||||
/// asynchronous actors, allowing them to share the same spawn logic and
 | 
			
		||||
/// Redis polling loop while implementing different job processing strategies.
 | 
			
		||||
#[async_trait::async_trait]
 | 
			
		||||
pub trait Actor: Send + Sync + 'static {
 | 
			
		||||
    /// Process a single job
 | 
			
		||||
    /// 
 | 
			
		||||
    /// This is the core method that differentiates actor implementations:
 | 
			
		||||
    /// - Sync actors process jobs sequentially, one at a time
 | 
			
		||||
    /// - Async actors spawn concurrent tasks for each job
 | 
			
		||||
    /// 
 | 
			
		||||
    /// # Arguments
 | 
			
		||||
    /// 
 | 
			
		||||
    /// * `job` - The job to process
 | 
			
		||||
    /// * `redis_conn` - Redis connection for status updates
 | 
			
		||||
    /// 
 | 
			
		||||
    /// Note: The engine is now owned by the actor implementation as a field
 | 
			
		||||
    async fn process_job(
 | 
			
		||||
        &self,
 | 
			
		||||
        job: Job,
 | 
			
		||||
        redis_conn: &mut redis::aio::MultiplexedConnection,
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    /// Get the actor type name for logging
 | 
			
		||||
    fn actor_type(&self) -> &'static str;
 | 
			
		||||
    
 | 
			
		||||
    /// Get actor ID for this actor instance
 | 
			
		||||
    fn actor_id(&self) -> &str;
 | 
			
		||||
    
 | 
			
		||||
    /// Get Redis URL for this actor instance
 | 
			
		||||
    fn redis_url(&self) -> &str;
 | 
			
		||||
 | 
			
		||||
    /// Spawn the actor
 | 
			
		||||
    /// 
 | 
			
		||||
    /// This method provides the common actor loop implementation that both
 | 
			
		||||
    /// sync and async actors can use. It handles:
 | 
			
		||||
    /// - Redis connection setup
 | 
			
		||||
    /// - Job polling from Redis queue
 | 
			
		||||
    /// - Shutdown signal handling
 | 
			
		||||
    /// - Delegating job processing to the implementation
 | 
			
		||||
    /// 
 | 
			
		||||
    /// Note: The engine is now owned by the actor implementation as a field
 | 
			
		||||
    fn spawn(
 | 
			
		||||
        self: Arc<Self>,
 | 
			
		||||
        mut shutdown_rx: mpsc::Receiver<()>,
 | 
			
		||||
    ) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
 | 
			
		||||
        tokio::spawn(async move {
 | 
			
		||||
            let actor_id = self.actor_id();
 | 
			
		||||
            let redis_url = self.redis_url();
 | 
			
		||||
            let queue_key = format!("{}{}", NAMESPACE_PREFIX, actor_id);
 | 
			
		||||
            info!(
 | 
			
		||||
                "{} Actor '{}' starting. Connecting to Redis at {}. Listening on queue: {}",
 | 
			
		||||
                self.actor_type(),
 | 
			
		||||
                actor_id,
 | 
			
		||||
                redis_url,
 | 
			
		||||
                queue_key
 | 
			
		||||
            );
 | 
			
		||||
 | 
			
		||||
            let mut redis_conn = initialize_redis_connection(actor_id, redis_url).await?;
 | 
			
		||||
 | 
			
		||||
            loop {
 | 
			
		||||
                let blpop_keys = vec![queue_key.clone()];
 | 
			
		||||
                tokio::select! {
 | 
			
		||||
                    // Listen for shutdown signal
 | 
			
		||||
                    _ = shutdown_rx.recv() => {
 | 
			
		||||
                        info!("{} Actor '{}': Shutdown signal received. Terminating loop.", 
 | 
			
		||||
                              self.actor_type(), actor_id);
 | 
			
		||||
                        break;
 | 
			
		||||
                    }
 | 
			
		||||
                    // Listen for tasks from Redis
 | 
			
		||||
                    blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => {
 | 
			
		||||
                        debug!("{} Actor '{}': Attempting BLPOP on queue: {}", 
 | 
			
		||||
                               self.actor_type(), actor_id, queue_key);
 | 
			
		||||
                        
 | 
			
		||||
                        let response: Option<(String, String)> = match blpop_result {
 | 
			
		||||
                            Ok(resp) => resp,
 | 
			
		||||
                            Err(e) => {
 | 
			
		||||
                                error!("{} Actor '{}': Redis BLPOP error on queue {}: {}. Actor for this circle might stop.", 
 | 
			
		||||
                                       self.actor_type(), actor_id, queue_key, e);
 | 
			
		||||
                                return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
 | 
			
		||||
                            }
 | 
			
		||||
                        };
 | 
			
		||||
 | 
			
		||||
                        if let Some((_queue_name_recv, job_id)) = response {
 | 
			
		||||
                            info!("{} Actor '{}' received job_id: {} from queue: {}", 
 | 
			
		||||
                                  self.actor_type(), actor_id, job_id, _queue_name_recv);
 | 
			
		||||
                            
 | 
			
		||||
                            // Load the job from Redis
 | 
			
		||||
                            match crate::load_job_from_redis(&mut redis_conn, &job_id, actor_id).await {
 | 
			
		||||
                                Ok(job) => {
 | 
			
		||||
                                    // Check for ping job and handle it directly
 | 
			
		||||
                                    if job.script.trim() == "ping" {
 | 
			
		||||
                                        info!("{} Actor '{}': Received ping job '{}', responding with pong", 
 | 
			
		||||
                                              self.actor_type(), actor_id, job_id);
 | 
			
		||||
                                        
 | 
			
		||||
                                        // Update job status to started
 | 
			
		||||
                                        if let Err(e) = hero_job::Job::update_status(&mut redis_conn, &job_id, hero_job::JobStatus::Started).await {
 | 
			
		||||
                                            error!("{} Actor '{}': Failed to update ping job '{}' status to Started: {}", 
 | 
			
		||||
                                                   self.actor_type(), actor_id, job_id, e);
 | 
			
		||||
                                        }
 | 
			
		||||
                                        
 | 
			
		||||
                                        // Set result to "pong" and mark as finished
 | 
			
		||||
                                        if let Err(e) = hero_job::Job::set_result(&mut redis_conn, &job_id, "pong").await {
 | 
			
		||||
                                            error!("{} Actor '{}': Failed to set ping job '{}' result: {}", 
 | 
			
		||||
                                                   self.actor_type(), actor_id, job_id, e);
 | 
			
		||||
                                        }
 | 
			
		||||
                                        
 | 
			
		||||
                                        info!("{} Actor '{}': Successfully responded to ping job '{}' with pong", 
 | 
			
		||||
                                              self.actor_type(), actor_id, job_id);
 | 
			
		||||
                                    } else {
 | 
			
		||||
                                        // Delegate job processing to the implementation
 | 
			
		||||
                                        // The engine is now owned by the actor implementation
 | 
			
		||||
                                        self.process_job(job, &mut redis_conn).await;
 | 
			
		||||
                                    }
 | 
			
		||||
                                }
 | 
			
		||||
                                Err(e) => {
 | 
			
		||||
                                    error!("{} Actor '{}': Failed to load job '{}': {}", 
 | 
			
		||||
                                           self.actor_type(), actor_id, job_id, e);
 | 
			
		||||
                                }
 | 
			
		||||
                            }
 | 
			
		||||
                        } else {
 | 
			
		||||
                            debug!("{} Actor '{}': BLPOP timed out on queue {}. No new tasks.", 
 | 
			
		||||
                                   self.actor_type(), actor_id, queue_key);
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            info!("{} Actor '{}' has shut down.", self.actor_type(), actor_id);
 | 
			
		||||
            Ok(())
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Convenience function to spawn a actor with the trait-based interface
 | 
			
		||||
/// 
 | 
			
		||||
/// This function provides a unified interface for spawning any actor implementation
 | 
			
		||||
/// that implements the Actor trait.
 | 
			
		||||
/// 
 | 
			
		||||
/// # Arguments
 | 
			
		||||
/// 
 | 
			
		||||
/// * `actor` - The actor implementation to spawn
 | 
			
		||||
/// * `config` - Actor configuration
 | 
			
		||||
/// * `engine` - Rhai engine for script execution
 | 
			
		||||
/// * `shutdown_rx` - Channel receiver for shutdown signals
 | 
			
		||||
/// 
 | 
			
		||||
/// # Returns
 | 
			
		||||
/// 
 | 
			
		||||
/// Returns a `JoinHandle` that can be awaited to wait for actor shutdown.
 | 
			
		||||
/// 
 | 
			
		||||
/// # Example
 | 
			
		||||
/// 
 | 
			
		||||
/// ```rust
 | 
			
		||||
/// use std::sync::Arc;
 | 
			
		||||
/// use std::time::Duration;
 | 
			
		||||
/// 
 | 
			
		||||
/// let config = ActorConfig::new(
 | 
			
		||||
///     "actor_1".to_string(),
 | 
			
		||||
///     "/path/to/db".to_string(),
 | 
			
		||||
///     "redis://localhost:6379".to_string(),
 | 
			
		||||
///     false,
 | 
			
		||||
/// );
 | 
			
		||||
/// 
 | 
			
		||||
/// let actor = Arc::new(SyncActor::new());
 | 
			
		||||
/// let engine = create_heromodels_engine();
 | 
			
		||||
/// let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
 | 
			
		||||
/// 
 | 
			
		||||
/// let handle = spawn_actor(actor, config, engine, shutdown_rx);
 | 
			
		||||
/// 
 | 
			
		||||
/// // Later, shutdown the actor
 | 
			
		||||
/// shutdown_tx.send(()).await.unwrap();
 | 
			
		||||
/// handle.await.unwrap().unwrap();
 | 
			
		||||
/// ```
 | 
			
		||||
pub fn spawn_actor<W: Actor>(
 | 
			
		||||
    actor: Arc<W>,
 | 
			
		||||
    shutdown_rx: mpsc::Receiver<()>,
 | 
			
		||||
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
 | 
			
		||||
    actor.spawn(shutdown_rx)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod tests {
 | 
			
		||||
    use super::*;
 | 
			
		||||
    use crate::engine::create_heromodels_engine;
 | 
			
		||||
 | 
			
		||||
    // Mock actor for testing
 | 
			
		||||
    struct MockActor;
 | 
			
		||||
 | 
			
		||||
    #[async_trait::async_trait]
 | 
			
		||||
    impl Actor for MockActor {
 | 
			
		||||
        async fn process_job(
 | 
			
		||||
            &self,
 | 
			
		||||
            _job: Job,
 | 
			
		||||
            _redis_conn: &mut redis::aio::MultiplexedConnection,
 | 
			
		||||
        ) {
 | 
			
		||||
            // Mock implementation - do nothing
 | 
			
		||||
            // Engine would be owned by the actor implementation as a field
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        fn actor_type(&self) -> &'static str {
 | 
			
		||||
            "Mock"
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        fn actor_id(&self) -> &str {
 | 
			
		||||
            "mock_actor"
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        fn redis_url(&self) -> &str {
 | 
			
		||||
            "redis://localhost:6379"
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[tokio::test]
 | 
			
		||||
    async fn test_actor_config_creation() {
 | 
			
		||||
        let config = ActorConfig::new(
 | 
			
		||||
            "test_actor".to_string(),
 | 
			
		||||
            "/tmp".to_string(),
 | 
			
		||||
            "redis://localhost:6379".to_string(),
 | 
			
		||||
            false,
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        assert_eq!(config.actor_id, "test_actor");
 | 
			
		||||
        assert_eq!(config.db_path, "/tmp");
 | 
			
		||||
        assert_eq!(config.redis_url, "redis://localhost:6379");
 | 
			
		||||
        assert!(!config.preserve_tasks);
 | 
			
		||||
        assert!(config.default_timeout.is_none());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[tokio::test]
 | 
			
		||||
    async fn test_actor_config_with_timeout() {
 | 
			
		||||
        let timeout = Duration::from_secs(300);
 | 
			
		||||
        let config = ActorConfig::new(
 | 
			
		||||
            "test_actor".to_string(),
 | 
			
		||||
            "/tmp".to_string(),
 | 
			
		||||
            "redis://localhost:6379".to_string(),
 | 
			
		||||
            false,
 | 
			
		||||
        ).with_default_timeout(timeout);
 | 
			
		||||
 | 
			
		||||
        assert_eq!(config.default_timeout, Some(timeout));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[tokio::test]
 | 
			
		||||
    async fn test_spawn_actor_function() {
 | 
			
		||||
        let (_shutdown_tx, shutdown_rx) = mpsc::channel(1);
 | 
			
		||||
        let actor = Arc::new(MockActor);
 | 
			
		||||
 | 
			
		||||
        let handle = spawn_actor(actor, shutdown_rx);
 | 
			
		||||
        
 | 
			
		||||
        // The actor should be created successfully
 | 
			
		||||
        assert!(!handle.is_finished());
 | 
			
		||||
        
 | 
			
		||||
        // Abort the actor for cleanup
 | 
			
		||||
        handle.abort();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										238
									
								
								core/actor/src/lib.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										238
									
								
								core/actor/src/lib.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,238 @@
 | 
			
		||||
use hero_job::{Job, JobStatus};
 | 
			
		||||
use log::{debug, error, info};
 | 
			
		||||
use redis::AsyncCommands;
 | 
			
		||||
use rhai::{Dynamic, Engine};
 | 
			
		||||
use tokio::sync::mpsc; // For shutdown signal
 | 
			
		||||
use tokio::task::JoinHandle;
 | 
			
		||||
 | 
			
		||||
/// Actor trait abstraction for unified actor interface
 | 
			
		||||
pub mod actor_trait;
 | 
			
		||||
 | 
			
		||||
const NAMESPACE_PREFIX: &str = "hero:job:";
 | 
			
		||||
const BLPOP_TIMEOUT_SECONDS: usize = 5;
 | 
			
		||||
 | 
			
		||||
/// Initialize Redis connection for the actor
 | 
			
		||||
pub(crate) async fn initialize_redis_connection(
 | 
			
		||||
    actor_id: &str,
 | 
			
		||||
    redis_url: &str,
 | 
			
		||||
) -> Result<redis::aio::MultiplexedConnection, Box<dyn std::error::Error + Send + Sync>> {
 | 
			
		||||
    let redis_client = redis::Client::open(redis_url)
 | 
			
		||||
        .map_err(|e| {
 | 
			
		||||
            error!("Actor for Actor ID '{}': Failed to open Redis client: {}", actor_id, e);
 | 
			
		||||
            e
 | 
			
		||||
        })?;
 | 
			
		||||
    
 | 
			
		||||
    let redis_conn = redis_client.get_multiplexed_async_connection().await
 | 
			
		||||
        .map_err(|e| {
 | 
			
		||||
            error!("Actor for Actor ID '{}': Failed to get Redis connection: {}", actor_id, e);
 | 
			
		||||
            e
 | 
			
		||||
        })?;
 | 
			
		||||
    
 | 
			
		||||
    info!("Actor for Actor ID '{}' successfully connected to Redis.", actor_id);
 | 
			
		||||
    Ok(redis_conn)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Load job from Redis using Job struct
 | 
			
		||||
pub(crate) async fn load_job_from_redis(
 | 
			
		||||
    redis_conn: &mut redis::aio::MultiplexedConnection,
 | 
			
		||||
    job_id: &str,
 | 
			
		||||
    actor_id: &str,
 | 
			
		||||
) -> Result<Job, Box<dyn std::error::Error + Send + Sync>> {
 | 
			
		||||
    debug!("Actor '{}', Job {}: Loading job from Redis", actor_id, job_id);
 | 
			
		||||
    
 | 
			
		||||
    match Job::load_from_redis(redis_conn, job_id).await {
 | 
			
		||||
        Ok(job) => {
 | 
			
		||||
            debug!("Actor '{}', Job {}: Successfully loaded job", actor_id, job_id);
 | 
			
		||||
            Ok(job)
 | 
			
		||||
        }
 | 
			
		||||
        Err(e) => {
 | 
			
		||||
            error!("Actor '{}', Job {}: Failed to load job from Redis: {}", actor_id, job_id, e);
 | 
			
		||||
            Err(Box::new(e))
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Execute the Rhai script and update job status in Redis
 | 
			
		||||
async fn execute_script_and_update_status(
 | 
			
		||||
    redis_conn: &mut redis::aio::MultiplexedConnection,
 | 
			
		||||
    engine: &mut Engine,
 | 
			
		||||
    job: &Job,
 | 
			
		||||
    db_path: &str,
 | 
			
		||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
 | 
			
		||||
    let mut db_config = rhai::Map::new();
 | 
			
		||||
    db_config.insert("DB_PATH".into(), db_path.to_string().into());
 | 
			
		||||
    db_config.insert("CALLER_ID".into(), job.caller_id.clone().into());
 | 
			
		||||
    db_config.insert("CONTEXT_ID".into(), job.context_id.clone().into());
 | 
			
		||||
    engine.set_default_tag(Dynamic::from(db_config));
 | 
			
		||||
    
 | 
			
		||||
    debug!("Actor for Context ID '{}': Evaluating script with Rhai engine.", job.context_id);
 | 
			
		||||
    
 | 
			
		||||
    match engine.eval::<rhai::Dynamic>(&job.script) {
 | 
			
		||||
        Ok(result) => {
 | 
			
		||||
            let output_str = if result.is::<String>() {
 | 
			
		||||
                result.into_string().unwrap()
 | 
			
		||||
            } else {
 | 
			
		||||
                result.to_string()
 | 
			
		||||
            };
 | 
			
		||||
            info!("Actor for Context ID '{}' job {} completed. Output: {}", job.context_id, job.id, output_str);
 | 
			
		||||
            
 | 
			
		||||
            // Update job status to finished and set result
 | 
			
		||||
            Job::update_status(redis_conn, &job.id, JobStatus::Finished).await
 | 
			
		||||
                .map_err(|e| {
 | 
			
		||||
                    error!("Failed to update job {} status to finished: {}", job.id, e);
 | 
			
		||||
                    e
 | 
			
		||||
                })?;
 | 
			
		||||
            
 | 
			
		||||
            Job::set_result(redis_conn, &job.id, &output_str).await
 | 
			
		||||
                .map_err(|e| {
 | 
			
		||||
                    error!("Failed to set job {} result: {}", job.id, e);
 | 
			
		||||
                    e
 | 
			
		||||
                })?;
 | 
			
		||||
            
 | 
			
		||||
            Ok(())
 | 
			
		||||
        }
 | 
			
		||||
        Err(e) => {
 | 
			
		||||
            let error_str = format!("{:?}", *e);
 | 
			
		||||
            error!("Actor for Context ID '{}' job {} script evaluation failed. Error: {}", job.context_id, job.id, error_str);
 | 
			
		||||
            
 | 
			
		||||
            // Update job status to error and set error message
 | 
			
		||||
            Job::update_status(redis_conn, &job.id, JobStatus::Error).await
 | 
			
		||||
                .map_err(|e| {
 | 
			
		||||
                    error!("Failed to update job {} status to error: {}", job.id, e);
 | 
			
		||||
                    e
 | 
			
		||||
                })?;
 | 
			
		||||
            
 | 
			
		||||
            Job::set_error(redis_conn, &job.id, &error_str).await
 | 
			
		||||
                .map_err(|e| {
 | 
			
		||||
                    error!("Failed to set job {} error: {}", job.id, e);
 | 
			
		||||
                    e
 | 
			
		||||
                })?;
 | 
			
		||||
            
 | 
			
		||||
            Ok(())
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Clean up job from Redis if preserve_tasks is false
 | 
			
		||||
async fn cleanup_job(
 | 
			
		||||
    redis_conn: &mut redis::aio::MultiplexedConnection,
 | 
			
		||||
    job_id: &str,
 | 
			
		||||
    context_id: &str,
 | 
			
		||||
    preserve_tasks: bool,
 | 
			
		||||
) {
 | 
			
		||||
    if !preserve_tasks {
 | 
			
		||||
        if let Err(e) = Job::delete_from_redis(redis_conn, job_id).await {
 | 
			
		||||
            error!("Actor for Context ID '{}', Job {}: Failed to delete job: {}", context_id, job_id, e);
 | 
			
		||||
        } else {
 | 
			
		||||
            debug!("Actor for Context ID '{}', Job {}: Cleaned up job.", context_id, job_id);
 | 
			
		||||
        }
 | 
			
		||||
    } else {
 | 
			
		||||
        debug!("Actor for Context ID '{}', Job {}: Preserving job (preserve_tasks=true)", context_id, job_id);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Process a single job from the queue
 | 
			
		||||
async fn process_job(
 | 
			
		||||
    redis_conn: &mut redis::aio::MultiplexedConnection,
 | 
			
		||||
    job_id: &str,
 | 
			
		||||
    actor_id: &str,
 | 
			
		||||
    db_path: &str,
 | 
			
		||||
    engine: &mut Engine,
 | 
			
		||||
    preserve_tasks: bool,
 | 
			
		||||
) {
 | 
			
		||||
    debug!("Actor '{}', Job {}: Processing started.", actor_id, job_id);
 | 
			
		||||
    
 | 
			
		||||
    // Load job from Redis
 | 
			
		||||
    match load_job_from_redis(redis_conn, job_id, actor_id).await {
 | 
			
		||||
        Ok(job) => {
 | 
			
		||||
            info!("Actor '{}' processing job_id: {}. Script: {:.50}...", job.context_id, job_id, job.script);
 | 
			
		||||
            
 | 
			
		||||
            // Update status to started
 | 
			
		||||
            debug!("Actor for Context ID '{}', Job {}: Attempting to update status to 'started'.", job.context_id, job_id);
 | 
			
		||||
            if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Started).await {
 | 
			
		||||
                error!("Actor for Context ID '{}', Job {}: Failed to update status to 'started': {}", job.context_id, job_id, e);
 | 
			
		||||
            } else {
 | 
			
		||||
                debug!("Actor for Context ID '{}', Job {}: Status updated to 'started'.", job.context_id, job_id);
 | 
			
		||||
            }
 | 
			
		||||
            
 | 
			
		||||
            // Execute the script and update status
 | 
			
		||||
            if let Err(e) = execute_script_and_update_status(redis_conn, engine, &job, db_path).await {
 | 
			
		||||
                error!("Actor for Context ID '{}', Job {}: Script execution failed: {}", job.context_id, job_id, e);
 | 
			
		||||
                
 | 
			
		||||
                // Ensure job status is set to error if execution failed
 | 
			
		||||
                if let Err(status_err) = Job::update_status(redis_conn, job_id, JobStatus::Error).await {
 | 
			
		||||
                    error!("Actor for Context ID '{}', Job {}: Failed to update status to error after execution failure: {}", job.context_id, job_id, status_err);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            
 | 
			
		||||
            // Clean up job if needed
 | 
			
		||||
            cleanup_job(redis_conn, job_id, &job.context_id, preserve_tasks).await;
 | 
			
		||||
        }
 | 
			
		||||
        Err(e) => {
 | 
			
		||||
            error!("Actor '{}', Job {}: Failed to load job: {}", actor_id, job_id, e);
 | 
			
		||||
            // Clean up invalid job if needed
 | 
			
		||||
            if !preserve_tasks {
 | 
			
		||||
                if let Err(del_err) = Job::delete_from_redis(redis_conn, job_id).await {
 | 
			
		||||
                    error!("Actor '{}', Job {}: Failed to delete invalid job: {}", actor_id, job_id, del_err);
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                debug!("Actor '{}', Job {}: Preserving invalid job (preserve_tasks=true)", actor_id, job_id);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn spawn_rhai_actor(
 | 
			
		||||
    actor_id: String,
 | 
			
		||||
    db_path: String,
 | 
			
		||||
    mut engine: Engine,
 | 
			
		||||
    redis_url: String,
 | 
			
		||||
    mut shutdown_rx: mpsc::Receiver<()>,
 | 
			
		||||
    preserve_tasks: bool,
 | 
			
		||||
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let queue_key = format!("{}{}", NAMESPACE_PREFIX, actor_id);
 | 
			
		||||
        info!(
 | 
			
		||||
            "Rhai Actor for Actor ID '{}' starting. Connecting to Redis at {}. Listening on queue: {}. Waiting for tasks or shutdown signal.",
 | 
			
		||||
            actor_id, redis_url, queue_key
 | 
			
		||||
        );
 | 
			
		||||
        
 | 
			
		||||
        let mut redis_conn = initialize_redis_connection(&actor_id, &redis_url).await?;
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            let blpop_keys = vec![queue_key.clone()];
 | 
			
		||||
            tokio::select! {
 | 
			
		||||
                // Listen for shutdown signal
 | 
			
		||||
                _ = shutdown_rx.recv() => {
 | 
			
		||||
                    info!("Actor for Actor ID '{}': Shutdown signal received. Terminating loop.", actor_id);
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
                // Listen for tasks from Redis
 | 
			
		||||
                blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => {
 | 
			
		||||
                    debug!("Actor for Actor ID '{}': Attempting BLPOP on queue: {}", actor_id, queue_key);
 | 
			
		||||
                    let response: Option<(String, String)> = match blpop_result {
 | 
			
		||||
                        Ok(resp) => resp,
 | 
			
		||||
                        Err(e) => {
 | 
			
		||||
                            error!("Actor '{}': Redis BLPOP error on queue {}: {}. Actor for this circle might stop.", actor_id, queue_key, e);
 | 
			
		||||
                            return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
 | 
			
		||||
                        }
 | 
			
		||||
                    };
 | 
			
		||||
                    
 | 
			
		||||
                    if let Some((_queue_name_recv, job_id)) = response {
 | 
			
		||||
                        info!("Actor '{}' received job_id: {} from queue: {}", actor_id, job_id, _queue_name_recv);
 | 
			
		||||
                        process_job(&mut redis_conn, &job_id, &actor_id, &db_path, &mut engine, preserve_tasks).await;
 | 
			
		||||
                    } else {
 | 
			
		||||
                        debug!("Actor '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", actor_id, queue_key);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        info!("Actor '{}' has shut down.", actor_id);
 | 
			
		||||
        Ok(())
 | 
			
		||||
    })
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Re-export the main trait-based interface for convenience
 | 
			
		||||
pub use actor_trait::{Actor, ActorConfig, spawn_actor};
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user