This commit is contained in:
Timur Gordon
2025-08-01 00:01:08 +02:00
parent 32c2cbe0cc
commit 8ed40ce99c
57 changed files with 2047 additions and 4113 deletions

View File

@@ -0,0 +1,102 @@
// Added error
// Duration is still used, Instant and sleep were removed
/// Comprehensive error type for all possible failures in the Rhai client.
///
/// This enum covers all error scenarios that can occur during client operations,
/// from Redis connectivity issues to task execution timeouts.
#[derive(Debug)]
pub enum SupervisorError {
/// Redis connection or operation error
RedisError(redis::RedisError),
/// JSON serialization/deserialization error
SerializationError(serde_json::Error),
/// Task execution timeout - contains the task_id that timed out
Timeout(String),
/// Task not found after submission - contains the task_id (rare occurrence)
TaskNotFound(String),
/// Context ID is missing
ContextIdMissing,
/// Invalid input provided
InvalidInput(String),
/// Job operation error
JobError(hero_job::JobError),
/// Worker lifecycle management errors
WorkerStartFailed(String, String),
WorkerStopFailed(String, String),
WorkerRestartFailed(String, String),
WorkerStatusFailed(String, String),
WorkerNotFound(String),
PingJobFailed(String, String),
/// Zinit client operation error
ZinitError(String),
SupervisorNotConfigured,
}
impl From<redis::RedisError> for SupervisorError {
fn from(err: redis::RedisError) -> Self {
SupervisorError::RedisError(err)
}
}
impl From<serde_json::Error> for SupervisorError {
fn from(err: serde_json::Error) -> Self {
SupervisorError::SerializationError(err)
}
}
impl From<hero_job::JobError> for SupervisorError {
fn from(err: hero_job::JobError) -> Self {
SupervisorError::JobError(err)
}
}
impl std::fmt::Display for SupervisorError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SupervisorError::RedisError(e) => write!(f, "Redis error: {}", e),
SupervisorError::SerializationError(e) => write!(f, "Serialization error: {}", e),
SupervisorError::Timeout(task_id) => {
write!(f, "Timeout waiting for task {} to complete", task_id)
}
SupervisorError::TaskNotFound(task_id) => {
write!(f, "Task {} not found after submission", task_id)
}
SupervisorError::ContextIdMissing => {
write!(f, "Context ID is missing")
}
SupervisorError::InvalidInput(msg) => {
write!(f, "Invalid input: {}", msg)
}
SupervisorError::JobError(e) => {
write!(f, "Job error: {}", e)
}
SupervisorError::WorkerStartFailed(worker, reason) => {
write!(f, "Failed to start worker '{}': {}", worker, reason)
}
SupervisorError::WorkerStopFailed(worker, reason) => {
write!(f, "Failed to stop worker '{}': {}", worker, reason)
}
SupervisorError::WorkerRestartFailed(worker, reason) => {
write!(f, "Failed to restart worker '{}': {}", worker, reason)
}
SupervisorError::WorkerStatusFailed(worker, reason) => {
write!(f, "Failed to get status for worker '{}': {}", worker, reason)
}
SupervisorError::WorkerNotFound(worker) => {
write!(f, "Worker '{}' not found", worker)
}
SupervisorError::PingJobFailed(worker, reason) => {
write!(f, "Ping job failed for worker '{}': {}", worker, reason)
}
SupervisorError::ZinitError(msg) => {
write!(f, "Zinit error: {}", msg)
}
SupervisorError::SupervisorNotConfigured => {
write!(f, "Supervisor not configured for health monitoring")
}
}
}
}
impl std::error::Error for SupervisorError {}

261
core/supervisor/src/job.rs Normal file
View File

@@ -0,0 +1,261 @@
use chrono::Utc;
use std::collections::HashMap;
use std::time::Duration;
use uuid::Uuid;
use crate::{Supervisor, SupervisorError};
use hero_job::{Job, ScriptType};
/// Builder for constructing and submitting script execution requests.
///
/// This builder provides a fluent interface for configuring script execution
/// parameters and offers two submission modes: fire-and-forget (`submit()`)
/// and request-reply (`await_response()`).
///
/// # Example
///
/// ```rust,no_run
/// use std::time::Duration;
/// use hero_supervisor::ScriptType;
///
/// # async fn example(client: &hero_supervisor::Supervisor) -> Result<String, hero_supervisor::SupervisorError> {
/// let result = client
/// .new_job()
/// .script_type(ScriptType::OSIS)
/// .script(r#"print("Hello, World!");"#)
/// .timeout(Duration::from_secs(30))
/// .await_response()
/// .await?;
/// # Ok(result)
/// # }
/// ```
pub struct JobBuilder<'a> {
client: &'a Supervisor,
request_id: String,
context_id: String,
caller_id: String,
script: String,
script_type: ScriptType,
timeout: Duration,
retries: u32,
concurrent: bool,
log_path: Option<String>,
env_vars: HashMap<String, String>,
prerequisites: Vec<String>,
dependents: Vec<String>
}
impl<'a> JobBuilder<'a> {
pub fn new(client: &'a Supervisor) -> Self {
Self {
client,
request_id: "".to_string(),
context_id: "".to_string(),
caller_id: "".to_string(),
script: "".to_string(),
script_type: ScriptType::OSIS, // Default to OSIS
timeout: Duration::from_secs(5),
retries: 0,
concurrent: false,
log_path: None,
env_vars: HashMap::new(),
prerequisites: Vec::new(),
dependents: Vec::new(),
}
}
pub fn request_id(mut self, request_id: &str) -> Self {
self.request_id = request_id.to_string();
self
}
pub fn script_type(mut self, script_type: ScriptType) -> Self {
self.script_type = script_type;
self
}
pub fn context_id(mut self, context_id: &str) -> Self {
self.context_id = context_id.to_string();
self
}
pub fn script(mut self, script: &str) -> Self {
self.script = script.to_string();
self
}
pub fn script_path(mut self, script_path: &str) -> Self {
self.script = std::fs::read_to_string(script_path).unwrap();
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn log_path(mut self, log_path: &str) -> Self {
self.log_path = Some(log_path.to_string());
self
}
/// Set a single environment variable
pub fn env_var(mut self, key: &str, value: &str) -> Self {
self.env_vars.insert(key.to_string(), value.to_string());
self
}
/// Set multiple environment variables from a HashMap
pub fn env_vars(mut self, env_vars: HashMap<String, String>) -> Self {
self.env_vars.extend(env_vars);
self
}
/// Clear all environment variables
pub fn clear_env_vars(mut self) -> Self {
self.env_vars.clear();
self
}
/// Add a prerequisite job ID that must complete before this job can run
pub fn prerequisite(mut self, job_id: &str) -> Self {
self.prerequisites.push(job_id.to_string());
self
}
/// Set multiple prerequisite job IDs
pub fn prerequisites(mut self, job_ids: Vec<String>) -> Self {
self.prerequisites.extend(job_ids);
self
}
/// Add a dependent job ID that depends on this job completing
pub fn dependent(mut self, job_id: &str) -> Self {
self.dependents.push(job_id.to_string());
self
}
/// Set multiple dependent job IDs
pub fn dependents(mut self, job_ids: Vec<String>) -> Self {
self.dependents.extend(job_ids);
self
}
/// Clear all prerequisites
pub fn clear_prerequisites(mut self) -> Self {
self.prerequisites.clear();
self
}
/// Clear all dependents
pub fn clear_dependents(mut self) -> Self {
self.dependents.clear();
self
}
pub fn build(self) -> Result<Job, SupervisorError> {
let request_id = if self.request_id.is_empty() {
// Generate a UUID for the request_id
Uuid::new_v4().to_string()
} else {
self.request_id.clone()
};
if self.context_id.is_empty() {
return Err(SupervisorError::ContextIdMissing);
}
if self.caller_id.is_empty() {
return Err(SupervisorError::ContextIdMissing);
}
let now = Utc::now();
Ok(Job {
id: request_id,
caller_id: self.caller_id,
context_id: self.context_id,
script: self.script,
script_type: self.script_type,
timeout: self.timeout,
retries: self.retries as u8,
concurrent: self.concurrent,
log_path: self.log_path.clone(),
env_vars: self.env_vars.clone(),
prerequisites: self.prerequisites.clone(),
dependents: self.dependents.clone(),
created_at: now,
updated_at: now,
})
}
pub async fn submit(self) -> Result<(), SupervisorError> {
// Create job first, then use client reference
let request_id = if self.request_id.is_empty() {
Uuid::new_v4().to_string()
} else {
self.request_id
};
if self.context_id.is_empty() {
return Err(SupervisorError::ContextIdMissing);
}
let now = Utc::now();
let job = Job {
id: request_id,
caller_id: self.caller_id,
context_id: self.context_id,
script: self.script,
script_type: self.script_type.clone(),
timeout: self.timeout,
retries: self.retries as u8,
concurrent: self.concurrent,
log_path: self.log_path.clone(),
env_vars: self.env_vars.clone(),
prerequisites: self.prerequisites.clone(),
dependents: self.dependents.clone(),
created_at: now,
updated_at: now,
};
self.client.create_job(&job).await?;
Ok(())
}
pub async fn await_response(self) -> Result<String, SupervisorError> {
// Create job first, then use client reference
let request_id = if self.request_id.is_empty() {
Uuid::new_v4().to_string()
} else {
self.request_id
};
if self.context_id.is_empty() {
return Err(SupervisorError::ContextIdMissing);
}
let now = Utc::now();
let job = Job {
id: request_id,
caller_id: self.caller_id.clone(),
context_id: self.context_id,
script: self.script,
script_type: self.script_type.clone(),
timeout: self.timeout,
retries: self.retries as u8,
concurrent: self.concurrent,
log_path: self.log_path.clone(),
env_vars: self.env_vars.clone(),
prerequisites: self.prerequisites.clone(),
dependents: self.dependents.clone(),
created_at: now,
updated_at: now,
};
let result = self.client.run_job_and_await_result(&job).await?;
Ok(result)
}
}

596
core/supervisor/src/lib.rs Normal file
View File

@@ -0,0 +1,596 @@
use log::{debug, error, info, warn};
use redis::AsyncCommands;
use std::collections::HashMap;
use std::time::Duration;
use hero_job::NAMESPACE_PREFIX;
use zinit_client::ZinitClient;
mod job;
mod error;
mod lifecycle;
pub use crate::error::SupervisorError;
pub use crate::job::JobBuilder;
pub use crate::lifecycle::WorkerConfig;
// Re-export types from hero_job for public API
pub use hero_job::{Job, JobStatus, ScriptType};
pub struct Supervisor {
redis_client: redis::Client,
zinit_client: ZinitClient,
builder_data: Option<SupervisorBuilderData>,
}
pub struct SupervisorBuilder {
redis_url: Option<String>,
zinit_socket_path: Option<String>,
osis_worker: Option<String>,
sal_worker: Option<String>,
v_worker: Option<String>,
python_worker: Option<String>,
worker_env_vars: HashMap<String, String>,
}
/// Helper struct to pass builder data to worker launch method
struct SupervisorBuilderData {
osis_worker: Option<String>,
sal_worker: Option<String>,
v_worker: Option<String>,
python_worker: Option<String>,
worker_env_vars: HashMap<String, String>,
}
impl SupervisorBuilder {
pub fn new() -> Self {
Self {
redis_url: None,
zinit_socket_path: Some("/var/run/zinit.sock".to_string()),
osis_worker: None,
sal_worker: None,
v_worker: None,
python_worker: None,
worker_env_vars: HashMap::new(),
}
}
pub fn redis_url(mut self, url: &str) -> Self {
self.redis_url = Some(url.to_string());
self
}
pub fn zinit_socket_path(mut self, path: &str) -> Self {
self.zinit_socket_path = Some(path.to_string());
self
}
pub fn osis_worker(mut self, binary_path: &str) -> Self {
self.osis_worker = Some(binary_path.to_string());
self
}
pub fn sal_worker(mut self, binary_path: &str) -> Self {
self.sal_worker = Some(binary_path.to_string());
self
}
pub fn v_worker(mut self, binary_path: &str) -> Self {
self.v_worker = Some(binary_path.to_string());
self
}
pub fn python_worker(mut self, binary_path: &str) -> Self {
self.python_worker = Some(binary_path.to_string());
self
}
pub fn worker_env_var(mut self, key: &str, value: &str) -> Self {
self.worker_env_vars.insert(key.to_string(), value.to_string());
self
}
pub fn worker_env_vars(mut self, env_vars: HashMap<String, String>) -> Self {
self.worker_env_vars.extend(env_vars);
self
}
/// Builds the final `Supervisor` instance synchronously.
///
/// This method validates the configuration and creates the Redis client.
/// Worker launching is deferred to the `start_workers()` method.
///
/// # Returns
///
/// * `Ok(Supervisor)` - Successfully configured client
/// * `Err(SupervisorError)` - Configuration or connection error
pub fn build(self) -> Result<Supervisor, SupervisorError> {
let url = self.redis_url
.unwrap_or_else(|| "redis://127.0.0.1/".to_string());
let client = redis::Client::open(url)?;
let zinit_socket = self.zinit_socket_path
.unwrap_or_else(|| "/var/run/zinit.sock".to_string());
let zinit_client = ZinitClient::new(&zinit_socket);
// Store builder data for later use in start_workers()
let builder_data = SupervisorBuilderData {
osis_worker: self.osis_worker,
sal_worker: self.sal_worker,
v_worker: self.v_worker,
python_worker: self.python_worker,
worker_env_vars: self.worker_env_vars,
};
let supervisor = Supervisor {
redis_client: client,
zinit_client,
builder_data: Some(builder_data),
};
Ok(supervisor)
}
}
impl Supervisor {
/// Start all configured workers asynchronously.
/// This method should be called after build() to launch the workers.
pub async fn start_workers(&self) -> Result<(), SupervisorError> {
// Clean up any existing worker services first
self.cleanup_existing_workers().await?;
// Launch configured workers if builder data is available
if let Some(builder_data) = &self.builder_data {
self.launch_configured_workers(builder_data).await?;
}
Ok(())
}
/// Clean up all worker services from zinit on program exit
pub async fn cleanup_and_shutdown(&self) -> Result<(), SupervisorError> {
info!("Cleaning up worker services before shutdown...");
let worker_names = vec![
"osis_worker_1",
"sal_worker_1",
"v_worker_1",
"python_worker_1"
];
for worker_name in worker_names {
if let Err(e) = self.stop_and_delete_worker(worker_name).await {
warn!("Failed to cleanup worker {}: {}", worker_name, e);
}
}
info!("Worker cleanup completed");
Ok(())
}
/// Clean up any existing worker services on startup
async fn cleanup_existing_workers(&self) -> Result<(), SupervisorError> {
info!("Cleaning up any existing worker services...");
let worker_names = vec![
"osis_worker_1",
"sal_worker_1",
"v_worker_1",
"python_worker_1"
];
for worker_name in worker_names {
// Try to stop and delete, but don't fail if they don't exist
let _ = self.stop_and_delete_worker(worker_name).await;
}
info!("Existing worker cleanup completed");
Ok(())
}
/// Stop and delete a worker service from zinit
async fn stop_and_delete_worker(&self, worker_name: &str) -> Result<(), SupervisorError> {
// First try to stop the worker
if let Err(e) = self.zinit_client.stop(worker_name).await {
debug!("Worker {} was not running or failed to stop: {}", worker_name, e);
}
// Then try to delete the service
if let Err(e) = self.zinit_client.delete(worker_name).await {
debug!("Worker {} service did not exist or failed to delete: {}", worker_name, e);
} else {
info!("Successfully deleted worker service: {}", worker_name);
}
Ok(())
}
/// Get the hardcoded worker queue key for the script type
fn get_worker_queue_key(&self, script_type: &ScriptType) -> String {
format!("{}worker_queue:{}", NAMESPACE_PREFIX, script_type.worker_queue_suffix())
}
pub fn new_job(&self) -> JobBuilder {
JobBuilder::new(self)
}
// Internal helper to submit script details and push to work queue
async fn create_job_using_connection(
&self,
conn: &mut redis::aio::MultiplexedConnection,
job: &Job,
) -> Result<(), SupervisorError> {
debug!(
"Submitting play request: {} for script type: {:?} with namespace prefix: {}",
job.id, job.script_type, NAMESPACE_PREFIX
);
// Use the shared Job struct's Redis storage method
job.store_in_redis(conn).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to store job in Redis: {}", e)))?;
Ok(())
}
// Internal helper to submit script details and push to work queue
async fn start_job_using_connection(
&self,
conn: &mut redis::aio::MultiplexedConnection,
job_id: String,
script_type: &ScriptType
) -> Result<(), SupervisorError> {
let worker_queue_key = self.get_worker_queue_key(script_type);
// lpush also infers its types, RV is typically i64 (length of list) or () depending on exact command variant
// For `redis::AsyncCommands::lpush`, it's `RedisResult<R>` where R: FromRedisValue
// Often this is the length of the list. Let's allow inference or specify if needed.
let _: redis::RedisResult<i64> =
conn.lpush(&worker_queue_key, job_id.clone()).await;
Ok(())
}
// Internal helper to await response from worker
async fn await_response_from_connection(
&self,
conn: &mut redis::aio::MultiplexedConnection,
job_key: &String,
reply_queue_key: &String,
timeout: Duration,
) -> Result<String, SupervisorError> {
// BLPOP on the reply queue
// The timeout for BLPOP is in seconds (integer)
let blpop_timeout_secs = timeout.as_secs().max(1); // Ensure at least 1 second for BLPOP timeout
match conn
.blpop::<&String, Option<(String, String)>>(reply_queue_key, blpop_timeout_secs as f64)
.await
{
Ok(Some((_queue, result_message_str))) => {
Ok(result_message_str)
}
Ok(None) => {
// BLPOP timed out
warn!(
"Timeout waiting for result on reply queue {} for job {}",
reply_queue_key, job_key
);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_queue_key).await;
Err(SupervisorError::Timeout(job_key.clone()))
}
Err(e) => {
// Redis error
error!(
"Redis error on BLPOP for reply queue {}: {}",
reply_queue_key, e
);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_queue_key).await;
Err(SupervisorError::RedisError(e))
}
}
}
// New method using dedicated reply queue
pub async fn create_job(
&self,
job: &Job,
) -> Result<(), SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
self.create_job_using_connection(
&mut conn,
&job, // Pass the job_id parameter
)
.await?;
Ok(())
}
// Method to start a previously created job
pub async fn start_job(
&self,
job_id: &str,
) -> Result<(), SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Load the job to get its script type
let job = Job::load_from_redis(&mut conn, job_id).await?;
self.start_job_using_connection(&mut conn, job_id.to_string(), &job.script_type).await?;
Ok(())
}
// New method using dedicated reply queue with automatic worker selection
pub async fn run_job_and_await_result(
&self,
job: &Job
) -> Result<String, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let reply_queue_key = format!("{}:reply:{}", NAMESPACE_PREFIX, job.id); // Derived from the passed job_id
self.create_job_using_connection(
&mut conn,
&job, // Pass the job_id parameter
)
.await?;
self.start_job_using_connection(&mut conn, job.id.clone(), &job.script_type).await?;
info!(
"Task {} submitted. Waiting for result on queue {} with timeout {:?}...",
job.id, // This is the UUID
reply_queue_key,
job.timeout
);
self.await_response_from_connection(
&mut conn,
&job.id,
&reply_queue_key,
job.timeout,
)
.await
}
// Method to get job status
pub async fn get_job_status(
&self,
job_id: &str,
) -> Result<JobStatus, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let job_key = format!("{}{}", NAMESPACE_PREFIX, job_id);
let result_map: Option<std::collections::HashMap<String, String>> =
conn.hgetall(&job_key).await?;
match result_map {
Some(map) => {
let status_str = map.get("status").cloned().unwrap_or_else(|| {
warn!("Task {}: 'status' field missing from Redis hash, defaulting to empty.", job_id);
String::new()
});
let status = match status_str.as_str() {
"dispatched" => JobStatus::Dispatched,
"started" => JobStatus::Started,
"error" => JobStatus::Error,
"finished" => JobStatus::Finished,
_ => JobStatus::Dispatched, // default
};
Ok(status)
}
None => {
warn!("Job {} not found in Redis", job_id);
Ok(JobStatus::Dispatched) // default for missing jobs
}
}
}
// Method to get job output
pub async fn get_job_output(
&self,
job_id: &str,
) -> Result<Option<String>, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let job_key = format!("{}{}", NAMESPACE_PREFIX, job_id);
let result_map: Option<std::collections::HashMap<String, String>> =
conn.hgetall(&job_key).await?;
match result_map {
Some(map) => {
Ok(map.get("output").cloned())
}
None => {
warn!("Job {} not found in Redis", job_id);
Ok(None)
}
}
}
/// List all jobs in Redis
pub async fn list_jobs(&self) -> Result<Vec<String>, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Use the shared Job struct's list method
Job::list_all_job_ids(&mut conn).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to list jobs: {}", e)))
}
/// Stop a job by pushing its ID to the stop queue
pub async fn stop_job(&self, job_id: &str) -> Result<(), SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Get job details to determine script type and appropriate worker
let job_key = format!("{}job:{}", NAMESPACE_PREFIX, job_id);
let job_data: std::collections::HashMap<String, String> = conn.hgetall(&job_key).await?;
if job_data.is_empty() {
return Err(SupervisorError::InvalidInput(format!("Job {} not found", job_id)));
}
// Parse script type from job data
let script_type_str = job_data.get("script_type")
.ok_or_else(|| SupervisorError::InvalidInput("Job missing script_type field".to_string()))?;
let script_type: ScriptType = serde_json::from_str(&format!("\"{}\"", script_type_str))
.map_err(|e| SupervisorError::InvalidInput(format!("Invalid script type: {}", e)))?;
// Use hardcoded stop queue key for this script type
let stop_queue_key = format!("{}stop_queue:{}", NAMESPACE_PREFIX, script_type.worker_queue_suffix());
// Push job ID to the stop queue
conn.lpush::<_, _, ()>(&stop_queue_key, job_id).await?;
info!("Job {} added to stop queue {} for script type {:?}", job_id, stop_queue_key, script_type);
Ok(())
}
/// Get logs for a job by reading from its log file
pub async fn get_job_logs(&self, job_id: &str) -> Result<Option<String>, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let job_key = format!("{}job:{}", NAMESPACE_PREFIX, job_id);
// Get the job data to find the log path
let result_map: Option<std::collections::HashMap<String, String>> =
conn.hgetall(&job_key).await?;
match result_map {
Some(map) => {
if let Some(log_path) = map.get("log_path") {
// Try to read the log file
match std::fs::read_to_string(log_path) {
Ok(contents) => Ok(Some(contents)),
Err(e) => {
warn!("Failed to read log file {}: {}", log_path, e);
Ok(None)
}
}
} else {
// No log path configured for this job
Ok(None)
}
}
None => {
warn!("Job {} not found in Redis", job_id);
Ok(None)
}
}
}
/// Delete a specific job by ID
pub async fn delete_job(&self, job_id: &str) -> Result<(), SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Use the shared Job struct's delete method
Job::delete_from_redis(&mut conn, job_id).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to delete job: {}", e)))?;
info!("Job {} deleted successfully", job_id);
Ok(())
}
/// Clear all jobs from Redis
pub async fn clear_all_jobs(&self) -> Result<usize, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Get all job IDs first
let job_ids = Job::list_all_job_ids(&mut conn).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to list jobs: {}", e)))?;
let count = job_ids.len();
// Delete each job using the shared method
for job_id in job_ids {
Job::delete_from_redis(&mut conn, &job_id).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to delete job {}: {}", job_id, e)))?;
}
Ok(count)
}
/// Check if all prerequisites for a job are completed
pub async fn check_prerequisites_completed(&self, job_id: &str) -> Result<bool, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Load the job using the shared Job struct
let job = Job::load_from_redis(&mut conn, job_id).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to load job: {}", e)))?;
// Check each prerequisite job status
for prereq_id in &job.prerequisites {
let status = Job::get_status(&mut conn, prereq_id).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to get prerequisite status: {}", e)))?;
if status != JobStatus::Finished {
return Ok(false); // Prerequisite not completed
}
}
Ok(true) // All prerequisites completed (or no prerequisites)
}
/// Update job status and check dependent jobs for readiness
pub async fn update_job_status_and_check_dependents(&self, job_id: &str, new_status: JobStatus) -> Result<Vec<String>, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Update job status using shared Job method
Job::update_status(&mut conn, job_id, new_status.clone()).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to update job status: {}", e)))?;
let mut ready_jobs = Vec::new();
// If job finished, check dependent jobs
if new_status == JobStatus::Finished {
// Load the job to get its dependents
let job = Job::load_from_redis(&mut conn, job_id).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to load job: {}", e)))?;
// Check each dependent job
for dependent_id in &job.dependents {
let dependent_status = Job::get_status(&mut conn, dependent_id).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to get dependent status: {}", e)))?;
// Only check jobs that are waiting for prerequisites
if dependent_status == JobStatus::WaitingForPrerequisites {
// Check if all prerequisites are now completed
if self.check_prerequisites_completed(dependent_id).await? {
// Update status to dispatched and add to ready jobs
Job::update_status(&mut conn, dependent_id, JobStatus::Dispatched).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to update dependent status: {}", e)))?;
ready_jobs.push(dependent_id.clone());
}
}
}
}
Ok(ready_jobs)
}
/// Dispatch jobs that are ready (have all prerequisites completed)
pub async fn dispatch_ready_jobs(&self, ready_job_ids: Vec<String>) -> Result<(), SupervisorError> {
for job_id in ready_job_ids {
// Get job data to determine script type and select worker
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let job_key = format!("{}job:{}", NAMESPACE_PREFIX, job_id);
let job_data: std::collections::HashMap<String, String> = conn.hgetall(&job_key).await?;
if let Some(script_type_str) = job_data.get("script_type") {
// Parse script type (stored as Debug format, e.g., "OSIS")
let script_type = match script_type_str.as_str() {
"OSIS" => ScriptType::OSIS,
"SAL" => ScriptType::SAL,
"V" => ScriptType::V,
"Python" => ScriptType::Python,
_ => return Err(SupervisorError::InvalidInput(format!("Unknown script type: {}", script_type_str))),
};
// Dispatch job using hardcoded queue
self.start_job_using_connection(&mut conn, job_id, &script_type).await?;
}
}
Ok(())
}
}

View File

@@ -0,0 +1,368 @@
//! Worker lifecycle management functionality for the Hero Supervisor
//!
//! This module provides worker process lifecycle management using Zinit as the process manager.
//! All functionality is implemented as methods on the Supervisor struct for a clean API.
use log::{debug, error, info, warn};
use serde_json::json;
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;
use zinit_client::{ZinitClient, ServiceStatus, ServiceState};
use hero_job::ScriptType;
use crate::{Supervisor, SupervisorError};
/// Information about a worker including its configuration and current status
#[derive(Debug, Clone)]
pub struct WorkerInfo {
pub config: WorkerConfig,
pub status: Option<ServiceStatus>,
pub is_running: bool,
}
/// Configuration for a worker binary
#[derive(Debug, Clone)]
pub struct WorkerConfig {
/// Name of the worker service
pub name: String,
/// Path to the worker binary
pub binary_path: PathBuf,
/// Script type this worker handles
pub script_type: ScriptType,
/// Command line arguments for the worker
pub args: Vec<String>,
/// Environment variables for the worker
pub env: HashMap<String, String>,
/// Whether this worker should restart on exit
pub restart_on_exit: bool,
/// Health check command (optional)
pub health_check: Option<String>,
/// Dependencies that must be running first
pub dependencies: Vec<String>,
}
impl WorkerConfig {
pub fn new(name: String, binary_path: PathBuf, script_type: ScriptType) -> Self {
Self {
name,
binary_path,
script_type,
args: Vec::new(),
env: HashMap::new(),
restart_on_exit: true,
health_check: None,
dependencies: Vec::new(),
}
}
pub fn with_args(mut self, args: Vec<String>) -> Self {
self.args = args;
self
}
pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
self.env = env;
self
}
pub fn with_health_check(mut self, health_check: String) -> Self {
self.health_check = Some(health_check);
self
}
pub fn with_dependencies(mut self, dependencies: Vec<String>) -> Self {
self.dependencies = dependencies;
self
}
pub fn no_restart(mut self) -> Self {
self.restart_on_exit = false;
self
}
}
/// Worker lifecycle management methods for Supervisor
impl Supervisor {
/// Get all workers with their configuration and status - unified method
pub async fn get_workers(&self, worker_configs: &[WorkerConfig]) -> Vec<WorkerInfo> {
let mut workers = Vec::new();
for config in worker_configs {
let status = self.zinit_client.status(&config.name).await.ok();
let is_running = status.as_ref()
.map(|s| matches!(s.state, ServiceState::Running) && s.pid > 0)
.unwrap_or(false);
workers.push(WorkerInfo {
config: config.clone(),
status,
is_running,
});
}
workers
}
/// Start a worker using Zinit
pub async fn start_worker(
&self,
worker_config: &WorkerConfig,
) -> Result<(), SupervisorError> {
info!("Starting worker: {}", worker_config.name);
// Create service configuration for Zinit
let service_config = self.create_service_config(worker_config);
// Create the service in Zinit
self.zinit_client.create_service(&worker_config.name, service_config).await
.map_err(|e| SupervisorError::ZinitError(format!("Failed to create service: {}", e)))?;
// Start the service
self.zinit_client.start(&worker_config.name).await
.map_err(|e| SupervisorError::ZinitError(format!("Failed to start worker: {}", e)))?;
info!("Successfully started worker: {}", worker_config.name);
Ok(())
}
/// Stop a worker using Zinit
pub async fn stop_worker(
&self,
worker_name: &str,
) -> Result<(), SupervisorError> {
info!("Stopping worker: {}", worker_name);
match self.zinit_client.stop(worker_name).await {
Ok(_) => {
info!("Successfully stopped worker: {}", worker_name);
Ok(())
}
Err(e) => {
error!("Failed to stop worker {}: {}", worker_name, e);
Err(SupervisorError::WorkerStopFailed(worker_name.to_string(), e.to_string()))
}
}
}
/// Restart a worker using Zinit
pub async fn restart_worker(
&self,
worker_name: &str,
) -> Result<(), SupervisorError> {
info!("Restarting worker: {}", worker_name);
match self.zinit_client.restart(worker_name).await {
Ok(_) => {
info!("Successfully restarted worker: {}", worker_name);
Ok(())
}
Err(e) => {
error!("Failed to restart worker {}: {}", worker_name, e);
Err(SupervisorError::WorkerRestartFailed(worker_name.to_string(), e.to_string()))
}
}
}
/// Get status of a worker using Zinit
pub async fn get_worker_status(
&self,
worker_name: &str,
zinit_client: &ZinitClient,
) -> Result<ServiceStatus, SupervisorError> {
match zinit_client.status(worker_name).await {
Ok(status) => Ok(status),
Err(e) => {
error!("Failed to get status for worker {}: {}", worker_name, e);
Err(SupervisorError::WorkerStatusFailed(worker_name.to_string(), e.to_string()))
}
}
}
/// Get status of all workers
pub async fn get_all_worker_status(
&self,
worker_configs: &[WorkerConfig],
zinit_client: &ZinitClient,
) -> Result<HashMap<String, ServiceStatus>, SupervisorError> {
let mut status_map = HashMap::new();
for worker in worker_configs {
match zinit_client.status(&worker.name).await {
Ok(status) => {
status_map.insert(worker.name.clone(), status);
}
Err(e) => {
warn!("Failed to get status for worker {}: {}", worker.name, e);
}
}
}
Ok(status_map)
}
/// Start multiple workers
pub async fn start_workers(
&self,
worker_configs: &[WorkerConfig],
) -> Result<(), SupervisorError> {
info!("Starting {} workers", worker_configs.len());
for worker in worker_configs {
self.start_worker(worker).await?;
}
Ok(())
}
/// Stop multiple workers
pub async fn stop_workers(
&self,
worker_names: &[String],
) -> Result<(), SupervisorError> {
info!("Stopping {} workers", worker_names.len());
for worker_name in worker_names {
self.stop_worker(worker_name).await?;
}
Ok(())
}
/// Get count of running workers for a script type
pub async fn get_running_worker_count(
&self,
worker_configs: &[WorkerConfig],
script_type: &ScriptType,
zinit_client: &ZinitClient,
) -> usize {
let mut running_count = 0;
for worker in worker_configs {
if worker.script_type == *script_type {
if let Ok(status) = zinit_client.status(&worker.name).await {
if status.state == ServiceState::Running {
running_count += 1;
}
}
}
}
running_count
}
/// Send a ping job to a worker for health checking
pub async fn send_ping_job(
&self,
script_type: ScriptType,
) -> Result<(), SupervisorError> {
// Create a ping job
let ping_job = self
.new_job()
.script_type(script_type.clone())
.script("ping") // Simple ping script
.timeout(Duration::from_secs(30))
.build()?;
// Execute the ping job with a short timeout
match self.run_job_and_await_result(&ping_job).await {
Ok(_) => {
debug!("Ping job successful for script type: {:?}", script_type);
Ok(())
}
Err(e) => {
warn!("Ping job failed for script type {:?}: {}", script_type, e);
Err(SupervisorError::PingJobFailed(format!("{:?}", script_type), e.to_string()))
}
}
}
/// Create Zinit service configuration from worker config
fn create_service_config(&self, worker: &WorkerConfig) -> serde_json::Value {
let mut config = json!({
"exec": format!("{} {}",
worker.binary_path.display(),
worker.args.join(" ")
),
"oneshot": !worker.restart_on_exit,
});
if let Some(health_check) = &worker.health_check {
config["test"] = json!(health_check);
}
if !worker.dependencies.is_empty() {
config["after"] = json!(worker.dependencies);
}
// Add environment variables if any
if !worker.env.is_empty() {
config["env"] = json!(worker.env);
}
config
}
/// Launch workers based on SupervisorBuilder configuration
pub(crate) async fn launch_configured_workers(&self, builder: &crate::SupervisorBuilderData) -> Result<(), SupervisorError> {
use hero_job::ScriptType;
use std::path::PathBuf;
// Launch OSIS worker if configured
if let Some(binary_path) = &builder.osis_worker {
let worker_id = "osis_worker_1";
let mut config = WorkerConfig::new(
worker_id.to_string(),
PathBuf::from(binary_path),
ScriptType::OSIS
);
config.env.extend(builder.worker_env_vars.clone());
info!("Launching OSIS worker: {}", worker_id);
self.start_worker(&config).await?;
}
// Launch SAL worker if configured
if let Some(binary_path) = &builder.sal_worker {
let worker_id = "sal_worker_1";
let mut config = WorkerConfig::new(
worker_id.to_string(),
PathBuf::from(binary_path),
ScriptType::SAL
);
config.env.extend(builder.worker_env_vars.clone());
info!("Launching SAL worker: {}", worker_id);
self.start_worker(&config).await?;
}
// Launch V worker if configured
if let Some(binary_path) = &builder.v_worker {
let worker_id = "v_worker_1";
let mut config = WorkerConfig::new(
worker_id.to_string(),
PathBuf::from(binary_path),
ScriptType::V
);
config.env.extend(builder.worker_env_vars.clone());
info!("Launching V worker: {}", worker_id);
self.start_worker(&config).await?;
}
// Launch Python worker if configured
if let Some(binary_path) = &builder.python_worker {
let worker_id = "python_worker_1";
let mut config = WorkerConfig::new(
worker_id.to_string(),
PathBuf::from(binary_path),
ScriptType::Python
);
config.env.extend(builder.worker_env_vars.clone());
info!("Launching Python worker: {}", worker_id);
self.start_worker(&config).await?;
}
Ok(())
}
}