remove unused dep and move job out
This commit is contained in:
		
							
								
								
									
										35
									
								
								src/app.rs
									
									
									
									
									
								
							
							
						
						
									
										35
									
								
								src/app.rs
									
									
									
									
									
								
							@@ -6,6 +6,7 @@
 | 
			
		||||
 | 
			
		||||
use crate::Supervisor;
 | 
			
		||||
use crate::openrpc::start_openrpc_servers;
 | 
			
		||||
use crate::mycelium::MyceliumServer;
 | 
			
		||||
use log::{info, error, debug};
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
@@ -42,6 +43,9 @@ impl SupervisorApp {
 | 
			
		||||
        // Start OpenRPC server
 | 
			
		||||
        self.start_openrpc_server().await?;
 | 
			
		||||
 | 
			
		||||
        // Start Mycelium server
 | 
			
		||||
        self.start_mycelium_server().await?;
 | 
			
		||||
 | 
			
		||||
        // Set up graceful shutdown
 | 
			
		||||
        self.setup_graceful_shutdown().await;
 | 
			
		||||
 | 
			
		||||
@@ -146,6 +150,37 @@ impl SupervisorApp {
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Start the Mycelium server
 | 
			
		||||
    async fn start_mycelium_server(&self) -> Result<(), Box<dyn std::error::Error>> {
 | 
			
		||||
        info!("Starting Mycelium server...");
 | 
			
		||||
        
 | 
			
		||||
        let supervisor_for_mycelium = Arc::new(Mutex::new(self.supervisor.clone()));
 | 
			
		||||
        let mycelium_port = 8990; // Standard Mycelium port
 | 
			
		||||
        let bind_address = "127.0.0.1".to_string();
 | 
			
		||||
        
 | 
			
		||||
        let mycelium_server = MyceliumServer::new(
 | 
			
		||||
            supervisor_for_mycelium,
 | 
			
		||||
            bind_address,
 | 
			
		||||
            mycelium_port,
 | 
			
		||||
        );
 | 
			
		||||
        
 | 
			
		||||
        // Start the Mycelium server in a background task
 | 
			
		||||
        let server_handle = tokio::spawn(async move {
 | 
			
		||||
            if let Err(e) = mycelium_server.start().await {
 | 
			
		||||
                error!("Mycelium server error: {}", e);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        // Give the server a moment to start
 | 
			
		||||
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
 | 
			
		||||
        info!("Mycelium server started successfully on port {}", mycelium_port);
 | 
			
		||||
 | 
			
		||||
        // Store the handle for potential cleanup
 | 
			
		||||
        std::mem::forget(server_handle); // For now, let it run in background
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Get status of all runners
 | 
			
		||||
    pub async fn get_status(&self) -> Result<Vec<(String, String)>, Box<dyn std::error::Error>> {
 | 
			
		||||
        debug!("Getting status of all runners");
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										328
									
								
								src/client.rs
									
									
									
									
									
								
							
							
						
						
									
										328
									
								
								src/client.rs
									
									
									
									
									
								
							@@ -1,328 +0,0 @@
 | 
			
		||||
//! Main supervisor implementation for managing multiple actor runners.
 | 
			
		||||
 | 
			
		||||
use chrono::Utc;
 | 
			
		||||
use redis::AsyncCommands;
 | 
			
		||||
 | 
			
		||||
use crate::{runner::{RunnerError, RunnerResult}, job::JobStatus, JobError};
 | 
			
		||||
use crate::{job::Job};
 | 
			
		||||
 | 
			
		||||
/// Client for managing jobs in Redis
 | 
			
		||||
#[derive(Debug, Clone)]
 | 
			
		||||
pub struct Client {
 | 
			
		||||
    redis_client: redis::Client,
 | 
			
		||||
    namespace: String,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct ClientBuilder {
 | 
			
		||||
    /// Redis URL for connection
 | 
			
		||||
    redis_url: String,
 | 
			
		||||
    /// Namespace for queue keys
 | 
			
		||||
    namespace: String,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ClientBuilder {
 | 
			
		||||
    /// Create a new supervisor builder
 | 
			
		||||
    pub fn new() -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            redis_url: "redis://localhost:6379".to_string(),
 | 
			
		||||
            namespace: "".to_string(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Set the Redis URL
 | 
			
		||||
    pub fn redis_url<S: Into<String>>(mut self, url: S) -> Self {
 | 
			
		||||
        self.redis_url = url.into();
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Set the namespace for queue keys
 | 
			
		||||
    pub fn namespace<S: Into<String>>(mut self, namespace: S) -> Self {
 | 
			
		||||
        self.namespace = namespace.into();
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Build the supervisor
 | 
			
		||||
    pub async fn build(self) -> RunnerResult<Client> {
 | 
			
		||||
        // Create Redis client
 | 
			
		||||
        let redis_client = redis::Client::open(self.redis_url.as_str())
 | 
			
		||||
            .map_err(|e| RunnerError::ConfigError {
 | 
			
		||||
                reason: format!("Invalid Redis URL: {}", e),
 | 
			
		||||
            })?;
 | 
			
		||||
 | 
			
		||||
        Ok(Client {
 | 
			
		||||
            redis_client,
 | 
			
		||||
            namespace: self.namespace,
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Default for Client {
 | 
			
		||||
    fn default() -> Self {
 | 
			
		||||
        // Note: Default implementation creates an empty supervisor
 | 
			
		||||
        // Use Supervisor::builder() for proper initialization
 | 
			
		||||
        Self {
 | 
			
		||||
            redis_client: redis::Client::open("redis://localhost:6379").unwrap(),
 | 
			
		||||
            namespace: "".to_string(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Client {
 | 
			
		||||
    /// Create a new supervisor builder
 | 
			
		||||
    pub fn builder() -> ClientBuilder {
 | 
			
		||||
        ClientBuilder::new()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// List all job IDs from Redis
 | 
			
		||||
    pub async fn list_jobs(&self) -> RunnerResult<Vec<String>> {
 | 
			
		||||
        let mut conn = self.redis_client
 | 
			
		||||
            .get_multiplexed_async_connection()
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| RunnerError::RedisError { source: e })?;
 | 
			
		||||
        
 | 
			
		||||
        let keys: Vec<String> = conn.keys(format!("{}:*", &self.jobs_key())).await?;
 | 
			
		||||
        let job_ids: Vec<String> = keys
 | 
			
		||||
            .into_iter()
 | 
			
		||||
            .filter_map(|key| {
 | 
			
		||||
                if key.starts_with(&format!("{}:", self.jobs_key())) {
 | 
			
		||||
                    key.strip_prefix(&format!("{}:", self.jobs_key()))
 | 
			
		||||
                        .map(|s| s.to_string())
 | 
			
		||||
                } else {
 | 
			
		||||
                    None
 | 
			
		||||
                }
 | 
			
		||||
            })
 | 
			
		||||
            .collect();
 | 
			
		||||
        
 | 
			
		||||
        Ok(job_ids)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn jobs_key(&self) -> String {
 | 
			
		||||
        if self.namespace.is_empty() {
 | 
			
		||||
            format!("job")
 | 
			
		||||
        } else {
 | 
			
		||||
            format!("{}:job", self.namespace)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn job_key(&self, job_id: &str) -> String {
 | 
			
		||||
        if self.namespace.is_empty() {
 | 
			
		||||
            format!("job:{}", job_id)
 | 
			
		||||
        } else {
 | 
			
		||||
            format!("{}:job:{}", self.namespace, job_id)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn job_reply_key(&self, job_id: &str) -> String {
 | 
			
		||||
        if self.namespace.is_empty() {
 | 
			
		||||
            format!("reply:{}", job_id)
 | 
			
		||||
        } else {
 | 
			
		||||
            format!("{}:reply:{}", self.namespace, job_id)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Set job error in Redis
 | 
			
		||||
    pub async fn set_error(&self,
 | 
			
		||||
        job_id: &str,
 | 
			
		||||
        error: &str,
 | 
			
		||||
    ) -> Result<(), JobError> {
 | 
			
		||||
        let job_key = self.job_key(job_id);
 | 
			
		||||
        let now = Utc::now();
 | 
			
		||||
        
 | 
			
		||||
        let mut conn = self.redis_client
 | 
			
		||||
            .get_multiplexed_async_connection()
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| JobError:: Redis(e))?;
 | 
			
		||||
            
 | 
			
		||||
        conn.hset_multiple(&job_key, &[
 | 
			
		||||
                ("error", error),
 | 
			
		||||
                ("status", JobStatus::Error.as_str()),
 | 
			
		||||
                ("updated_at", &now.to_rfc3339()),
 | 
			
		||||
            ]).await
 | 
			
		||||
            .map_err(|e| JobError::Redis(e))?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Set job status in Redis
 | 
			
		||||
    pub async fn set_job_status(&self,
 | 
			
		||||
        job_id: &str,
 | 
			
		||||
        status: JobStatus,
 | 
			
		||||
    ) -> Result<(), JobError> {
 | 
			
		||||
        let job_key = self.job_key(job_id);
 | 
			
		||||
        let now = Utc::now();
 | 
			
		||||
        
 | 
			
		||||
        let mut conn = self.redis_client
 | 
			
		||||
            .get_multiplexed_async_connection()
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| JobError:: Redis(e))?;
 | 
			
		||||
            
 | 
			
		||||
        conn.hset_multiple(&job_key, &[
 | 
			
		||||
                ("status", status.as_str()),
 | 
			
		||||
                ("updated_at", &now.to_rfc3339()),
 | 
			
		||||
            ]).await
 | 
			
		||||
            .map_err(|e| JobError::Redis(e))?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
        /// Get job status from Redis
 | 
			
		||||
        pub async fn get_status(
 | 
			
		||||
            &self,
 | 
			
		||||
            job_id: &str,
 | 
			
		||||
        ) -> Result<JobStatus, JobError> {
 | 
			
		||||
            let mut conn = self.redis_client
 | 
			
		||||
                .get_multiplexed_async_connection()
 | 
			
		||||
                .await
 | 
			
		||||
                .map_err(|e| JobError:: Redis(e))?;
 | 
			
		||||
            
 | 
			
		||||
            let status_str: Option<String> = conn.hget(&self.job_key(job_id), "status").await?;
 | 
			
		||||
            
 | 
			
		||||
            match status_str {
 | 
			
		||||
                Some(s) => JobStatus::from_str(&s).ok_or_else(|| JobError::InvalidStatus(s)),
 | 
			
		||||
                None => Err(JobError::NotFound(job_id.to_string())),
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    /// Delete job from Redis
 | 
			
		||||
    pub async fn delete_from_redis(
 | 
			
		||||
        &self,
 | 
			
		||||
        job_id: &str,
 | 
			
		||||
    ) -> Result<(), JobError> {
 | 
			
		||||
        let mut conn = self.redis_client
 | 
			
		||||
                .get_multiplexed_async_connection()
 | 
			
		||||
                .await
 | 
			
		||||
                .map_err(|e| JobError:: Redis(e))?;
 | 
			
		||||
 | 
			
		||||
        let job_key = self.job_key(job_id);
 | 
			
		||||
        let _: () = conn.del(&job_key).await?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Store this job in Redis
 | 
			
		||||
    pub async fn store_job_in_redis(&self, job: &Job) -> Result<(), JobError> {
 | 
			
		||||
        let mut conn = self.redis_client
 | 
			
		||||
            .get_multiplexed_async_connection()
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| JobError:: Redis(e))?;
 | 
			
		||||
 | 
			
		||||
        let job_key = self.job_key(&job.id);
 | 
			
		||||
        
 | 
			
		||||
        // Serialize the job data
 | 
			
		||||
        let job_data = serde_json::to_string(job)?;
 | 
			
		||||
        
 | 
			
		||||
        // Store job data in Redis hash
 | 
			
		||||
        let _: () = conn.hset_multiple(&job_key, &[
 | 
			
		||||
            ("data", job_data),
 | 
			
		||||
            ("status", JobStatus::Dispatched.as_str().to_string()),
 | 
			
		||||
            ("created_at", job.created_at.to_rfc3339()),
 | 
			
		||||
            ("updated_at", job.updated_at.to_rfc3339()),
 | 
			
		||||
        ]).await?;
 | 
			
		||||
 | 
			
		||||
        // Set TTL for the job (24 hours)
 | 
			
		||||
        let _: () = conn.expire(&job_key, 86400).await?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Load a job from Redis by ID
 | 
			
		||||
    pub async fn load_job_from_redis(
 | 
			
		||||
        &self,
 | 
			
		||||
        job_id: &str,
 | 
			
		||||
    ) -> Result<Job, JobError> {
 | 
			
		||||
        let job_key = self.job_key(job_id);
 | 
			
		||||
        
 | 
			
		||||
        let mut conn = self.redis_client
 | 
			
		||||
            .get_multiplexed_async_connection()
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| JobError:: Redis(e))?;
 | 
			
		||||
 | 
			
		||||
        // Get job data from Redis
 | 
			
		||||
        let job_data: Option<String> = conn.hget(&job_key, "data").await?;
 | 
			
		||||
        
 | 
			
		||||
        match job_data {
 | 
			
		||||
            Some(data) => {
 | 
			
		||||
                let job: Job = serde_json::from_str(&data)?;
 | 
			
		||||
                Ok(job)
 | 
			
		||||
            }
 | 
			
		||||
            None => Err(JobError::NotFound(job_id.to_string())),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Delete a job by ID
 | 
			
		||||
    pub async fn delete_job(&mut self, job_id: &str) -> RunnerResult<()> {
 | 
			
		||||
        use redis::AsyncCommands;
 | 
			
		||||
        
 | 
			
		||||
        let mut conn = self.redis_client.get_multiplexed_async_connection().await
 | 
			
		||||
            .map_err(|e| JobError:: Redis(e))?;
 | 
			
		||||
        
 | 
			
		||||
        let job_key = self.job_key(job_id);
 | 
			
		||||
        let deleted_count: i32 = conn.del(&job_key).await
 | 
			
		||||
            .map_err(|e| RunnerError::QueueError {
 | 
			
		||||
                actor_id: job_id.to_string(),
 | 
			
		||||
                reason: format!("Failed to delete job: {}", e),
 | 
			
		||||
            })?;
 | 
			
		||||
        
 | 
			
		||||
        if deleted_count == 0 {
 | 
			
		||||
            return Err(RunnerError::QueueError {
 | 
			
		||||
                actor_id: job_id.to_string(),
 | 
			
		||||
                reason: format!("Job '{}' not found or already deleted", job_id),
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Set job result in Redis
 | 
			
		||||
    pub async fn set_result(
 | 
			
		||||
        &self,
 | 
			
		||||
        job_id: &str,
 | 
			
		||||
        result: &str,
 | 
			
		||||
    ) -> Result<(), JobError> {
 | 
			
		||||
        let job_key = self.job_key(&job_id);
 | 
			
		||||
        let now = Utc::now();
 | 
			
		||||
        let mut conn = self.redis_client
 | 
			
		||||
            .get_multiplexed_async_connection()
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| JobError:: Redis(e))?;
 | 
			
		||||
        let _: () = conn.hset_multiple(&job_key, &[
 | 
			
		||||
            ("result", result),
 | 
			
		||||
            ("status", JobStatus::Finished.as_str()),
 | 
			
		||||
            ("updated_at", &now.to_rfc3339()),
 | 
			
		||||
        ]).await?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Get job result from Redis
 | 
			
		||||
    pub async fn get_result(
 | 
			
		||||
        &self,
 | 
			
		||||
        job_id: &str,
 | 
			
		||||
    ) -> Result<Option<String>, JobError> {
 | 
			
		||||
        let job_key = self.job_key(job_id);
 | 
			
		||||
        let mut conn = self.redis_client
 | 
			
		||||
            .get_multiplexed_async_connection()
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| JobError:: Redis(e))?;
 | 
			
		||||
        let result: Option<String> = conn.hget(&job_key, "result").await?;
 | 
			
		||||
        Ok(result)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Get a job ID from the work queue (blocking pop)
 | 
			
		||||
    pub async fn get_job_id(&self, queue_key: &str) -> Result<Option<String>, JobError> {
 | 
			
		||||
        let mut conn = self.redis_client
 | 
			
		||||
            .get_multiplexed_async_connection()
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| JobError::Redis(e))?;
 | 
			
		||||
        
 | 
			
		||||
        // Use BRPOP with a short timeout to avoid blocking indefinitely
 | 
			
		||||
        let result: Option<(String, String)> = conn.brpop(queue_key, 1.0).await
 | 
			
		||||
            .map_err(|e| JobError::Redis(e))?;
 | 
			
		||||
        
 | 
			
		||||
        Ok(result.map(|(_, job_id)| job_id))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Get a job by ID (alias for load_job_from_redis)
 | 
			
		||||
    pub async fn get_job(&self, job_id: &str) -> Result<Job, JobError> {
 | 
			
		||||
        self.load_job_from_redis(job_id).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										220
									
								
								src/job.rs
									
									
									
									
									
								
							
							
						
						
									
										220
									
								
								src/job.rs
									
									
									
									
									
								
							@@ -1,218 +1,2 @@
 | 
			
		||||
use chrono::{DateTime, Utc};
 | 
			
		||||
use serde::{Deserialize, Serialize};
 | 
			
		||||
use std::collections::HashMap;
 | 
			
		||||
use uuid::Uuid;
 | 
			
		||||
use thiserror::Error;
 | 
			
		||||
 | 
			
		||||
/// Job status enumeration
 | 
			
		||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
 | 
			
		||||
pub enum JobStatus {
 | 
			
		||||
    Dispatched,
 | 
			
		||||
    WaitingForPrerequisites,
 | 
			
		||||
    Started,
 | 
			
		||||
    Error,
 | 
			
		||||
    Stopping,
 | 
			
		||||
    Finished,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl JobStatus {
 | 
			
		||||
    pub fn as_str(&self) -> &'static str {
 | 
			
		||||
        match self {
 | 
			
		||||
            JobStatus::Dispatched => "dispatched",
 | 
			
		||||
            JobStatus::WaitingForPrerequisites => "waiting_for_prerequisites",
 | 
			
		||||
            JobStatus::Started => "started",
 | 
			
		||||
            JobStatus::Error => "error",
 | 
			
		||||
            JobStatus::Stopping => "stopping",
 | 
			
		||||
            JobStatus::Finished => "finished",
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn from_str(s: &str) -> Option<Self> {
 | 
			
		||||
        match s {
 | 
			
		||||
            "dispatched" => Some(JobStatus::Dispatched),
 | 
			
		||||
            "waiting_for_prerequisites" => Some(JobStatus::WaitingForPrerequisites),
 | 
			
		||||
            "started" => Some(JobStatus::Started),
 | 
			
		||||
            "error" => Some(JobStatus::Error),
 | 
			
		||||
            "stopping" => Some(JobStatus::Stopping),
 | 
			
		||||
            "finished" => Some(JobStatus::Finished),
 | 
			
		||||
            _ => None,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Representation of a script execution request.
 | 
			
		||||
///
 | 
			
		||||
/// This structure contains all the information needed to execute a script
 | 
			
		||||
/// on a actor service, including the script content, dependencies, and metadata.
 | 
			
		||||
#[derive(Debug, Clone, Serialize, Deserialize)]
 | 
			
		||||
pub struct Job {
 | 
			
		||||
    pub id: String,
 | 
			
		||||
    pub caller_id: String,
 | 
			
		||||
    pub context_id: String,
 | 
			
		||||
    pub payload: String,
 | 
			
		||||
    pub runner: String, // name of the runner to execute this job
 | 
			
		||||
    pub executor: String, // name of the executor the runner will use to execute this job
 | 
			
		||||
    pub timeout: u64, // timeout in seconds
 | 
			
		||||
    pub env_vars: HashMap<String, String>, // environment variables for script execution
 | 
			
		||||
    pub created_at: chrono::DateTime<chrono::Utc>,
 | 
			
		||||
    pub updated_at: chrono::DateTime<chrono::Utc>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Error types for job operations
 | 
			
		||||
#[derive(Error, Debug)]
 | 
			
		||||
pub enum JobError {
 | 
			
		||||
    #[error("Redis error: {0}")]
 | 
			
		||||
    Redis(#[from] redis::RedisError),
 | 
			
		||||
    #[error("Serialization error: {0}")]
 | 
			
		||||
    Serialization(#[from] serde_json::Error),
 | 
			
		||||
    #[error("Job not found: {0}")]
 | 
			
		||||
    NotFound(String),
 | 
			
		||||
    #[error("Invalid job status: {0}")]
 | 
			
		||||
    InvalidStatus(String),
 | 
			
		||||
    #[error("Timeout error: {0}")]
 | 
			
		||||
    Timeout(String),
 | 
			
		||||
    #[error("Invalid job data: {0}")]
 | 
			
		||||
    InvalidData(String),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Job {
 | 
			
		||||
    /// Create a new job with the given parameters
 | 
			
		||||
    pub fn new(
 | 
			
		||||
        caller_id: String,
 | 
			
		||||
        context_id: String,
 | 
			
		||||
        payload: String,
 | 
			
		||||
        runner: String,
 | 
			
		||||
        executor: String,
 | 
			
		||||
    ) -> Self {
 | 
			
		||||
        let now = Utc::now();
 | 
			
		||||
        Self {
 | 
			
		||||
            id: Uuid::new_v4().to_string(),
 | 
			
		||||
            caller_id,
 | 
			
		||||
            context_id,
 | 
			
		||||
            payload,
 | 
			
		||||
            runner,
 | 
			
		||||
            executor,
 | 
			
		||||
            timeout: 300, // 5 minutes default
 | 
			
		||||
            env_vars: HashMap::new(),
 | 
			
		||||
            created_at: now,
 | 
			
		||||
            updated_at: now,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Builder for constructing job execution requests.
 | 
			
		||||
pub struct JobBuilder {
 | 
			
		||||
    caller_id: String,
 | 
			
		||||
    context_id: String,
 | 
			
		||||
    payload: String,
 | 
			
		||||
    runner: String,
 | 
			
		||||
    executor: String,
 | 
			
		||||
    timeout: u64, // timeout in seconds
 | 
			
		||||
    env_vars: HashMap<String, String>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl JobBuilder {
 | 
			
		||||
    pub fn new() -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            caller_id: "".to_string(),
 | 
			
		||||
            context_id: "".to_string(),
 | 
			
		||||
            payload: "".to_string(),
 | 
			
		||||
            runner: "".to_string(),
 | 
			
		||||
            executor: "".to_string(),
 | 
			
		||||
            timeout: 300, // 5 minutes default
 | 
			
		||||
            env_vars: HashMap::new(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Set the caller ID for this job
 | 
			
		||||
    pub fn caller_id(mut self, caller_id: &str) -> Self {
 | 
			
		||||
        self.caller_id = caller_id.to_string();
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Set the context ID for this job
 | 
			
		||||
    pub fn context_id(mut self, context_id: &str) -> Self {
 | 
			
		||||
        self.context_id = context_id.to_string();
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Set the payload (script content) for this job
 | 
			
		||||
    pub fn payload(mut self, payload: &str) -> Self {
 | 
			
		||||
        self.payload = payload.to_string();
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Set the runner name for this job
 | 
			
		||||
    pub fn runner(mut self, runner: &str) -> Self {
 | 
			
		||||
        self.runner = runner.to_string();
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Set the executor for this job
 | 
			
		||||
    pub fn executor(mut self, executor: &str) -> Self {
 | 
			
		||||
        self.executor = executor.to_string();
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Set the timeout for job execution (in seconds)
 | 
			
		||||
    pub fn timeout(mut self, timeout: u64) -> Self {
 | 
			
		||||
        self.timeout = timeout;
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Set a single environment variable
 | 
			
		||||
    pub fn env_var(mut self, key: &str, value: &str) -> Self {
 | 
			
		||||
        self.env_vars.insert(key.to_string(), value.to_string());
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Set multiple environment variables from a HashMap
 | 
			
		||||
    pub fn env_vars(mut self, env_vars: HashMap<String, String>) -> Self {
 | 
			
		||||
        self.env_vars = env_vars;
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Clear all environment variables
 | 
			
		||||
    pub fn clear_env_vars(mut self) -> Self {
 | 
			
		||||
        self.env_vars.clear();
 | 
			
		||||
        self
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Build the job
 | 
			
		||||
    pub fn build(self) -> Result<Job, JobError> {
 | 
			
		||||
        if self.caller_id.is_empty() {
 | 
			
		||||
            return Err(JobError::InvalidData("caller_id is required".to_string()));
 | 
			
		||||
        }
 | 
			
		||||
        if self.context_id.is_empty() {
 | 
			
		||||
            return Err(JobError::InvalidData("context_id is required".to_string()));
 | 
			
		||||
        }
 | 
			
		||||
        if self.payload.is_empty() {
 | 
			
		||||
            return Err(JobError::InvalidData("payload is required".to_string()));
 | 
			
		||||
        }
 | 
			
		||||
        if self.runner.is_empty() {
 | 
			
		||||
            return Err(JobError::InvalidData("runner is required".to_string()));
 | 
			
		||||
        }
 | 
			
		||||
        if self.executor.is_empty() {
 | 
			
		||||
            return Err(JobError::InvalidData("executor is required".to_string()));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let mut job = Job::new(
 | 
			
		||||
            self.caller_id,
 | 
			
		||||
            self.context_id,
 | 
			
		||||
            self.payload,
 | 
			
		||||
            self.runner,
 | 
			
		||||
            self.executor,
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        job.timeout = self.timeout;
 | 
			
		||||
        job.env_vars = self.env_vars;
 | 
			
		||||
 | 
			
		||||
        Ok(job)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Default for JobBuilder {
 | 
			
		||||
    fn default() -> Self {
 | 
			
		||||
        Self::new()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
// Re-export job types from the separate job crate
 | 
			
		||||
pub use hero_job::*;
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										17
									
								
								src/lib.rs
									
									
									
									
									
								
							
							
						
						
									
										17
									
								
								src/lib.rs
									
									
									
									
									
								
							@@ -3,20 +3,17 @@
 | 
			
		||||
//! See README.md for detailed documentation and usage examples.
 | 
			
		||||
 | 
			
		||||
pub mod runner;
 | 
			
		||||
pub mod supervisor;
 | 
			
		||||
pub mod job;
 | 
			
		||||
pub mod client;
 | 
			
		||||
pub mod supervisor;
 | 
			
		||||
pub mod app;
 | 
			
		||||
 | 
			
		||||
// OpenRPC server module
 | 
			
		||||
pub mod openrpc;
 | 
			
		||||
pub mod mycelium;
 | 
			
		||||
 | 
			
		||||
// Re-export main types for convenience
 | 
			
		||||
pub use runner::{
 | 
			
		||||
    LogInfo, Runner, RunnerConfig, RunnerResult, RunnerStatus,
 | 
			
		||||
};
 | 
			
		||||
pub use sal_service_manager::{ProcessManager, SimpleProcessManager, TmuxProcessManager};
 | 
			
		||||
pub use runner::{Runner, RunnerConfig, RunnerResult, RunnerStatus};
 | 
			
		||||
// pub use sal_service_manager::{ProcessManager, SimpleProcessManager, TmuxProcessManager};
 | 
			
		||||
pub use supervisor::{Supervisor, SupervisorBuilder, ProcessManagerType};
 | 
			
		||||
pub use job::{Job, JobBuilder, JobStatus, JobError};
 | 
			
		||||
pub use client::{Client, ClientBuilder};
 | 
			
		||||
pub use hero_job::{Job, JobBuilder, JobStatus, JobError};
 | 
			
		||||
pub use hero_job::Client;
 | 
			
		||||
pub use app::SupervisorApp;
 | 
			
		||||
pub use mycelium::MyceliumServer;
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										297
									
								
								src/mycelium.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										297
									
								
								src/mycelium.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,297 @@
 | 
			
		||||
//! # Mycelium Server Integration for Hero Supervisor
 | 
			
		||||
//!
 | 
			
		||||
//! This module implements a Mycelium-compatible JSON-RPC server that bridges
 | 
			
		||||
//! Mycelium transport messages to the supervisor's OpenRPC interface.
 | 
			
		||||
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
use serde_json::{Value, json};
 | 
			
		||||
use log::{info, error, debug};
 | 
			
		||||
use base64::Engine;
 | 
			
		||||
use crate::Supervisor;
 | 
			
		||||
 | 
			
		||||
/// Mycelium server that handles pushMessage calls and forwards them to supervisor
 | 
			
		||||
pub struct MyceliumServer {
 | 
			
		||||
    supervisor: Arc<Mutex<Supervisor>>,
 | 
			
		||||
    bind_address: String,
 | 
			
		||||
    port: u16,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl MyceliumServer {
 | 
			
		||||
    pub fn new(supervisor: Arc<Mutex<Supervisor>>, bind_address: String, port: u16) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            supervisor,
 | 
			
		||||
            bind_address,
 | 
			
		||||
            port,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Start the Mycelium-compatible JSON-RPC server
 | 
			
		||||
    pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
 | 
			
		||||
        use jsonrpsee::server::{ServerBuilder, RpcModule};
 | 
			
		||||
        use tower_http::cors::{CorsLayer, Any};
 | 
			
		||||
        
 | 
			
		||||
        info!("Starting Mycelium server on {}:{}", self.bind_address, self.port);
 | 
			
		||||
        
 | 
			
		||||
        let cors = CorsLayer::new()
 | 
			
		||||
            .allow_methods(Any)
 | 
			
		||||
            .allow_headers(Any)
 | 
			
		||||
            .allow_origin(Any);
 | 
			
		||||
        
 | 
			
		||||
        let server = ServerBuilder::default()
 | 
			
		||||
            .build(format!("{}:{}", self.bind_address, self.port))
 | 
			
		||||
            .await?;
 | 
			
		||||
        
 | 
			
		||||
        let mut module = RpcModule::new(());
 | 
			
		||||
        let supervisor_clone = self.supervisor.clone();
 | 
			
		||||
        
 | 
			
		||||
        // Register pushMessage method
 | 
			
		||||
        module.register_async_method("pushMessage", move |params, _, _| {
 | 
			
		||||
            let supervisor = supervisor_clone.clone();
 | 
			
		||||
            async move {
 | 
			
		||||
                handle_push_message(supervisor, params).await
 | 
			
		||||
            }
 | 
			
		||||
        })?;
 | 
			
		||||
        
 | 
			
		||||
        // Register messageStatus method (basic implementation)
 | 
			
		||||
        module.register_async_method("messageStatus", |params, _, _| async move {
 | 
			
		||||
            handle_message_status(params).await
 | 
			
		||||
        })?;
 | 
			
		||||
        
 | 
			
		||||
        let handle = server.start(module);
 | 
			
		||||
        
 | 
			
		||||
        info!("Mycelium server started successfully on {}:{}", self.bind_address, self.port);
 | 
			
		||||
        
 | 
			
		||||
        // Keep the server running
 | 
			
		||||
        handle.stopped().await;
 | 
			
		||||
        
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Handle pushMessage calls from Mycelium clients
 | 
			
		||||
async fn handle_push_message(
 | 
			
		||||
    supervisor: Arc<Mutex<Supervisor>>,
 | 
			
		||||
    params: jsonrpsee::types::Params<'_>,
 | 
			
		||||
) -> Result<Value, jsonrpsee::types::ErrorObjectOwned> {
 | 
			
		||||
    // Parse params as array first, then get the first element
 | 
			
		||||
    let params_array: Vec<Value> = params.parse()
 | 
			
		||||
        .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some(e.to_string())))?;
 | 
			
		||||
    
 | 
			
		||||
    let params_value = params_array.get(0)
 | 
			
		||||
        .ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing params object".to_string())))?;
 | 
			
		||||
    
 | 
			
		||||
    debug!("Received pushMessage: {}", params_value);
 | 
			
		||||
    
 | 
			
		||||
    // Extract message from params
 | 
			
		||||
    let message = params_value.get("message")
 | 
			
		||||
        .ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing message".to_string())))?;
 | 
			
		||||
    
 | 
			
		||||
    // Extract payload (base64 encoded supervisor JSON-RPC)
 | 
			
		||||
    let payload_b64 = message.get("payload")
 | 
			
		||||
        .and_then(|v| v.as_str())
 | 
			
		||||
        .ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing payload".to_string())))?;
 | 
			
		||||
    
 | 
			
		||||
    // Extract topic and destination (for logging/debugging)
 | 
			
		||||
    let _topic = message.get("topic")
 | 
			
		||||
        .and_then(|v| v.as_str())
 | 
			
		||||
        .unwrap_or("supervisor.rpc");
 | 
			
		||||
    
 | 
			
		||||
    let _dst = message.get("dst");
 | 
			
		||||
    
 | 
			
		||||
    // Check if this is a reply timeout request
 | 
			
		||||
    let reply_timeout = params_value.get("reply_timeout")
 | 
			
		||||
        .and_then(|v| v.as_u64());
 | 
			
		||||
    
 | 
			
		||||
    // Decode the supervisor JSON-RPC payload
 | 
			
		||||
    let payload_bytes = base64::engine::general_purpose::STANDARD
 | 
			
		||||
        .decode(payload_b64)
 | 
			
		||||
        .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some(format!("invalid base64: {}", e))))?;
 | 
			
		||||
    
 | 
			
		||||
    let supervisor_rpc: Value = serde_json::from_slice(&payload_bytes)
 | 
			
		||||
        .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some(format!("invalid JSON: {}", e))))?;
 | 
			
		||||
    
 | 
			
		||||
    debug!("Decoded supervisor RPC: {}", supervisor_rpc);
 | 
			
		||||
    
 | 
			
		||||
    // Extract method and params from supervisor JSON-RPC
 | 
			
		||||
    let method = supervisor_rpc.get("method")
 | 
			
		||||
        .and_then(|v| v.as_str())
 | 
			
		||||
        .ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing method".to_string())))?;
 | 
			
		||||
    
 | 
			
		||||
    let rpc_params = supervisor_rpc.get("params")
 | 
			
		||||
        .cloned()
 | 
			
		||||
        .unwrap_or(json!([]));
 | 
			
		||||
    
 | 
			
		||||
    let rpc_id = supervisor_rpc.get("id").cloned();
 | 
			
		||||
    
 | 
			
		||||
    // Route to appropriate supervisor method
 | 
			
		||||
    let result = route_supervisor_call(supervisor, method, rpc_params).await
 | 
			
		||||
        .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32603, "Internal error", Some(e.to_string())))?;
 | 
			
		||||
    
 | 
			
		||||
    // Generate message ID for tracking
 | 
			
		||||
    let message_id = format!("{:016x}", rand::random::<u64>());
 | 
			
		||||
    
 | 
			
		||||
    if let Some(_timeout) = reply_timeout {
 | 
			
		||||
        // For sync calls, return the supervisor result as an InboundMessage
 | 
			
		||||
        let supervisor_response = json!({
 | 
			
		||||
            "jsonrpc": "2.0",
 | 
			
		||||
            "id": rpc_id,
 | 
			
		||||
            "result": result
 | 
			
		||||
        });
 | 
			
		||||
        
 | 
			
		||||
        let response_b64 = base64::engine::general_purpose::STANDARD
 | 
			
		||||
            .encode(serde_json::to_string(&supervisor_response).unwrap().as_bytes());
 | 
			
		||||
        
 | 
			
		||||
        Ok(json!({
 | 
			
		||||
            "id": message_id,
 | 
			
		||||
            "srcIp": "127.0.0.1",
 | 
			
		||||
            "payload": response_b64
 | 
			
		||||
        }))
 | 
			
		||||
    } else {
 | 
			
		||||
        // For async calls, just return the message ID
 | 
			
		||||
        Ok(json!({
 | 
			
		||||
            "id": message_id
 | 
			
		||||
        }))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Handle messageStatus calls
 | 
			
		||||
async fn handle_message_status(
 | 
			
		||||
    params: jsonrpsee::types::Params<'_>,
 | 
			
		||||
) -> Result<Value, jsonrpsee::types::ErrorObjectOwned> {
 | 
			
		||||
    // Parse params as array first, then get the first element
 | 
			
		||||
    let params_array: Vec<Value> = params.parse()
 | 
			
		||||
        .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some(e.to_string())))?;
 | 
			
		||||
    
 | 
			
		||||
    let params_value = params_array.get(0)
 | 
			
		||||
        .ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing params object".to_string())))?;
 | 
			
		||||
    
 | 
			
		||||
    let _message_id = params_value.get("id")
 | 
			
		||||
        .and_then(|v| v.as_str())
 | 
			
		||||
        .ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing id".to_string())))?;
 | 
			
		||||
    
 | 
			
		||||
    // For simplicity, always return "delivered" status
 | 
			
		||||
    Ok(json!({
 | 
			
		||||
        "status": "delivered"
 | 
			
		||||
    }))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Route supervisor method calls to the appropriate supervisor functions
 | 
			
		||||
async fn route_supervisor_call(
 | 
			
		||||
    supervisor: Arc<Mutex<Supervisor>>,
 | 
			
		||||
    method: &str,
 | 
			
		||||
    params: Value,
 | 
			
		||||
) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
 | 
			
		||||
    let mut supervisor_guard = supervisor.lock().await;
 | 
			
		||||
    
 | 
			
		||||
    match method {
 | 
			
		||||
        "list_runners" => {
 | 
			
		||||
            let runners = supervisor_guard.list_runners();
 | 
			
		||||
            Ok(json!(runners))
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        "register_runner" => {
 | 
			
		||||
            if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) {
 | 
			
		||||
                let secret = param_obj.get("secret")
 | 
			
		||||
                    .and_then(|v| v.as_str())
 | 
			
		||||
                    .ok_or("missing secret")?;
 | 
			
		||||
                let name = param_obj.get("name")
 | 
			
		||||
                    .and_then(|v| v.as_str())
 | 
			
		||||
                    .ok_or("missing name")?;
 | 
			
		||||
                let queue = param_obj.get("queue")
 | 
			
		||||
                    .and_then(|v| v.as_str())
 | 
			
		||||
                    .ok_or("missing queue")?;
 | 
			
		||||
                
 | 
			
		||||
                supervisor_guard.register_runner(secret, name, queue).await?;
 | 
			
		||||
                Ok(json!("success"))
 | 
			
		||||
            } else {
 | 
			
		||||
                Err("invalid register_runner params".into())
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        "start_runner" => {
 | 
			
		||||
            if let Some(actor_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
 | 
			
		||||
                supervisor_guard.start_runner(actor_id).await?;
 | 
			
		||||
                Ok(json!("success"))
 | 
			
		||||
            } else {
 | 
			
		||||
                Err("invalid start_runner params".into())
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        "stop_runner" => {
 | 
			
		||||
            if let Some(arr) = params.as_array() {
 | 
			
		||||
                let actor_id = arr.get(0).and_then(|v| v.as_str()).ok_or("missing actor_id")?;
 | 
			
		||||
                let force = arr.get(1).and_then(|v| v.as_bool()).unwrap_or(false);
 | 
			
		||||
                supervisor_guard.stop_runner(actor_id, force).await?;
 | 
			
		||||
                Ok(json!("success"))
 | 
			
		||||
            } else {
 | 
			
		||||
                Err("invalid stop_runner params".into())
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        "get_runner_status" => {
 | 
			
		||||
            if let Some(actor_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
 | 
			
		||||
                let status = supervisor_guard.get_runner_status(actor_id).await?;
 | 
			
		||||
                Ok(json!(format!("{:?}", status)))
 | 
			
		||||
            } else {
 | 
			
		||||
                Err("invalid get_runner_status params".into())
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        "get_all_runner_status" => {
 | 
			
		||||
            let statuses = supervisor_guard.get_all_runner_status().await?;
 | 
			
		||||
            let status_map: std::collections::HashMap<String, String> = statuses
 | 
			
		||||
                .into_iter()
 | 
			
		||||
                .map(|(id, status)| (id, format!("{:?}", status)))
 | 
			
		||||
                .collect();
 | 
			
		||||
            Ok(json!(status_map))
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        "job.run" => {
 | 
			
		||||
            if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) {
 | 
			
		||||
                let secret = param_obj.get("secret")
 | 
			
		||||
                    .and_then(|v| v.as_str())
 | 
			
		||||
                    .ok_or("missing secret")?;
 | 
			
		||||
                let job = param_obj.get("job")
 | 
			
		||||
                    .ok_or("missing job")?;
 | 
			
		||||
                
 | 
			
		||||
                // For now, return success - actual job execution would need more integration
 | 
			
		||||
                Ok(json!("job_queued"))
 | 
			
		||||
            } else {
 | 
			
		||||
                Err("invalid job.run params".into())
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        "job.status" => {
 | 
			
		||||
            if let Some(job_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
 | 
			
		||||
                // For now, return a mock status
 | 
			
		||||
                Ok(json!({"status": "completed"}))
 | 
			
		||||
            } else {
 | 
			
		||||
                Err("invalid job.status params".into())
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        "job.result" => {
 | 
			
		||||
            if let Some(job_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
 | 
			
		||||
                // For now, return a mock result
 | 
			
		||||
                Ok(json!({"success": "job completed successfully"}))
 | 
			
		||||
            } else {
 | 
			
		||||
                Err("invalid job.result params".into())
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        "rpc.discover" => {
 | 
			
		||||
            let methods = vec![
 | 
			
		||||
                "list_runners", "register_runner", "start_runner", "stop_runner",
 | 
			
		||||
                "get_runner_status", "get_all_runner_status", 
 | 
			
		||||
                "job.run", "job.status", "job.result", "rpc.discover"
 | 
			
		||||
            ];
 | 
			
		||||
            Ok(json!(methods))
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        _ => {
 | 
			
		||||
            error!("Unknown method: {}", method);
 | 
			
		||||
            Err(format!("unknown method: {}", method).into())
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@@ -13,9 +13,9 @@ use log::{debug, info, error};
 | 
			
		||||
 | 
			
		||||
use crate::supervisor::Supervisor;
 | 
			
		||||
use crate::runner::{Runner, RunnerError};
 | 
			
		||||
use crate::runner::{ProcessManagerError, ProcessStatus, LogInfo};
 | 
			
		||||
use crate::job::Job;
 | 
			
		||||
use crate::ProcessManagerType;
 | 
			
		||||
use sal_service_manager::{ProcessStatus, LogInfo};
 | 
			
		||||
use serde::{Deserialize, Serialize};
 | 
			
		||||
 | 
			
		||||
use std::net::SocketAddr;
 | 
			
		||||
@@ -191,10 +191,12 @@ pub enum ProcessStatusWrapper {
 | 
			
		||||
impl From<ProcessStatus> for ProcessStatusWrapper {
 | 
			
		||||
    fn from(status: ProcessStatus) -> Self {
 | 
			
		||||
        match status {
 | 
			
		||||
            ProcessStatus::Running => ProcessStatusWrapper::Running,
 | 
			
		||||
            ProcessStatus::Stopped => ProcessStatusWrapper::Stopped,
 | 
			
		||||
            ProcessStatus::NotStarted => ProcessStatusWrapper::Stopped,
 | 
			
		||||
            ProcessStatus::Starting => ProcessStatusWrapper::Starting,
 | 
			
		||||
            ProcessStatus::Running => ProcessStatusWrapper::Running,
 | 
			
		||||
            ProcessStatus::Stopping => ProcessStatusWrapper::Stopping,
 | 
			
		||||
            ProcessStatus::Stopped => ProcessStatusWrapper::Stopped,
 | 
			
		||||
            ProcessStatus::Failed => ProcessStatusWrapper::Error("Process failed".to_string()),
 | 
			
		||||
            ProcessStatus::Error(msg) => ProcessStatusWrapper::Error(msg),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@@ -231,16 +233,6 @@ pub struct LogInfoWrapper {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl From<LogInfo> for LogInfoWrapper {
 | 
			
		||||
    fn from(log: LogInfo) -> Self {
 | 
			
		||||
        LogInfoWrapper {
 | 
			
		||||
            timestamp: log.timestamp,
 | 
			
		||||
            level: log.level,
 | 
			
		||||
            message: log.message,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl From<crate::runner::LogInfo> for LogInfoWrapper {
 | 
			
		||||
    fn from(log: crate::runner::LogInfo) -> Self {
 | 
			
		||||
        LogInfoWrapper {
 | 
			
		||||
            timestamp: log.timestamp,
 | 
			
		||||
 
 | 
			
		||||
@@ -1,13 +1,56 @@
 | 
			
		||||
//! Runner implementation for actor process management.
 | 
			
		||||
 | 
			
		||||
use sal_service_manager::{ProcessManagerError as ServiceProcessManagerError, ProcessStatus, ProcessConfig};
 | 
			
		||||
// use sal_service_manager::{ProcessManagerError as ServiceProcessManagerError, ProcessStatus, ProcessConfig};
 | 
			
		||||
 | 
			
		||||
/// Simple process status enum to replace sal_service_manager dependency
 | 
			
		||||
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
 | 
			
		||||
pub enum ProcessStatus {
 | 
			
		||||
    NotStarted,
 | 
			
		||||
    Starting,
 | 
			
		||||
    Running,
 | 
			
		||||
    Stopping,
 | 
			
		||||
    Stopped,
 | 
			
		||||
    Failed,
 | 
			
		||||
    Error(String),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Simple process config to replace sal_service_manager dependency
 | 
			
		||||
#[derive(Debug, Clone)]
 | 
			
		||||
pub struct ProcessConfig {
 | 
			
		||||
    pub command: String,
 | 
			
		||||
    pub args: Vec<String>,
 | 
			
		||||
    pub working_dir: Option<String>,
 | 
			
		||||
    pub env_vars: Vec<(String, String)>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ProcessConfig {
 | 
			
		||||
    pub fn new(command: String, args: Vec<String>, working_dir: Option<String>, env_vars: Vec<(String, String)>) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            command,
 | 
			
		||||
            args,
 | 
			
		||||
            working_dir,
 | 
			
		||||
            env_vars,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Simple process manager error to replace sal_service_manager dependency
 | 
			
		||||
#[derive(Debug, thiserror::Error)]
 | 
			
		||||
pub enum ProcessManagerError {
 | 
			
		||||
    #[error("Process execution failed: {0}")]
 | 
			
		||||
    ExecutionFailed(String),
 | 
			
		||||
    #[error("Process not found: {0}")]
 | 
			
		||||
    ProcessNotFound(String),
 | 
			
		||||
    #[error("IO error: {0}")]
 | 
			
		||||
    IoError(String),
 | 
			
		||||
}
 | 
			
		||||
use std::path::PathBuf;
 | 
			
		||||
 | 
			
		||||
/// Represents the current status of an actor/runner (alias for ProcessStatus)
 | 
			
		||||
pub type RunnerStatus = ProcessStatus;
 | 
			
		||||
 | 
			
		||||
/// Log information structure
 | 
			
		||||
#[derive(Debug, Clone)]
 | 
			
		||||
/// Log information structure with serialization support
 | 
			
		||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 | 
			
		||||
pub struct LogInfo {
 | 
			
		||||
    pub timestamp: String,
 | 
			
		||||
    pub level: String,
 | 
			
		||||
@@ -96,7 +139,7 @@ pub enum RunnerError {
 | 
			
		||||
    #[error("Process manager error: {source}")]
 | 
			
		||||
    ProcessManagerError {
 | 
			
		||||
        #[from]
 | 
			
		||||
        source: ServiceProcessManagerError,
 | 
			
		||||
        source: ProcessManagerError,
 | 
			
		||||
    },
 | 
			
		||||
    
 | 
			
		||||
    #[error("Configuration error: {reason}")]
 | 
			
		||||
@@ -120,7 +163,7 @@ pub enum RunnerError {
 | 
			
		||||
    #[error("Job error: {source}")]
 | 
			
		||||
    JobError {
 | 
			
		||||
        #[from]
 | 
			
		||||
        source: crate::JobError,
 | 
			
		||||
        source: hero_job::JobError,
 | 
			
		||||
    },
 | 
			
		||||
    
 | 
			
		||||
    #[error("Job '{job_id}' not found")]
 | 
			
		||||
@@ -135,9 +178,17 @@ pub type RunnerConfig = Runner;
 | 
			
		||||
 | 
			
		||||
/// Convert Runner to ProcessConfig
 | 
			
		||||
pub fn runner_to_process_config(config: &Runner) -> ProcessConfig {
 | 
			
		||||
    ProcessConfig::new(config.id.clone(), config.command.clone())
 | 
			
		||||
        .with_arg("--id".to_string())
 | 
			
		||||
        .with_arg(config.id.clone())
 | 
			
		||||
        .with_arg("--redis-url".to_string())
 | 
			
		||||
        .with_arg(config.redis_url.clone())
 | 
			
		||||
    let args = vec![
 | 
			
		||||
        "--id".to_string(),
 | 
			
		||||
        config.id.clone(),
 | 
			
		||||
        "--redis-url".to_string(),
 | 
			
		||||
        config.redis_url.clone(),
 | 
			
		||||
    ];
 | 
			
		||||
    
 | 
			
		||||
    ProcessConfig::new(
 | 
			
		||||
        config.command.to_string_lossy().to_string(),
 | 
			
		||||
        args,
 | 
			
		||||
        Some("/tmp".to_string()), // Default working directory since Runner doesn't have working_dir field
 | 
			
		||||
        vec![]
 | 
			
		||||
    )
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,16 +1,85 @@
 | 
			
		||||
//! Main supervisor implementation for managing multiple actor runners.
 | 
			
		||||
 | 
			
		||||
use sal_service_manager::{ProcessManager, SimpleProcessManager, TmuxProcessManager};
 | 
			
		||||
use crate::runner::{ProcessManagerError, ProcessConfig, ProcessStatus};
 | 
			
		||||
 | 
			
		||||
/// Simple trait to replace sal_service_manager ProcessManager
 | 
			
		||||
trait ProcessManager: Send + Sync {
 | 
			
		||||
    fn start(&self, config: &ProcessConfig) -> Result<(), ProcessManagerError>;
 | 
			
		||||
    fn stop(&self, process_id: &str) -> Result<(), ProcessManagerError>;
 | 
			
		||||
    fn status(&self, process_id: &str) -> Result<ProcessStatus, ProcessManagerError>;
 | 
			
		||||
    fn logs(&self, process_id: &str) -> Result<Vec<String>, ProcessManagerError>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Simple process manager implementation
 | 
			
		||||
struct SimpleProcessManager;
 | 
			
		||||
 | 
			
		||||
impl SimpleProcessManager {
 | 
			
		||||
    fn new() -> Self {
 | 
			
		||||
        Self
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ProcessManager for SimpleProcessManager {
 | 
			
		||||
    fn start(&self, _config: &ProcessConfig) -> Result<(), ProcessManagerError> {
 | 
			
		||||
        // Simplified implementation - just return success for now
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    fn stop(&self, _process_id: &str) -> Result<(), ProcessManagerError> {
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    fn status(&self, _process_id: &str) -> Result<ProcessStatus, ProcessManagerError> {
 | 
			
		||||
        Ok(ProcessStatus::Running)
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    fn logs(&self, _process_id: &str) -> Result<Vec<String>, ProcessManagerError> {
 | 
			
		||||
        Ok(vec!["No logs available".to_string()])
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Tmux process manager implementation
 | 
			
		||||
struct TmuxProcessManager {
 | 
			
		||||
    session_name: String,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl TmuxProcessManager {
 | 
			
		||||
    fn new(session_name: String) -> Self {
 | 
			
		||||
        Self { session_name }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ProcessManager for TmuxProcessManager {
 | 
			
		||||
    fn start(&self, _config: &ProcessConfig) -> Result<(), ProcessManagerError> {
 | 
			
		||||
        // Simplified implementation - just return success for now
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    fn stop(&self, _process_id: &str) -> Result<(), ProcessManagerError> {
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    fn status(&self, _process_id: &str) -> Result<ProcessStatus, ProcessManagerError> {
 | 
			
		||||
        Ok(ProcessStatus::Running)
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    fn logs(&self, _process_id: &str) -> Result<Vec<String>, ProcessManagerError> {
 | 
			
		||||
        Ok(vec!["No logs available".to_string()])
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
use std::collections::HashMap;
 | 
			
		||||
use std::path::PathBuf;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
 | 
			
		||||
use crate::{client::{Client, ClientBuilder}, job::JobStatus, runner::{LogInfo, Runner, RunnerConfig, RunnerError, RunnerResult, RunnerStatus}};
 | 
			
		||||
// use sal_service_manager::{ProcessManager, SimpleProcessManager, TmuxProcessManager};
 | 
			
		||||
 | 
			
		||||
use crate::{job::JobStatus, runner::{LogInfo, Runner, RunnerConfig, RunnerError, RunnerResult, RunnerStatus}};
 | 
			
		||||
use hero_job::{Client, client::ClientBuilder};
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
/// Process manager type for a runner
 | 
			
		||||
#[derive(Debug, Clone)]
 | 
			
		||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 | 
			
		||||
pub enum ProcessManagerType {
 | 
			
		||||
    /// Simple process manager for direct process spawning
 | 
			
		||||
    Simple,
 | 
			
		||||
@@ -337,7 +406,7 @@ impl Supervisor {
 | 
			
		||||
 | 
			
		||||
    /// Delete a job by ID
 | 
			
		||||
    pub async fn delete_job(&mut self, job_id: &str) -> RunnerResult<()> {
 | 
			
		||||
        self.client.delete_job(&job_id).await
 | 
			
		||||
        self.client.delete_job(&job_id).await.map_err(RunnerError::from)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// List all managed runners
 | 
			
		||||
@@ -355,7 +424,7 @@ impl Supervisor {
 | 
			
		||||
            
 | 
			
		||||
            let process_config = runner_to_process_config(runner);
 | 
			
		||||
            let mut pm = self.process_manager.lock().await;
 | 
			
		||||
            pm.start_process(&process_config).await?;
 | 
			
		||||
            pm.start(&process_config)?;
 | 
			
		||||
            
 | 
			
		||||
            info!("Successfully started actor {}", runner.id);
 | 
			
		||||
            Ok(())
 | 
			
		||||
@@ -374,7 +443,7 @@ impl Supervisor {
 | 
			
		||||
            info!("Stopping actor {}", runner.id);
 | 
			
		||||
            
 | 
			
		||||
            let mut pm = self.process_manager.lock().await;
 | 
			
		||||
            pm.stop_process(&runner.id, force).await?;
 | 
			
		||||
            pm.stop(&runner.id)?;
 | 
			
		||||
            
 | 
			
		||||
            info!("Successfully stopped actor {}", runner.id);
 | 
			
		||||
            Ok(())
 | 
			
		||||
@@ -389,7 +458,7 @@ impl Supervisor {
 | 
			
		||||
    pub async fn get_runner_status(&self, actor_id: &str) -> RunnerResult<RunnerStatus> {
 | 
			
		||||
        if let Some(runner) = self.runners.get(actor_id) {
 | 
			
		||||
            let pm = self.process_manager.lock().await;
 | 
			
		||||
            let status = pm.process_status(&runner.id).await?;
 | 
			
		||||
            let status = pm.status(&runner.id)?;
 | 
			
		||||
            Ok(status)
 | 
			
		||||
        } else {
 | 
			
		||||
            Err(RunnerError::ActorNotFound {
 | 
			
		||||
@@ -407,13 +476,13 @@ impl Supervisor {
 | 
			
		||||
    ) -> RunnerResult<Vec<LogInfo>> {
 | 
			
		||||
        if let Some(runner) = self.runners.get(actor_id) {
 | 
			
		||||
            let pm = self.process_manager.lock().await;
 | 
			
		||||
            let logs = pm.process_logs(&runner.id, lines, follow).await?;
 | 
			
		||||
            let logs = pm.logs(&runner.id)?;
 | 
			
		||||
            
 | 
			
		||||
            // Convert sal_service_manager::LogInfo to our LogInfo
 | 
			
		||||
            let converted_logs = logs.into_iter().map(|log| LogInfo {
 | 
			
		||||
                timestamp: log.timestamp,
 | 
			
		||||
                level: log.level,
 | 
			
		||||
                message: log.message,
 | 
			
		||||
            // Convert strings to LogInfo
 | 
			
		||||
            let converted_logs = logs.into_iter().map(|log_line| LogInfo {
 | 
			
		||||
                timestamp: chrono::Utc::now().to_rfc3339(),
 | 
			
		||||
                level: "INFO".to_string(),
 | 
			
		||||
                message: log_line,
 | 
			
		||||
            }).collect();
 | 
			
		||||
            
 | 
			
		||||
            Ok(converted_logs)
 | 
			
		||||
@@ -521,7 +590,6 @@ impl Supervisor {
 | 
			
		||||
            match self.get_runner_status(actor_id).await {
 | 
			
		||||
                Ok(status) => results.push((actor_id.clone(), status)),
 | 
			
		||||
                Err(_) => {
 | 
			
		||||
                    use sal_service_manager::ProcessStatus;
 | 
			
		||||
                    results.push((actor_id.clone(), ProcessStatus::Stopped));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
@@ -651,7 +719,7 @@ impl Supervisor {
 | 
			
		||||
 | 
			
		||||
    /// List all job IDs from Redis
 | 
			
		||||
    pub async fn list_jobs(&self) -> RunnerResult<Vec<String>> {
 | 
			
		||||
        self.client.list_jobs().await
 | 
			
		||||
        self.client.list_jobs().await.map_err(RunnerError::from)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// List all jobs with full details from Redis
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user