Add calling of supervisor over mycelium

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
Lee Smet
2025-08-28 13:50:59 +02:00
parent cf06c7fa36
commit 4b597cc445
8 changed files with 331 additions and 12 deletions

View File

@@ -5,3 +5,4 @@ mod time;
pub mod dag; pub mod dag;
pub mod rpc; pub mod rpc;
pub mod clients; pub mod clients;
pub mod router;

View File

@@ -84,10 +84,23 @@ async fn main() {
// Initialize Service // Initialize Service
let service = herocoordinator::service::AppService::new(redis); let service = herocoordinator::service::AppService::new(redis);
let service_for_router = service.clone();
// Shared application state // Shared application state
let state = Arc::new(herocoordinator::rpc::AppState::new(service)); let state = Arc::new(herocoordinator::rpc::AppState::new(service));
// Start router workers (auto-discovered contexts)
{
let base_url = format!("http://{}:{}", cli.mycelium_ip, cli.mycelium_port);
let cfg = herocoordinator::router::RouterConfig {
context_ids: Vec::new(), // ignored by start_router_auto
concurrency: 32,
base_url,
topic: "supervisor.rpc".to_string(),
};
let _auto_handle = herocoordinator::router::start_router_auto(service_for_router, cfg);
}
// Build RPC modules for both servers // Build RPC modules for both servers
let http_module = herocoordinator::rpc::build_module(state.clone()); let http_module = herocoordinator::rpc::build_module(state.clone());
let ws_module = herocoordinator::rpc::build_module(state.clone()); let ws_module = herocoordinator::rpc::build_module(state.clone());

View File

@@ -2,6 +2,7 @@ use std::net::IpAddr;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::models::ScriptType;
use crate::time::Timestamp; use crate::time::Timestamp;
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
@@ -13,6 +14,8 @@ pub struct Runner {
pub address: IpAddr, pub address: IpAddr,
/// Needs to be set by the runner, usually `runner<runnerid` /// Needs to be set by the runner, usually `runner<runnerid`
pub topic: String, pub topic: String,
/// The script type this runner can execute; used for routing
pub script_type: ScriptType,
/// If this is true, the runner also listens on a local redis queue /// If this is true, the runner also listens on a local redis queue
pub local: bool, pub local: bool,
pub created_at: Timestamp, pub created_at: Timestamp,

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub enum ScriptType { pub enum ScriptType {
Osis, Osis,
Sal, Sal,

211
src/router.rs Normal file
View File

@@ -0,0 +1,211 @@
use std::{collections::HashSet, sync::Arc};
use serde_json::{Value, json};
use tokio::sync::Semaphore;
use crate::{
clients::{Destination, SupervisorClient},
models::{Job, Message, MessageStatus, ScriptType},
service::AppService,
};
#[derive(Clone, Debug)]
pub struct RouterConfig {
pub context_ids: Vec<u32>,
pub concurrency: usize,
pub base_url: String, // e.g. http://127.0.0.1:8990
pub topic: String, // e.g. "supervisor.rpc"
// secret currently unused (None), add here later if needed
}
/// Start background router loops, one per context.
/// Each loop:
/// - BRPOP msg_out with 1s timeout
/// - Loads the Message by key, selects a Runner by script_type
/// - Sends supervisor JSON-RPC via Mycelium
/// - On success: Message.status = Acknowledged
/// - On error: Message.status = Error and append a log
pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec<tokio::task::JoinHandle<()>> {
let mut handles = Vec::new();
for ctx_id in cfg.context_ids.clone() {
let service_cloned = service.clone();
let cfg_cloned = cfg.clone();
let handle = tokio::spawn(async move {
let sem = Arc::new(Semaphore::new(cfg_cloned.concurrency));
loop {
// Pop next message key (blocking with timeout)
match service_cloned.brpop_msg_out(ctx_id, 1).await {
Ok(Some(key)) => {
let permit = {
// acquire a concurrency permit (non-fair is fine)
let sem = sem.clone();
// if semaphore is exhausted, await until a slot becomes available
match sem.acquire_owned().await {
Ok(p) => p,
Err(_) => {
// Semaphore closed; exit loop
break;
}
}
};
let service_task = service_cloned.clone();
let cfg_task = cfg_cloned.clone();
tokio::spawn(async move {
// Ensure permit is dropped at end of task
let _permit = permit;
if let Err(e) =
deliver_one(&service_task, &cfg_task, ctx_id, &key).await
{
eprintln!(
"[router ctx={}] delivery error for {}: {}",
ctx_id, key, e
);
}
});
}
Ok(None) => {
// timeout: just tick
continue;
}
Err(e) => {
eprintln!("[router ctx={}] brpop error: {}", ctx_id, e);
// small backoff to avoid busy-loop on persistent errors
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
}
}
});
handles.push(handle);
}
handles
}
async fn deliver_one(
service: &AppService,
cfg: &RouterConfig,
context_id: u32,
msg_key: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Parse "message:{caller_id}:{id}"
let (caller_id, id) = parse_message_key(msg_key)
.ok_or_else(|| format!("invalid message key format: {}", msg_key))?;
// Load message
let msg: Message = service.load_message(context_id, caller_id, id).await?;
// Determine routing script_type
let desired: ScriptType = determine_script_type(&msg);
// Discover runners and select a matching one
let runners = service.scan_runners(context_id).await?;
let Some(runner) = runners.into_iter().find(|r| r.script_type == desired) else {
let log = format!(
"No runner with script_type {:?} available in context {} for message {}",
desired, context_id, msg_key
);
let _ = service
.append_message_logs(context_id, caller_id, id, vec![log.clone()])
.await;
let _ = service
.update_message_status(context_id, caller_id, id, MessageStatus::Error)
.await;
return Err(log.into());
};
// Build SupervisorClient
let dest = if !runner.pubkey.trim().is_empty() {
Destination::Pk(runner.pubkey.clone())
} else {
Destination::Ip(runner.address)
};
let client = SupervisorClient::new(
cfg.base_url.clone(),
dest,
cfg.topic.clone(),
None, // secret
)?;
// Build supervisor method and params from Message
let method = msg.message.clone();
let params = build_params(&msg)?;
// Send
let _out_id = client.call(&method, params).await?;
// Mark as acknowledged on success
service
.update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged)
.await?;
Ok(())
}
fn determine_script_type(msg: &Message) -> ScriptType {
// Prefer embedded job's script_type if available, else fallback to message.message_type
match msg.job.first() {
Some(j) => j.script_type.clone(),
None => msg.message_type.clone(),
}
}
fn build_params(msg: &Message) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
// Minimal mapping:
// - "job.run" with exactly one embedded job: [{ "job": <job> }]
// - otherwise: []
if msg.message == "job.run"
&& let Some(j) = msg.job.first()
{
let jv = job_to_json(j)?;
return Ok(json!([ { "job": jv } ]));
}
Ok(json!([]))
}
fn job_to_json(job: &Job) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
Ok(serde_json::to_value(job)?)
}
fn parse_message_key(s: &str) -> Option<(u32, u32)> {
// Expect "message:{caller_id}:{id}"
let mut it = s.split(':');
match (it.next(), it.next(), it.next(), it.next()) {
(Some("message"), Some(caller), Some(id), None) => {
let caller_id = caller.parse::<u32>().ok()?;
let msg_id = id.parse::<u32>().ok()?;
Some((caller_id, msg_id))
}
_ => None,
}
}
/// Auto-discover contexts periodically and ensure a router loop exists for each.
/// Returns a JoinHandle of the discovery task (router loops are detached).
pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut active: HashSet<u32> = HashSet::new();
loop {
match service.list_context_ids().await {
Ok(ids) => {
for ctx_id in ids {
if !active.contains(&ctx_id) {
// Spawn a loop for this new context
let cfg_ctx = RouterConfig {
context_ids: vec![ctx_id],
..cfg.clone()
};
let _ = start_router(service.clone(), cfg_ctx);
active.insert(ctx_id);
eprintln!("[router] started loop for context {}", ctx_id);
}
}
}
Err(e) => {
eprintln!("[router] list_context_ids error: {}", e);
}
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
})
}

View File

@@ -144,6 +144,8 @@ pub struct RunnerCreate {
pub pubkey: String, pub pubkey: String,
pub address: IpAddr, pub address: IpAddr,
pub topic: String, pub topic: String,
/// The script type this runner executes (used for routing)
pub script_type: ScriptType,
pub local: bool, pub local: bool,
} }
impl RunnerCreate { impl RunnerCreate {
@@ -155,6 +157,7 @@ impl RunnerCreate {
pubkey, pubkey,
address, address,
topic, topic,
script_type,
local, local,
} = self; } = self;
@@ -163,6 +166,7 @@ impl RunnerCreate {
pubkey, pubkey,
address, address,
topic, topic,
script_type,
local, local,
created_at: ts, created_at: ts,
updated_at: ts, updated_at: ts,

View File

@@ -10,7 +10,7 @@ use serde_json::Value;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::time::{sleep, Duration}; use tokio::time::{Duration, sleep};
pub type BoxError = Box<dyn std::error::Error + Send + Sync>; pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
@@ -309,6 +309,7 @@ fn validate_message(context_id: u32, msg: &Message) -> Result<(), BoxError> {
// Service API // Service API
// ----------------------------- // -----------------------------
#[derive(Clone)]
pub struct AppService { pub struct AppService {
redis: Arc<RedisDriver>, redis: Arc<RedisDriver>,
schedulers: Arc<Mutex<HashSet<(u32, u32)>>>, schedulers: Arc<Mutex<HashSet<(u32, u32)>>>,
@@ -558,15 +559,6 @@ impl AppService {
Ok(true) Ok(true)
} }
/// Start a background scheduler for a flow.
/// - Ticks every 1 second.
/// - Sets Flow status to Started immediately.
/// - Dispatches jobs whose dependencies are Finished: creates a Message and LPUSHes its key into msg_out,
/// and marks the job status to Dispatched.
/// - When all jobs are Finished sets Flow to Finished; if any job is Error sets Flow to Error.
/// Returns:
/// - Ok(true) if a scheduler was started
/// Execute a flow: compute DAG, create Message entries for ready jobs, and enqueue their keys to msg_out. /// Execute a flow: compute DAG, create Message entries for ready jobs, and enqueue their keys to msg_out.
/// Returns the list of enqueued message keys ("message:{caller_id}:{id}") in deterministic order (by job id). /// Returns the list of enqueued message keys ("message:{caller_id}:{id}") in deterministic order (by job id).
pub async fn flow_execute(&self, context_id: u32, flow_id: u32) -> DagResult<Vec<String>> { pub async fn flow_execute(&self, context_id: u32, flow_id: u32) -> DagResult<Vec<String>> {
@@ -1075,3 +1067,28 @@ impl AppService {
Ok(()) Ok(())
} }
} }
/// Router/helper wrappers exposed on AppService so background tasks don't need direct Redis access.
impl AppService {
/// Block-pop from the per-context msg_out queue with a timeout (seconds).
/// Returns Some(message_key) like "message:{caller_id}:{id}" or None on timeout.
pub async fn brpop_msg_out(
&self,
context_id: u32,
timeout_secs: usize,
) -> Result<Option<String>, BoxError> {
self.redis.brpop_msg_out(context_id, timeout_secs).await
}
/// Scan all runner:* in the given context and return deserialized Runner entries.
pub async fn scan_runners(&self, context_id: u32) -> Result<Vec<Runner>, BoxError> {
self.redis.scan_runners(context_id).await
}
}
/// Auto-discovery helpers for contexts (wrappers over RedisDriver)
impl AppService {
pub async fn list_context_ids(&self) -> Result<Vec<u32>, BoxError> {
self.redis.list_context_ids().await
}
}

View File

@@ -163,7 +163,11 @@ impl RedisDriver {
.and_then(|v| v.as_u64()) .and_then(|v| v.as_u64())
.ok_or("Context.id missing or not a number")? as u32; .ok_or("Context.id missing or not a number")? as u32;
let key = Self::context_key(id); let key = Self::context_key(id);
self.hset_model(id, &key, ctx).await // Write the context hash in its own DB
self.hset_model(id, &key, ctx).await?;
// Register this context id in the global registry (DB 0)
let _ = self.register_context_id(id).await;
Ok(())
} }
/// Load a Context from its own DB (db index = id) /// Load a Context from its own DB (db index = id)
@@ -551,4 +555,70 @@ impl RedisDriver {
let key = Self::message_key(caller_id, id); let key = Self::message_key(caller_id, id);
self.lpush_list(db, "msg_out", &key).await self.lpush_list(db, "msg_out", &key).await
} }
/// Block-pop from msg_out with timeout (seconds). Returns the message key if present.
/// Uses BRPOP so that the queue behaves FIFO with LPUSH producer.
pub async fn brpop_msg_out(&self, db: u32, timeout_secs: usize) -> Result<Option<String>> {
let mut cm = self.manager_for_db(db).await?;
// BRPOP returns (list, element) on success
let res: Option<(String, String)> = redis::cmd("BRPOP")
.arg("msg_out")
.arg(timeout_secs)
.query_async(&mut cm)
.await?;
Ok(res.map(|(_, v)| v))
}
/// Scan all runner:* keys in this DB and return the deserialized Runner entries.
pub async fn scan_runners(&self, db: u32) -> Result<Vec<Runner>> {
let mut cm = self.manager_for_db(db).await?;
let mut out: Vec<Runner> = Vec::new();
let mut cursor: u64 = 0;
loop {
let (next, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("runner:*")
.arg("COUNT")
.arg(100)
.query_async(&mut cm)
.await?;
for k in keys {
if let Ok(r) = self.hget_model::<Runner>(db, &k).await {
out.push(r);
}
}
if next == 0 {
break;
}
cursor = next;
}
Ok(out)
}
// -----------------------------
// Global registry (DB 0) for Context IDs
// -----------------------------
/// Register a context id in the global set "contexts" stored in DB 0.
pub async fn register_context_id(&self, id: u32) -> Result<()> {
let mut cm = self.manager_for_db(0).await?;
let _: i64 = redis::cmd("SADD").arg("contexts").arg(id).query_async(&mut cm).await?;
Ok(())
}
/// List all registered context ids from the global set in DB 0.
pub async fn list_context_ids(&self) -> Result<Vec<u32>> {
let mut cm = self.manager_for_db(0).await?;
// Using SMEMBERS and parsing into u32
let vals: Vec<String> = redis::cmd("SMEMBERS").arg("contexts").query_async(&mut cm).await?;
let mut out = Vec::with_capacity(vals.len());
for v in vals {
if let Ok(n) = v.parse::<u32>() {
out.push(n);
}
}
out.sort_unstable();
Ok(out)
}
} }