- Add coordinator client library to workspace - Add installation documentation and heroscript - Add new test runners for Osiris and Sal - Update hero runner test to handle invalid heroscript errors - Update README with installation instructions
Hero Coordinator Client
Rust client library for interacting with the Hero Coordinator JSON-RPC API.
Features
- Actor Management: Create and load actors
- Context Management: Create contexts with admin/reader/executor permissions
- Runner Management: Register runners in contexts
- Job Management: Create jobs with dependencies
- Flow Management: Create and manage job flows (DAGs)
- Flow Polling: Poll flows until completion with timeout support
- Helper Methods:
*_create_or_loadmethods for idempotent operations
Installation
Add to your Cargo.toml:
[dependencies]
hero-coordinator-client = { path = "path/to/lib/clients/coordinator" }
Usage
Basic Example
use hero_coordinator_client::{CoordinatorClient, models::*};
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create client
let client = CoordinatorClient::new("http://127.0.0.1:9652")?;
// Create actor
let actor = client.actor_create(ActorCreate {
id: 11001,
pubkey: "demo-pubkey".to_string(),
address: vec!["127.0.0.1".parse()?],
}).await?;
// Create context
let context = client.context_create(ContextCreate {
id: 2,
admins: vec![11001],
readers: vec![11001],
executors: vec![11001],
}).await?;
// Create runner
let runner = client.runner_create(2, RunnerCreate {
id: 12001,
pubkey: "".to_string(),
address: "127.0.0.1".parse()?,
topic: "supervisor.rpc".to_string(),
script_type: ScriptType::Python,
local: false,
secret: None,
}).await?;
// Create job
let job = client.job_create(2, JobCreate {
id: 20000,
caller_id: 11001,
context_id: 2,
script: "print('Hello from job')".to_string(),
script_type: ScriptType::Python,
timeout: 60,
retries: 0,
env_vars: HashMap::new(),
prerequisites: vec![],
depends: vec![],
}).await?;
// Create flow
let flow = client.flow_create(2, FlowCreate {
id: 13001,
caller_id: 11001,
context_id: 2,
jobs: vec![20000],
env_vars: HashMap::new(),
}).await?;
// Start flow
client.flow_start(2, 13001).await?;
// Poll until completion
let final_flow = client.flow_poll_until_complete(
2,
13001,
std::time::Duration::from_secs(2),
std::time::Duration::from_secs(600),
).await?;
println!("Flow status: {:?}", final_flow.status);
Ok(())
}
Idempotent Operations
Use *_create_or_load methods to handle existing resources:
// Will create if doesn't exist, or load if it does
let actor = client.actor_create_or_load(ActorCreate {
id: 11001,
pubkey: "demo-pubkey".to_string(),
address: vec!["127.0.0.1".parse()?],
}).await?;
Flow with Dependencies
Create a chain of dependent jobs:
// Job 0 (root)
let job0 = client.job_create(2, JobCreate {
id: 20000,
caller_id: 11001,
context_id: 2,
script: "print('Job 0')".to_string(),
script_type: ScriptType::Python,
timeout: 60,
retries: 0,
env_vars: HashMap::new(),
prerequisites: vec![],
depends: vec![],
}).await?;
// Job 1 (depends on Job 0)
let job1 = client.job_create(2, JobCreate {
id: 20001,
caller_id: 11001,
context_id: 2,
script: "print('Job 1')".to_string(),
script_type: ScriptType::Python,
timeout: 60,
retries: 0,
env_vars: HashMap::new(),
prerequisites: vec![],
depends: vec![20000], // Depends on job0
}).await?;
// Create flow with both jobs
let flow = client.flow_create(2, FlowCreate {
id: 13001,
caller_id: 11001,
context_id: 2,
jobs: vec![20000, 20001],
env_vars: HashMap::new(),
}).await?;
Examples
See the examples/ directory for complete examples:
# Run the flow demo
COORDINATOR_URL=http://127.0.0.1:9652 cargo run --example flow_demo -- \
--dst-ip 127.0.0.1 \
--context-id 2 \
--actor-id 11001 \
--runner-id 12001 \
--flow-id 13001 \
--jobs 3
API Methods
Actor
actor_create(actor: ActorCreate) -> Actoractor_load(id: u32) -> Actoractor_create_or_load(actor: ActorCreate) -> Actor
Context
context_create(context: ContextCreate) -> Contextcontext_load(id: u32) -> Contextcontext_create_or_load(context: ContextCreate) -> Context
Runner
runner_create(context_id: u32, runner: RunnerCreate) -> Runnerrunner_load(context_id: u32, id: u32) -> Runnerrunner_create_or_load(context_id: u32, runner: RunnerCreate) -> Runner
Job
job_create(context_id: u32, job: JobCreate) -> Jobjob_load(context_id: u32, caller_id: u32, id: u32) -> Jobjob_create_or_load(context_id: u32, job: JobCreate) -> Job
Flow
flow_create(context_id: u32, flow: FlowCreate) -> Flowflow_load(context_id: u32, id: u32) -> Flowflow_create_or_load(context_id: u32, flow: FlowCreate) -> Flowflow_dag(context_id: u32, id: u32) -> FlowDagflow_start(context_id: u32, id: u32) -> boolflow_poll_until_complete(context_id, flow_id, poll_interval, timeout) -> Flow
Message
message_create(context_id: u32, message: MessageCreate) -> Messagemessage_load(context_id: u32, id: u32) -> Message
Error Handling
The client uses a custom CoordinatorError type:
use hero_coordinator_client::error::CoordinatorError;
match client.actor_create(actor).await {
Ok(actor) => println!("Created: {:?}", actor),
Err(CoordinatorError::AlreadyExists) => {
// Resource already exists, load it instead
let actor = client.actor_load(actor.id).await?;
}
Err(e) => return Err(e.into()),
}
License
MIT OR Apache-2.0