From 4b597cc4454f7dbc5915c7a964b2eb301d2f02c4 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Thu, 28 Aug 2025 13:50:59 +0200 Subject: [PATCH] Add calling of supervisor over mycelium Signed-off-by: Lee Smet --- src/lib.rs | 1 + src/main.rs | 13 +++ src/models/runner.rs | 3 + src/models/script_type.rs | 2 +- src/router.rs | 211 ++++++++++++++++++++++++++++++++++++++ src/rpc.rs | 4 + src/service.rs | 37 +++++-- src/storage/redis.rs | 72 ++++++++++++- 8 files changed, 331 insertions(+), 12 deletions(-) create mode 100644 src/router.rs diff --git a/src/lib.rs b/src/lib.rs index daa4755..79c6230 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,3 +5,4 @@ mod time; pub mod dag; pub mod rpc; pub mod clients; +pub mod router; diff --git a/src/main.rs b/src/main.rs index 1c569d3..2a31f9b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -84,10 +84,23 @@ async fn main() { // Initialize Service let service = herocoordinator::service::AppService::new(redis); + let service_for_router = service.clone(); // Shared application state let state = Arc::new(herocoordinator::rpc::AppState::new(service)); + // Start router workers (auto-discovered contexts) + { + let base_url = format!("http://{}:{}", cli.mycelium_ip, cli.mycelium_port); + let cfg = herocoordinator::router::RouterConfig { + context_ids: Vec::new(), // ignored by start_router_auto + concurrency: 32, + base_url, + topic: "supervisor.rpc".to_string(), + }; + let _auto_handle = herocoordinator::router::start_router_auto(service_for_router, cfg); + } + // Build RPC modules for both servers let http_module = herocoordinator::rpc::build_module(state.clone()); let ws_module = herocoordinator::rpc::build_module(state.clone()); diff --git a/src/models/runner.rs b/src/models/runner.rs index 68bb6a7..15515e0 100644 --- a/src/models/runner.rs +++ b/src/models/runner.rs @@ -2,6 +2,7 @@ use std::net::IpAddr; use serde::{Deserialize, Serialize}; +use crate::models::ScriptType; use crate::time::Timestamp; #[derive(Serialize, Deserialize, Clone)] @@ -13,6 +14,8 @@ pub struct Runner { pub address: IpAddr, /// Needs to be set by the runner, usually `runner, + pub concurrency: usize, + pub base_url: String, // e.g. http://127.0.0.1:8990 + pub topic: String, // e.g. "supervisor.rpc" + // secret currently unused (None), add here later if needed +} + +/// Start background router loops, one per context. +/// Each loop: +/// - BRPOP msg_out with 1s timeout +/// - Loads the Message by key, selects a Runner by script_type +/// - Sends supervisor JSON-RPC via Mycelium +/// - On success: Message.status = Acknowledged +/// - On error: Message.status = Error and append a log +pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec> { + let mut handles = Vec::new(); + for ctx_id in cfg.context_ids.clone() { + let service_cloned = service.clone(); + let cfg_cloned = cfg.clone(); + let handle = tokio::spawn(async move { + let sem = Arc::new(Semaphore::new(cfg_cloned.concurrency)); + loop { + // Pop next message key (blocking with timeout) + match service_cloned.brpop_msg_out(ctx_id, 1).await { + Ok(Some(key)) => { + let permit = { + // acquire a concurrency permit (non-fair is fine) + let sem = sem.clone(); + // if semaphore is exhausted, await until a slot becomes available + match sem.acquire_owned().await { + Ok(p) => p, + Err(_) => { + // Semaphore closed; exit loop + break; + } + } + }; + let service_task = service_cloned.clone(); + let cfg_task = cfg_cloned.clone(); + tokio::spawn(async move { + // Ensure permit is dropped at end of task + let _permit = permit; + if let Err(e) = + deliver_one(&service_task, &cfg_task, ctx_id, &key).await + { + eprintln!( + "[router ctx={}] delivery error for {}: {}", + ctx_id, key, e + ); + } + }); + } + Ok(None) => { + // timeout: just tick + continue; + } + Err(e) => { + eprintln!("[router ctx={}] brpop error: {}", ctx_id, e); + // small backoff to avoid busy-loop on persistent errors + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + } + } + } + }); + handles.push(handle); + } + handles +} + +async fn deliver_one( + service: &AppService, + cfg: &RouterConfig, + context_id: u32, + msg_key: &str, +) -> Result<(), Box> { + // Parse "message:{caller_id}:{id}" + let (caller_id, id) = parse_message_key(msg_key) + .ok_or_else(|| format!("invalid message key format: {}", msg_key))?; + + // Load message + let msg: Message = service.load_message(context_id, caller_id, id).await?; + + // Determine routing script_type + let desired: ScriptType = determine_script_type(&msg); + + // Discover runners and select a matching one + let runners = service.scan_runners(context_id).await?; + let Some(runner) = runners.into_iter().find(|r| r.script_type == desired) else { + let log = format!( + "No runner with script_type {:?} available in context {} for message {}", + desired, context_id, msg_key + ); + let _ = service + .append_message_logs(context_id, caller_id, id, vec![log.clone()]) + .await; + let _ = service + .update_message_status(context_id, caller_id, id, MessageStatus::Error) + .await; + return Err(log.into()); + }; + + // Build SupervisorClient + let dest = if !runner.pubkey.trim().is_empty() { + Destination::Pk(runner.pubkey.clone()) + } else { + Destination::Ip(runner.address) + }; + let client = SupervisorClient::new( + cfg.base_url.clone(), + dest, + cfg.topic.clone(), + None, // secret + )?; + + // Build supervisor method and params from Message + let method = msg.message.clone(); + let params = build_params(&msg)?; + + // Send + let _out_id = client.call(&method, params).await?; + + // Mark as acknowledged on success + service + .update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged) + .await?; + + Ok(()) +} + +fn determine_script_type(msg: &Message) -> ScriptType { + // Prefer embedded job's script_type if available, else fallback to message.message_type + match msg.job.first() { + Some(j) => j.script_type.clone(), + None => msg.message_type.clone(), + } +} + +fn build_params(msg: &Message) -> Result> { + // Minimal mapping: + // - "job.run" with exactly one embedded job: [{ "job": }] + // - otherwise: [] + if msg.message == "job.run" + && let Some(j) = msg.job.first() + { + let jv = job_to_json(j)?; + return Ok(json!([ { "job": jv } ])); + } + + Ok(json!([])) +} + +fn job_to_json(job: &Job) -> Result> { + Ok(serde_json::to_value(job)?) +} + +fn parse_message_key(s: &str) -> Option<(u32, u32)> { + // Expect "message:{caller_id}:{id}" + let mut it = s.split(':'); + match (it.next(), it.next(), it.next(), it.next()) { + (Some("message"), Some(caller), Some(id), None) => { + let caller_id = caller.parse::().ok()?; + let msg_id = id.parse::().ok()?; + Some((caller_id, msg_id)) + } + _ => None, + } +} + + +/// Auto-discover contexts periodically and ensure a router loop exists for each. +/// Returns a JoinHandle of the discovery task (router loops are detached). +pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut active: HashSet = HashSet::new(); + loop { + match service.list_context_ids().await { + Ok(ids) => { + for ctx_id in ids { + if !active.contains(&ctx_id) { + // Spawn a loop for this new context + let cfg_ctx = RouterConfig { + context_ids: vec![ctx_id], + ..cfg.clone() + }; + let _ = start_router(service.clone(), cfg_ctx); + active.insert(ctx_id); + eprintln!("[router] started loop for context {}", ctx_id); + } + } + } + Err(e) => { + eprintln!("[router] list_context_ids error: {}", e); + } + } + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + }) +} diff --git a/src/rpc.rs b/src/rpc.rs index 2a35123..314d599 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -144,6 +144,8 @@ pub struct RunnerCreate { pub pubkey: String, pub address: IpAddr, pub topic: String, + /// The script type this runner executes (used for routing) + pub script_type: ScriptType, pub local: bool, } impl RunnerCreate { @@ -155,6 +157,7 @@ impl RunnerCreate { pubkey, address, topic, + script_type, local, } = self; @@ -163,6 +166,7 @@ impl RunnerCreate { pubkey, address, topic, + script_type, local, created_at: ts, updated_at: ts, diff --git a/src/service.rs b/src/service.rs index c2bfb28..1559969 100644 --- a/src/service.rs +++ b/src/service.rs @@ -10,7 +10,7 @@ use serde_json::Value; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use tokio::sync::Mutex; -use tokio::time::{sleep, Duration}; +use tokio::time::{Duration, sleep}; pub type BoxError = Box; @@ -309,6 +309,7 @@ fn validate_message(context_id: u32, msg: &Message) -> Result<(), BoxError> { // Service API // ----------------------------- +#[derive(Clone)] pub struct AppService { redis: Arc, schedulers: Arc>>, @@ -558,15 +559,6 @@ impl AppService { Ok(true) } - /// Start a background scheduler for a flow. - /// - Ticks every 1 second. - /// - Sets Flow status to Started immediately. - /// - Dispatches jobs whose dependencies are Finished: creates a Message and LPUSHes its key into msg_out, - /// and marks the job status to Dispatched. - /// - When all jobs are Finished sets Flow to Finished; if any job is Error sets Flow to Error. - /// Returns: - /// - Ok(true) if a scheduler was started - /// Execute a flow: compute DAG, create Message entries for ready jobs, and enqueue their keys to msg_out. /// Returns the list of enqueued message keys ("message:{caller_id}:{id}") in deterministic order (by job id). pub async fn flow_execute(&self, context_id: u32, flow_id: u32) -> DagResult> { @@ -1075,3 +1067,28 @@ impl AppService { Ok(()) } } + +/// Router/helper wrappers exposed on AppService so background tasks don't need direct Redis access. +impl AppService { + /// Block-pop from the per-context msg_out queue with a timeout (seconds). + /// Returns Some(message_key) like "message:{caller_id}:{id}" or None on timeout. + pub async fn brpop_msg_out( + &self, + context_id: u32, + timeout_secs: usize, + ) -> Result, BoxError> { + self.redis.brpop_msg_out(context_id, timeout_secs).await + } + + /// Scan all runner:* in the given context and return deserialized Runner entries. + pub async fn scan_runners(&self, context_id: u32) -> Result, BoxError> { + self.redis.scan_runners(context_id).await + } +} + +/// Auto-discovery helpers for contexts (wrappers over RedisDriver) +impl AppService { + pub async fn list_context_ids(&self) -> Result, BoxError> { + self.redis.list_context_ids().await + } +} diff --git a/src/storage/redis.rs b/src/storage/redis.rs index 56d0ae9..b251d01 100644 --- a/src/storage/redis.rs +++ b/src/storage/redis.rs @@ -163,7 +163,11 @@ impl RedisDriver { .and_then(|v| v.as_u64()) .ok_or("Context.id missing or not a number")? as u32; let key = Self::context_key(id); - self.hset_model(id, &key, ctx).await + // Write the context hash in its own DB + self.hset_model(id, &key, ctx).await?; + // Register this context id in the global registry (DB 0) + let _ = self.register_context_id(id).await; + Ok(()) } /// Load a Context from its own DB (db index = id) @@ -551,4 +555,70 @@ impl RedisDriver { let key = Self::message_key(caller_id, id); self.lpush_list(db, "msg_out", &key).await } + + /// Block-pop from msg_out with timeout (seconds). Returns the message key if present. + /// Uses BRPOP so that the queue behaves FIFO with LPUSH producer. + pub async fn brpop_msg_out(&self, db: u32, timeout_secs: usize) -> Result> { + let mut cm = self.manager_for_db(db).await?; + // BRPOP returns (list, element) on success + let res: Option<(String, String)> = redis::cmd("BRPOP") + .arg("msg_out") + .arg(timeout_secs) + .query_async(&mut cm) + .await?; + Ok(res.map(|(_, v)| v)) + } + + /// Scan all runner:* keys in this DB and return the deserialized Runner entries. + pub async fn scan_runners(&self, db: u32) -> Result> { + let mut cm = self.manager_for_db(db).await?; + let mut out: Vec = Vec::new(); + let mut cursor: u64 = 0; + loop { + let (next, keys): (u64, Vec) = redis::cmd("SCAN") + .arg(cursor) + .arg("MATCH") + .arg("runner:*") + .arg("COUNT") + .arg(100) + .query_async(&mut cm) + .await?; + for k in keys { + if let Ok(r) = self.hget_model::(db, &k).await { + out.push(r); + } + } + if next == 0 { + break; + } + cursor = next; + } + Ok(out) + } + + // ----------------------------- + // Global registry (DB 0) for Context IDs + // ----------------------------- + + /// Register a context id in the global set "contexts" stored in DB 0. + pub async fn register_context_id(&self, id: u32) -> Result<()> { + let mut cm = self.manager_for_db(0).await?; + let _: i64 = redis::cmd("SADD").arg("contexts").arg(id).query_async(&mut cm).await?; + Ok(()) + } + + /// List all registered context ids from the global set in DB 0. + pub async fn list_context_ids(&self) -> Result> { + let mut cm = self.manager_for_db(0).await?; + // Using SMEMBERS and parsing into u32 + let vals: Vec = redis::cmd("SMEMBERS").arg("contexts").query_async(&mut cm).await?; + let mut out = Vec::with_capacity(vals.len()); + for v in vals { + if let Ok(n) = v.parse::() { + out.push(n); + } + } + out.sort_unstable(); + Ok(out) + } }