This repository has been archived on 2025-11-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
supervisor/core/src/supervisor.rs

1033 lines
38 KiB
Rust

//! Main supervisor implementation for managing multiple actor runners.
use crate::runner::{ProcessManagerError, ProcessConfig, ProcessStatus};
/// Simple trait to replace sal_service_manager ProcessManager
trait ProcessManager: Send + Sync {
fn start(&self, config: &ProcessConfig) -> Result<(), ProcessManagerError>;
fn stop(&self, process_id: &str) -> Result<(), ProcessManagerError>;
fn status(&self, process_id: &str) -> Result<ProcessStatus, ProcessManagerError>;
fn logs(&self, process_id: &str) -> Result<Vec<String>, ProcessManagerError>;
}
/// Simple process manager implementation
struct SimpleProcessManager;
impl SimpleProcessManager {
fn new() -> Self {
Self
}
}
impl ProcessManager for SimpleProcessManager {
fn start(&self, _config: &ProcessConfig) -> Result<(), ProcessManagerError> {
// Simplified implementation - just return success for now
Ok(())
}
fn stop(&self, _process_id: &str) -> Result<(), ProcessManagerError> {
Ok(())
}
fn status(&self, _process_id: &str) -> Result<ProcessStatus, ProcessManagerError> {
Ok(ProcessStatus::Running)
}
fn logs(&self, _process_id: &str) -> Result<Vec<String>, ProcessManagerError> {
Ok(vec!["No logs available".to_string()])
}
}
/// Tmux process manager implementation
struct TmuxProcessManager {
session_name: String,
}
impl TmuxProcessManager {
fn new(session_name: String) -> Self {
Self { session_name }
}
}
impl ProcessManager for TmuxProcessManager {
fn start(&self, _config: &ProcessConfig) -> Result<(), ProcessManagerError> {
// Simplified implementation - just return success for now
Ok(())
}
fn stop(&self, _process_id: &str) -> Result<(), ProcessManagerError> {
Ok(())
}
fn status(&self, _process_id: &str) -> Result<ProcessStatus, ProcessManagerError> {
Ok(ProcessStatus::Running)
}
fn logs(&self, _process_id: &str) -> Result<Vec<String>, ProcessManagerError> {
Ok(vec!["No logs available".to_string()])
}
}
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
// use sal_service_manager::{ProcessManager, SimpleProcessManager, TmuxProcessManager};
use crate::{job::JobStatus, runner::{LogInfo, Runner, RunnerConfig, RunnerError, RunnerResult, RunnerStatus}};
use hero_job_client::{Client, ClientBuilder};
/// Process manager type for a runner
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum ProcessManagerType {
/// Simple process manager for direct process spawning
Simple,
/// Tmux process manager for session-based management
Tmux(String), // session name
}
/// Main supervisor that manages multiple runners
#[derive(Clone)]
pub struct Supervisor {
/// Map of runner name to runner configuration
runners: HashMap<String, Runner>,
/// Shared process manager for all runners
process_manager: Arc<Mutex<dyn ProcessManager>>,
/// Shared Redis client for all runners
redis_client: redis::Client,
/// Namespace for queue keys
namespace: String,
/// Admin secrets for full access (deprecated - use api_keys)
admin_secrets: Vec<String>,
/// User secrets for limited access (deprecated - use api_keys)
user_secrets: Vec<String>,
/// Register secrets for runner registration (deprecated - use api_keys)
register_secrets: Vec<String>,
/// API key store for named key management
api_keys: Arc<Mutex<crate::auth::ApiKeyStore>>,
/// Services for persistent storage
services: crate::services::Services,
client: Client,
}
pub struct SupervisorBuilder {
/// Map of runner name to runner configuration
runners: HashMap<String, Runner>,
/// Redis URL for connection
redis_url: String,
/// Process manager type
process_manager_type: ProcessManagerType,
/// Namespace for queue keys
namespace: String,
/// Admin secrets for full access
admin_secrets: Vec<String>,
/// User secrets for limited access
user_secrets: Vec<String>,
/// Register secrets for runner registration
register_secrets: Vec<String>,
client_builder: ClientBuilder,
}
impl SupervisorBuilder {
/// Create a new supervisor builder
pub fn new() -> Self {
Self {
runners: HashMap::new(),
redis_url: "redis://localhost:6379".to_string(),
process_manager_type: ProcessManagerType::Simple,
namespace: "".to_string(),
admin_secrets: Vec::new(),
user_secrets: Vec::new(),
register_secrets: Vec::new(),
client_builder: ClientBuilder::new(),
}
}
/// Set the Redis URL
pub fn redis_url<S: Into<String>>(mut self, url: S) -> Self {
let url_string = url.into();
self.redis_url = url_string.clone();
self.client_builder = self.client_builder.redis_url(url_string);
self
}
/// Set the process manager type
pub fn process_manager(mut self, pm_type: ProcessManagerType) -> Self {
self.process_manager_type = pm_type;
self
}
/// Set the namespace for queue keys
pub fn namespace<S: Into<String>>(mut self, namespace: S) -> Self {
let namespace_string = namespace.into();
self.namespace = namespace_string.clone();
self.client_builder = self.client_builder.namespace(namespace_string);
self
}
/// Add an admin secret
pub fn add_admin_secret<S: Into<String>>(mut self, secret: S) -> Self {
self.admin_secrets.push(secret.into());
self
}
/// Add multiple admin secrets
pub fn admin_secrets<I, S>(mut self, secrets: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.admin_secrets.extend(secrets.into_iter().map(|s| s.into()));
self
}
/// Add a user secret
pub fn add_user_secret<S: Into<String>>(mut self, secret: S) -> Self {
self.user_secrets.push(secret.into());
self
}
/// Add multiple user secrets
pub fn user_secrets<I, S>(mut self, secrets: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.user_secrets.extend(secrets.into_iter().map(|s| s.into()));
self
}
/// Add a register secret
pub fn add_register_secret<S: Into<String>>(mut self, secret: S) -> Self {
self.register_secrets.push(secret.into());
self
}
/// Add multiple register secrets
pub fn register_secrets<I, S>(mut self, secrets: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.register_secrets.extend(secrets.into_iter().map(|s| s.into()));
self
}
/// Add a runner to the supervisor
pub fn add_runner(mut self, runner: Runner) -> Self {
self.runners.insert(runner.id.clone(), runner);
self
}
/// Build the supervisor
pub async fn build(self) -> RunnerResult<Supervisor> {
// Create process manager based on type
let process_manager: Arc<Mutex<dyn ProcessManager>> = match &self.process_manager_type {
ProcessManagerType::Simple => {
Arc::new(Mutex::new(SimpleProcessManager::new()))
}
ProcessManagerType::Tmux(session_name) => {
Arc::new(Mutex::new(TmuxProcessManager::new(session_name.clone())))
}
};
// Create Redis client
let redis_client = redis::Client::open(self.redis_url.as_str())
.map_err(|e| RunnerError::ConfigError {
reason: format!("Invalid Redis URL: {}", e),
})?;
Ok(Supervisor {
client: self.client_builder.build().await.unwrap(),
runners: self.runners,
process_manager,
redis_client,
namespace: self.namespace,
admin_secrets: self.admin_secrets,
user_secrets: self.user_secrets,
register_secrets: self.register_secrets,
api_keys: Arc::new(Mutex::new(crate::auth::ApiKeyStore::new())),
services: crate::services::Services::new(),
})
}
}
impl Supervisor {
/// Create a new supervisor builder
pub fn builder() -> SupervisorBuilder {
SupervisorBuilder::new()
}
/// Add a new runner to the supervisor
pub async fn add_runner(
&mut self,
config: RunnerConfig,
) -> RunnerResult<()> {
// Runner is now just the config
let runner = Runner::from_config(config.clone());
self.runners.insert(config.id.clone(), runner);
Ok(())
}
/// Register a new runner with API key authentication
pub async fn register_runner(&mut self, key: &str, name: &str, _queue: &str) -> RunnerResult<()> {
// Verify API key and check scope
let api_key = self.verify_api_key(key).await
.ok_or_else(|| RunnerError::InvalidSecret("Invalid API key".to_string()))?;
// Check if key has admin or registrar scope
if api_key.scope != crate::auth::ApiKeyScope::Admin &&
api_key.scope != crate::auth::ApiKeyScope::Registrar {
return Err(RunnerError::InvalidSecret("Insufficient permissions. Requires admin or registrar scope".to_string()));
}
// Create a basic runner config for the named runner
let config = RunnerConfig {
id: name.to_string(), // Use the provided name as actor_id
name: name.to_string(), // Use the provided name as actor_id
namespace: self.namespace.clone(),
command: PathBuf::from("/tmp/mock_runner"), // Default path
redis_url: "redis://localhost:6379".to_string(),
extra_args: Vec::new(),
};
// Add the runner using existing logic
self.add_runner(config).await
}
/// Create a job (fire-and-forget, non-blocking) with API key authentication
pub async fn create_job(&mut self, key: &str, job: crate::job::Job) -> RunnerResult<String> {
// Verify API key and check scope
let api_key = self.verify_api_key(key).await
.ok_or_else(|| RunnerError::InvalidSecret("Invalid API key".to_string()))?;
// Check if key has admin or user scope
if api_key.scope != crate::auth::ApiKeyScope::Admin &&
api_key.scope != crate::auth::ApiKeyScope::User {
return Err(RunnerError::InvalidSecret("Insufficient permissions. Requires admin or user scope".to_string()));
}
// Find the runner by name
let runner = job.runner.clone();
let job_id = job.id.clone(); // Store job ID before moving job
if let Some(_runner) = self.runners.get(&runner) {
// Store job in Redis with "created" status so it can be retrieved later
self.client.store_job_in_redis_with_status(&job, hero_job::JobStatus::Created).await
.map_err(|e| RunnerError::QueueError {
actor_id: runner.clone(),
reason: format!("Failed to store job in Redis: {}", e),
})?;
// Store job metadata in the database with "created" status
// Job will only be queued when explicitly started via start_job
let job_metadata = crate::services::JobMetadata {
job_id: job_id.clone(),
runner: runner.clone(),
created_at: chrono::Utc::now().to_rfc3339(),
created_by: api_key.name.clone(),
status: "created".to_string(),
job: job.clone(),
};
self.services.jobs.store(job_metadata).await
.map_err(|e| RunnerError::ConfigError { reason: format!("Failed to store job: {}", e) })?;
// Do NOT queue the job to the runner - it will be queued when start_job is called
Ok(job_id) // Return the job ID immediately
} else {
Err(RunnerError::ActorNotFound {
actor_id: job.runner.clone(),
})
}
}
/// Run a job on the appropriate runner with API key authentication
/// This is a synchronous operation that queues the job, waits for the result, and returns it
pub async fn run_job(&mut self, key: &str, job: crate::job::Job) -> RunnerResult<Option<String>> {
// Verify API key and check scope
let api_key = self.verify_api_key(key).await
.ok_or_else(|| RunnerError::InvalidSecret("Invalid API key".to_string()))?;
// Check if key has admin or user scope
if api_key.scope != crate::auth::ApiKeyScope::Admin &&
api_key.scope != crate::auth::ApiKeyScope::User {
return Err(RunnerError::InvalidSecret("Insufficient permissions. Requires admin or user scope".to_string()));
}
// Find the runner by name
let runner = job.runner.clone();
if let Some(_runner) = self.runners.get(&runner) {
// Use the synchronous queue_and_wait method with a reasonable timeout (30 seconds)
self.queue_and_wait(&runner, job, 30).await
} else {
Err(RunnerError::ActorNotFound {
actor_id: job.runner.clone(),
})
}
}
/// Remove a runner from the supervisor
pub async fn remove_runner(&mut self, actor_id: &str) -> RunnerResult<()> {
if let Some(_instance) = self.runners.remove(actor_id) {
// Runner is removed from the map, which will drop the Arc
// and eventually clean up the runner when no more references exist
}
Ok(())
}
/// Get a runner by actor ID
pub fn get_runner(&self, actor_id: &str) -> Option<&Runner> {
self.runners.get(actor_id)
}
/// Get a job by job ID from Redis
pub async fn get_job(&self, job_id: &str) -> RunnerResult<crate::job::Job> {
let _conn = self.redis_client.get_multiplexed_async_connection().await
.map_err(|e| RunnerError::RedisError {
source: e
})?;
self.client.load_job_from_redis(job_id).await
.map_err(|e| RunnerError::QueueError {
actor_id: job_id.to_string(),
reason: format!("Failed to load job: {}", e),
})
}
/// Ping a runner by dispatching a ping job to its queue
pub async fn ping_runner(&mut self, runner_id: &str) -> RunnerResult<String> {
use crate::job::JobBuilder;
// Check if runner exists
if !self.runners.contains_key(runner_id) {
return Err(RunnerError::ActorNotFound {
actor_id: runner_id.to_string(),
});
}
// Create a ping job
let ping_job = JobBuilder::new()
.caller_id("supervisor_ping")
.context_id("ping_context")
.payload("ping")
.runner(runner_id)
.executor("ping")
.timeout(10)
.build()
.map_err(|e| RunnerError::QueueError {
actor_id: runner_id.to_string(),
reason: format!("Failed to create ping job: {}", e),
})?;
// Queue the ping job
let job_id = ping_job.id.clone();
self.queue_job_to_runner(runner_id, ping_job).await?;
Ok(job_id)
}
/// Stop a job by ID
pub async fn stop_job(&mut self, job_id: &str) -> RunnerResult<()> {
// For now, we'll implement a basic stop by removing the job from Redis
// In a more sophisticated implementation, you might send a stop signal to the runner
let _conn = self.redis_client.get_multiplexed_async_connection().await
.map_err(|e| RunnerError::QueueError {
actor_id: job_id.to_string(),
reason: format!("Failed to connect to Redis: {}", e),
})?;
let _job_key = self.client.set_job_status(job_id, JobStatus::Stopping).await;
Ok(())
}
/// Delete a job by ID (no authentication - should be called from authenticated endpoints)
pub async fn delete_job(&mut self, job_id: &str) -> RunnerResult<()> {
self.client.delete_job(&job_id).await.map_err(RunnerError::from)
}
/// Delete a job by ID with authentication
pub async fn delete_job_with_auth(&mut self, secret: &str, job_id: &str) -> RunnerResult<()> {
// Verify API key and check scope
let api_key = self.verify_api_key(secret).await
.ok_or_else(|| RunnerError::InvalidSecret("Invalid API key".to_string()))?;
// Check if key has admin or user scope
if api_key.scope != crate::auth::ApiKeyScope::Admin &&
api_key.scope != crate::auth::ApiKeyScope::User {
return Err(RunnerError::InvalidSecret("Insufficient permissions. Requires admin or user scope".to_string()));
}
self.delete_job(job_id).await
}
/// List all managed runners
pub fn list_runners(&self) -> Vec<&str> {
self.runners.keys().map(|s| s.as_str()).collect()
}
/// Start a specific runner
pub async fn start_runner(&mut self, actor_id: &str) -> RunnerResult<()> {
use crate::runner::runner_to_process_config;
use log::info;
if let Some(runner) = self.runners.get(actor_id) {
info!("Starting actor {}", runner.id);
let process_config = runner_to_process_config(runner);
let mut pm = self.process_manager.lock().await;
pm.start(&process_config)?;
info!("Successfully started actor {}", runner.id);
Ok(())
} else {
Err(RunnerError::ActorNotFound {
actor_id: actor_id.to_string(),
})
}
}
/// Stop a specific runner
pub async fn stop_runner(&mut self, actor_id: &str, force: bool) -> RunnerResult<()> {
use log::info;
if let Some(runner) = self.runners.get(actor_id) {
info!("Stopping actor {}", runner.id);
let mut pm = self.process_manager.lock().await;
pm.stop(&runner.id)?;
info!("Successfully stopped actor {}", runner.id);
Ok(())
} else {
Err(RunnerError::ActorNotFound {
actor_id: actor_id.to_string(),
})
}
}
/// Get status of a specific runner
pub async fn get_runner_status(&self, actor_id: &str) -> RunnerResult<RunnerStatus> {
if let Some(runner) = self.runners.get(actor_id) {
let pm = self.process_manager.lock().await;
let status = pm.status(&runner.id)?;
Ok(status)
} else {
Err(RunnerError::ActorNotFound {
actor_id: actor_id.to_string(),
})
}
}
/// Get logs from a specific runner
pub async fn get_runner_logs(
&self,
actor_id: &str,
lines: Option<usize>,
follow: bool,
) -> RunnerResult<Vec<LogInfo>> {
if let Some(runner) = self.runners.get(actor_id) {
let pm = self.process_manager.lock().await;
let logs = pm.logs(&runner.id)?;
// Convert strings to LogInfo
let converted_logs = logs.into_iter().map(|log_line| LogInfo {
timestamp: chrono::Utc::now().to_rfc3339(),
level: "INFO".to_string(),
message: log_line,
}).collect();
Ok(converted_logs)
} else {
Err(RunnerError::ActorNotFound {
actor_id: actor_id.to_string(),
})
}
}
/// Queue a job to a specific runner by name
pub async fn queue_job_to_runner(&mut self, runner: &str, job: crate::job::Job) -> RunnerResult<()> {
use redis::AsyncCommands;
use log::{debug, info};
if let Some(runner) = self.runners.get(runner) {
debug!("Queuing job {} for actor {}", job.id, runner.id);
let mut conn = self.redis_client.get_multiplexed_async_connection().await
.map_err(|e| RunnerError::QueueError {
actor_id: runner.id.clone(),
reason: format!("Failed to connect to Redis: {}", e),
})?;
// Store job in Redis first (will be set to "dispatched" by default)
self.client.store_job_in_redis(&job).await
.map_err(|e| RunnerError::QueueError {
actor_id: runner.id.clone(),
reason: format!("Failed to store job in Redis: {}", e),
})?;
// Use the runner's get_queue method with our namespace
let queue_key = runner.get_queue();
let _: () = conn.lpush(&queue_key, &job.id).await
.map_err(|e| RunnerError::QueueError {
actor_id: runner.id.clone(),
reason: format!("Failed to queue job: {}", e),
})?;
info!("Job {} queued successfully for actor {} on queue {}", job.id, runner.id, queue_key);
Ok(())
} else {
Err(RunnerError::ActorNotFound {
actor_id: runner.to_string(),
})
}
}
/// Queue a job to a specific runner and wait for the result
/// This implements the proper Hero job protocol:
/// 1. Queue the job to the runner
/// 2. BLPOP on the reply queue for this job
/// 3. Get the job result from the job hash
/// 4. Return the complete result
pub async fn queue_and_wait(&mut self, runner: &str, job: crate::job::Job, timeout_secs: u64) -> RunnerResult<Option<String>> {
use redis::AsyncCommands;
let job_id = job.id.clone();
// First queue the job
self.queue_job_to_runner(runner, job).await?;
// Get Redis connection from the supervisor (shared Redis client)
let _runner = self.runners.get(runner)
.ok_or_else(|| RunnerError::ActorNotFound {
actor_id: runner.to_string(),
})?;
let mut conn = self.redis_client.get_multiplexed_async_connection().await
.map_err(|e| RunnerError::RedisError {
source: e
})?;
// BLPOP on the reply queue for this specific job
let reply_key = self.client.job_reply_key(&job_id);
let result: Option<Vec<String>> = conn.blpop(&reply_key, timeout_secs as f64).await
.map_err(|e| RunnerError::RedisError {
source: e
})?;
match result {
Some(_reply_data) => {
// Reply received, now get the job result from the job hash
let job_key = self.client.job_key(&job_id);
let job_result: Option<String> = conn.hget(&job_key, "result").await
.map_err(|e| RunnerError::RedisError {
source: e
})?;
Ok(job_result)
}
None => {
// Timeout occurred
Ok(None)
}
}
}
/// Get status of all runners
pub async fn get_all_runner_status(&self) -> RunnerResult<Vec<(String, RunnerStatus)>> {
let mut results = Vec::new();
for (actor_id, _instance) in &self.runners {
match self.get_runner_status(actor_id).await {
Ok(status) => results.push((actor_id.clone(), status)),
Err(_) => {
results.push((actor_id.clone(), ProcessStatus::Stopped));
}
}
}
Ok(results)
}
/// Start all runners
pub async fn start_all(&mut self) -> Vec<(String, RunnerResult<()>)> {
let mut results = Vec::new();
let actor_ids: Vec<String> = self.runners.keys().cloned().collect();
for actor_id in actor_ids {
let result = self.start_runner(&actor_id).await;
results.push((actor_id, result));
}
results
}
/// Stop all runners
pub async fn stop_all(&mut self, force: bool) -> Vec<(String, RunnerResult<()>)> {
let mut results = Vec::new();
let actor_ids: Vec<String> = self.runners.keys().cloned().collect();
for actor_id in actor_ids {
let result = self.stop_runner(&actor_id, force).await;
results.push((actor_id, result));
}
results
}
/// Get status of all runners
pub async fn get_all_status(&self) -> Vec<(String, RunnerResult<RunnerStatus>)> {
let mut results = Vec::new();
for (actor_id, _instance) in &self.runners {
let result = self.get_runner_status(actor_id).await;
results.push((actor_id.clone(), result));
}
results
}
/// Add an admin secret
pub fn add_admin_secret(&mut self, secret: String) {
if !self.admin_secrets.contains(&secret) {
self.admin_secrets.push(secret);
}
}
/// Remove an admin secret
pub fn remove_admin_secret(&mut self, secret: &str) -> bool {
if let Some(pos) = self.admin_secrets.iter().position(|x| x == secret) {
self.admin_secrets.remove(pos);
true
} else {
false
}
}
/// Check if admin secret exists
pub fn has_admin_secret(&self, secret: &str) -> bool {
self.admin_secrets.contains(&secret.to_string())
}
/// Get admin secrets count
pub fn admin_secrets_count(&self) -> usize {
self.admin_secrets.len()
}
/// Get admin secrets (returns cloned vector for security)
pub fn get_admin_secrets(&self) -> Vec<String> {
self.admin_secrets.clone()
}
/// Add a user secret
pub fn add_user_secret(&mut self, secret: String) {
if !self.user_secrets.contains(&secret) {
self.user_secrets.push(secret);
}
}
/// Remove a user secret
pub fn remove_user_secret(&mut self, secret: &str) -> bool {
if let Some(pos) = self.user_secrets.iter().position(|x| x == secret) {
self.user_secrets.remove(pos);
true
} else {
false
}
}
/// Check if user secret exists
pub fn has_user_secret(&self, secret: &str) -> bool {
self.user_secrets.contains(&secret.to_string())
}
/// Get user secrets count
pub fn user_secrets_count(&self) -> usize {
self.user_secrets.len()
}
/// Add a register secret
pub fn add_register_secret(&mut self, secret: String) {
if !self.register_secrets.contains(&secret) {
self.register_secrets.push(secret);
}
}
/// Remove a register secret
pub fn remove_register_secret(&mut self, secret: &str) -> bool {
if let Some(pos) = self.register_secrets.iter().position(|x| x == secret) {
self.register_secrets.remove(pos);
true
} else {
false
}
}
/// Check if register secret exists
pub fn has_register_secret(&self, secret: &str) -> bool {
self.register_secrets.contains(&secret.to_string())
}
/// Get register secrets count
pub fn register_secrets_count(&self) -> usize {
self.register_secrets.len()
}
/// List all job IDs from Redis
pub async fn list_jobs(&self) -> RunnerResult<Vec<String>> {
self.client.list_jobs().await.map_err(RunnerError::from)
}
/// List all jobs from the database
pub async fn list_jobs_from_db(&self) -> Vec<crate::services::JobMetadata> {
self.services.jobs.list().await
}
/// List jobs by runner from the database
pub async fn list_jobs_by_runner(&self, runner: &str) -> Vec<crate::services::JobMetadata> {
self.services.jobs.list_by_runner(runner).await
}
/// List jobs by creator (API key name) from the database
pub async fn list_jobs_by_creator(&self, creator: &str) -> Vec<crate::services::JobMetadata> {
self.services.jobs.list_by_creator(creator).await
}
/// Get a specific job from the database
pub async fn get_job_from_db(&self, job_id: &str) -> Option<crate::services::JobMetadata> {
self.services.jobs.get(job_id).await
}
/// List all jobs with full details from the database
pub async fn list_all_jobs(&self) -> RunnerResult<Vec<crate::job::Job>> {
let job_metadata_list = self.services.jobs.list().await;
let jobs = job_metadata_list.into_iter().map(|metadata| metadata.job).collect();
Ok(jobs)
}
/// Start a previously created job by queuing it to its assigned runner
pub async fn start_job(&mut self, secret: &str, job_id: &str) -> RunnerResult<()> {
// Verify API key and check scope
let api_key = self.verify_api_key(secret).await
.ok_or_else(|| RunnerError::InvalidSecret("Invalid API key".to_string()))?;
// Check if key has admin or user scope
if api_key.scope != crate::auth::ApiKeyScope::Admin &&
api_key.scope != crate::auth::ApiKeyScope::User {
return Err(RunnerError::InvalidSecret("Insufficient permissions. Requires admin or user scope".to_string()));
}
// Get the job from Redis
let job = self.get_job(job_id).await?;
let runner = job.runner.clone();
// Queue the job to its assigned runner
self.queue_job_to_runner(&runner, job).await
}
/// Get the status of a job
pub async fn get_job_status(&self, job_id: &str) -> RunnerResult<crate::openrpc::JobStatusResponse> {
// Use the client's get_status method
let status = self.client.get_status(job_id).await
.map_err(|e| match e {
hero_job_client::ClientError::Job(hero_job::JobError::NotFound(_)) => RunnerError::JobNotFound { job_id: job_id.to_string() },
_ => RunnerError::from(e)
})?;
Ok(crate::openrpc::JobStatusResponse {
job_id: job_id.to_string(),
status: status.as_str().to_string(),
})
}
/// Get the result of a job (returns immediately with current result or error)
pub async fn get_job_result(&self, job_id: &str) -> RunnerResult<Option<String>> {
// Use client's get_status to check if job exists and get its status
let status = self.client.get_status(job_id).await
.map_err(|e| match e {
hero_job_client::ClientError::Job(hero_job::JobError::NotFound(_)) => RunnerError::JobNotFound { job_id: job_id.to_string() },
_ => RunnerError::from(e)
})?;
// If job has error status, get the error message using client method
if status.as_str() == "error" {
let error_msg = self.client.get_error(job_id).await
.map_err(|e| RunnerError::from(e))?;
return Ok(Some(format!("Error: {}", error_msg.unwrap_or_else(|| "Unknown error".to_string()))));
}
// Use client's get_result to get the result
let result = self.client.get_result(job_id).await
.map_err(|e| RunnerError::from(e))?;
Ok(result)
}
/// Get user secrets (returns cloned vector for security)
pub fn get_user_secrets(&self) -> Vec<String> {
self.user_secrets.clone()
}
/// Get register secrets (returns cloned vector for security)
pub fn get_register_secrets(&self) -> Vec<String> {
self.register_secrets.clone()
}
/// Get runners count
pub fn runners_count(&self) -> usize {
self.runners.len()
}
// API Key Management Methods
/// Get logs for a specific job
///
/// Reads log files from the logs/actor/<runner_name>/job-<job_id>/ directory
pub async fn get_job_logs(&self, job_id: &str, lines: Option<usize>) -> RunnerResult<Vec<String>> {
// Determine the logs directory path
// Default to ~/hero/logs
let logs_root = if let Some(home) = std::env::var_os("HOME") {
std::path::PathBuf::from(home).join("hero").join("logs")
} else {
std::path::PathBuf::from("logs")
};
// Check if logs directory exists
if !logs_root.exists() {
return Ok(vec![format!("Logs directory not found: {}", logs_root.display())]);
}
let actor_dir = logs_root.join("actor");
if !actor_dir.exists() {
return Ok(vec![format!("Actor logs directory not found: {}", actor_dir.display())]);
}
// Search through all runner directories to find the job
if let Ok(entries) = std::fs::read_dir(&actor_dir) {
for entry in entries.flatten() {
if entry.path().is_dir() {
let job_dir = entry.path().join(format!("job-{}", job_id));
if job_dir.exists() {
// Read all log files in the directory
let mut all_logs = Vec::new();
if let Ok(log_entries) = std::fs::read_dir(&job_dir) {
// Collect all log files with their paths for sorting
let mut log_files: Vec<_> = log_entries
.flatten()
.filter(|e| {
if !e.path().is_file() {
return false;
}
// Accept files that start with "log" (covers log.YYYY-MM-DD-HH format)
e.file_name().to_string_lossy().starts_with("log")
})
.collect();
// Sort by filename (which includes timestamp for hourly rotation)
log_files.sort_by_key(|e| e.path());
// Read files in order
for entry in log_files {
if let Ok(content) = std::fs::read_to_string(entry.path()) {
all_logs.extend(content.lines().map(|s| s.to_string()));
}
}
}
// If lines limit is specified, return only the last N lines
if let Some(n) = lines {
let start = all_logs.len().saturating_sub(n);
return Ok(all_logs[start..].to_vec());
} else {
return Ok(all_logs);
}
}
}
}
}
// If no logs found, return helpful message
Ok(vec![format!("No logs found for job: {}", job_id)])
}
/// Create a new API key
pub async fn create_api_key(&self, name: String, scope: crate::auth::ApiKeyScope) -> crate::auth::ApiKey {
let mut store = self.api_keys.lock().await;
let key = crate::auth::ApiKey::new(name, scope);
store.add_key(key.clone());
key
}
/// Create an API key with a specific key value
pub async fn create_api_key_with_value(&self, key_value: String, name: String, scope: crate::auth::ApiKeyScope) -> crate::auth::ApiKey {
let mut store = self.api_keys.lock().await;
let key = crate::auth::ApiKey::with_key(key_value, name, scope);
store.add_key(key.clone());
key
}
/// Remove an API key
pub async fn remove_api_key(&self, key: &str) -> Option<crate::auth::ApiKey> {
let mut store = self.api_keys.lock().await;
store.remove_key(key)
}
/// Verify an API key and return its metadata
pub async fn verify_api_key(&self, key: &str) -> Option<crate::auth::ApiKey> {
let store = self.api_keys.lock().await;
store.verify_key(key).cloned()
}
/// List all API keys
pub async fn list_api_keys(&self) -> Vec<crate::auth::ApiKey> {
let store = self.api_keys.lock().await;
store.list_all_keys().into_iter().cloned().collect()
}
/// List API keys by scope
pub async fn list_api_keys_by_scope(&self, scope: crate::auth::ApiKeyScope) -> Vec<crate::auth::ApiKey> {
let store = self.api_keys.lock().await;
store.list_keys_by_scope(scope).into_iter().cloned().collect()
}
/// Bootstrap an initial admin key (useful for first-time setup)
pub async fn bootstrap_admin_key(&self, name: String) -> crate::auth::ApiKey {
let mut store = self.api_keys.lock().await;
store.bootstrap_admin_key(name)
}
/// Check if a key has admin scope
pub async fn is_admin_key(&self, key: &str) -> bool {
if let Some(api_key) = self.verify_api_key(key).await {
api_key.scope == crate::auth::ApiKeyScope::Admin
} else {
false
}
}
}
impl Default for Supervisor {
fn default() -> Self {
// Note: Default implementation creates an empty supervisor
// Use Supervisor::builder() for proper initialization
Self {
runners: HashMap::new(),
process_manager: Arc::new(Mutex::new(SimpleProcessManager::new())),
redis_client: redis::Client::open("redis://localhost:6379").unwrap(),
namespace: "".to_string(),
admin_secrets: Vec::new(),
user_secrets: Vec::new(),
register_secrets: Vec::new(),
api_keys: Arc::new(Mutex::new(crate::auth::ApiKeyStore::new())),
services: crate::services::Services::new(),
client: Client::default(),
}
}
}