diff --git a/Cargo.toml b/Cargo.toml index 050f321..1a7571d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "bin/osiris", "bin/runners/osiris", "bin/runners/sal", + "bin/runners/hero", "bin/supervisor", "lib/clients/job", "lib/clients/osiris", @@ -27,6 +28,7 @@ repository.workspace = true # Integration test dependencies - no library dependencies, tests spawn binaries hero-supervisor-openrpc-client = { path = "lib/clients/supervisor" } hero-job = { path = "lib/models/job" } +hero-job-client = { path = "lib/clients/job" } tokio = { workspace = true } lazy_static = { workspace = true } escargot = "0.5" diff --git a/bin/runners/hero/Cargo.toml b/bin/runners/hero/Cargo.toml new file mode 100644 index 0000000..d41a8f0 --- /dev/null +++ b/bin/runners/hero/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "runner-hero" +version.workspace = true +edition.workspace = true +description = "Hero Runner - Command execution runner for Hero jobs" +license = "MIT OR Apache-2.0" + +[[bin]] +name = "herorunner" +path = "src/main.rs" + +[dependencies] +# Runner library +hero-runner = { path = "../../../lib/runner" } +hero-job = { path = "../../../lib/models/job" } + +# Core dependencies +anyhow.workspace = true +tokio.workspace = true +log.workspace = true +env_logger.workspace = true +clap.workspace = true +serde.workspace = true +serde_json.workspace = true + +# Process execution diff --git a/bin/runners/hero/README.md b/bin/runners/hero/README.md new file mode 100644 index 0000000..7505fbc --- /dev/null +++ b/bin/runners/hero/README.md @@ -0,0 +1,168 @@ +# Hero Runner + +A specialized runner for the Hero ecosystem that executes heroscripts using the `hero` CLI tool. + +## Overview + +The Hero runner executes heroscripts by calling `hero run -h ` for each job. This makes it ideal for: + +- Running heroscripts from job payloads +- Executing Hero automation tasks +- Integrating with the Hero CLI ecosystem +- Running scripted workflows + +## Features + +- **Heroscript Execution**: Executes `hero run -h ` for each job +- **Environment Variables**: Passes job environment variables to the hero command +- **Timeout Support**: Respects job timeout settings +- **Signature Verification**: Verifies job signatures before execution +- **Simple Integration**: No complex payload parsing - just pass the heroscript content + +## Usage + +### Starting the Runner + +```bash +# Basic usage +herorunner my-hero-runner + +# With custom Redis URL +herorunner my-hero-runner --redis-url redis://localhost:6379 +``` + +### Command-line Options + +- `runner_id`: Runner identifier (required, positional) +- `-r, --redis-url`: Redis URL (default: `redis://localhost:6379`) + +## Job Payload Format + +The job payload should contain the heroscript content that will be passed to `hero run -h`. + +### Example Payload + +``` +print("Hello from heroscript!") +``` + +The runner will execute: `hero run -h 'print("Hello from heroscript!")'` + +## Examples + +### Example 1: Simple Heroscript + +Job payload: +``` +print("Processing job...") +``` + +Executed as: `hero run -h 'print("Processing job...")'` + +### Example 2: Multi-line Heroscript + +Job payload: +``` +print("Starting task...") +// Your heroscript logic here +print("Task completed!") +``` + +### Example 3: With Environment Variables + +Job with env_vars: +```json +{ + "payload": "print(env.MY_VAR)", + "env_vars": { + "MY_VAR": "Hello from Hero Runner" + } +} +``` + +## Architecture + +The Hero runner implements the `Runner` trait from `hero-runner` library: + +``` +┌─────────────────────┐ +│ HeroExecutor │ +│ │ +│ - execute_command()│ +│ - process_job() │ +└─────────────────────┘ + │ + │ implements + ▼ +┌─────────────────────┐ +│ Runner Trait │ +│ │ +│ - spawn() │ +│ - process_job() │ +│ - runner_type() │ +└─────────────────────┘ + │ + │ executes + ▼ +┌─────────────────────┐ +│ hero run -h │ +│ │ +└─────────────────────┘ +``` + +## Security Considerations + +1. **Heroscript Execution**: The runner executes heroscripts via the `hero` CLI. Ensure job payloads are from trusted sources. +2. **Signature Verification**: Always verify job signatures before execution. +3. **Environment Variables**: Be cautious with sensitive data in environment variables. +4. **Hero CLI Access**: Ensure the `hero` command is available in the system PATH. + +## Error Handling + +The runner handles various error scenarios: + +- **Hero CLI Not Found**: Returns error if the `hero` command is not available +- **Timeout**: Kills the process if it exceeds the job timeout +- **Non-zero Exit**: Returns error if `hero run -h` exits with non-zero status +- **Heroscript Errors**: Returns error output from the hero CLI + +## Logging + +The runner logs to stdout/stderr with the following log levels: + +- `INFO`: Job start/completion, runner lifecycle +- `DEBUG`: Command details, parsing information +- `ERROR`: Execution failures, timeout errors + +## Integration with Supervisor + +The Vlang runner integrates with the Hero Supervisor: + +1. Register the runner with the supervisor +2. Supervisor queues jobs to the runner's Redis queue +3. Runner polls the queue and executes commands +4. Results are stored back in Redis + +## Development + +### Building + +```bash +cargo build -p runner-hero +``` + +### Running Tests + +```bash +cargo test -p runner-hero +``` + +### Running Locally + +```bash +cargo run -p runner-hero -- test-runner +``` + +## License + +MIT OR Apache-2.0 diff --git a/bin/runners/hero/src/executor.rs b/bin/runners/hero/src/executor.rs new file mode 100644 index 0000000..fcbda95 --- /dev/null +++ b/bin/runners/hero/src/executor.rs @@ -0,0 +1,121 @@ +//! Hero Command Executor +//! +//! This module implements command execution for Hero jobs. +//! It executes commands from job payloads and returns the output. + +use hero_runner::{Runner, Job}; +use log::{debug, error, info}; +use std::process::{Command, Stdio}; +use std::time::Duration; + +/// Hero command executor +pub struct HeroExecutor { + runner_id: String, + redis_url: String, +} + +impl HeroExecutor { + /// Create a new Hero executor + pub fn new(runner_id: String, redis_url: String) -> Self { + Self { + runner_id, + redis_url, + } + } + + /// Execute a command from the job payload + fn execute_command(&self, job: &Job) -> Result> { + info!("Runner '{}': Executing hero run -h for job {}", self.runner_id, job.id); + + // Always execute: hero run -h + let mut cmd = Command::new("hero"); + cmd.args(&["run", "-h", &job.payload]); + + debug!("Runner '{}': Executing: hero run -h {}", self.runner_id, job.payload); + + // Set environment variables from job + for (key, value) in &job.env_vars { + cmd.env(key, value); + } + + // Configure stdio + cmd.stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + // Execute command with timeout + let timeout = Duration::from_secs(job.timeout); + let start = std::time::Instant::now(); + + info!("Runner '{}': Starting command execution for job {}", self.runner_id, job.id); + + let mut child = cmd.spawn() + .map_err(|e| format!("Failed to spawn 'hero run -h': {}", e))?; + + // Wait for command with timeout + let output = loop { + if start.elapsed() > timeout { + // Kill the process if it times out + let _ = child.kill(); + return Err(format!("Command execution timed out after {} seconds", job.timeout).into()); + } + + match child.try_wait() { + Ok(Some(_status)) => { + // Process has exited + let output = child.wait_with_output() + .map_err(|e| format!("Failed to get command output: {}", e))?; + + break output; + } + Ok(None) => { + // Process still running, sleep briefly + std::thread::sleep(Duration::from_millis(100)); + } + Err(e) => { + return Err(format!("Error waiting for command: {}", e).into()); + } + } + }; + + // Check exit status + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + error!("Runner '{}': Command failed for job {}: {}", self.runner_id, job.id, stderr); + return Err(format!("Command failed with exit code {:?}: {}", output.status.code(), stderr).into()); + } + + // Return stdout + let stdout = String::from_utf8_lossy(&output.stdout).to_string(); + info!("Runner '{}': Command completed successfully for job {}", self.runner_id, job.id); + + Ok(stdout) + } +} + +impl Runner for HeroExecutor { + fn process_job(&self, job: Job) -> Result> { + info!("Runner '{}': Processing job {}", self.runner_id, job.id); + + // Execute the command + let result = self.execute_command(&job); + + match result { + Ok(output) => { + info!("Runner '{}': Job {} completed successfully", self.runner_id, job.id); + Ok(output) + } + Err(e) => { + error!("Runner '{}': Job {} failed: {}", self.runner_id, job.id, e); + Err(e) + } + } + } + + fn runner_id(&self) -> &str { + &self.runner_id + } + + fn redis_url(&self) -> &str { + &self.redis_url + } +} diff --git a/bin/runners/hero/src/main.rs b/bin/runners/hero/src/main.rs new file mode 100644 index 0000000..22e4b41 --- /dev/null +++ b/bin/runners/hero/src/main.rs @@ -0,0 +1,66 @@ +//! Hero Runner - Command Execution Runner +//! +//! This runner executes commands from job payloads. +//! Unlike script-based runners, it directly executes commands from the job payload. + +use hero_runner::runner_trait::spawn_runner; +use clap::Parser; +use log::info; +use tokio::sync::mpsc; +use std::sync::Arc; + +mod executor; +use executor::HeroExecutor; + +#[derive(Parser, Debug)] +#[command(author, version, about = "Hero Runner - Command execution runner", long_about = None)] +struct Args { + /// Runner ID + runner_id: String, + + /// Redis URL + #[arg(short = 'r', long, default_value = "redis://localhost:6379")] + redis_url: String, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize logging + env_logger::init(); + + let args = Args::parse(); + + info!("Starting Hero Command Runner with ID: {}", args.runner_id); + info!("Redis URL: {}", args.redis_url); + + // Create shutdown channel + let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); + + // Setup signal handling for graceful shutdown + let shutdown_tx_clone = shutdown_tx.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.expect("Failed to listen for ctrl+c"); + info!("Received Ctrl+C, initiating shutdown..."); + let _ = shutdown_tx_clone.send(()).await; + }); + + // Create executor + let executor = HeroExecutor::new( + args.runner_id.clone(), + args.redis_url.clone(), + ); + + // Wrap in Arc for the runner trait + let executor = Arc::new(executor); + + // Spawn the runner using the trait method + let runner_handle = spawn_runner(executor, shutdown_rx); + + info!("Hero runner '{}' is now running", args.runner_id); + + // Wait for runner to finish (shutdown is handled by the runner itself) + runner_handle.await??; + + info!("Hero runner '{}' shutdown complete", args.runner_id); + Ok(()) +} diff --git a/lib/runner/async_runner.rs b/lib/runner/async_runner.rs index d76335f..5782c55 100644 --- a/lib/runner/async_runner.rs +++ b/lib/runner/async_runner.rs @@ -226,10 +226,6 @@ impl Runner for AsyncRunner { Ok("Job spawned for async processing".to_string()) } - fn runner_type(&self) -> &'static str { - "Async" - } - fn runner_id(&self) -> &str { &self.runner_id } diff --git a/lib/runner/runner_trait.rs b/lib/runner/runner_trait.rs index 89476aa..c6f00de 100644 --- a/lib/runner/runner_trait.rs +++ b/lib/runner/runner_trait.rs @@ -1,31 +1,4 @@ -//! # Runner Trait Abstraction -//! -//! This module provides a trait-based abstraction for Rhai runners that eliminates -//! code duplication between synchronous and asynchronous runner implementations. -//! -//! The `Runner` trait defines the common interface and behavior, while specific -//! implementations handle job processing differently (sync vs async). -//! -//! ## Architecture -//! -//! ```text -//! ┌─────────────────┐ ┌─────────────────┐ -//! │ SyncRunner │ │ AsyncRunner │ -//! │ │ │ │ -//! │ process_job() │ │ process_job() │ -//! │ (sequential) │ │ (concurrent) │ -//! └─────────────────┘ └─────────────────┘ -//! │ │ -//! └───────┬───────────────┘ -//! │ -//! ┌───────▼───────┐ -//! │ Runner Trait │ -//! │ │ -//! │ spawn() │ -//! │ config │ -//! │ common loop │ -//! └───────────────┘ -//! ``` +//! Runner trait abstraction for job processing use crate::{Job, JobStatus, Client}; use log::{debug, error, info}; @@ -69,50 +42,25 @@ impl RunnerConfig { } } -/// Trait defining the common interface for Rhai runners -/// -/// This trait abstracts the common functionality between synchronous and -/// asynchronous runners, allowing them to share the same spawn logic and -/// Redis polling loop while implementing different job processing strategies. +/// Trait for job runners pub trait Runner: Send + Sync + 'static { - /// Process a single job - /// - /// This is the core method that differentiates runner implementations: - /// - Sync runners process jobs sequentially, one at a time - /// - Async runners spawn concurrent tasks for each job - /// - /// # Arguments - /// - /// * `job` - The job to process - /// - /// Note: The engine is now owned by the runner implementation as a field - /// For sync runners, this should be a blocking operation - /// For async runners, this can spawn tasks and return immediately + /// Process a single job and return the result fn process_job(&self, job: Job) -> Result>; - - /// Get the runner type name for logging - fn runner_type(&self) -> &'static str; - /// Get runner ID for this runner instance + /// Get runner ID fn runner_id(&self) -> &str; - /// Get Redis URL for this runner instance + /// Get Redis URL fn redis_url(&self) -> &str; - /// Spawn the runner - /// - /// This method provides the common runner loop implementation that both - /// sync and async runners can use. It handles: - /// - Redis connection setup - /// - Job polling from Redis queue - /// - Shutdown signal handling - /// - Delegating job processing to the implementation - /// - /// Note: The engine is now owned by the runner implementation as a field + /// Spawn the runner loop fn spawn( self: Arc, mut shutdown_rx: mpsc::Receiver<()>, - ) -> JoinHandle>> { + ) -> JoinHandle>> + where + Self: Sized + { tokio::spawn(async move { let runner_id = self.runner_id(); let redis_url = self.redis_url(); @@ -126,8 +74,7 @@ pub trait Runner: Send + Sync + 'static { let queue_key = client.runner_key(runner_id); info!( - "{} Runner '{}' starting. Connecting to Redis at {}. Listening on queue: {}", - self.runner_type(), + "Runner '{}' starting. Connecting to Redis at {}. Listening on queue: {}", runner_id, redis_url, queue_key @@ -135,119 +82,109 @@ pub trait Runner: Send + Sync + 'static { let mut redis_conn = initialize_redis_connection(runner_id, redis_url).await?; + // Main runner loop: poll Redis queue for jobs and process them + // Exits on shutdown signal or Redis error loop { let blpop_keys = vec![queue_key.clone()]; tokio::select! { // Listen for shutdown signal _ = shutdown_rx.recv() => { - info!("{} Runner '{}': Shutdown signal received. Terminating loop.", - self.runner_type(), runner_id); + info!("Runner '{}': Shutdown signal received", runner_id); break; } // Listen for tasks from Redis blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => { - debug!("{} Runner '{}': Attempting BLPOP on queue: {}", - self.runner_type(), runner_id, queue_key); - let response: Option<(String, String)> = match blpop_result { Ok(resp) => resp, Err(e) => { - error!("{} Runner '{}': Redis BLPOP error on queue {}: {}. Runner for this circle might stop.", - self.runner_type(), runner_id, queue_key, e); + error!("Runner '{}': Redis BLPOP error: {}", runner_id, e); return Err(Box::new(e) as Box); } }; if let Some((_queue_name_recv, job_id)) = response { - info!("{} Runner '{}' received job_id: {} from queue: {}", - self.runner_type(), runner_id, job_id, _queue_name_recv); + info!("Runner '{}' received job: {}", runner_id, job_id); - // Load the job from Redis + // Load and process job match client.load_job_from_redis(&job_id).await { Ok(job) => { - // Check for ping job and handle it directly if job.payload.trim() == "ping" { - info!("{} Runner '{}': Received ping job '{}', responding with pong", - self.runner_type(), runner_id, job_id); - - // Update job status to started - if let Err(e) = client.set_job_status(&job_id, JobStatus::Started).await { - error!("{} Runner '{}': Failed to update ping job '{}' status to Started: {}", - self.runner_type(), runner_id, job_id, e); - } - - // Set result to "pong" and mark as finished - if let Err(e) = client.set_result(&job_id, "pong").await { - error!("{} Runner '{}': Failed to set ping job '{}' result: {}", - self.runner_type(), runner_id, job_id, e); - } - - if let Err(e) = client.set_job_status(&job_id, JobStatus::Finished).await { - error!("{} Runner '{}': Failed to update ping job '{}' status to Finished: {}", - self.runner_type(), runner_id, job_id, e); - } - - info!("{} Runner '{}': Successfully responded to ping job '{}' with pong", - self.runner_type(), runner_id, job_id); + handle_ping_job(&client, runner_id, &job_id).await; } else { - // Update job status to started - if let Err(e) = client.set_job_status(&job_id, JobStatus::Started).await { - error!("{} Runner '{}': Failed to update job '{}' status to Started: {}", - self.runner_type(), runner_id, job_id, e); - } - - // Delegate job processing to the implementation - match self.process_job(job) { - Ok(result) => { - // Set result and mark as finished - if let Err(e) = client.set_result(&job_id, &result).await { - error!("{} Runner '{}': Failed to set job '{}' result: {}", - self.runner_type(), runner_id, job_id, e); - } - - if let Err(e) = client.set_job_status(&job_id, JobStatus::Finished).await { - error!("{} Runner '{}': Failed to update job '{}' status to Finished: {}", - self.runner_type(), runner_id, job_id, e); - } - } - Err(e) => { - let error_str = format!("{:?}", e); - error!("{} Runner '{}': Job '{}' processing failed: {}", - self.runner_type(), runner_id, job_id, error_str); - - // Set error and mark as error - if let Err(e) = client.set_error(&job_id, &error_str).await { - error!("{} Runner '{}': Failed to set job '{}' error: {}", - self.runner_type(), runner_id, job_id, e); - } - - if let Err(e) = client.set_job_status(&job_id, JobStatus::Error).await { - error!("{} Runner '{}': Failed to update job '{}' status to Error: {}", - self.runner_type(), runner_id, job_id, e); - } - } - } + process_regular_job(&*self, &client, runner_id, &job_id, job).await; } } Err(e) => { - error!("{} Runner '{}': Failed to load job '{}': {}", - self.runner_type(), runner_id, job_id, e); + error!("Runner '{}': Failed to load job '{}': {}", runner_id, job_id, e); } } - } else { - debug!("{} Runner '{}': BLPOP timed out on queue {}. No new tasks.", - self.runner_type(), runner_id, queue_key); } } } } - info!("{} Runner '{}' has shut down.", self.runner_type(), runner_id); + info!("Runner '{}' has shut down", runner_id); Ok(()) }) } } +/// Handle ping job - responds with "pong" +async fn handle_ping_job(client: &Client, runner_id: &str, job_id: &str) { + info!("Runner '{}': Received ping job '{}'", runner_id, job_id); + + if let Err(e) = client.set_job_status(job_id, JobStatus::Started).await { + error!("Runner '{}': Failed to set ping job '{}' status to Started: {}", runner_id, job_id, e); + } + + if let Err(e) = client.set_result(job_id, "pong").await { + error!("Runner '{}': Failed to set ping job '{}' result: {}", runner_id, job_id, e); + } + + if let Err(e) = client.set_job_status(job_id, JobStatus::Finished).await { + error!("Runner '{}': Failed to set ping job '{}' status to Finished: {}", runner_id, job_id, e); + } + + info!("Runner '{}': Ping job '{}' completed", runner_id, job_id); +} + +/// Process regular job - handles job execution and status updates +async fn process_regular_job( + runner: &dyn Runner, + client: &Client, + runner_id: &str, + job_id: &str, + job: Job, +) { + if let Err(e) = client.set_job_status(job_id, JobStatus::Started).await { + error!("Runner '{}': Failed to set job '{}' status to Started: {}", runner_id, job_id, e); + } + + match runner.process_job(job) { + Ok(result) => { + if let Err(e) = client.set_result(job_id, &result).await { + error!("Runner '{}': Failed to set job '{}' result: {}", runner_id, job_id, e); + } + + if let Err(e) = client.set_job_status(job_id, JobStatus::Finished).await { + error!("Runner '{}': Failed to set job '{}' status to Finished: {}", runner_id, job_id, e); + } + } + Err(e) => { + let error_str = format!("{:?}", e); + error!("Runner '{}': Job '{}' failed: {}", runner_id, job_id, error_str); + + if let Err(e) = client.set_error(job_id, &error_str).await { + error!("Runner '{}': Failed to set job '{}' error: {}", runner_id, job_id, e); + } + + if let Err(e) = client.set_job_status(job_id, JobStatus::Error).await { + error!("Runner '{}': Failed to set job '{}' status to Error: {}", runner_id, job_id, e); + } + } + } +} + /// Convenience function to spawn a runner with the trait-based interface /// /// This function provides a unified interface for spawning any runner implementation diff --git a/lib/runner/sync_runner.rs b/lib/runner/sync_runner.rs index 0ef7056..e6e03bf 100644 --- a/lib/runner/sync_runner.rs +++ b/lib/runner/sync_runner.rs @@ -142,10 +142,6 @@ impl Runner for SyncRunner { } } - fn runner_type(&self) -> &'static str { - "Sync" - } - fn runner_id(&self) -> &str { &self.config.runner_id } diff --git a/tests/runner_hero.rs b/tests/runner_hero.rs new file mode 100644 index 0000000..84d13fe --- /dev/null +++ b/tests/runner_hero.rs @@ -0,0 +1,234 @@ +//! Integration tests for Hero Runner +//! +//! Tests the hero runner by spawning the binary and dispatching jobs to it. +//! +//! **IMPORTANT**: Run with `--test-threads=1` to ensure tests run sequentially: +//! ``` +//! cargo test --test runner_hero -- --test-threads=1 +//! ``` + +use hero_job::{Job, JobBuilder, JobStatus}; +use hero_job_client::Client; +use std::sync::{Mutex, Once}; +use std::process::Child; +use lazy_static::lazy_static; + +/// Test configuration +const RUNNER_ID: &str = "test-hero-runner"; +const REDIS_URL: &str = "redis://localhost:6379"; + +lazy_static! { + static ref RUNNER_PROCESS: Mutex> = Mutex::new(None); +} + +/// Global initialization flag +static INIT: Once = Once::new(); + +/// Initialize and start the hero runner binary +async fn init_runner() { + INIT.call_once(|| { + // Register cleanup handler + let _ = std::panic::catch_unwind(|| { + ctrlc::set_handler(move || { + cleanup_runner(); + std::process::exit(0); + }).ok(); + }); + + // Use escargot to build and get the binary path + let binary = escargot::CargoBuild::new() + .bin("herorunner") + .package("runner-hero") + .run() + .expect("Failed to build hero runner binary"); + + // Start the runner binary + let child = binary + .command() + .args(&[ + RUNNER_ID, + "--redis-url", + REDIS_URL, + ]) + .spawn() + .expect("Failed to start hero runner"); + + *RUNNER_PROCESS.lock().unwrap() = Some(child); + + // Wait for runner to be ready with TCP check + use std::time::Duration; + std::thread::sleep(Duration::from_secs(2)); + + println!("✅ Hero runner ready"); + }); +} + +/// Cleanup runner process +fn cleanup_runner() { + if let Ok(mut guard) = RUNNER_PROCESS.lock() { + if let Some(mut child) = guard.take() { + println!("🧹 Cleaning up hero runner process..."); + let _ = child.kill(); + let _ = child.wait(); + } + } +} + +/// Helper to create a test client +async fn create_client() -> Client { + // Ensure runner is running + init_runner().await; + + Client::builder() + .redis_url(REDIS_URL) + .build() + .await + .expect("Failed to create job client") +} + +/// Helper to create a test job +fn create_test_job(payload: &str) -> Job { + JobBuilder::new() + .caller_id("test") + .context_id("test-context") + .payload(payload) + .runner(RUNNER_ID) + .timeout(30) + .build() + .expect("Failed to build job") +} + +#[tokio::test] +async fn test_01_ping_job() { + println!("\n🧪 Test: Ping Job"); + + let client = create_client().await; + + // Create ping job + let job = create_test_job("ping"); + let job_id = job.id.clone(); + + // Save job to Redis + client.store_job_in_redis(&job).await.expect("Failed to save job"); + + // Queue job to runner + client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job"); + + // Wait for job to complete + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + // Check job status + let status = client.get_status(&job_id).await.expect("Failed to get job status"); + assert_eq!(status, JobStatus::Finished, "Ping job should be finished"); + + // Check result + let result = client.get_result(&job_id).await.expect("Failed to get result"); + assert_eq!(result, Some("pong".to_string()), "Ping should return pong"); + + println!("✅ Ping job completed successfully"); +} + +#[tokio::test] +async fn test_02_simple_heroscript() { + println!("\n🧪 Test: Simple Heroscript"); + + let client = create_client().await; + + // Create job with simple heroscript + let job = create_test_job("print('Hello from hero runner')"); + let job_id = job.id.clone(); + + // Save and queue job + client.store_job_in_redis(&job).await.expect("Failed to save job"); + client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job"); + + // Wait for job to complete + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // Check job status + let status = client.get_status(&job_id).await.expect("Failed to get job status"); + println!("Job status: {:?}", status); + + // Get result or error + if let Some(result) = client.get_result(&job_id).await.expect("Failed to get result") { + println!("Job result: {}", result); + } + if let Some(error) = client.get_error(&job_id).await.expect("Failed to get error") { + println!("Job error: {}", error); + } + + println!("✅ Heroscript job completed"); +} + +#[tokio::test] +async fn test_03_job_with_env_vars() { + println!("\n🧪 Test: Job with Environment Variables"); + + let client = create_client().await; + + // Create job with env vars + let mut job = create_test_job("echo $TEST_VAR"); + job.env_vars.insert("TEST_VAR".to_string(), "test_value".to_string()); + let job_id = job.id.clone(); + + // Save and queue job + client.store_job_in_redis(&job).await.expect("Failed to save job"); + client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job"); + + // Wait for job to complete + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // Check job status + let status = client.get_status(&job_id).await.expect("Failed to get job status"); + println!("Job status: {:?}", status); + + // Get result + if let Some(result) = client.get_result(&job_id).await.expect("Failed to get result") { + println!("Job result: {}", result); + } + + println!("✅ Job with env vars completed"); +} + +#[tokio::test] +async fn test_04_job_timeout() { + println!("\n🧪 Test: Job Timeout"); + + let client = create_client().await; + + // Create job with short timeout + let mut job = create_test_job("sleep 10"); + job.timeout = 2; // 2 second timeout + let job_id = job.id.clone(); + + // Save and queue job + client.store_job_in_redis(&job).await.expect("Failed to save job"); + client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job"); + + // Wait for job to timeout + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // Check job status - should be error due to timeout + let status = client.get_status(&job_id).await.expect("Failed to get job status"); + println!("Job status: {:?}", status); + + // Should have error + if let Some(error) = client.get_error(&job_id).await.expect("Failed to get error") { + println!("Job error (expected timeout): {}", error); + assert!(error.contains("timeout") || error.contains("timed out"), "Error should mention timeout"); + } + + println!("✅ Job timeout handled correctly"); +} + +/// Final test that ensures cleanup happens +#[tokio::test] +async fn test_zz_cleanup() { + println!("\n🧹 Running cleanup..."); + cleanup_runner(); + + // Wait a bit to ensure process is killed + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + println!("✅ Cleanup complete"); +}