- Simplified RunnerConfig to just name, command, and optional env - Removed RunnerType and ProcessManagerType enums - Removed db_path, redis_url, binary_path from config - Made runner name also serve as queue name (no separate queue param) - Added secret-based authentication to all runner management methods - Created comprehensive osiris_openrpc example - Archived old examples to _archive/ - Updated client API to match simplified supervisor interface
		
			
				
	
	
		
			279 lines
		
	
	
		
			9.9 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			279 lines
		
	
	
		
			9.9 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
//! End-to-End Demo: Supervisor + Runner + Client
 | 
						|
//!
 | 
						|
//! This example demonstrates the complete workflow:
 | 
						|
//! 1. Starts a supervisor with Mycelium integration
 | 
						|
//! 2. Starts an OSIS runner
 | 
						|
//! 3. Uses the supervisor client to run jobs
 | 
						|
//! 4. Shows both job.run (blocking) and job.start (non-blocking) modes
 | 
						|
//!
 | 
						|
//! Prerequisites:
 | 
						|
//! - Redis running on localhost:6379
 | 
						|
//!
 | 
						|
//! Usage:
 | 
						|
//! ```bash
 | 
						|
//! RUST_LOG=info cargo run --example end_to_end_demo
 | 
						|
//! ```
 | 
						|
 | 
						|
use anyhow::{Result, Context};
 | 
						|
use log::{info, error};
 | 
						|
use std::process::{Command, Child, Stdio};
 | 
						|
use std::time::Duration;
 | 
						|
use tokio::time::sleep;
 | 
						|
use hero_supervisor_openrpc_client::{SupervisorClient, JobBuilder};
 | 
						|
 | 
						|
/// Configuration for the demo
 | 
						|
struct DemoConfig {
 | 
						|
    redis_url: String,
 | 
						|
    supervisor_port: u16,
 | 
						|
    runner_id: String,
 | 
						|
    db_path: String,
 | 
						|
}
 | 
						|
 | 
						|
impl Default for DemoConfig {
 | 
						|
    fn default() -> Self {
 | 
						|
        Self {
 | 
						|
            redis_url: "redis://localhost:6379".to_string(),
 | 
						|
            supervisor_port: 3030,
 | 
						|
            runner_id: "example_runner".to_string(),
 | 
						|
            db_path: "/tmp/example_runner.db".to_string(),
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
/// Supervisor process wrapper
 | 
						|
struct SupervisorProcess {
 | 
						|
    child: Child,
 | 
						|
}
 | 
						|
 | 
						|
impl SupervisorProcess {
 | 
						|
    fn start(config: &DemoConfig) -> Result<Self> {
 | 
						|
        info!("🚀 Starting supervisor on port {}...", config.supervisor_port);
 | 
						|
        
 | 
						|
        let child = Command::new("cargo")
 | 
						|
            .args(&[
 | 
						|
                "run",
 | 
						|
                "--bin",
 | 
						|
                "hero-supervisor",
 | 
						|
                "--",
 | 
						|
                "--redis-url",
 | 
						|
                &config.redis_url,
 | 
						|
                "--port",
 | 
						|
                &config.supervisor_port.to_string(),
 | 
						|
            ])
 | 
						|
            .stdout(Stdio::piped())
 | 
						|
            .stderr(Stdio::piped())
 | 
						|
            .spawn()
 | 
						|
            .context("Failed to start supervisor")?;
 | 
						|
        
 | 
						|
        Ok(Self { child })
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
impl Drop for SupervisorProcess {
 | 
						|
    fn drop(&mut self) {
 | 
						|
        info!("🛑 Stopping supervisor...");
 | 
						|
        let _ = self.child.kill();
 | 
						|
        let _ = self.child.wait();
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
/// Runner process wrapper
 | 
						|
struct RunnerProcess {
 | 
						|
    child: Child,
 | 
						|
}
 | 
						|
 | 
						|
impl RunnerProcess {
 | 
						|
    fn start(config: &DemoConfig) -> Result<Self> {
 | 
						|
        info!("🤖 Starting OSIS runner '{}'...", config.runner_id);
 | 
						|
        
 | 
						|
        let child = Command::new("cargo")
 | 
						|
            .args(&[
 | 
						|
                "run",
 | 
						|
                "--bin",
 | 
						|
                "runner_osis",
 | 
						|
                "--",
 | 
						|
                &config.runner_id,
 | 
						|
                "--db-path",
 | 
						|
                &config.db_path,
 | 
						|
                "--redis-url",
 | 
						|
                &config.redis_url,
 | 
						|
            ])
 | 
						|
            .env("RUST_LOG", "info")
 | 
						|
            .stdout(Stdio::piped())
 | 
						|
            .stderr(Stdio::piped())
 | 
						|
            .spawn()
 | 
						|
            .context("Failed to start runner")?;
 | 
						|
        
 | 
						|
        Ok(Self { child })
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
impl Drop for RunnerProcess {
 | 
						|
    fn drop(&mut self) {
 | 
						|
        info!("🛑 Stopping runner...");
 | 
						|
        let _ = self.child.kill();
 | 
						|
        let _ = self.child.wait();
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
/// Helper functions for the demo
 | 
						|
async fn register_runner_helper(client: &SupervisorClient, runner_id: &str, secret: &str) -> Result<()> {
 | 
						|
    info!("📝 Registering runner '{}'...", runner_id);
 | 
						|
    
 | 
						|
    let queue = format!("hero:q:work:type:osis:group:default:inst:{}", runner_id);
 | 
						|
    client.register_runner(secret, runner_id, &queue).await?;
 | 
						|
    
 | 
						|
    info!("✅ Runner registered successfully");
 | 
						|
    Ok(())
 | 
						|
}
 | 
						|
 | 
						|
async fn run_job_helper(client: &SupervisorClient, job: runner_rust::job::Job, secret: &str, timeout: u64) -> Result<String> {
 | 
						|
    info!("🚀 Running job {} (blocking)...", job.id);
 | 
						|
    
 | 
						|
    let response = client.job_run(secret, job, Some(timeout)).await?;
 | 
						|
    
 | 
						|
    let result = response.result
 | 
						|
        .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
 | 
						|
    
 | 
						|
    info!("✅ Job completed with result: {}", result);
 | 
						|
    Ok(result)
 | 
						|
}
 | 
						|
 | 
						|
async fn start_job_helper(client: &SupervisorClient, job: runner_rust::job::Job, secret: &str) -> Result<String> {
 | 
						|
    info!("🚀 Starting job {} (non-blocking)...", job.id);
 | 
						|
    
 | 
						|
    let response = client.job_start(secret, job).await?;
 | 
						|
    
 | 
						|
    info!("✅ Job queued with ID: {}", response.job_id);
 | 
						|
    Ok(response.job_id)
 | 
						|
}
 | 
						|
 | 
						|
#[tokio::main]
 | 
						|
async fn main() -> Result<()> {
 | 
						|
    // Initialize logging
 | 
						|
    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
 | 
						|
    
 | 
						|
    println!("\n╔════════════════════════════════════════════════════════════╗");
 | 
						|
    println!("║  End-to-End Demo: Supervisor + Runner + Client            ║");
 | 
						|
    println!("╚════════════════════════════════════════════════════════════╝\n");
 | 
						|
    
 | 
						|
    let config = DemoConfig::default();
 | 
						|
    
 | 
						|
    // Step 1: Start supervisor
 | 
						|
    println!("📋 Step 1: Starting Supervisor");
 | 
						|
    println!("─────────────────────────────────────────────────────────────");
 | 
						|
    let _supervisor = SupervisorProcess::start(&config)?;
 | 
						|
    sleep(Duration::from_secs(3)).await;
 | 
						|
    println!("✅ Supervisor started on port {}\n", config.supervisor_port);
 | 
						|
    
 | 
						|
    // Step 2: Start runner
 | 
						|
    println!("📋 Step 2: Starting OSIS Runner");
 | 
						|
    println!("─────────────────────────────────────────────────────────────");
 | 
						|
    let _runner = RunnerProcess::start(&config)?;
 | 
						|
    sleep(Duration::from_secs(3)).await;
 | 
						|
    println!("✅ Runner '{}' started\n", config.runner_id);
 | 
						|
    
 | 
						|
    // Step 3: Create client and register runner
 | 
						|
    println!("📋 Step 3: Registering Runner with Supervisor");
 | 
						|
    println!("─────────────────────────────────────────────────────────────");
 | 
						|
    let client = SupervisorClient::new(&format!("http://localhost:{}", config.supervisor_port))?;
 | 
						|
    register_runner_helper(&client, &config.runner_id, "admin_secret").await?;
 | 
						|
    println!("✅ Runner registered\n");
 | 
						|
    
 | 
						|
    sleep(Duration::from_secs(2)).await;
 | 
						|
    
 | 
						|
    // Step 4: Run blocking jobs (job.run)
 | 
						|
    println!("📋 Step 4: Running Blocking Jobs (job.run)");
 | 
						|
    println!("─────────────────────────────────────────────────────────────");
 | 
						|
    
 | 
						|
    // Job 1: Simple calculation
 | 
						|
    println!("\n🔹 Job 1: Simple Calculation");
 | 
						|
    let job1 = JobBuilder::new()
 | 
						|
        .caller_id("demo_client")
 | 
						|
        .context_id("demo_context")
 | 
						|
        .payload("let result = 2 + 2; to_json(result)")
 | 
						|
        .runner(&config.runner_id)
 | 
						|
        .executor("rhai")
 | 
						|
        .timeout(30)
 | 
						|
        .build()?;
 | 
						|
    
 | 
						|
    let result1 = run_job_helper(&client, job1, "admin_secret", 30).await?;
 | 
						|
    println!("   Result: {}", result1);
 | 
						|
    
 | 
						|
    // Job 2: String manipulation
 | 
						|
    println!("\n🔹 Job 2: String Manipulation");
 | 
						|
    let job2 = JobBuilder::new()
 | 
						|
        .caller_id("demo_client")
 | 
						|
        .context_id("demo_context")
 | 
						|
        .payload(r#"let msg = "Hello from OSIS Runner!"; to_json(msg)"#)
 | 
						|
        .runner(&config.runner_id)
 | 
						|
        .executor("rhai")
 | 
						|
        .timeout(30)
 | 
						|
        .build()?;
 | 
						|
    
 | 
						|
    let result2 = run_job_helper(&client, job2, "admin_secret", 30).await?;
 | 
						|
    println!("   Result: {}", result2);
 | 
						|
    
 | 
						|
    // Job 3: Array operations
 | 
						|
    println!("\n🔹 Job 3: Array Operations");
 | 
						|
    let job3 = JobBuilder::new()
 | 
						|
        .caller_id("demo_client")
 | 
						|
        .context_id("demo_context")
 | 
						|
        .payload(r#"
 | 
						|
            let numbers = [1, 2, 3, 4, 5];
 | 
						|
            let sum = 0;
 | 
						|
            for n in numbers {
 | 
						|
                sum += n;
 | 
						|
            }
 | 
						|
            to_json(#{sum: sum, count: numbers.len()})
 | 
						|
        "#)
 | 
						|
        .runner(&config.runner_id)
 | 
						|
        .executor("rhai")
 | 
						|
        .timeout(30)
 | 
						|
        .build()?;
 | 
						|
    
 | 
						|
    let result3 = run_job_helper(&client, job3, "admin_secret", 30).await?;
 | 
						|
    println!("   Result: {}", result3);
 | 
						|
    
 | 
						|
    println!("\n✅ All blocking jobs completed successfully\n");
 | 
						|
    
 | 
						|
    // Step 5: Start non-blocking jobs (job.start)
 | 
						|
    println!("📋 Step 5: Starting Non-Blocking Jobs (job.start)");
 | 
						|
    println!("─────────────────────────────────────────────────────────────");
 | 
						|
    
 | 
						|
    println!("\n🔹 Job 4: Background Task");
 | 
						|
    let job4 = JobBuilder::new()
 | 
						|
        .caller_id("demo_client")
 | 
						|
        .context_id("demo_context")
 | 
						|
        .payload(r#"
 | 
						|
            let result = "Background task completed";
 | 
						|
            to_json(result)
 | 
						|
        "#)
 | 
						|
        .runner(&config.runner_id)
 | 
						|
        .executor("rhai")
 | 
						|
        .timeout(30)
 | 
						|
        .build()?;
 | 
						|
    
 | 
						|
    let job4_id = start_job_helper(&client, job4, "admin_secret").await?;
 | 
						|
    println!("   Job ID: {} (running in background)", job4_id);
 | 
						|
    
 | 
						|
    println!("\n✅ Non-blocking job started\n");
 | 
						|
    
 | 
						|
    // Step 6: Summary
 | 
						|
    println!("📋 Step 6: Demo Summary");
 | 
						|
    println!("─────────────────────────────────────────────────────────────");
 | 
						|
    println!("✅ Supervisor: Running on port {}", config.supervisor_port);
 | 
						|
    println!("✅ Runner: '{}' registered and processing jobs", config.runner_id);
 | 
						|
    println!("✅ Blocking jobs: 3 completed successfully");
 | 
						|
    println!("✅ Non-blocking jobs: 1 started");
 | 
						|
    println!("\n🎉 Demo completed successfully!");
 | 
						|
    
 | 
						|
    // Keep processes running for a bit to see logs
 | 
						|
    println!("\n⏳ Keeping processes running for 5 seconds...");
 | 
						|
    sleep(Duration::from_secs(5)).await;
 | 
						|
    
 | 
						|
    println!("\n🛑 Shutting down...");
 | 
						|
    
 | 
						|
    Ok(())
 | 
						|
}
 |