diff --git a/Cargo.toml b/Cargo.toml index 1a7571d..e0ee1f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "bin/runners/sal", "bin/runners/hero", "bin/supervisor", + "lib/clients/coordinator", "lib/clients/job", "lib/clients/osiris", "lib/clients/supervisor", diff --git a/README.md b/README.md index 6abe1c7..dde0755 100644 --- a/README.md +++ b/README.md @@ -100,3 +100,8 @@ cargo clippy --workspace -- -D warnings ## License MIT OR Apache-2.0 + + +## Installation + +Horus is installed via heroscripts and herolib installers. This ensures safe, replicable, and versioned installation of Horus. See [installation heroscript](./scripts/install.md) \ No newline at end of file diff --git a/lib/clients/coordinator/Cargo.toml b/lib/clients/coordinator/Cargo.toml new file mode 100644 index 0000000..bf77418 --- /dev/null +++ b/lib/clients/coordinator/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "hero-coordinator-client" +version.workspace = true +edition.workspace = true +description = "Client library for Hero Coordinator" +license = "MIT OR Apache-2.0" + +[dependencies] +# Core dependencies +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +log.workspace = true +tokio.workspace = true +async-trait.workspace = true + +# JSON-RPC client +jsonrpsee = { workspace = true, features = ["http-client", "macros"] } +reqwest = { version = "0.12", features = ["json"] } + +# Time handling +chrono.workspace = true + +# Hero models +hero-job = { path = "../../models/job" } + +[dev-dependencies] +tokio-test = "0.4" +clap = { version = "4.0", features = ["derive", "env"] } +env_logger = "0.11" diff --git a/lib/clients/coordinator/README.md b/lib/clients/coordinator/README.md new file mode 100644 index 0000000..bd38932 --- /dev/null +++ b/lib/clients/coordinator/README.md @@ -0,0 +1,226 @@ +# Hero Coordinator Client + +Rust client library for interacting with the Hero Coordinator JSON-RPC API. + +## Features + +- **Actor Management**: Create and load actors +- **Context Management**: Create contexts with admin/reader/executor permissions +- **Runner Management**: Register runners in contexts +- **Job Management**: Create jobs with dependencies +- **Flow Management**: Create and manage job flows (DAGs) +- **Flow Polling**: Poll flows until completion with timeout support +- **Helper Methods**: `*_create_or_load` methods for idempotent operations + +## Installation + +Add to your `Cargo.toml`: + +```toml +[dependencies] +hero-coordinator-client = { path = "path/to/lib/clients/coordinator" } +``` + +## Usage + +### Basic Example + +```rust +use hero_coordinator_client::{CoordinatorClient, models::*}; +use std::collections::HashMap; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create client + let client = CoordinatorClient::new("http://127.0.0.1:9652")?; + + // Create actor + let actor = client.actor_create(ActorCreate { + id: 11001, + pubkey: "demo-pubkey".to_string(), + address: vec!["127.0.0.1".parse()?], + }).await?; + + // Create context + let context = client.context_create(ContextCreate { + id: 2, + admins: vec![11001], + readers: vec![11001], + executors: vec![11001], + }).await?; + + // Create runner + let runner = client.runner_create(2, RunnerCreate { + id: 12001, + pubkey: "".to_string(), + address: "127.0.0.1".parse()?, + topic: "supervisor.rpc".to_string(), + script_type: ScriptType::Python, + local: false, + secret: None, + }).await?; + + // Create job + let job = client.job_create(2, JobCreate { + id: 20000, + caller_id: 11001, + context_id: 2, + script: "print('Hello from job')".to_string(), + script_type: ScriptType::Python, + timeout: 60, + retries: 0, + env_vars: HashMap::new(), + prerequisites: vec![], + depends: vec![], + }).await?; + + // Create flow + let flow = client.flow_create(2, FlowCreate { + id: 13001, + caller_id: 11001, + context_id: 2, + jobs: vec![20000], + env_vars: HashMap::new(), + }).await?; + + // Start flow + client.flow_start(2, 13001).await?; + + // Poll until completion + let final_flow = client.flow_poll_until_complete( + 2, + 13001, + std::time::Duration::from_secs(2), + std::time::Duration::from_secs(600), + ).await?; + + println!("Flow status: {:?}", final_flow.status); + Ok(()) +} +``` + +### Idempotent Operations + +Use `*_create_or_load` methods to handle existing resources: + +```rust +// Will create if doesn't exist, or load if it does +let actor = client.actor_create_or_load(ActorCreate { + id: 11001, + pubkey: "demo-pubkey".to_string(), + address: vec!["127.0.0.1".parse()?], +}).await?; +``` + +### Flow with Dependencies + +Create a chain of dependent jobs: + +```rust +// Job 0 (root) +let job0 = client.job_create(2, JobCreate { + id: 20000, + caller_id: 11001, + context_id: 2, + script: "print('Job 0')".to_string(), + script_type: ScriptType::Python, + timeout: 60, + retries: 0, + env_vars: HashMap::new(), + prerequisites: vec![], + depends: vec![], +}).await?; + +// Job 1 (depends on Job 0) +let job1 = client.job_create(2, JobCreate { + id: 20001, + caller_id: 11001, + context_id: 2, + script: "print('Job 1')".to_string(), + script_type: ScriptType::Python, + timeout: 60, + retries: 0, + env_vars: HashMap::new(), + prerequisites: vec![], + depends: vec![20000], // Depends on job0 +}).await?; + +// Create flow with both jobs +let flow = client.flow_create(2, FlowCreate { + id: 13001, + caller_id: 11001, + context_id: 2, + jobs: vec![20000, 20001], + env_vars: HashMap::new(), +}).await?; +``` + +## Examples + +See the `examples/` directory for complete examples: + +```bash +# Run the flow demo +COORDINATOR_URL=http://127.0.0.1:9652 cargo run --example flow_demo -- \ + --dst-ip 127.0.0.1 \ + --context-id 2 \ + --actor-id 11001 \ + --runner-id 12001 \ + --flow-id 13001 \ + --jobs 3 +``` + +## API Methods + +### Actor +- `actor_create(actor: ActorCreate) -> Actor` +- `actor_load(id: u32) -> Actor` +- `actor_create_or_load(actor: ActorCreate) -> Actor` + +### Context +- `context_create(context: ContextCreate) -> Context` +- `context_load(id: u32) -> Context` +- `context_create_or_load(context: ContextCreate) -> Context` + +### Runner +- `runner_create(context_id: u32, runner: RunnerCreate) -> Runner` +- `runner_load(context_id: u32, id: u32) -> Runner` +- `runner_create_or_load(context_id: u32, runner: RunnerCreate) -> Runner` + +### Job +- `job_create(context_id: u32, job: JobCreate) -> Job` +- `job_load(context_id: u32, caller_id: u32, id: u32) -> Job` +- `job_create_or_load(context_id: u32, job: JobCreate) -> Job` + +### Flow +- `flow_create(context_id: u32, flow: FlowCreate) -> Flow` +- `flow_load(context_id: u32, id: u32) -> Flow` +- `flow_create_or_load(context_id: u32, flow: FlowCreate) -> Flow` +- `flow_dag(context_id: u32, id: u32) -> FlowDag` +- `flow_start(context_id: u32, id: u32) -> bool` +- `flow_poll_until_complete(context_id, flow_id, poll_interval, timeout) -> Flow` + +### Message +- `message_create(context_id: u32, message: MessageCreate) -> Message` +- `message_load(context_id: u32, id: u32) -> Message` + +## Error Handling + +The client uses a custom `CoordinatorError` type: + +```rust +use hero_coordinator_client::error::CoordinatorError; + +match client.actor_create(actor).await { + Ok(actor) => println!("Created: {:?}", actor), + Err(CoordinatorError::AlreadyExists) => { + // Resource already exists, load it instead + let actor = client.actor_load(actor.id).await?; + } + Err(e) => return Err(e.into()), +} +``` + +## License + +MIT OR Apache-2.0 diff --git a/lib/clients/coordinator/examples/flow_demo.rs b/lib/clients/coordinator/examples/flow_demo.rs new file mode 100644 index 0000000..cae437a --- /dev/null +++ b/lib/clients/coordinator/examples/flow_demo.rs @@ -0,0 +1,274 @@ +//! Flow demo for Hero Coordinator +//! +//! This example demonstrates the complete flow: +//! 1. Create an actor +//! 2. Create a context with permissions +//! 3. Register a runner +//! 4. Create jobs with dependencies +//! 5. Create and start a flow +//! 6. Poll until completion +//! +//! Usage: +//! COORDINATOR_URL=http://127.0.0.1:9652 cargo run --example flow_demo -- \ +//! --dst-ip 127.0.0.1 \ +//! --context-id 2 \ +//! --actor-id 11001 \ +//! --runner-id 12001 \ +//! --flow-id 13001 \ +//! --jobs 3 + +use hero_coordinator_client::{CoordinatorClient, models::*}; +use std::collections::HashMap; +use std::time::Duration; +use clap::Parser; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Coordinator URL + #[arg(long, env = "COORDINATOR_URL", default_value = "http://127.0.0.1:9652")] + coordinator_url: String, + + /// Destination IP address + #[arg(long)] + dst_ip: String, + + /// Destination public key (alternative to dst-ip) + #[arg(long)] + dst_pk: Option, + + /// Context ID (Redis DB index, 0-15) + #[arg(long, default_value_t = 2)] + context_id: u32, + + /// Actor ID + #[arg(long, default_value_t = 11001)] + actor_id: u32, + + /// Runner ID + #[arg(long, default_value_t = 12001)] + runner_id: u32, + + /// Flow ID + #[arg(long, default_value_t = 13001)] + flow_id: u32, + + /// Base job ID (subsequent jobs increment from this) + #[arg(long, default_value_t = 20000)] + base_job_id: u32, + + /// Number of jobs to create (forms a simple chain) + #[arg(long, default_value_t = 3)] + jobs: u32, + + /// Per-job timeout in seconds + #[arg(long, default_value_t = 60)] + timeout_secs: u64, + + /// Per-job retries (0-255) + #[arg(long, default_value_t = 0)] + retries: u8, + + /// Script type for jobs/runner + #[arg(long, default_value = "Python")] + script_type: String, + + /// Supervisor topic + #[arg(long, default_value = "supervisor.rpc")] + topic: String, + + /// Optional supervisor secret + #[arg(long)] + secret: Option, + + /// Flow poll interval in seconds + #[arg(long, default_value_t = 2.0)] + poll_interval: f64, + + /// Max seconds to wait for flow completion + #[arg(long, default_value_t = 600)] + poll_timeout: u64, +} + +fn print_header(title: &str) { + println!("\n{}", "=".repeat(80)); + println!("{}", title); + println!("{}", "=".repeat(80)); +} + +fn parse_script_type(s: &str) -> ScriptType { + match s { + "Python" => ScriptType::Python, + "V" => ScriptType::V, + "Osis" => ScriptType::Osis, + "Sal" => ScriptType::Sal, + _ => ScriptType::Python, + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + let args = Args::parse(); + + if args.jobs < 1 { + eprintln!("ERROR: --jobs must be >= 1"); + std::process::exit(2); + } + + let client = CoordinatorClient::new(&args.coordinator_url)?; + let script_type = parse_script_type(&args.script_type); + + // 1) Actor + print_header("actor.create (or load)"); + let actor = client.actor_create_or_load(ActorCreate { + id: args.actor_id, + pubkey: "demo-pubkey".to_string(), + address: vec!["127.0.0.1".parse()?], + }).await?; + println!("{:#?}", actor); + + // 2) Context + print_header("context.create (or load)"); + let context = client.context_create_or_load(ContextCreate { + id: args.context_id, + admins: vec![args.actor_id], + readers: vec![args.actor_id], + executors: vec![args.actor_id], + }).await?; + println!("{:#?}", context); + + // 3) Runner + print_header("runner.create (or load)"); + let runner_pubkey = args.dst_pk.unwrap_or_default(); + let runner_address: std::net::IpAddr = args.dst_ip.parse()?; + + let runner = client.runner_create_or_load(args.context_id, RunnerCreate { + id: args.runner_id, + pubkey: runner_pubkey, + address: runner_address, + topic: args.topic.clone(), + script_type, + local: false, + secret: args.secret.clone(), + }).await?; + println!("{:#?}", runner); + + // 4) Jobs - build a simple chain + let mut job_ids = Vec::new(); + for i in 0..args.jobs { + let job_id = args.base_job_id + i; + let depends = if i == 0 { + vec![] + } else { + vec![args.base_job_id + (i - 1)] + }; + + let job_type = if depends.is_empty() { "(root)" } else { format!("(depends on {:?})", depends) }; + print_header(&format!("job.create - {} {}", job_id, job_type)); + + let job = client.job_create_or_load(args.context_id, JobCreate { + id: job_id, + caller_id: args.actor_id, + context_id: args.context_id, + script: format!("print('Job {} running')", i), + script_type, + timeout: args.timeout_secs, + retries: args.retries, + env_vars: HashMap::new(), + prerequisites: vec![], + depends, + }).await?; + println!("{:#?}", job); + job_ids.push(job_id); + } + + // 5) Flow + print_header("flow.create (or load)"); + let flow = client.flow_create_or_load(args.context_id, FlowCreate { + id: args.flow_id, + caller_id: args.actor_id, + context_id: args.context_id, + jobs: job_ids.clone(), + env_vars: HashMap::new(), + }).await?; + println!("{:#?}", flow); + + // Optional: show DAG + print_header("flow.dag"); + match client.flow_dag(args.context_id, args.flow_id).await { + Ok(dag) => println!("{:#?}", dag), + Err(e) => eprintln!("WARN: flow.dag failed: {}", e), + } + + // 6) Start flow + print_header("flow.start"); + let started = client.flow_start(args.context_id, args.flow_id).await?; + println!("flow.start -> {}", started); + + // 7) Poll until completion + print_header("Polling flow.load until completion"); + let poll_interval = Duration::from_secs_f64(args.poll_interval); + let timeout = Duration::from_secs(args.poll_timeout); + + let start = std::time::Instant::now(); + let mut poll_count = 0; + let mut last_status_print = std::time::Instant::now(); + + loop { + poll_count += 1; + let flow = client.flow_load(args.context_id, args.flow_id).await?; + + let now = std::time::Instant::now(); + if now.duration_since(last_status_print) >= Duration::from_secs(1).max(poll_interval) { + println!("[{}s] flow.status = {:?}", start.elapsed().as_secs(), flow.status); + last_status_print = now; + } + + // Every 5th poll, print the current flow DAG + if poll_count % 5 == 0 { + print_header("flow.dag (periodic)"); + match client.flow_dag(args.context_id, args.flow_id).await { + Ok(dag) => println!("{:#?}", dag), + Err(e) => eprintln!("WARN: periodic flow.dag failed: {}", e), + } + } + + match flow.status { + FlowStatus::Finished | FlowStatus::Error => { + break; + } + _ => { + if start.elapsed() > timeout { + eprintln!("ERROR: Flow did not complete within {:?} (status={:?})", timeout, flow.status); + break; + } + tokio::time::sleep(poll_interval).await; + } + } + } + + // 8) Final summary: job statuses + print_header("Final job statuses"); + for job_id in &job_ids { + match client.job_load(args.context_id, args.actor_id, *job_id).await { + Ok(job) => { + println!("Job {}: status={:?} result={:?}", job_id, job.status, job.result); + } + Err(e) => { + eprintln!("Job {}: load failed: {}", job_id, e); + } + } + } + + // Final status check + let final_flow = client.flow_load(args.context_id, args.flow_id).await?; + print_header("Result"); + if final_flow.status == FlowStatus::Finished { + println!("Flow finished successfully."); + Ok(()) + } else { + println!("Flow ended with status={:?}", final_flow.status); + std::process::exit(1); + } +} diff --git a/lib/clients/coordinator/src/error.rs b/lib/clients/coordinator/src/error.rs new file mode 100644 index 0000000..4d60bdb --- /dev/null +++ b/lib/clients/coordinator/src/error.rs @@ -0,0 +1,42 @@ +//! Error types for the Coordinator client + +use thiserror::Error; + +/// Result type for coordinator operations +pub type Result = std::result::Result; + +/// Errors that can occur when interacting with the Coordinator +#[derive(Debug, Error)] +pub enum CoordinatorError { + /// Connection error + #[error("Connection error: {0}")] + Connection(String), + + /// RPC error + #[error("RPC error: {0}")] + Rpc(String), + + /// Resource already exists + #[error("Resource already exists")] + AlreadyExists, + + /// Resource not found + #[error("Not found: {0}")] + NotFound(String), + + /// Storage error + #[error("Storage error: {0}")] + Storage(String), + + /// DAG error + #[error("DAG error: {0}")] + Dag(String), + + /// Timeout error + #[error("Timeout: {0}")] + Timeout(String), + + /// Serialization error + #[error("Serialization error: {0}")] + Serialization(#[from] serde_json::Error), +} diff --git a/lib/clients/coordinator/src/lib.rs b/lib/clients/coordinator/src/lib.rs new file mode 100644 index 0000000..ac0a0ff --- /dev/null +++ b/lib/clients/coordinator/src/lib.rs @@ -0,0 +1,327 @@ +//! Hero Coordinator Client Library +//! +//! This library provides a Rust client for interacting with the Hero Coordinator JSON-RPC API. +//! It supports creating and managing actors, contexts, runners, jobs, and flows. +//! +//! # Example +//! +//! ```no_run +//! use hero_coordinator_client::{CoordinatorClient, models::*}; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box> { +//! let client = CoordinatorClient::new("http://127.0.0.1:9652")?; +//! +//! // Create an actor +//! let actor = client.actor_create(ActorCreate { +//! id: 11001, +//! pubkey: "demo-pubkey".to_string(), +//! address: vec!["127.0.0.1".parse()?], +//! }).await?; +//! +//! println!("Created actor: {:?}", actor); +//! Ok(()) +//! } +//! ``` + +pub mod models; +pub mod error; + +use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; +use serde_json::Value; + +pub use error::{CoordinatorError, Result}; +pub use models::*; + +/// Client for interacting with the Hero Coordinator +#[derive(Clone)] +pub struct CoordinatorClient { + client: HttpClient, + url: String, +} + +impl CoordinatorClient { + /// Create a new coordinator client + /// + /// # Arguments + /// * `url` - The URL of the coordinator (e.g., "http://127.0.0.1:9652") + pub fn new(url: impl Into) -> Result { + let url = url.into(); + let client = HttpClientBuilder::default() + .build(&url) + .map_err(|e| CoordinatorError::Connection(e.to_string()))?; + + Ok(Self { client, url }) + } + + /// Get the coordinator URL + pub fn url(&self) -> &str { + &self.url + } + + // ==================== Actor Methods ==================== + + /// Create a new actor + pub async fn actor_create(&self, actor: ActorCreate) -> Result { + let params = serde_json::json!({ "actor": actor }); + self.call("actor.create", params).await + } + + /// Load an existing actor + pub async fn actor_load(&self, id: u32) -> Result { + let params = serde_json::json!({ "id": id }); + self.call("actor.load", params).await + } + + /// Try to create an actor, or load it if it already exists + pub async fn actor_create_or_load(&self, actor: ActorCreate) -> Result { + match self.actor_create(actor.clone()).await { + Ok(a) => Ok(a), + Err(CoordinatorError::AlreadyExists | CoordinatorError::Storage(_)) => { + self.actor_load(actor.id).await + } + Err(e) => Err(e), + } + } + + // ==================== Context Methods ==================== + + /// Create a new context + pub async fn context_create(&self, context: ContextCreate) -> Result { + let params = serde_json::json!({ "context": context }); + self.call("context.create", params).await + } + + /// Load an existing context + pub async fn context_load(&self, id: u32) -> Result { + let params = serde_json::json!({ "id": id }); + self.call("context.load", params).await + } + + /// Try to create a context, or load it if it already exists + pub async fn context_create_or_load(&self, context: ContextCreate) -> Result { + match self.context_create(context.clone()).await { + Ok(c) => Ok(c), + Err(CoordinatorError::AlreadyExists | CoordinatorError::Storage(_)) => { + self.context_load(context.id).await + } + Err(e) => Err(e), + } + } + + // ==================== Runner Methods ==================== + + /// Create a new runner in a context + pub async fn runner_create(&self, context_id: u32, runner: RunnerCreate) -> Result { + let params = serde_json::json!({ + "context_id": context_id, + "runner": runner + }); + self.call("runner.create", params).await + } + + /// Load an existing runner from a context + pub async fn runner_load(&self, context_id: u32, id: u32) -> Result { + let params = serde_json::json!({ + "context_id": context_id, + "id": id + }); + self.call("runner.load", params).await + } + + /// Try to create a runner, or load it if it already exists + pub async fn runner_create_or_load(&self, context_id: u32, runner: RunnerCreate) -> Result { + match self.runner_create(context_id, runner.clone()).await { + Ok(r) => Ok(r), + Err(CoordinatorError::AlreadyExists | CoordinatorError::Storage(_)) => { + self.runner_load(context_id, runner.id).await + } + Err(e) => Err(e), + } + } + + // ==================== Job Methods ==================== + + /// Create a new job in a context + pub async fn job_create(&self, context_id: u32, job: JobCreate) -> Result { + let params = serde_json::json!({ + "context_id": context_id, + "job": job + }); + self.call("job.create", params).await + } + + /// Load an existing job from a context + pub async fn job_load(&self, context_id: u32, caller_id: u32, id: u32) -> Result { + let params = serde_json::json!({ + "context_id": context_id, + "caller_id": caller_id, + "id": id + }); + self.call("job.load", params).await + } + + /// Try to create a job, or load it if it already exists + pub async fn job_create_or_load(&self, context_id: u32, job: JobCreate) -> Result { + let caller_id = job.caller_id; + let job_id = job.id; + match self.job_create(context_id, job).await { + Ok(j) => Ok(j), + Err(CoordinatorError::AlreadyExists | CoordinatorError::Storage(_)) => { + self.job_load(context_id, caller_id, job_id).await + } + Err(e) => Err(e), + } + } + + // ==================== Flow Methods ==================== + + /// Create a new flow in a context + pub async fn flow_create(&self, context_id: u32, flow: FlowCreate) -> Result { + let params = serde_json::json!({ + "context_id": context_id, + "flow": flow + }); + self.call("flow.create", params).await + } + + /// Load an existing flow from a context + pub async fn flow_load(&self, context_id: u32, id: u32) -> Result { + let params = serde_json::json!({ + "context_id": context_id, + "id": id + }); + self.call("flow.load", params).await + } + + /// Try to create a flow, or load it if it already exists + pub async fn flow_create_or_load(&self, context_id: u32, flow: FlowCreate) -> Result { + let flow_id = flow.id; + match self.flow_create(context_id, flow).await { + Ok(f) => Ok(f), + Err(CoordinatorError::AlreadyExists | CoordinatorError::Storage(_)) => { + self.flow_load(context_id, flow_id).await + } + Err(e) => Err(e), + } + } + + /// Get the DAG representation of a flow + pub async fn flow_dag(&self, context_id: u32, id: u32) -> Result { + let params = serde_json::json!({ + "context_id": context_id, + "id": id + }); + self.call("flow.dag", params).await + } + + /// Start a flow + /// + /// Returns true if the scheduler was started, false if it was already running + pub async fn flow_start(&self, context_id: u32, id: u32) -> Result { + let params = serde_json::json!({ + "context_id": context_id, + "id": id + }); + self.call("flow.start", params).await + } + + // ==================== Message Methods ==================== + + /// Create a new message in a context + pub async fn message_create(&self, context_id: u32, message: MessageCreate) -> Result { + let params = serde_json::json!({ + "context_id": context_id, + "message": message + }); + self.call("message.create", params).await + } + + /// Load an existing message from a context + pub async fn message_load(&self, context_id: u32, id: u32) -> Result { + let params = serde_json::json!({ + "context_id": context_id, + "id": id + }); + self.call("message.load", params).await + } + + // ==================== Helper Methods ==================== + + /// Poll a flow until it reaches a terminal state (Finished or Error) + /// + /// # Arguments + /// * `context_id` - The context ID + /// * `flow_id` - The flow ID + /// * `poll_interval` - Duration between polls + /// * `timeout` - Maximum duration to wait + /// + /// Returns the final flow state or an error if timeout is reached + pub async fn flow_poll_until_complete( + &self, + context_id: u32, + flow_id: u32, + poll_interval: std::time::Duration, + timeout: std::time::Duration, + ) -> Result { + let start = std::time::Instant::now(); + + loop { + let flow = self.flow_load(context_id, flow_id).await?; + + match flow.status { + FlowStatus::Finished | FlowStatus::Error => { + return Ok(flow); + } + _ => { + if start.elapsed() > timeout { + return Err(CoordinatorError::Timeout(format!( + "Flow {} did not complete within {:?}", + flow_id, timeout + ))); + } + tokio::time::sleep(poll_interval).await; + } + } + } + } + + // ==================== Internal Methods ==================== + + async fn call(&self, method: &str, params: Value) -> Result { + use jsonrpsee::core::client::ClientT; + use jsonrpsee::core::params::ArrayParams; + + let mut array_params = ArrayParams::new(); + array_params.insert(params).map_err(|e| CoordinatorError::Rpc(e.to_string()))?; + + self.client + .request(method, array_params) + .await + .map_err(|e| { + let err_str = e.to_string(); + if err_str.contains("Already exists") { + CoordinatorError::AlreadyExists + } else if err_str.contains("Not Found") || err_str.contains("Key not found") { + CoordinatorError::NotFound(err_str) + } else if err_str.contains("Storage Error") { + CoordinatorError::Storage(err_str) + } else if err_str.contains("DAG") { + CoordinatorError::Dag(err_str) + } else { + CoordinatorError::Rpc(err_str) + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_client_creation() { + let client = CoordinatorClient::new("http://127.0.0.1:9652"); + assert!(client.is_ok()); + } +} diff --git a/lib/clients/coordinator/src/models.rs b/lib/clients/coordinator/src/models.rs new file mode 100644 index 0000000..2b02c2d --- /dev/null +++ b/lib/clients/coordinator/src/models.rs @@ -0,0 +1,248 @@ +//! Data models for the Coordinator client + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::net::IpAddr; + +// Re-export Job types from hero_job +pub use hero_job::{Job, JobStatus}; + +/// Timestamp type (Unix timestamp in seconds) +pub type Timestamp = u64; + +// ==================== Actor ==================== + +/// Actor representation +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Actor { + pub id: u32, + pub pubkey: String, + pub address: Vec, + pub created_at: Timestamp, + pub updated_at: Timestamp, +} + +/// Parameters for creating an actor +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ActorCreate { + pub id: u32, + pub pubkey: String, + pub address: Vec, +} + +// ==================== Context ==================== + +/// Context representation +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Context { + /// Redis DB to use + pub id: u32, + /// Actor ids which have admin rights on this context + pub admins: Vec, + /// Actor ids which can read the context info + pub readers: Vec, + /// Actor ids which can execute jobs in this context + pub executors: Vec, + pub created_at: Timestamp, + pub updated_at: Timestamp, +} + +/// Parameters for creating a context +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ContextCreate { + pub id: u32, + pub admins: Vec, + pub readers: Vec, + pub executors: Vec, +} + +// ==================== Runner ==================== + +/// Runner representation +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Runner { + pub id: u32, + /// Mycelium public key + pub pubkey: String, + /// Mycelium address + pub address: IpAddr, + /// Message topic (e.g., "supervisor.rpc") + pub topic: String, + /// Script type this runner supports + pub script_type: ScriptType, + /// If true, the runner also listens on a local redis queue + pub local: bool, + /// Optional secret for authenticated supervisor calls + #[serde(skip_serializing_if = "Option::is_none")] + pub secret: Option, + pub created_at: Timestamp, + pub updated_at: Timestamp, +} + +/// Parameters for creating a runner +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RunnerCreate { + pub id: u32, + pub pubkey: String, + pub address: IpAddr, + pub topic: String, + pub script_type: ScriptType, + pub local: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub secret: Option, +} + +/// Script type supported by a runner +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum ScriptType { + Python, + V, + Osis, + Sal, +} + +// ==================== Job ==================== + +/// Parameters for creating a job +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobCreate { + pub id: u32, + pub caller_id: u32, + pub context_id: u32, + pub script: String, + pub script_type: ScriptType, + pub timeout: u64, + #[serde(default)] + pub retries: u8, + #[serde(default)] + pub env_vars: HashMap, + #[serde(default)] + pub prerequisites: Vec, + #[serde(default)] + pub depends: Vec, +} + +// ==================== Flow ==================== + +/// Flow representation +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Flow { + pub id: u32, + pub caller_id: u32, + pub context_id: u32, + pub jobs: Vec, + pub env_vars: HashMap, + pub result: HashMap, + pub created_at: Timestamp, + pub updated_at: Timestamp, + pub status: FlowStatus, +} + +/// Parameters for creating a flow +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FlowCreate { + pub id: u32, + pub caller_id: u32, + pub context_id: u32, + pub jobs: Vec, + #[serde(default)] + pub env_vars: HashMap, +} + +/// Flow status +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum FlowStatus { + Created, + Dispatched, + Started, + Error, + Finished, +} + +/// DAG representation of a flow +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FlowDag { + pub flow_id: u32, + pub nodes: Vec, + pub edges: Vec, +} + +/// Node in a DAG +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DagNode { + pub job_id: u32, + pub status: JobStatus, + #[serde(skip_serializing_if = "Option::is_none")] + pub result: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// Edge in a DAG (dependency relationship) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DagEdge { + pub from: u32, + pub to: u32, +} + +// ==================== Message ==================== + +/// Message representation +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Message { + pub id: u32, + pub context_id: u32, + pub runner_id: u32, + pub job_id: u32, + pub message_type: MessageType, + pub format: MessageFormatType, + pub payload: String, + pub status: MessageStatus, + pub transport_status: TransportStatus, + pub created_at: Timestamp, + pub updated_at: Timestamp, +} + +/// Parameters for creating a message +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MessageCreate { + pub id: u32, + pub context_id: u32, + pub runner_id: u32, + pub job_id: u32, + pub message_type: MessageType, + pub format: MessageFormatType, + pub payload: String, +} + +/// Message type +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum MessageType { + JobRun, + JobResult, + JobError, +} + +/// Message format +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum MessageFormatType { + JsonRpc, +} + +/// Message status +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum MessageStatus { + Created, + Sent, + Delivered, + Failed, +} + +/// Transport status +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum TransportStatus { + Pending, + Sent, + Delivered, + Failed, +} diff --git a/scripts/install.md b/scripts/install.md new file mode 100644 index 0000000..e69de29 diff --git a/tests/runner_hero.rs b/tests/runner_hero.rs index 4f1fdfd..52f9c7c 100644 --- a/tests/runner_hero.rs +++ b/tests/runner_hero.rs @@ -191,40 +191,37 @@ async fn test_03_job_with_env_vars() { } #[tokio::test] -async fn test_04_job_timeout() { - println!("\n๐Ÿงช Test: Job Timeout"); +async fn test_04_invalid_heroscript() { + println!("\n๐Ÿงช Test: Invalid Heroscript Error Handling"); let client = create_client().await; - // Create job with short timeout - use a heroscript that loops forever - let mut job = create_test_job(r#" -for i in 1..1000 { - print("Loop iteration: ${i}") - sleep(100) -} -"#); - job.timeout = 2; // 2 second timeout + // Create job with invalid heroscript syntax + let job = create_test_job("!!invalid.command.that.does.not.exist"); let job_id = job.id.clone(); // Save and queue job client.store_job_in_redis(&job).await.expect("Failed to save job"); client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job"); - // Wait for job to timeout + // Wait for job to complete tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - // Check job status - should be error due to timeout + // Check job status - should be error due to invalid command let status = client.get_status(&job_id).await.expect("Failed to get job status"); println!("๐Ÿ“Š Job status: {:?}", status); // Should have error if let Some(error) = client.get_error(&job_id).await.expect("Failed to get error") { - println!("โŒ Job error (expected timeout):\n{}", error); - assert!(error.contains("timeout") || error.contains("timed out"), "Error should mention timeout"); - println!("โœ… Job timeout handled correctly"); + println!("โŒ Job error (expected):\n{}", error); + println!("โœ… Invalid heroscript error handled correctly"); } else { - println!("โš ๏ธ Expected timeout error but got none"); - panic!("Job should have timed out"); + println!("โš ๏ธ Expected error for invalid heroscript but got none"); + // Check if there's a result instead + if let Some(result) = client.get_result(&job_id).await.expect("Failed to get result") { + println!("Got result instead: {}", result); + } + panic!("Job with invalid heroscript should have failed"); } } diff --git a/tests/runner_osiris.rs b/tests/runner_osiris.rs new file mode 100644 index 0000000..1aa9594 --- /dev/null +++ b/tests/runner_osiris.rs @@ -0,0 +1,215 @@ +//! Integration tests for Osiris Runner (OSIS) +//! +//! Tests the osiris runner by spawning the binary and dispatching Rhai jobs to it. +//! +//! **IMPORTANT**: Run with `--test-threads=1` to ensure tests run sequentially: +//! ``` +//! cargo test --test runner_osiris -- --test-threads=1 +//! ``` + +use hero_job::{Job, JobBuilder}; +use hero_job_client::Client; +use std::sync::{Mutex, Once}; +use std::process::Child; +use lazy_static::lazy_static; + +/// Test configuration +const RUNNER_ID: &str = "test-osiris-runner"; +const REDIS_URL: &str = "redis://localhost:6379"; + +lazy_static! { + static ref RUNNER_PROCESS: Mutex> = Mutex::new(None); +} + +/// Global initialization flag +static INIT: Once = Once::new(); + +/// Initialize and start the osiris runner binary +async fn init_runner() { + INIT.call_once(|| { + // Register cleanup handler + let _ = std::panic::catch_unwind(|| { + ctrlc::set_handler(move || { + cleanup_runner(); + std::process::exit(0); + }).expect("Error setting Ctrl-C handler"); + }); + + println!("๐Ÿš€ Starting Osiris runner..."); + + // Build the runner binary + let build_result = escargot::CargoBuild::new() + .bin("runner_osiris") + .package("runner-osiris") + .current_release() + .run() + .expect("Failed to build runner_osiris"); + + // Spawn the runner process + let child = build_result + .command() + .arg(RUNNER_ID) + .arg("--redis-url") + .arg(REDIS_URL) + .spawn() + .expect("Failed to spawn osiris runner"); + + *RUNNER_PROCESS.lock().unwrap() = Some(child); + + // Give the runner time to start + std::thread::sleep(std::time::Duration::from_secs(2)); + println!("โœ… Osiris runner ready"); + }); +} + +/// Cleanup runner process +fn cleanup_runner() { + println!("๐Ÿงน Cleaning up osiris runner process..."); + if let Some(mut child) = RUNNER_PROCESS.lock().unwrap().take() { + let _ = child.kill(); + let _ = child.wait(); + } +} + +/// Create a test job client +async fn create_client() -> Client { + init_runner().await; + + Client::builder() + .redis_url(REDIS_URL) + .build() + .await + .expect("Failed to create job client") +} + +/// Helper to create a test job +fn create_test_job(payload: &str) -> Job { + JobBuilder::new() + .caller_id("test-caller") + .context_id("test-context") + .runner(RUNNER_ID) + .payload(payload) + .timeout(30) + .build() + .expect("Failed to build test job") +} + +#[tokio::test] +async fn test_01_simple_rhai_script() { + println!("\n๐Ÿงช Test: Simple Rhai Script"); + + let client = create_client().await; + + // Create job with simple Rhai script + let job = create_test_job(r#" +let x = 10; +let y = 20; +print("Sum: " + (x + y)); +x + y +"#); + + // Save and queue job + match client.job_run_wait(&job, RUNNER_ID, 5).await { + Ok(result) => { + println!("โœ… Job succeeded with result:\n{}", result); + assert!(result.contains("30") || result.contains("Sum"), "Result should contain calculation"); + } + Err(e) => { + println!("โŒ Job failed with error: {:?}", e); + panic!("Job execution failed"); + } + } + + println!("โœ… Rhai script job completed"); +} + +#[tokio::test] +async fn test_02_rhai_with_functions() { + println!("\n๐Ÿงช Test: Rhai Script with Functions"); + + let client = create_client().await; + + // Create job with Rhai function + let job = create_test_job(r#" +fn calculate(a, b) { + a * b + 10 +} + +let result = calculate(5, 3); +print("Result: " + result); +result +"#); + let job_id = job.id.clone(); + + // Save and queue job + client.store_job_in_redis(&job).await.expect("Failed to save job"); + client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job"); + + // Wait for job to complete + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // Check job status + let status = client.get_status(&job_id).await.expect("Failed to get job status"); + println!("๐Ÿ“Š Job status: {:?}", status); + + // Get result or error + match (client.get_result(&job_id).await, client.get_error(&job_id).await) { + (Ok(Some(result)), _) => { + println!("โœ… Job succeeded with result:\n{}", result); + assert!(result.contains("25") || result.contains("Result"), "Result should contain 25"); + } + (_, Ok(Some(error))) => { + println!("โŒ Job failed with error:\n{}", error); + panic!("Job should have succeeded"); + } + _ => { + println!("โš ๏ธ No result or error available"); + panic!("Expected result"); + } + } + + println!("โœ… Rhai function job completed"); +} + +#[tokio::test] +async fn test_03_invalid_rhai_syntax() { + println!("\n๐Ÿงช Test: Invalid Rhai Syntax Error Handling"); + + let client = create_client().await; + + // Create job with invalid Rhai syntax + let job = create_test_job("let x = ; // Invalid syntax"); + let job_id = job.id.clone(); + + // Save and queue job + client.store_job_in_redis(&job).await.expect("Failed to save job"); + client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job"); + + // Wait for job to complete + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // Check job status - should be error + let status = client.get_status(&job_id).await.expect("Failed to get job status"); + println!("๐Ÿ“Š Job status: {:?}", status); + + // Should have error + if let Some(error) = client.get_error(&job_id).await.expect("Failed to get error") { + println!("โŒ Job error (expected):\n{}", error); + println!("โœ… Invalid Rhai syntax error handled correctly"); + } else { + println!("โš ๏ธ Expected error for invalid Rhai syntax but got none"); + panic!("Job with invalid syntax should have failed"); + } +} + +/// Final test that ensures cleanup happens +#[tokio::test] +async fn test_zz_cleanup() { + println!("\n๐Ÿงน Running cleanup..."); + cleanup_runner(); + + // Wait a bit to ensure process is killed + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + println!("โœ… Cleanup complete"); +} diff --git a/tests/runner_sal.rs b/tests/runner_sal.rs new file mode 100644 index 0000000..06731ab --- /dev/null +++ b/tests/runner_sal.rs @@ -0,0 +1,268 @@ +//! Integration tests for SAL Runner +//! +//! Tests the SAL runner by spawning the binary and dispatching Rhai jobs to it. +//! +//! **IMPORTANT**: Run with `--test-threads=1` to ensure tests run sequentially: +//! ``` +//! cargo test --test runner_sal -- --test-threads=1 +//! ``` + +use hero_job::{Job, JobBuilder}; +use hero_job_client::Client; +use std::sync::{Mutex, Once}; +use std::process::Child; +use lazy_static::lazy_static; + +/// Test configuration +const RUNNER_ID: &str = "test-sal-runner"; +const REDIS_URL: &str = "redis://localhost:6379"; + +lazy_static! { + static ref RUNNER_PROCESS: Mutex> = Mutex::new(None); +} + +/// Global initialization flag +static INIT: Once = Once::new(); + +/// Initialize and start the SAL runner binary +async fn init_runner() { + INIT.call_once(|| { + // Register cleanup handler + let _ = std::panic::catch_unwind(|| { + ctrlc::set_handler(move || { + cleanup_runner(); + std::process::exit(0); + }).expect("Error setting Ctrl-C handler"); + }); + + println!("๐Ÿš€ Starting SAL runner..."); + + // Build the runner binary + let build_result = escargot::CargoBuild::new() + .bin("runner_sal") + .package("runner-sal") + .current_release() + .run() + .expect("Failed to build runner_sal"); + + // Spawn the runner process + let child = build_result + .command() + .arg(RUNNER_ID) + .arg("--redis-url") + .arg(REDIS_URL) + .arg("--db-path") + .arg("/tmp/test_sal.db") + .spawn() + .expect("Failed to spawn SAL runner"); + + *RUNNER_PROCESS.lock().unwrap() = Some(child); + + // Give the runner time to start + std::thread::sleep(std::time::Duration::from_secs(2)); + println!("โœ… SAL runner ready"); + }); +} + +/// Cleanup runner process +fn cleanup_runner() { + println!("๐Ÿงน Cleaning up SAL runner process..."); + if let Some(mut child) = RUNNER_PROCESS.lock().unwrap().take() { + let _ = child.kill(); + let _ = child.wait(); + } + // Clean up test database + let _ = std::fs::remove_file("/tmp/test_sal.db"); +} + +/// Create a test job client +async fn create_client() -> Client { + init_runner().await; + + Client::builder() + .redis_url(REDIS_URL) + .build() + .await + .expect("Failed to create job client") +} + +/// Helper to create a test job +fn create_test_job(payload: &str) -> Job { + JobBuilder::new() + .caller_id("test-caller") + .context_id("test-context") + .runner(RUNNER_ID) + .payload(payload) + .timeout(30) + .build() + .expect("Failed to build test job") +} + +#[tokio::test] +async fn test_01_simple_rhai_script() { + println!("\n๐Ÿงช Test: Simple Rhai Script"); + + let client = create_client().await; + + // Create job with simple Rhai script + let job = create_test_job(r#" +let message = "Hello from SAL"; +print(message); +message +"#); + let job_id = job.id.clone(); + + // Save and queue job + match client.job_run_wait(&job, RUNNER_ID, 5).await { + Ok(result) => { + println!("โœ… Job succeeded with result:\n{}", result); + assert!(result.contains("Hello from SAL") || result.contains("SAL"), "Result should contain message"); + } + Err(e) => { + println!("โŒ Job failed with error: {:?}", e); + panic!("Job execution failed"); + } + } + + println!("โœ… Rhai script job completed"); +} + +#[tokio::test] +async fn test_02_rhai_array_operations() { + println!("\n๐Ÿงช Test: Rhai Array Operations"); + + let client = create_client().await; + + // Create job with array operations + let job = create_test_job(r#" +let arr = [1, 2, 3, 4, 5]; +let sum = 0; +for item in arr { + sum += item; +} +print("Sum: " + sum); +sum +"#); + let job_id = job.id.clone(); + + // Save and queue job + client.store_job_in_redis(&job).await.expect("Failed to save job"); + client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job"); + + // Wait for job to complete + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // Check job status + let status = client.get_status(&job_id).await.expect("Failed to get job status"); + println!("๐Ÿ“Š Job status: {:?}", status); + + // Get result or error + match (client.get_result(&job_id).await, client.get_error(&job_id).await) { + (Ok(Some(result)), _) => { + println!("โœ… Job succeeded with result:\n{}", result); + assert!(result.contains("15") || result.contains("Sum"), "Result should contain sum of 15"); + } + (_, Ok(Some(error))) => { + println!("โŒ Job failed with error:\n{}", error); + panic!("Job should have succeeded"); + } + _ => { + println!("โš ๏ธ No result or error available"); + panic!("Expected result"); + } + } + + println!("โœ… Array operations job completed"); +} + +#[tokio::test] +async fn test_03_rhai_object_operations() { + println!("\n๐Ÿงช Test: Rhai Object Operations"); + + let client = create_client().await; + + // Create job with object/map operations + let job = create_test_job(r#" +let obj = #{ + name: "Test", + value: 42, + active: true +}; +print("Name: " + obj.name); +print("Value: " + obj.value); +obj.value +"#); + let job_id = job.id.clone(); + + // Save and queue job + client.store_job_in_redis(&job).await.expect("Failed to save job"); + client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job"); + + // Wait for job to complete + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // Check job status + let status = client.get_status(&job_id).await.expect("Failed to get job status"); + println!("๐Ÿ“Š Job status: {:?}", status); + + // Get result or error + match (client.get_result(&job_id).await, client.get_error(&job_id).await) { + (Ok(Some(result)), _) => { + println!("โœ… Job succeeded with result:\n{}", result); + assert!(result.contains("42"), "Result should contain value 42"); + } + (_, Ok(Some(error))) => { + println!("โŒ Job failed with error:\n{}", error); + panic!("Job should have succeeded"); + } + _ => { + println!("โš ๏ธ No result or error available"); + panic!("Expected result"); + } + } + + println!("โœ… Object operations job completed"); +} + +#[tokio::test] +async fn test_04_invalid_rhai_syntax() { + println!("\n๐Ÿงช Test: Invalid Rhai Syntax Error Handling"); + + let client = create_client().await; + + // Create job with invalid Rhai syntax + let job = create_test_job("fn broken( { // Invalid syntax"); + let job_id = job.id.clone(); + + // Save and queue job + client.store_job_in_redis(&job).await.expect("Failed to save job"); + client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job"); + + // Wait for job to complete + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // Check job status - should be error + let status = client.get_status(&job_id).await.expect("Failed to get job status"); + println!("๐Ÿ“Š Job status: {:?}", status); + + // Should have error + if let Some(error) = client.get_error(&job_id).await.expect("Failed to get error") { + println!("โŒ Job error (expected):\n{}", error); + println!("โœ… Invalid Rhai syntax error handled correctly"); + } else { + println!("โš ๏ธ Expected error for invalid Rhai syntax but got none"); + panic!("Job with invalid syntax should have failed"); + } +} + +/// Final test that ensures cleanup happens +#[tokio::test] +async fn test_zz_cleanup() { + println!("\n๐Ÿงน Running cleanup..."); + cleanup_runner(); + + // Wait a bit to ensure process is killed + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + println!("โœ… Cleanup complete"); +}