Files
horus/lib/clients/coordinator/examples/flow_demo.rs
Timur Gordon f66edba1d3 Add coordinator client library, installation scripts, and new test runners
- Add coordinator client library to workspace
- Add installation documentation and heroscript
- Add new test runners for Osiris and Sal
- Update hero runner test to handle invalid heroscript errors
- Update README with installation instructions
2025-11-17 10:56:13 +01:00

275 lines
8.0 KiB
Rust

//! Flow demo for Hero Coordinator
//!
//! This example demonstrates the complete flow:
//! 1. Create an actor
//! 2. Create a context with permissions
//! 3. Register a runner
//! 4. Create jobs with dependencies
//! 5. Create and start a flow
//! 6. Poll until completion
//!
//! Usage:
//! COORDINATOR_URL=http://127.0.0.1:9652 cargo run --example flow_demo -- \
//! --dst-ip 127.0.0.1 \
//! --context-id 2 \
//! --actor-id 11001 \
//! --runner-id 12001 \
//! --flow-id 13001 \
//! --jobs 3
use hero_coordinator_client::{CoordinatorClient, models::*};
use std::collections::HashMap;
use std::time::Duration;
use clap::Parser;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Coordinator URL
#[arg(long, env = "COORDINATOR_URL", default_value = "http://127.0.0.1:9652")]
coordinator_url: String,
/// Destination IP address
#[arg(long)]
dst_ip: String,
/// Destination public key (alternative to dst-ip)
#[arg(long)]
dst_pk: Option<String>,
/// Context ID (Redis DB index, 0-15)
#[arg(long, default_value_t = 2)]
context_id: u32,
/// Actor ID
#[arg(long, default_value_t = 11001)]
actor_id: u32,
/// Runner ID
#[arg(long, default_value_t = 12001)]
runner_id: u32,
/// Flow ID
#[arg(long, default_value_t = 13001)]
flow_id: u32,
/// Base job ID (subsequent jobs increment from this)
#[arg(long, default_value_t = 20000)]
base_job_id: u32,
/// Number of jobs to create (forms a simple chain)
#[arg(long, default_value_t = 3)]
jobs: u32,
/// Per-job timeout in seconds
#[arg(long, default_value_t = 60)]
timeout_secs: u64,
/// Per-job retries (0-255)
#[arg(long, default_value_t = 0)]
retries: u8,
/// Script type for jobs/runner
#[arg(long, default_value = "Python")]
script_type: String,
/// Supervisor topic
#[arg(long, default_value = "supervisor.rpc")]
topic: String,
/// Optional supervisor secret
#[arg(long)]
secret: Option<String>,
/// Flow poll interval in seconds
#[arg(long, default_value_t = 2.0)]
poll_interval: f64,
/// Max seconds to wait for flow completion
#[arg(long, default_value_t = 600)]
poll_timeout: u64,
}
fn print_header(title: &str) {
println!("\n{}", "=".repeat(80));
println!("{}", title);
println!("{}", "=".repeat(80));
}
fn parse_script_type(s: &str) -> ScriptType {
match s {
"Python" => ScriptType::Python,
"V" => ScriptType::V,
"Osis" => ScriptType::Osis,
"Sal" => ScriptType::Sal,
_ => ScriptType::Python,
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let args = Args::parse();
if args.jobs < 1 {
eprintln!("ERROR: --jobs must be >= 1");
std::process::exit(2);
}
let client = CoordinatorClient::new(&args.coordinator_url)?;
let script_type = parse_script_type(&args.script_type);
// 1) Actor
print_header("actor.create (or load)");
let actor = client.actor_create_or_load(ActorCreate {
id: args.actor_id,
pubkey: "demo-pubkey".to_string(),
address: vec!["127.0.0.1".parse()?],
}).await?;
println!("{:#?}", actor);
// 2) Context
print_header("context.create (or load)");
let context = client.context_create_or_load(ContextCreate {
id: args.context_id,
admins: vec![args.actor_id],
readers: vec![args.actor_id],
executors: vec![args.actor_id],
}).await?;
println!("{:#?}", context);
// 3) Runner
print_header("runner.create (or load)");
let runner_pubkey = args.dst_pk.unwrap_or_default();
let runner_address: std::net::IpAddr = args.dst_ip.parse()?;
let runner = client.runner_create_or_load(args.context_id, RunnerCreate {
id: args.runner_id,
pubkey: runner_pubkey,
address: runner_address,
topic: args.topic.clone(),
script_type,
local: false,
secret: args.secret.clone(),
}).await?;
println!("{:#?}", runner);
// 4) Jobs - build a simple chain
let mut job_ids = Vec::new();
for i in 0..args.jobs {
let job_id = args.base_job_id + i;
let depends = if i == 0 {
vec![]
} else {
vec![args.base_job_id + (i - 1)]
};
let job_type = if depends.is_empty() { "(root)" } else { format!("(depends on {:?})", depends) };
print_header(&format!("job.create - {} {}", job_id, job_type));
let job = client.job_create_or_load(args.context_id, JobCreate {
id: job_id,
caller_id: args.actor_id,
context_id: args.context_id,
script: format!("print('Job {} running')", i),
script_type,
timeout: args.timeout_secs,
retries: args.retries,
env_vars: HashMap::new(),
prerequisites: vec![],
depends,
}).await?;
println!("{:#?}", job);
job_ids.push(job_id);
}
// 5) Flow
print_header("flow.create (or load)");
let flow = client.flow_create_or_load(args.context_id, FlowCreate {
id: args.flow_id,
caller_id: args.actor_id,
context_id: args.context_id,
jobs: job_ids.clone(),
env_vars: HashMap::new(),
}).await?;
println!("{:#?}", flow);
// Optional: show DAG
print_header("flow.dag");
match client.flow_dag(args.context_id, args.flow_id).await {
Ok(dag) => println!("{:#?}", dag),
Err(e) => eprintln!("WARN: flow.dag failed: {}", e),
}
// 6) Start flow
print_header("flow.start");
let started = client.flow_start(args.context_id, args.flow_id).await?;
println!("flow.start -> {}", started);
// 7) Poll until completion
print_header("Polling flow.load until completion");
let poll_interval = Duration::from_secs_f64(args.poll_interval);
let timeout = Duration::from_secs(args.poll_timeout);
let start = std::time::Instant::now();
let mut poll_count = 0;
let mut last_status_print = std::time::Instant::now();
loop {
poll_count += 1;
let flow = client.flow_load(args.context_id, args.flow_id).await?;
let now = std::time::Instant::now();
if now.duration_since(last_status_print) >= Duration::from_secs(1).max(poll_interval) {
println!("[{}s] flow.status = {:?}", start.elapsed().as_secs(), flow.status);
last_status_print = now;
}
// Every 5th poll, print the current flow DAG
if poll_count % 5 == 0 {
print_header("flow.dag (periodic)");
match client.flow_dag(args.context_id, args.flow_id).await {
Ok(dag) => println!("{:#?}", dag),
Err(e) => eprintln!("WARN: periodic flow.dag failed: {}", e),
}
}
match flow.status {
FlowStatus::Finished | FlowStatus::Error => {
break;
}
_ => {
if start.elapsed() > timeout {
eprintln!("ERROR: Flow did not complete within {:?} (status={:?})", timeout, flow.status);
break;
}
tokio::time::sleep(poll_interval).await;
}
}
}
// 8) Final summary: job statuses
print_header("Final job statuses");
for job_id in &job_ids {
match client.job_load(args.context_id, args.actor_id, *job_id).await {
Ok(job) => {
println!("Job {}: status={:?} result={:?}", job_id, job.status, job.result);
}
Err(e) => {
eprintln!("Job {}: load failed: {}", job_id, e);
}
}
}
// Final status check
let final_flow = client.flow_load(args.context_id, args.flow_id).await?;
print_header("Result");
if final_flow.status == FlowStatus::Finished {
println!("Flow finished successfully.");
Ok(())
} else {
println!("Flow ended with status={:?}", final_flow.status);
std::process::exit(1);
}
}