Add coordinator client library, installation scripts, and new test runners

- Add coordinator client library to workspace
- Add installation documentation and heroscript
- Add new test runners for Osiris and Sal
- Update hero runner test to handle invalid heroscript errors
- Update README with installation instructions
This commit is contained in:
Timur Gordon
2025-11-17 10:56:13 +01:00
parent c95be9339e
commit f66edba1d3
12 changed files with 1650 additions and 17 deletions

View File

@@ -7,6 +7,7 @@ members = [
"bin/runners/sal",
"bin/runners/hero",
"bin/supervisor",
"lib/clients/coordinator",
"lib/clients/job",
"lib/clients/osiris",
"lib/clients/supervisor",

View File

@@ -100,3 +100,8 @@ cargo clippy --workspace -- -D warnings
## License
MIT OR Apache-2.0
## Installation
Horus is installed via heroscripts and herolib installers. This ensures safe, replicable, and versioned installation of Horus. See [installation heroscript](./scripts/install.md)

View File

@@ -0,0 +1,30 @@
[package]
name = "hero-coordinator-client"
version.workspace = true
edition.workspace = true
description = "Client library for Hero Coordinator"
license = "MIT OR Apache-2.0"
[dependencies]
# Core dependencies
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
log.workspace = true
tokio.workspace = true
async-trait.workspace = true
# JSON-RPC client
jsonrpsee = { workspace = true, features = ["http-client", "macros"] }
reqwest = { version = "0.12", features = ["json"] }
# Time handling
chrono.workspace = true
# Hero models
hero-job = { path = "../../models/job" }
[dev-dependencies]
tokio-test = "0.4"
clap = { version = "4.0", features = ["derive", "env"] }
env_logger = "0.11"

View File

@@ -0,0 +1,226 @@
# Hero Coordinator Client
Rust client library for interacting with the Hero Coordinator JSON-RPC API.
## Features
- **Actor Management**: Create and load actors
- **Context Management**: Create contexts with admin/reader/executor permissions
- **Runner Management**: Register runners in contexts
- **Job Management**: Create jobs with dependencies
- **Flow Management**: Create and manage job flows (DAGs)
- **Flow Polling**: Poll flows until completion with timeout support
- **Helper Methods**: `*_create_or_load` methods for idempotent operations
## Installation
Add to your `Cargo.toml`:
```toml
[dependencies]
hero-coordinator-client = { path = "path/to/lib/clients/coordinator" }
```
## Usage
### Basic Example
```rust
use hero_coordinator_client::{CoordinatorClient, models::*};
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create client
let client = CoordinatorClient::new("http://127.0.0.1:9652")?;
// Create actor
let actor = client.actor_create(ActorCreate {
id: 11001,
pubkey: "demo-pubkey".to_string(),
address: vec!["127.0.0.1".parse()?],
}).await?;
// Create context
let context = client.context_create(ContextCreate {
id: 2,
admins: vec![11001],
readers: vec![11001],
executors: vec![11001],
}).await?;
// Create runner
let runner = client.runner_create(2, RunnerCreate {
id: 12001,
pubkey: "".to_string(),
address: "127.0.0.1".parse()?,
topic: "supervisor.rpc".to_string(),
script_type: ScriptType::Python,
local: false,
secret: None,
}).await?;
// Create job
let job = client.job_create(2, JobCreate {
id: 20000,
caller_id: 11001,
context_id: 2,
script: "print('Hello from job')".to_string(),
script_type: ScriptType::Python,
timeout: 60,
retries: 0,
env_vars: HashMap::new(),
prerequisites: vec![],
depends: vec![],
}).await?;
// Create flow
let flow = client.flow_create(2, FlowCreate {
id: 13001,
caller_id: 11001,
context_id: 2,
jobs: vec![20000],
env_vars: HashMap::new(),
}).await?;
// Start flow
client.flow_start(2, 13001).await?;
// Poll until completion
let final_flow = client.flow_poll_until_complete(
2,
13001,
std::time::Duration::from_secs(2),
std::time::Duration::from_secs(600),
).await?;
println!("Flow status: {:?}", final_flow.status);
Ok(())
}
```
### Idempotent Operations
Use `*_create_or_load` methods to handle existing resources:
```rust
// Will create if doesn't exist, or load if it does
let actor = client.actor_create_or_load(ActorCreate {
id: 11001,
pubkey: "demo-pubkey".to_string(),
address: vec!["127.0.0.1".parse()?],
}).await?;
```
### Flow with Dependencies
Create a chain of dependent jobs:
```rust
// Job 0 (root)
let job0 = client.job_create(2, JobCreate {
id: 20000,
caller_id: 11001,
context_id: 2,
script: "print('Job 0')".to_string(),
script_type: ScriptType::Python,
timeout: 60,
retries: 0,
env_vars: HashMap::new(),
prerequisites: vec![],
depends: vec![],
}).await?;
// Job 1 (depends on Job 0)
let job1 = client.job_create(2, JobCreate {
id: 20001,
caller_id: 11001,
context_id: 2,
script: "print('Job 1')".to_string(),
script_type: ScriptType::Python,
timeout: 60,
retries: 0,
env_vars: HashMap::new(),
prerequisites: vec![],
depends: vec![20000], // Depends on job0
}).await?;
// Create flow with both jobs
let flow = client.flow_create(2, FlowCreate {
id: 13001,
caller_id: 11001,
context_id: 2,
jobs: vec![20000, 20001],
env_vars: HashMap::new(),
}).await?;
```
## Examples
See the `examples/` directory for complete examples:
```bash
# Run the flow demo
COORDINATOR_URL=http://127.0.0.1:9652 cargo run --example flow_demo -- \
--dst-ip 127.0.0.1 \
--context-id 2 \
--actor-id 11001 \
--runner-id 12001 \
--flow-id 13001 \
--jobs 3
```
## API Methods
### Actor
- `actor_create(actor: ActorCreate) -> Actor`
- `actor_load(id: u32) -> Actor`
- `actor_create_or_load(actor: ActorCreate) -> Actor`
### Context
- `context_create(context: ContextCreate) -> Context`
- `context_load(id: u32) -> Context`
- `context_create_or_load(context: ContextCreate) -> Context`
### Runner
- `runner_create(context_id: u32, runner: RunnerCreate) -> Runner`
- `runner_load(context_id: u32, id: u32) -> Runner`
- `runner_create_or_load(context_id: u32, runner: RunnerCreate) -> Runner`
### Job
- `job_create(context_id: u32, job: JobCreate) -> Job`
- `job_load(context_id: u32, caller_id: u32, id: u32) -> Job`
- `job_create_or_load(context_id: u32, job: JobCreate) -> Job`
### Flow
- `flow_create(context_id: u32, flow: FlowCreate) -> Flow`
- `flow_load(context_id: u32, id: u32) -> Flow`
- `flow_create_or_load(context_id: u32, flow: FlowCreate) -> Flow`
- `flow_dag(context_id: u32, id: u32) -> FlowDag`
- `flow_start(context_id: u32, id: u32) -> bool`
- `flow_poll_until_complete(context_id, flow_id, poll_interval, timeout) -> Flow`
### Message
- `message_create(context_id: u32, message: MessageCreate) -> Message`
- `message_load(context_id: u32, id: u32) -> Message`
## Error Handling
The client uses a custom `CoordinatorError` type:
```rust
use hero_coordinator_client::error::CoordinatorError;
match client.actor_create(actor).await {
Ok(actor) => println!("Created: {:?}", actor),
Err(CoordinatorError::AlreadyExists) => {
// Resource already exists, load it instead
let actor = client.actor_load(actor.id).await?;
}
Err(e) => return Err(e.into()),
}
```
## License
MIT OR Apache-2.0

View File

@@ -0,0 +1,274 @@
//! Flow demo for Hero Coordinator
//!
//! This example demonstrates the complete flow:
//! 1. Create an actor
//! 2. Create a context with permissions
//! 3. Register a runner
//! 4. Create jobs with dependencies
//! 5. Create and start a flow
//! 6. Poll until completion
//!
//! Usage:
//! COORDINATOR_URL=http://127.0.0.1:9652 cargo run --example flow_demo -- \
//! --dst-ip 127.0.0.1 \
//! --context-id 2 \
//! --actor-id 11001 \
//! --runner-id 12001 \
//! --flow-id 13001 \
//! --jobs 3
use hero_coordinator_client::{CoordinatorClient, models::*};
use std::collections::HashMap;
use std::time::Duration;
use clap::Parser;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Coordinator URL
#[arg(long, env = "COORDINATOR_URL", default_value = "http://127.0.0.1:9652")]
coordinator_url: String,
/// Destination IP address
#[arg(long)]
dst_ip: String,
/// Destination public key (alternative to dst-ip)
#[arg(long)]
dst_pk: Option<String>,
/// Context ID (Redis DB index, 0-15)
#[arg(long, default_value_t = 2)]
context_id: u32,
/// Actor ID
#[arg(long, default_value_t = 11001)]
actor_id: u32,
/// Runner ID
#[arg(long, default_value_t = 12001)]
runner_id: u32,
/// Flow ID
#[arg(long, default_value_t = 13001)]
flow_id: u32,
/// Base job ID (subsequent jobs increment from this)
#[arg(long, default_value_t = 20000)]
base_job_id: u32,
/// Number of jobs to create (forms a simple chain)
#[arg(long, default_value_t = 3)]
jobs: u32,
/// Per-job timeout in seconds
#[arg(long, default_value_t = 60)]
timeout_secs: u64,
/// Per-job retries (0-255)
#[arg(long, default_value_t = 0)]
retries: u8,
/// Script type for jobs/runner
#[arg(long, default_value = "Python")]
script_type: String,
/// Supervisor topic
#[arg(long, default_value = "supervisor.rpc")]
topic: String,
/// Optional supervisor secret
#[arg(long)]
secret: Option<String>,
/// Flow poll interval in seconds
#[arg(long, default_value_t = 2.0)]
poll_interval: f64,
/// Max seconds to wait for flow completion
#[arg(long, default_value_t = 600)]
poll_timeout: u64,
}
fn print_header(title: &str) {
println!("\n{}", "=".repeat(80));
println!("{}", title);
println!("{}", "=".repeat(80));
}
fn parse_script_type(s: &str) -> ScriptType {
match s {
"Python" => ScriptType::Python,
"V" => ScriptType::V,
"Osis" => ScriptType::Osis,
"Sal" => ScriptType::Sal,
_ => ScriptType::Python,
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let args = Args::parse();
if args.jobs < 1 {
eprintln!("ERROR: --jobs must be >= 1");
std::process::exit(2);
}
let client = CoordinatorClient::new(&args.coordinator_url)?;
let script_type = parse_script_type(&args.script_type);
// 1) Actor
print_header("actor.create (or load)");
let actor = client.actor_create_or_load(ActorCreate {
id: args.actor_id,
pubkey: "demo-pubkey".to_string(),
address: vec!["127.0.0.1".parse()?],
}).await?;
println!("{:#?}", actor);
// 2) Context
print_header("context.create (or load)");
let context = client.context_create_or_load(ContextCreate {
id: args.context_id,
admins: vec![args.actor_id],
readers: vec![args.actor_id],
executors: vec![args.actor_id],
}).await?;
println!("{:#?}", context);
// 3) Runner
print_header("runner.create (or load)");
let runner_pubkey = args.dst_pk.unwrap_or_default();
let runner_address: std::net::IpAddr = args.dst_ip.parse()?;
let runner = client.runner_create_or_load(args.context_id, RunnerCreate {
id: args.runner_id,
pubkey: runner_pubkey,
address: runner_address,
topic: args.topic.clone(),
script_type,
local: false,
secret: args.secret.clone(),
}).await?;
println!("{:#?}", runner);
// 4) Jobs - build a simple chain
let mut job_ids = Vec::new();
for i in 0..args.jobs {
let job_id = args.base_job_id + i;
let depends = if i == 0 {
vec![]
} else {
vec![args.base_job_id + (i - 1)]
};
let job_type = if depends.is_empty() { "(root)" } else { format!("(depends on {:?})", depends) };
print_header(&format!("job.create - {} {}", job_id, job_type));
let job = client.job_create_or_load(args.context_id, JobCreate {
id: job_id,
caller_id: args.actor_id,
context_id: args.context_id,
script: format!("print('Job {} running')", i),
script_type,
timeout: args.timeout_secs,
retries: args.retries,
env_vars: HashMap::new(),
prerequisites: vec![],
depends,
}).await?;
println!("{:#?}", job);
job_ids.push(job_id);
}
// 5) Flow
print_header("flow.create (or load)");
let flow = client.flow_create_or_load(args.context_id, FlowCreate {
id: args.flow_id,
caller_id: args.actor_id,
context_id: args.context_id,
jobs: job_ids.clone(),
env_vars: HashMap::new(),
}).await?;
println!("{:#?}", flow);
// Optional: show DAG
print_header("flow.dag");
match client.flow_dag(args.context_id, args.flow_id).await {
Ok(dag) => println!("{:#?}", dag),
Err(e) => eprintln!("WARN: flow.dag failed: {}", e),
}
// 6) Start flow
print_header("flow.start");
let started = client.flow_start(args.context_id, args.flow_id).await?;
println!("flow.start -> {}", started);
// 7) Poll until completion
print_header("Polling flow.load until completion");
let poll_interval = Duration::from_secs_f64(args.poll_interval);
let timeout = Duration::from_secs(args.poll_timeout);
let start = std::time::Instant::now();
let mut poll_count = 0;
let mut last_status_print = std::time::Instant::now();
loop {
poll_count += 1;
let flow = client.flow_load(args.context_id, args.flow_id).await?;
let now = std::time::Instant::now();
if now.duration_since(last_status_print) >= Duration::from_secs(1).max(poll_interval) {
println!("[{}s] flow.status = {:?}", start.elapsed().as_secs(), flow.status);
last_status_print = now;
}
// Every 5th poll, print the current flow DAG
if poll_count % 5 == 0 {
print_header("flow.dag (periodic)");
match client.flow_dag(args.context_id, args.flow_id).await {
Ok(dag) => println!("{:#?}", dag),
Err(e) => eprintln!("WARN: periodic flow.dag failed: {}", e),
}
}
match flow.status {
FlowStatus::Finished | FlowStatus::Error => {
break;
}
_ => {
if start.elapsed() > timeout {
eprintln!("ERROR: Flow did not complete within {:?} (status={:?})", timeout, flow.status);
break;
}
tokio::time::sleep(poll_interval).await;
}
}
}
// 8) Final summary: job statuses
print_header("Final job statuses");
for job_id in &job_ids {
match client.job_load(args.context_id, args.actor_id, *job_id).await {
Ok(job) => {
println!("Job {}: status={:?} result={:?}", job_id, job.status, job.result);
}
Err(e) => {
eprintln!("Job {}: load failed: {}", job_id, e);
}
}
}
// Final status check
let final_flow = client.flow_load(args.context_id, args.flow_id).await?;
print_header("Result");
if final_flow.status == FlowStatus::Finished {
println!("Flow finished successfully.");
Ok(())
} else {
println!("Flow ended with status={:?}", final_flow.status);
std::process::exit(1);
}
}

View File

@@ -0,0 +1,42 @@
//! Error types for the Coordinator client
use thiserror::Error;
/// Result type for coordinator operations
pub type Result<T> = std::result::Result<T, CoordinatorError>;
/// Errors that can occur when interacting with the Coordinator
#[derive(Debug, Error)]
pub enum CoordinatorError {
/// Connection error
#[error("Connection error: {0}")]
Connection(String),
/// RPC error
#[error("RPC error: {0}")]
Rpc(String),
/// Resource already exists
#[error("Resource already exists")]
AlreadyExists,
/// Resource not found
#[error("Not found: {0}")]
NotFound(String),
/// Storage error
#[error("Storage error: {0}")]
Storage(String),
/// DAG error
#[error("DAG error: {0}")]
Dag(String),
/// Timeout error
#[error("Timeout: {0}")]
Timeout(String),
/// Serialization error
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
}

View File

@@ -0,0 +1,327 @@
//! Hero Coordinator Client Library
//!
//! This library provides a Rust client for interacting with the Hero Coordinator JSON-RPC API.
//! It supports creating and managing actors, contexts, runners, jobs, and flows.
//!
//! # Example
//!
//! ```no_run
//! use hero_coordinator_client::{CoordinatorClient, models::*};
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let client = CoordinatorClient::new("http://127.0.0.1:9652")?;
//!
//! // Create an actor
//! let actor = client.actor_create(ActorCreate {
//! id: 11001,
//! pubkey: "demo-pubkey".to_string(),
//! address: vec!["127.0.0.1".parse()?],
//! }).await?;
//!
//! println!("Created actor: {:?}", actor);
//! Ok(())
//! }
//! ```
pub mod models;
pub mod error;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use serde_json::Value;
pub use error::{CoordinatorError, Result};
pub use models::*;
/// Client for interacting with the Hero Coordinator
#[derive(Clone)]
pub struct CoordinatorClient {
client: HttpClient,
url: String,
}
impl CoordinatorClient {
/// Create a new coordinator client
///
/// # Arguments
/// * `url` - The URL of the coordinator (e.g., "http://127.0.0.1:9652")
pub fn new(url: impl Into<String>) -> Result<Self> {
let url = url.into();
let client = HttpClientBuilder::default()
.build(&url)
.map_err(|e| CoordinatorError::Connection(e.to_string()))?;
Ok(Self { client, url })
}
/// Get the coordinator URL
pub fn url(&self) -> &str {
&self.url
}
// ==================== Actor Methods ====================
/// Create a new actor
pub async fn actor_create(&self, actor: ActorCreate) -> Result<Actor> {
let params = serde_json::json!({ "actor": actor });
self.call("actor.create", params).await
}
/// Load an existing actor
pub async fn actor_load(&self, id: u32) -> Result<Actor> {
let params = serde_json::json!({ "id": id });
self.call("actor.load", params).await
}
/// Try to create an actor, or load it if it already exists
pub async fn actor_create_or_load(&self, actor: ActorCreate) -> Result<Actor> {
match self.actor_create(actor.clone()).await {
Ok(a) => Ok(a),
Err(CoordinatorError::AlreadyExists | CoordinatorError::Storage(_)) => {
self.actor_load(actor.id).await
}
Err(e) => Err(e),
}
}
// ==================== Context Methods ====================
/// Create a new context
pub async fn context_create(&self, context: ContextCreate) -> Result<Context> {
let params = serde_json::json!({ "context": context });
self.call("context.create", params).await
}
/// Load an existing context
pub async fn context_load(&self, id: u32) -> Result<Context> {
let params = serde_json::json!({ "id": id });
self.call("context.load", params).await
}
/// Try to create a context, or load it if it already exists
pub async fn context_create_or_load(&self, context: ContextCreate) -> Result<Context> {
match self.context_create(context.clone()).await {
Ok(c) => Ok(c),
Err(CoordinatorError::AlreadyExists | CoordinatorError::Storage(_)) => {
self.context_load(context.id).await
}
Err(e) => Err(e),
}
}
// ==================== Runner Methods ====================
/// Create a new runner in a context
pub async fn runner_create(&self, context_id: u32, runner: RunnerCreate) -> Result<Runner> {
let params = serde_json::json!({
"context_id": context_id,
"runner": runner
});
self.call("runner.create", params).await
}
/// Load an existing runner from a context
pub async fn runner_load(&self, context_id: u32, id: u32) -> Result<Runner> {
let params = serde_json::json!({
"context_id": context_id,
"id": id
});
self.call("runner.load", params).await
}
/// Try to create a runner, or load it if it already exists
pub async fn runner_create_or_load(&self, context_id: u32, runner: RunnerCreate) -> Result<Runner> {
match self.runner_create(context_id, runner.clone()).await {
Ok(r) => Ok(r),
Err(CoordinatorError::AlreadyExists | CoordinatorError::Storage(_)) => {
self.runner_load(context_id, runner.id).await
}
Err(e) => Err(e),
}
}
// ==================== Job Methods ====================
/// Create a new job in a context
pub async fn job_create(&self, context_id: u32, job: JobCreate) -> Result<Job> {
let params = serde_json::json!({
"context_id": context_id,
"job": job
});
self.call("job.create", params).await
}
/// Load an existing job from a context
pub async fn job_load(&self, context_id: u32, caller_id: u32, id: u32) -> Result<Job> {
let params = serde_json::json!({
"context_id": context_id,
"caller_id": caller_id,
"id": id
});
self.call("job.load", params).await
}
/// Try to create a job, or load it if it already exists
pub async fn job_create_or_load(&self, context_id: u32, job: JobCreate) -> Result<Job> {
let caller_id = job.caller_id;
let job_id = job.id;
match self.job_create(context_id, job).await {
Ok(j) => Ok(j),
Err(CoordinatorError::AlreadyExists | CoordinatorError::Storage(_)) => {
self.job_load(context_id, caller_id, job_id).await
}
Err(e) => Err(e),
}
}
// ==================== Flow Methods ====================
/// Create a new flow in a context
pub async fn flow_create(&self, context_id: u32, flow: FlowCreate) -> Result<Flow> {
let params = serde_json::json!({
"context_id": context_id,
"flow": flow
});
self.call("flow.create", params).await
}
/// Load an existing flow from a context
pub async fn flow_load(&self, context_id: u32, id: u32) -> Result<Flow> {
let params = serde_json::json!({
"context_id": context_id,
"id": id
});
self.call("flow.load", params).await
}
/// Try to create a flow, or load it if it already exists
pub async fn flow_create_or_load(&self, context_id: u32, flow: FlowCreate) -> Result<Flow> {
let flow_id = flow.id;
match self.flow_create(context_id, flow).await {
Ok(f) => Ok(f),
Err(CoordinatorError::AlreadyExists | CoordinatorError::Storage(_)) => {
self.flow_load(context_id, flow_id).await
}
Err(e) => Err(e),
}
}
/// Get the DAG representation of a flow
pub async fn flow_dag(&self, context_id: u32, id: u32) -> Result<FlowDag> {
let params = serde_json::json!({
"context_id": context_id,
"id": id
});
self.call("flow.dag", params).await
}
/// Start a flow
///
/// Returns true if the scheduler was started, false if it was already running
pub async fn flow_start(&self, context_id: u32, id: u32) -> Result<bool> {
let params = serde_json::json!({
"context_id": context_id,
"id": id
});
self.call("flow.start", params).await
}
// ==================== Message Methods ====================
/// Create a new message in a context
pub async fn message_create(&self, context_id: u32, message: MessageCreate) -> Result<Message> {
let params = serde_json::json!({
"context_id": context_id,
"message": message
});
self.call("message.create", params).await
}
/// Load an existing message from a context
pub async fn message_load(&self, context_id: u32, id: u32) -> Result<Message> {
let params = serde_json::json!({
"context_id": context_id,
"id": id
});
self.call("message.load", params).await
}
// ==================== Helper Methods ====================
/// Poll a flow until it reaches a terminal state (Finished or Error)
///
/// # Arguments
/// * `context_id` - The context ID
/// * `flow_id` - The flow ID
/// * `poll_interval` - Duration between polls
/// * `timeout` - Maximum duration to wait
///
/// Returns the final flow state or an error if timeout is reached
pub async fn flow_poll_until_complete(
&self,
context_id: u32,
flow_id: u32,
poll_interval: std::time::Duration,
timeout: std::time::Duration,
) -> Result<Flow> {
let start = std::time::Instant::now();
loop {
let flow = self.flow_load(context_id, flow_id).await?;
match flow.status {
FlowStatus::Finished | FlowStatus::Error => {
return Ok(flow);
}
_ => {
if start.elapsed() > timeout {
return Err(CoordinatorError::Timeout(format!(
"Flow {} did not complete within {:?}",
flow_id, timeout
)));
}
tokio::time::sleep(poll_interval).await;
}
}
}
}
// ==================== Internal Methods ====================
async fn call<T: serde::de::DeserializeOwned>(&self, method: &str, params: Value) -> Result<T> {
use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::params::ArrayParams;
let mut array_params = ArrayParams::new();
array_params.insert(params).map_err(|e| CoordinatorError::Rpc(e.to_string()))?;
self.client
.request(method, array_params)
.await
.map_err(|e| {
let err_str = e.to_string();
if err_str.contains("Already exists") {
CoordinatorError::AlreadyExists
} else if err_str.contains("Not Found") || err_str.contains("Key not found") {
CoordinatorError::NotFound(err_str)
} else if err_str.contains("Storage Error") {
CoordinatorError::Storage(err_str)
} else if err_str.contains("DAG") {
CoordinatorError::Dag(err_str)
} else {
CoordinatorError::Rpc(err_str)
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_client_creation() {
let client = CoordinatorClient::new("http://127.0.0.1:9652");
assert!(client.is_ok());
}
}

View File

@@ -0,0 +1,248 @@
//! Data models for the Coordinator client
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::IpAddr;
// Re-export Job types from hero_job
pub use hero_job::{Job, JobStatus};
/// Timestamp type (Unix timestamp in seconds)
pub type Timestamp = u64;
// ==================== Actor ====================
/// Actor representation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Actor {
pub id: u32,
pub pubkey: String,
pub address: Vec<IpAddr>,
pub created_at: Timestamp,
pub updated_at: Timestamp,
}
/// Parameters for creating an actor
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActorCreate {
pub id: u32,
pub pubkey: String,
pub address: Vec<IpAddr>,
}
// ==================== Context ====================
/// Context representation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Context {
/// Redis DB to use
pub id: u32,
/// Actor ids which have admin rights on this context
pub admins: Vec<u32>,
/// Actor ids which can read the context info
pub readers: Vec<u32>,
/// Actor ids which can execute jobs in this context
pub executors: Vec<u32>,
pub created_at: Timestamp,
pub updated_at: Timestamp,
}
/// Parameters for creating a context
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContextCreate {
pub id: u32,
pub admins: Vec<u32>,
pub readers: Vec<u32>,
pub executors: Vec<u32>,
}
// ==================== Runner ====================
/// Runner representation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Runner {
pub id: u32,
/// Mycelium public key
pub pubkey: String,
/// Mycelium address
pub address: IpAddr,
/// Message topic (e.g., "supervisor.rpc")
pub topic: String,
/// Script type this runner supports
pub script_type: ScriptType,
/// If true, the runner also listens on a local redis queue
pub local: bool,
/// Optional secret for authenticated supervisor calls
#[serde(skip_serializing_if = "Option::is_none")]
pub secret: Option<String>,
pub created_at: Timestamp,
pub updated_at: Timestamp,
}
/// Parameters for creating a runner
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunnerCreate {
pub id: u32,
pub pubkey: String,
pub address: IpAddr,
pub topic: String,
pub script_type: ScriptType,
pub local: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub secret: Option<String>,
}
/// Script type supported by a runner
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ScriptType {
Python,
V,
Osis,
Sal,
}
// ==================== Job ====================
/// Parameters for creating a job
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobCreate {
pub id: u32,
pub caller_id: u32,
pub context_id: u32,
pub script: String,
pub script_type: ScriptType,
pub timeout: u64,
#[serde(default)]
pub retries: u8,
#[serde(default)]
pub env_vars: HashMap<String, String>,
#[serde(default)]
pub prerequisites: Vec<u32>,
#[serde(default)]
pub depends: Vec<u32>,
}
// ==================== Flow ====================
/// Flow representation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Flow {
pub id: u32,
pub caller_id: u32,
pub context_id: u32,
pub jobs: Vec<u32>,
pub env_vars: HashMap<String, String>,
pub result: HashMap<String, String>,
pub created_at: Timestamp,
pub updated_at: Timestamp,
pub status: FlowStatus,
}
/// Parameters for creating a flow
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowCreate {
pub id: u32,
pub caller_id: u32,
pub context_id: u32,
pub jobs: Vec<u32>,
#[serde(default)]
pub env_vars: HashMap<String, String>,
}
/// Flow status
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum FlowStatus {
Created,
Dispatched,
Started,
Error,
Finished,
}
/// DAG representation of a flow
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowDag {
pub flow_id: u32,
pub nodes: Vec<DagNode>,
pub edges: Vec<DagEdge>,
}
/// Node in a DAG
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DagNode {
pub job_id: u32,
pub status: JobStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
/// Edge in a DAG (dependency relationship)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DagEdge {
pub from: u32,
pub to: u32,
}
// ==================== Message ====================
/// Message representation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub id: u32,
pub context_id: u32,
pub runner_id: u32,
pub job_id: u32,
pub message_type: MessageType,
pub format: MessageFormatType,
pub payload: String,
pub status: MessageStatus,
pub transport_status: TransportStatus,
pub created_at: Timestamp,
pub updated_at: Timestamp,
}
/// Parameters for creating a message
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageCreate {
pub id: u32,
pub context_id: u32,
pub runner_id: u32,
pub job_id: u32,
pub message_type: MessageType,
pub format: MessageFormatType,
pub payload: String,
}
/// Message type
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MessageType {
JobRun,
JobResult,
JobError,
}
/// Message format
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MessageFormatType {
JsonRpc,
}
/// Message status
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MessageStatus {
Created,
Sent,
Delivered,
Failed,
}
/// Transport status
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum TransportStatus {
Pending,
Sent,
Delivered,
Failed,
}

0
scripts/install.md Normal file
View File

View File

@@ -191,40 +191,37 @@ async fn test_03_job_with_env_vars() {
}
#[tokio::test]
async fn test_04_job_timeout() {
println!("\n🧪 Test: Job Timeout");
async fn test_04_invalid_heroscript() {
println!("\n🧪 Test: Invalid Heroscript Error Handling");
let client = create_client().await;
// Create job with short timeout - use a heroscript that loops forever
let mut job = create_test_job(r#"
for i in 1..1000 {
print("Loop iteration: ${i}")
sleep(100)
}
"#);
job.timeout = 2; // 2 second timeout
// Create job with invalid heroscript syntax
let job = create_test_job("!!invalid.command.that.does.not.exist");
let job_id = job.id.clone();
// Save and queue job
client.store_job_in_redis(&job).await.expect("Failed to save job");
client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job");
// Wait for job to timeout
// Wait for job to complete
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// Check job status - should be error due to timeout
// Check job status - should be error due to invalid command
let status = client.get_status(&job_id).await.expect("Failed to get job status");
println!("📊 Job status: {:?}", status);
// Should have error
if let Some(error) = client.get_error(&job_id).await.expect("Failed to get error") {
println!("❌ Job error (expected timeout):\n{}", error);
assert!(error.contains("timeout") || error.contains("timed out"), "Error should mention timeout");
println!("✅ Job timeout handled correctly");
println!("❌ Job error (expected):\n{}", error);
println!("✅ Invalid heroscript error handled correctly");
} else {
println!("⚠️ Expected timeout error but got none");
panic!("Job should have timed out");
println!("⚠️ Expected error for invalid heroscript but got none");
// Check if there's a result instead
if let Some(result) = client.get_result(&job_id).await.expect("Failed to get result") {
println!("Got result instead: {}", result);
}
panic!("Job with invalid heroscript should have failed");
}
}

215
tests/runner_osiris.rs Normal file
View File

@@ -0,0 +1,215 @@
//! Integration tests for Osiris Runner (OSIS)
//!
//! Tests the osiris runner by spawning the binary and dispatching Rhai jobs to it.
//!
//! **IMPORTANT**: Run with `--test-threads=1` to ensure tests run sequentially:
//! ```
//! cargo test --test runner_osiris -- --test-threads=1
//! ```
use hero_job::{Job, JobBuilder};
use hero_job_client::Client;
use std::sync::{Mutex, Once};
use std::process::Child;
use lazy_static::lazy_static;
/// Test configuration
const RUNNER_ID: &str = "test-osiris-runner";
const REDIS_URL: &str = "redis://localhost:6379";
lazy_static! {
static ref RUNNER_PROCESS: Mutex<Option<Child>> = Mutex::new(None);
}
/// Global initialization flag
static INIT: Once = Once::new();
/// Initialize and start the osiris runner binary
async fn init_runner() {
INIT.call_once(|| {
// Register cleanup handler
let _ = std::panic::catch_unwind(|| {
ctrlc::set_handler(move || {
cleanup_runner();
std::process::exit(0);
}).expect("Error setting Ctrl-C handler");
});
println!("🚀 Starting Osiris runner...");
// Build the runner binary
let build_result = escargot::CargoBuild::new()
.bin("runner_osiris")
.package("runner-osiris")
.current_release()
.run()
.expect("Failed to build runner_osiris");
// Spawn the runner process
let child = build_result
.command()
.arg(RUNNER_ID)
.arg("--redis-url")
.arg(REDIS_URL)
.spawn()
.expect("Failed to spawn osiris runner");
*RUNNER_PROCESS.lock().unwrap() = Some(child);
// Give the runner time to start
std::thread::sleep(std::time::Duration::from_secs(2));
println!("✅ Osiris runner ready");
});
}
/// Cleanup runner process
fn cleanup_runner() {
println!("🧹 Cleaning up osiris runner process...");
if let Some(mut child) = RUNNER_PROCESS.lock().unwrap().take() {
let _ = child.kill();
let _ = child.wait();
}
}
/// Create a test job client
async fn create_client() -> Client {
init_runner().await;
Client::builder()
.redis_url(REDIS_URL)
.build()
.await
.expect("Failed to create job client")
}
/// Helper to create a test job
fn create_test_job(payload: &str) -> Job {
JobBuilder::new()
.caller_id("test-caller")
.context_id("test-context")
.runner(RUNNER_ID)
.payload(payload)
.timeout(30)
.build()
.expect("Failed to build test job")
}
#[tokio::test]
async fn test_01_simple_rhai_script() {
println!("\n🧪 Test: Simple Rhai Script");
let client = create_client().await;
// Create job with simple Rhai script
let job = create_test_job(r#"
let x = 10;
let y = 20;
print("Sum: " + (x + y));
x + y
"#);
// Save and queue job
match client.job_run_wait(&job, RUNNER_ID, 5).await {
Ok(result) => {
println!("✅ Job succeeded with result:\n{}", result);
assert!(result.contains("30") || result.contains("Sum"), "Result should contain calculation");
}
Err(e) => {
println!("❌ Job failed with error: {:?}", e);
panic!("Job execution failed");
}
}
println!("✅ Rhai script job completed");
}
#[tokio::test]
async fn test_02_rhai_with_functions() {
println!("\n🧪 Test: Rhai Script with Functions");
let client = create_client().await;
// Create job with Rhai function
let job = create_test_job(r#"
fn calculate(a, b) {
a * b + 10
}
let result = calculate(5, 3);
print("Result: " + result);
result
"#);
let job_id = job.id.clone();
// Save and queue job
client.store_job_in_redis(&job).await.expect("Failed to save job");
client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job");
// Wait for job to complete
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// Check job status
let status = client.get_status(&job_id).await.expect("Failed to get job status");
println!("📊 Job status: {:?}", status);
// Get result or error
match (client.get_result(&job_id).await, client.get_error(&job_id).await) {
(Ok(Some(result)), _) => {
println!("✅ Job succeeded with result:\n{}", result);
assert!(result.contains("25") || result.contains("Result"), "Result should contain 25");
}
(_, Ok(Some(error))) => {
println!("❌ Job failed with error:\n{}", error);
panic!("Job should have succeeded");
}
_ => {
println!("⚠️ No result or error available");
panic!("Expected result");
}
}
println!("✅ Rhai function job completed");
}
#[tokio::test]
async fn test_03_invalid_rhai_syntax() {
println!("\n🧪 Test: Invalid Rhai Syntax Error Handling");
let client = create_client().await;
// Create job with invalid Rhai syntax
let job = create_test_job("let x = ; // Invalid syntax");
let job_id = job.id.clone();
// Save and queue job
client.store_job_in_redis(&job).await.expect("Failed to save job");
client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job");
// Wait for job to complete
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// Check job status - should be error
let status = client.get_status(&job_id).await.expect("Failed to get job status");
println!("📊 Job status: {:?}", status);
// Should have error
if let Some(error) = client.get_error(&job_id).await.expect("Failed to get error") {
println!("❌ Job error (expected):\n{}", error);
println!("✅ Invalid Rhai syntax error handled correctly");
} else {
println!("⚠️ Expected error for invalid Rhai syntax but got none");
panic!("Job with invalid syntax should have failed");
}
}
/// Final test that ensures cleanup happens
#[tokio::test]
async fn test_zz_cleanup() {
println!("\n🧹 Running cleanup...");
cleanup_runner();
// Wait a bit to ensure process is killed
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
println!("✅ Cleanup complete");
}

268
tests/runner_sal.rs Normal file
View File

@@ -0,0 +1,268 @@
//! Integration tests for SAL Runner
//!
//! Tests the SAL runner by spawning the binary and dispatching Rhai jobs to it.
//!
//! **IMPORTANT**: Run with `--test-threads=1` to ensure tests run sequentially:
//! ```
//! cargo test --test runner_sal -- --test-threads=1
//! ```
use hero_job::{Job, JobBuilder};
use hero_job_client::Client;
use std::sync::{Mutex, Once};
use std::process::Child;
use lazy_static::lazy_static;
/// Test configuration
const RUNNER_ID: &str = "test-sal-runner";
const REDIS_URL: &str = "redis://localhost:6379";
lazy_static! {
static ref RUNNER_PROCESS: Mutex<Option<Child>> = Mutex::new(None);
}
/// Global initialization flag
static INIT: Once = Once::new();
/// Initialize and start the SAL runner binary
async fn init_runner() {
INIT.call_once(|| {
// Register cleanup handler
let _ = std::panic::catch_unwind(|| {
ctrlc::set_handler(move || {
cleanup_runner();
std::process::exit(0);
}).expect("Error setting Ctrl-C handler");
});
println!("🚀 Starting SAL runner...");
// Build the runner binary
let build_result = escargot::CargoBuild::new()
.bin("runner_sal")
.package("runner-sal")
.current_release()
.run()
.expect("Failed to build runner_sal");
// Spawn the runner process
let child = build_result
.command()
.arg(RUNNER_ID)
.arg("--redis-url")
.arg(REDIS_URL)
.arg("--db-path")
.arg("/tmp/test_sal.db")
.spawn()
.expect("Failed to spawn SAL runner");
*RUNNER_PROCESS.lock().unwrap() = Some(child);
// Give the runner time to start
std::thread::sleep(std::time::Duration::from_secs(2));
println!("✅ SAL runner ready");
});
}
/// Cleanup runner process
fn cleanup_runner() {
println!("🧹 Cleaning up SAL runner process...");
if let Some(mut child) = RUNNER_PROCESS.lock().unwrap().take() {
let _ = child.kill();
let _ = child.wait();
}
// Clean up test database
let _ = std::fs::remove_file("/tmp/test_sal.db");
}
/// Create a test job client
async fn create_client() -> Client {
init_runner().await;
Client::builder()
.redis_url(REDIS_URL)
.build()
.await
.expect("Failed to create job client")
}
/// Helper to create a test job
fn create_test_job(payload: &str) -> Job {
JobBuilder::new()
.caller_id("test-caller")
.context_id("test-context")
.runner(RUNNER_ID)
.payload(payload)
.timeout(30)
.build()
.expect("Failed to build test job")
}
#[tokio::test]
async fn test_01_simple_rhai_script() {
println!("\n🧪 Test: Simple Rhai Script");
let client = create_client().await;
// Create job with simple Rhai script
let job = create_test_job(r#"
let message = "Hello from SAL";
print(message);
message
"#);
let job_id = job.id.clone();
// Save and queue job
match client.job_run_wait(&job, RUNNER_ID, 5).await {
Ok(result) => {
println!("✅ Job succeeded with result:\n{}", result);
assert!(result.contains("Hello from SAL") || result.contains("SAL"), "Result should contain message");
}
Err(e) => {
println!("❌ Job failed with error: {:?}", e);
panic!("Job execution failed");
}
}
println!("✅ Rhai script job completed");
}
#[tokio::test]
async fn test_02_rhai_array_operations() {
println!("\n🧪 Test: Rhai Array Operations");
let client = create_client().await;
// Create job with array operations
let job = create_test_job(r#"
let arr = [1, 2, 3, 4, 5];
let sum = 0;
for item in arr {
sum += item;
}
print("Sum: " + sum);
sum
"#);
let job_id = job.id.clone();
// Save and queue job
client.store_job_in_redis(&job).await.expect("Failed to save job");
client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job");
// Wait for job to complete
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// Check job status
let status = client.get_status(&job_id).await.expect("Failed to get job status");
println!("📊 Job status: {:?}", status);
// Get result or error
match (client.get_result(&job_id).await, client.get_error(&job_id).await) {
(Ok(Some(result)), _) => {
println!("✅ Job succeeded with result:\n{}", result);
assert!(result.contains("15") || result.contains("Sum"), "Result should contain sum of 15");
}
(_, Ok(Some(error))) => {
println!("❌ Job failed with error:\n{}", error);
panic!("Job should have succeeded");
}
_ => {
println!("⚠️ No result or error available");
panic!("Expected result");
}
}
println!("✅ Array operations job completed");
}
#[tokio::test]
async fn test_03_rhai_object_operations() {
println!("\n🧪 Test: Rhai Object Operations");
let client = create_client().await;
// Create job with object/map operations
let job = create_test_job(r#"
let obj = #{
name: "Test",
value: 42,
active: true
};
print("Name: " + obj.name);
print("Value: " + obj.value);
obj.value
"#);
let job_id = job.id.clone();
// Save and queue job
client.store_job_in_redis(&job).await.expect("Failed to save job");
client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job");
// Wait for job to complete
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// Check job status
let status = client.get_status(&job_id).await.expect("Failed to get job status");
println!("📊 Job status: {:?}", status);
// Get result or error
match (client.get_result(&job_id).await, client.get_error(&job_id).await) {
(Ok(Some(result)), _) => {
println!("✅ Job succeeded with result:\n{}", result);
assert!(result.contains("42"), "Result should contain value 42");
}
(_, Ok(Some(error))) => {
println!("❌ Job failed with error:\n{}", error);
panic!("Job should have succeeded");
}
_ => {
println!("⚠️ No result or error available");
panic!("Expected result");
}
}
println!("✅ Object operations job completed");
}
#[tokio::test]
async fn test_04_invalid_rhai_syntax() {
println!("\n🧪 Test: Invalid Rhai Syntax Error Handling");
let client = create_client().await;
// Create job with invalid Rhai syntax
let job = create_test_job("fn broken( { // Invalid syntax");
let job_id = job.id.clone();
// Save and queue job
client.store_job_in_redis(&job).await.expect("Failed to save job");
client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job");
// Wait for job to complete
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// Check job status - should be error
let status = client.get_status(&job_id).await.expect("Failed to get job status");
println!("📊 Job status: {:?}", status);
// Should have error
if let Some(error) = client.get_error(&job_id).await.expect("Failed to get error") {
println!("❌ Job error (expected):\n{}", error);
println!("✅ Invalid Rhai syntax error handled correctly");
} else {
println!("⚠️ Expected error for invalid Rhai syntax but got none");
panic!("Job with invalid syntax should have failed");
}
}
/// Final test that ensures cleanup happens
#[tokio::test]
async fn test_zz_cleanup() {
println!("\n🧹 Running cleanup...");
cleanup_runner();
// Wait a bit to ensure process is killed
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
println!("✅ Cleanup complete");
}