From 512c99db54a6f02a18582703822a334357acba27 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Mon, 8 Sep 2025 11:37:22 +0200 Subject: [PATCH] Improve jsonrpc client to properly route replies Signed-off-by: Lee Smet --- src/clients/mod.rs | 2 + src/clients/supervisor_client.rs | 434 +++++++++++------- src/clients/supervisor_hub.rs | 143 ++++++ src/main.rs | 10 +- src/router.rs | 747 +++++++++++-------------------- 5 files changed, 683 insertions(+), 653 deletions(-) create mode 100644 src/clients/supervisor_hub.rs diff --git a/src/clients/mod.rs b/src/clients/mod.rs index 2eddc06..e33ff23 100644 --- a/src/clients/mod.rs +++ b/src/clients/mod.rs @@ -1,7 +1,9 @@ pub mod mycelium_client; pub mod supervisor_client; +pub mod supervisor_hub; pub mod types; pub use mycelium_client::{MyceliumClient, MyceliumClientError}; pub use supervisor_client::{SupervisorClient, SupervisorClientError}; +pub use supervisor_hub::SupervisorHub; pub use types::Destination; diff --git a/src/clients/supervisor_client.rs b/src/clients/supervisor_client.rs index a5daa07..1f9f9ee 100644 --- a/src/clients/supervisor_client.rs +++ b/src/clients/supervisor_client.rs @@ -1,20 +1,20 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use serde_json::{Value, json}; use thiserror::Error; +use tokio::time::timeout; -use crate::clients::{Destination, MyceliumClient, MyceliumClientError}; +use crate::clients::{Destination, MyceliumClient, MyceliumClientError, SupervisorHub}; #[derive(Clone)] pub struct SupervisorClient { - mycelium: Arc, // Delegated Mycelium transport - destination: Destination, // ip or pk - topic: String, // e.g. "supervisor.rpc" - secret: Option, // optional, required by several supervisor methods - id_counter: Arc, // JSON-RPC id generator (for inner supervisor requests) + hub: Arc, // Global hub with background pop loop and shared id generator + destination: Destination, // ip or pk + secret: Option, // optional, required by several supervisor methods } #[derive(Debug, Error)] @@ -46,24 +46,22 @@ impl From for SupervisorClientError { } impl SupervisorClient { - /// Preferred constructor: provide a shared Mycelium client. - pub fn new_with_client( - mycelium: Arc, + /// Preferred constructor using a shared SupervisorHub (single global listener). + pub fn new_with_hub( + hub: Arc, destination: Destination, - topic: impl Into, secret: Option, ) -> Self { Self { - mycelium, + hub, destination, - topic: topic.into(), secret, - id_counter: Arc::new(AtomicU64::new(1)), } } - /// Backward-compatible constructor that builds a Mycelium client from base_url. - /// base_url defaults to Mycelium spec "http://127.0.0.1:8990" if empty. + /// Backward-compatible constructor that builds a new Hub from base_url/topic. + /// NOTE: This spawns a background popMessage listener for the given topic. + /// Prefer `new_with_hub` so the process has a single global hub. pub fn new( base_url: impl Into, destination: Destination, @@ -78,8 +76,16 @@ impl SupervisorClient { Ok(Self::new_with_client(mycelium, destination, topic, secret)) } - fn next_id(&self) -> u64 { - self.id_counter.fetch_add(1, Ordering::Relaxed) + /// Backward-compatible constructor that reuses an existing Mycelium client. + /// NOTE: This creates a new hub and its own background listener. Prefer `new_with_hub`. + pub fn new_with_client( + mycelium: Arc, + destination: Destination, + topic: impl Into, + secret: Option, + ) -> Self { + let hub = SupervisorHub::new_with_client(mycelium, topic); + Self::new_with_hub(hub, destination, secret) } /// Internal helper used by tests to inspect dst JSON shape. @@ -93,7 +99,7 @@ impl SupervisorClient { fn build_supervisor_payload(&self, method: &str, params: Value) -> Value { json!({ "jsonrpc": "2.0", - "id": self.next_id(), + "id": self.hub.next_id(), "method": method, "params": params, }) @@ -128,50 +134,37 @@ impl SupervisorClient { .map(|s| s.to_string()) } - /// Generic call: build supervisor JSON-RPC message, send via Mycelium pushMessage, return outbound message id (hex). - pub async fn call(&self, method: &str, params: Value) -> Result { - let inner = self.build_supervisor_payload(method, params); - let payload_b64 = Self::encode_payload(&inner)?; - let result = self - .mycelium - .push_message( - &self.destination, - &Self::encode_topic(self.topic.as_bytes()), - &payload_b64, - None, - ) - .await?; - - if let Some(id) = MyceliumClient::extract_message_id_from_result(&result) { - return Ok(id); - } - // Some servers might return the oneOf wrapped, handle len==1 array defensively (not in spec but resilient) - if let Some(arr) = result.as_array() - && arr.len() == 1 - && let Some(id) = MyceliumClient::extract_message_id_from_result(&arr[0]) - { - return Ok(id); - } - Err(SupervisorClientError::InvalidResponse(format!( - "result did not contain message id: {result}" - ))) + fn need_secret(&self) -> Result<&str, SupervisorClientError> { + self.secret + .as_deref() + .ok_or(SupervisorClientError::MissingSecret) } - /// Variant of call that also returns the inner supervisor JSON-RPC id used in the payload. - /// This id is required to correlate asynchronous popMessage replies coming from Mycelium. - pub async fn call_with_ids( + // ----------------------------- + // Core: request-reply call via Hub with default 10s timeout + // ----------------------------- + + /// Send a supervisor JSON-RPC request and await its reply via the Hub. + /// Returns (outbound_message_id, reply_envelope_json). + pub async fn call_with_reply_timeout( &self, method: &str, params: Value, - ) -> Result<(String, u64), SupervisorClientError> { - let inner_id = self.next_id(); + timeout_secs: u64, + ) -> Result<(String, Value), SupervisorClientError> { + let inner_id = self.hub.next_id(); + // Register waiter before sending to avoid race + let rx = self.hub.register_waiter(inner_id).await; + let inner = self.build_supervisor_payload_with_id(method, params, inner_id); let payload_b64 = Self::encode_payload(&inner)?; + let result = self - .mycelium + .hub + .mycelium() .push_message( &self.destination, - &Self::encode_topic(self.topic.as_bytes()), + &Self::encode_topic(self.hub.topic().as_bytes()), &payload_b64, None, ) @@ -185,29 +178,210 @@ impl SupervisorClient { { id } else { + // Clean pending entry to avoid leak + let _ = self.hub.remove_waiter(inner_id).await; return Err(SupervisorClientError::InvalidResponse(format!( "result did not contain message id: {result}" ))); }; - Ok((out_id, inner_id)) + let d = Duration::from_secs(timeout_secs); + match timeout(d, rx).await { + Ok(Ok(reply)) => Ok((out_id, reply)), + Ok(Err(_canceled)) => Err(SupervisorClientError::InvalidResponse( + "oneshot canceled before receiving reply".into(), + )), + Err(_elapsed) => { + // Cleanup on timeout + let _ = self.hub.remove_waiter(inner_id).await; + Err(SupervisorClientError::TransportTimeout) + } + } } + /// Send and await with default 10s timeout. + pub async fn call_with_reply( + &self, + method: &str, + params: Value, + ) -> Result<(String, Value), SupervisorClientError> { + self.call_with_reply_timeout(method, params, 60).await + } - fn need_secret(&self) -> Result<&str, SupervisorClientError> { - self.secret - .as_deref() - .ok_or(SupervisorClientError::MissingSecret) + /// Back-compat: Send and await a reply but return only the outbound id (discard reply). + /// This keeps existing call sites working while the system migrates to reply-aware paths. + pub async fn call(&self, method: &str, params: Value) -> Result { + let (out_id, _reply) = self.call_with_reply(method, params).await?; + Ok(out_id) } // ----------------------------- - // Typed wrappers for Supervisor API - // Asynchronous-only: returns outbound message id + // Typed wrappers for Supervisor API (await replies) // ----------------------------- // Runners + pub async fn list_runners_wait(&self) -> Result<(String, Value), SupervisorClientError> { + self.call_with_reply("list_runners", json!([])).await + } + + pub async fn register_runner_wait( + &self, + name: impl Into, + queue: impl Into, + ) -> Result<(String, Value), SupervisorClientError> { + let secret = self.need_secret()?; + let params = json!([{ + "secret": secret, + "name": name.into(), + "queue": queue.into() + }]); + self.call_with_reply("register_runner", params).await + } + + pub async fn remove_runner_wait( + &self, + actor_id: impl Into, + ) -> Result<(String, Value), SupervisorClientError> { + self.call_with_reply("remove_runner", json!([actor_id.into()])) + .await + } + + pub async fn start_runner_wait( + &self, + actor_id: impl Into, + ) -> Result<(String, Value), SupervisorClientError> { + self.call_with_reply("start_runner", json!([actor_id.into()])) + .await + } + + pub async fn stop_runner_wait( + &self, + actor_id: impl Into, + force: bool, + ) -> Result<(String, Value), SupervisorClientError> { + self.call_with_reply("stop_runner", json!([actor_id.into(), force])) + .await + } + + pub async fn get_runner_status_wait( + &self, + actor_id: impl Into, + ) -> Result<(String, Value), SupervisorClientError> { + self.call_with_reply("get_runner_status", json!([actor_id.into()])) + .await + } + + pub async fn get_all_runner_status_wait( + &self, + ) -> Result<(String, Value), SupervisorClientError> { + self.call_with_reply("get_all_runner_status", json!([])) + .await + } + + pub async fn start_all_wait(&self) -> Result<(String, Value), SupervisorClientError> { + self.call_with_reply("start_all", json!([])).await + } + + pub async fn stop_all_wait( + &self, + force: bool, + ) -> Result<(String, Value), SupervisorClientError> { + self.call_with_reply("stop_all", json!([force])).await + } + + pub async fn get_all_status_wait(&self) -> Result<(String, Value), SupervisorClientError> { + self.call_with_reply("get_all_status", json!([])).await + } + + // Jobs (await) + pub async fn jobs_create_wait( + &self, + job: Value, + ) -> Result<(String, Value), SupervisorClientError> { + let secret = self.need_secret()?; + let params = json!([{ + "secret": secret, + "job": job + }]); + self.call_with_reply("jobs.create", params).await + } + + pub async fn jobs_list_wait(&self) -> Result<(String, Value), SupervisorClientError> { + self.call_with_reply("jobs.list", json!([])).await + } + + pub async fn job_run_wait(&self, job: Value) -> Result<(String, Value), SupervisorClientError> { + let secret = self.need_secret()?; + let params = json!([{ + "secret": secret, + "job": job + }]); + self.call_with_reply("job.run", params).await + } + + pub async fn job_start_wait( + &self, + job_id: impl Into, + ) -> Result<(String, Value), SupervisorClientError> { + let secret = self.need_secret()?; + let params = json!([{ + "secret": secret, + "job_id": job_id.into() + }]); + self.call_with_reply("job.start", params).await + } + + pub async fn job_status_wait( + &self, + job_id: impl Into, + ) -> Result<(String, Value), SupervisorClientError> { + self.call_with_reply("job.status", json!([job_id.into()])) + .await + } + + pub async fn job_result_wait( + &self, + job_id: impl Into, + ) -> Result<(String, Value), SupervisorClientError> { + self.call_with_reply("job.result", json!([job_id.into()])) + .await + } + + pub async fn job_stop_wait( + &self, + job_id: impl Into, + ) -> Result<(String, Value), SupervisorClientError> { + let secret = self.need_secret()?; + let params = json!([{ + "secret": secret, + "job_id": job_id.into() + }]); + self.call_with_reply("job.stop", params).await + } + + pub async fn job_delete_wait( + &self, + job_id: impl Into, + ) -> Result<(String, Value), SupervisorClientError> { + let secret = self.need_secret()?; + let params = json!([{ + "secret": secret, + "job_id": job_id.into() + }]); + self.call_with_reply("job.delete", params).await + } + + pub async fn rpc_discover_wait(&self) -> Result<(String, Value), SupervisorClientError> { + self.call_with_reply("rpc.discover", json!([])).await + } + + // ----------------------------- + // Backward-compatible variants returning only outbound id (discarding reply) + // ----------------------------- + pub async fn list_runners(&self) -> Result { - self.call("list_runners", json!([])).await + let (id, _) = self.list_runners_wait().await?; + Ok(id) } pub async fn register_runner( @@ -215,27 +389,24 @@ impl SupervisorClient { name: impl Into, queue: impl Into, ) -> Result { - let secret = self.need_secret()?; - let params = json!([{ - "secret": secret, - "name": name.into(), - "queue": queue.into() - }]); - self.call("register_runner", params).await + let (id, _) = self.register_runner_wait(name, queue).await?; + Ok(id) } pub async fn remove_runner( &self, actor_id: impl Into, ) -> Result { - self.call("remove_runner", json!([actor_id.into()])).await + let (id, _) = self.remove_runner_wait(actor_id).await?; + Ok(id) } pub async fn start_runner( &self, actor_id: impl Into, ) -> Result { - self.call("start_runner", json!([actor_id.into()])).await + let (id, _) = self.start_runner_wait(actor_id).await?; + Ok(id) } pub async fn stop_runner( @@ -243,141 +414,96 @@ impl SupervisorClient { actor_id: impl Into, force: bool, ) -> Result { - self.call("stop_runner", json!([actor_id.into(), force])) - .await + let (id, _) = self.stop_runner_wait(actor_id, force).await?; + Ok(id) } pub async fn get_runner_status( &self, actor_id: impl Into, ) -> Result { - self.call("get_runner_status", json!([actor_id.into()])) - .await + let (id, _) = self.get_runner_status_wait(actor_id).await?; + Ok(id) } pub async fn get_all_runner_status(&self) -> Result { - self.call("get_all_runner_status", json!([])).await + let (id, _) = self.get_all_runner_status_wait().await?; + Ok(id) } pub async fn start_all(&self) -> Result { - self.call("start_all", json!([])).await + let (id, _) = self.start_all_wait().await?; + Ok(id) } pub async fn stop_all(&self, force: bool) -> Result { - self.call("stop_all", json!([force])).await + let (id, _) = self.stop_all_wait(force).await?; + Ok(id) } pub async fn get_all_status(&self) -> Result { - self.call("get_all_status", json!([])).await + let (id, _) = self.get_all_status_wait().await?; + Ok(id) } - // Jobs pub async fn jobs_create(&self, job: Value) -> Result { - let secret = self.need_secret()?; - let params = json!([{ - "secret": secret, - "job": job - }]); - self.call("jobs.create", params).await + let (id, _) = self.jobs_create_wait(job).await?; + Ok(id) } pub async fn jobs_list(&self) -> Result { - self.call("jobs.list", json!([])).await + let (id, _) = self.jobs_list_wait().await?; + Ok(id) } pub async fn job_run(&self, job: Value) -> Result { - let secret = self.need_secret()?; - let params = json!([{ - "secret": secret, - "job": job - }]); - self.call("job.run", params).await - } - - /// Typed wrapper returning both outbound Mycelium id and inner supervisor JSON-RPC id. - pub async fn job_run_with_ids( - &self, - job: Value, - ) -> Result<(String, u64), SupervisorClientError> { - let secret = self.need_secret()?; - let params = json!([{ - "secret": secret, - "job": job - }]); - self.call_with_ids("job.run", params).await + let (id, _) = self.job_run_wait(job).await?; + Ok(id) } pub async fn job_start( &self, job_id: impl Into, ) -> Result { - let secret = self.need_secret()?; - let params = json!([{ - "secret": secret, - "job_id": job_id.into() - }]); - self.call("job.start", params).await + let (id, _) = self.job_start_wait(job_id).await?; + Ok(id) } pub async fn job_status( &self, job_id: impl Into, ) -> Result { - self.call("job.status", json!([job_id.into()])).await + let (id, _) = self.job_status_wait(job_id).await?; + Ok(id) } - /// Asynchronous job.status returning outbound and inner IDs for correlation - pub async fn job_status_with_ids( - &self, - job_id: impl Into, - ) -> Result<(String, u64), SupervisorClientError> { - self.call_with_ids("job.status", json!([job_id.into()])).await - } - - pub async fn job_result( &self, job_id: impl Into, ) -> Result { - self.call("job.result", json!([job_id.into()])).await + let (id, _) = self.job_result_wait(job_id).await?; + Ok(id) } - /// Asynchronous job.result returning outbound and inner IDs for correlation - pub async fn job_result_with_ids( - &self, - job_id: impl Into, - ) -> Result<(String, u64), SupervisorClientError> { - self.call_with_ids("job.result", json!([job_id.into()])).await - } - - pub async fn job_stop( &self, job_id: impl Into, ) -> Result { - let secret = self.need_secret()?; - let params = json!([{ - "secret": secret, - "job_id": job_id.into() - }]); - self.call("job.stop", params).await + let (id, _) = self.job_stop_wait(job_id).await?; + Ok(id) } pub async fn job_delete( &self, job_id: impl Into, ) -> Result { - let secret = self.need_secret()?; - let params = json!([{ - "secret": secret, - "job_id": job_id.into() - }]); - self.call("job.delete", params).await + let (id, _) = self.job_delete_wait(job_id).await?; + Ok(id) } - // Discovery pub async fn rpc_discover(&self) -> Result { - self.call("rpc.discover", json!([])).await + let (id, _) = self.rpc_discover_wait().await?; + Ok(id) } } @@ -390,27 +516,27 @@ mod tests { use std::net::IpAddr; fn mk_client() -> SupervisorClient { - // Uses the legacy constructor but will not issue real network calls in these tests. - SupervisorClient::new( - "http://127.0.0.1:8990", + // Build a hub but it won't issue real network calls in these serializer-only tests. + let mycelium = Arc::new(MyceliumClient::new("http://127.0.0.1:8990").unwrap()); + let hub = SupervisorHub::new_with_client(mycelium, "supervisor.rpc"); + SupervisorClient::new_with_hub( + hub, Destination::Pk( "bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32".to_string(), ), - "supervisor.rpc", Some("secret".to_string()), ) - .unwrap() } #[test] fn builds_dst_ip_and_pk() { - let c_ip = SupervisorClient::new( - "http://127.0.0.1:8990", + let mycelium = Arc::new(MyceliumClient::new("http://127.0.0.1:8990").unwrap()); + let hub_ip = SupervisorHub::new_with_client(mycelium.clone(), "supervisor.rpc"); + let c_ip = SupervisorClient::new_with_hub( + hub_ip, Destination::Ip("2001:db8::1".parse().unwrap()), - "supervisor.rpc", None, - ) - .unwrap(); + ); let v_ip = c_ip.build_dst(); assert_eq!(v_ip.get("ip").unwrap().as_str().unwrap(), "2001:db8::1"); diff --git a/src/clients/supervisor_hub.rs b/src/clients/supervisor_hub.rs new file mode 100644 index 0000000..daee7b4 --- /dev/null +++ b/src/clients/supervisor_hub.rs @@ -0,0 +1,143 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use base64::Engine; +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; +use serde_json::Value; +use tokio::sync::{Mutex, oneshot}; + +use crate::clients::mycelium_client::MyceliumClient; + +/// Global hub that: +/// - Owns a single MyceliumClient +/// - Spawns a background popMessage loop filtered by topic +/// - Correlates supervisor JSON-RPC replies by inner id to waiting callers via oneshot channels +#[derive(Clone)] +pub struct SupervisorHub { + mycelium: Arc, + topic: String, + pending: Arc>>>, + id_counter: Arc, +} + +impl SupervisorHub { + /// Create a new hub and start the background popMessage task. + /// - base_url: Mycelium JSON-RPC endpoint, e.g. "http://127.0.0.1:8990" + /// - topic: plain-text topic (e.g., "supervisor.rpc") + pub fn new( + base_url: impl Into, + topic: impl Into, + ) -> Result, crate::clients::MyceliumClientError> { + let myc = Arc::new(MyceliumClient::new(base_url)?); + Ok(Self::new_with_client(myc, topic)) + } + + /// Variant that reuses an existing Mycelium client. + pub fn new_with_client(mycelium: Arc, topic: impl Into) -> Arc { + let hub = Arc::new(Self { + mycelium, + topic: topic.into(), + pending: Arc::new(Mutex::new(HashMap::new())), + id_counter: Arc::new(AtomicU64::new(1)), + }); + Self::spawn_pop_loop(hub.clone()); + hub + } + + fn spawn_pop_loop(hub: Arc) { + tokio::spawn(async move { + loop { + match hub.mycelium.pop_message(Some(false), Some(2), None).await { + Ok(Some(inb)) => { + // Extract and decode payload + let Some(payload_b64) = inb.get("payload").and_then(|v| v.as_str()) else { + // Not a payload-bearing message; ignore + continue; + }; + let Ok(raw) = BASE64_STANDARD.decode(payload_b64.as_bytes()) else { + tracing::warn!(target: "supervisor_hub", "Failed to decode inbound payload base64"); + continue; + }; + let Ok(rpc): Result = serde_json::from_slice(&raw) else { + tracing::warn!(target: "supervisor_hub", "Failed to parse inbound payload JSON"); + continue; + }; + + // Extract inner JSON-RPC id + let inner_id_u64 = match rpc.get("id") { + Some(Value::Number(n)) => n.as_u64(), + Some(Value::String(s)) => s.parse::().ok(), + _ => None, + }; + + if let Some(inner_id) = inner_id_u64 { + // Try to deliver to a pending waiter + let sender_opt = { + let mut guard = hub.pending.lock().await; + guard.remove(&inner_id) + }; + if let Some(tx) = sender_opt { + let _ = tx.send(rpc); + } else { + tracing::warn!( + target: "supervisor_hub", + inner_id, + payload = %String::from_utf8_lossy(&raw), + "Unmatched supervisor reply; no waiter registered" + ); + } + } else { + tracing::warn!(target: "supervisor_hub", "Inbound supervisor reply missing id; dropping"); + } + } + Ok(None) => { + // No message; continue polling + continue; + } + Err(e) => { + tracing::warn!(target: "supervisor_hub", error = %e, "popMessage error; backing off"); + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + } + } + } + }); + } + + /// Allocate a new inner supervisor JSON-RPC id. + pub fn next_id(&self) -> u64 { + self.id_counter.fetch_add(1, Ordering::Relaxed) + } + + /// Register a oneshot sender for the given inner id and return the receiver side. + pub async fn register_waiter(&self, inner_id: u64) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + let mut guard = self.pending.lock().await; + guard.insert(inner_id, tx); + rx + } + + /// Remove a pending waiter for a given id (used to cleanup on timeout). + pub async fn remove_waiter(&self, inner_id: u64) -> Option> { + let mut guard = self.pending.lock().await; + guard.remove(&inner_id) + } + + /// Access to underlying Mycelium client (for pushMessage). + pub fn mycelium(&self) -> Arc { + self.mycelium.clone() + } + + /// Access configured topic. + pub fn topic(&self) -> &str { + &self.topic + } +} + +impl std::fmt::Debug for SupervisorHub { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SupervisorHub") + .field("topic", &self.topic) + .finish() + } +} diff --git a/src/main.rs b/src/main.rs index 7eab6d2..ab32472 100644 --- a/src/main.rs +++ b/src/main.rs @@ -99,21 +99,21 @@ async fn main() { // Shared application state let state = Arc::new(herocoordinator::rpc::AppState::new(service)); - // Start router workers (auto-discovered contexts) and a single global inbound listener + // Start router workers (auto-discovered contexts) using a single global SupervisorHub (no separate inbound listener) { let base_url = format!("http://{}:{}", cli.mycelium_ip, cli.mycelium_port); + let hub = herocoordinator::clients::SupervisorHub::new(base_url.clone(), "supervisor.rpc".to_string()) + .expect("Failed to initialize SupervisorHub"); let cfg = herocoordinator::router::RouterConfig { context_ids: Vec::new(), // ignored by start_router_auto concurrency: 32, base_url, topic: "supervisor.rpc".to_string(), + sup_hub: hub.clone(), transport_poll_interval_secs: 2, transport_poll_timeout_secs: 300, }; - // Global inbound listener for supervisor replies via Mycelium popMessage - let _inbound_handle = - herocoordinator::router::start_inbound_listener(service_for_router.clone(), cfg.clone()); - // Per-context outbound delivery loops + // Per-context outbound delivery loops (replies handled by SupervisorHub) let _auto_handle = herocoordinator::router::start_router_auto(service_for_router, cfg); } diff --git a/src/router.rs b/src/router.rs index 7446d58..272e50e 100644 --- a/src/router.rs +++ b/src/router.rs @@ -8,7 +8,7 @@ use std::hash::{Hash, Hasher}; use std::collections::hash_map::DefaultHasher; use crate::{ - clients::{Destination, MyceliumClient, SupervisorClient}, + clients::{Destination, MyceliumClient, SupervisorClient, SupervisorHub}, models::{Job, JobStatus, Message, MessageStatus, ScriptType, TransportStatus}, service::AppService, }; @@ -20,6 +20,7 @@ pub struct RouterConfig { pub concurrency: usize, pub base_url: String, // e.g. http://127.0.0.1:8990 pub topic: String, // e.g. "supervisor.rpc" + pub sup_hub: Arc, // global supervisor hub for replies // Transport status polling configuration pub transport_poll_interval_secs: u64, // e.g. 2 pub transport_poll_timeout_secs: u64, // e.g. 300 (5 minutes) @@ -75,7 +76,7 @@ impl SupervisorClientCache { async fn get_or_create( &self, - mycelium: Arc, + hub: Arc, dest: Destination, topic: String, secret: Option, @@ -95,10 +96,9 @@ impl SupervisorClientCache { tracing::debug!(target: "router", cache="supervisor", hit=true, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache lookup (double-checked)"); return existing.clone(); } - let client = Arc::new(SupervisorClient::new_with_client( - mycelium, + let client = Arc::new(SupervisorClient::new_with_hub( + hub, dest, - topic.clone(), secret.clone(), )); guard.insert(key, client.clone()); @@ -122,16 +122,9 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec break Arc::new(c), - Err(e) => { - error!(context_id=ctx_id, error=%e, "MyceliumClient init error"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - } - }; + // Use the global SupervisorHub and its Mycelium client + let sup_hub = cfg_cloned.sup_hub.clone(); + let mycelium = sup_hub.mycelium(); let cache = Arc::new(SupervisorClientCache::new()); @@ -156,11 +149,12 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec, + sup_hub: Arc, cache: Arc, ) -> Result<(), Box> { // Parse "message:{caller_id}:{id}" @@ -233,7 +228,7 @@ async fn deliver_one( let secret_for_poller = runner.secret.clone(); let client = cache .get_or_create( - mycelium.clone(), + sup_hub.clone(), dest.clone(), cfg.topic.clone(), runner.secret.clone(), @@ -247,15 +242,15 @@ async fn deliver_one( // Send // If this is a job.run and we have a secret configured on the client, // prefer the typed wrapper that injects the secret into inner supervisor params, - // and also capture the inner supervisor JSON-RPC id for correlation. - let (out_id, inner_id_opt) = if method == "job.run" { + // and await the reply to capture job_queued immediately. + let (out_id, reply_opt) = if method == "job.run" { if let Some(j) = msg.job.first() { let jv = job_to_json(j)?; - // Returns (outbound message id, inner supervisor JSON-RPC id) - let (out, inner) = client.job_run_with_ids(jv).await?; - (out, Some(inner)) + // Returns (outbound message id, reply envelope) + let (out, reply) = client.job_run_wait(jv).await?; + (out, Some(reply)) } else { - // Fallback: no embedded job, use the generic call + // Fallback: no embedded job, use the generic call (await reply, discard) let out = client.call(&method, params).await?; (out, None) } @@ -280,13 +275,59 @@ async fn deliver_one( .update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged) .await?; - // Record correlation (inner supervisor JSON-RPC id -> job/message) for inbound popMessage handling - if let (Some(inner_id), Some(job_id)) = (inner_id_opt, job_id_opt) { - let _ = service - .supcorr_set(inner_id, context_id, caller_id, job_id, id) - .await; + // If we got a job.run reply, interpret job_queued immediately + if let (Some(reply), Some(job_id)) = (reply_opt, msg.job.first().map(|j| j.id)) { + let result_opt = reply.get("result"); + let error_opt = reply.get("error"); + + // Handle job.run success (job_queued) + let is_job_queued = result_opt + .and_then(|res| { + if res.get("job_queued").is_some() { + Some(true) + } else if let Some(s) = res.as_str() { + Some(s == "job_queued") + } else { + None + } + }) + .unwrap_or(false); + + if is_job_queued { + let _ = service + .update_job_status_unchecked(context_id, caller_id, job_id, JobStatus::Dispatched) + .await; + let _ = service + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Supervisor reply for job {}: job_queued (processed synchronously)", + job_id + )], + ) + .await; + } else if let Some(err_obj) = error_opt { + let _ = service + .update_job_status_unchecked(context_id, caller_id, job_id, JobStatus::Error) + .await; + let _ = service + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Supervisor error for job {}: {} (processed synchronously)", + job_id, err_obj + )], + ) + .await; + } } + // No correlation map needed; replies are handled synchronously via SupervisorHub + // Spawn transport-status poller { let service_poll = service.clone(); @@ -365,28 +406,109 @@ async fn deliver_one( if job.result.is_empty() { let sup = cache .get_or_create( - client.clone(), + sup_hub.clone(), sup_dest.clone(), sup_topic.clone(), secret_for_poller.clone(), ) .await; - match sup.job_result_with_ids(job_id.to_string()).await { - Ok((_out2, inner2)) => { - let _ = service_poll - .supcorr_set(inner2, context_id, caller_id, job_id, id) - .await; - let _ = service_poll - .append_message_logs( - context_id, - caller_id, - id, - vec![format!( - "Requested supervisor job.result for job {} (local terminal w/ empty result)", - job_id - )], - ) - .await; + match sup.job_result_wait(job_id.to_string()).await { + Ok((_out2, reply2)) => { + // Interpret reply synchronously: success/error/bare string + let res = reply2.get("result"); + if let Some(obj) = res.and_then(|v| v.as_object()) { + if let Some(s) = obj.get("success").and_then(|v| v.as_str()) { + let mut patch = std::collections::HashMap::new(); + patch.insert("success".to_string(), s.to_string()); + let _ = service_poll + .update_job_result_merge_unchecked( + context_id, caller_id, job_id, patch, + ) + .await; + let _ = service_poll + .update_message_status( + context_id, + caller_id, + id, + MessageStatus::Processed, + ) + .await; + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Stored supervisor job.result for job {} (success, sync)", + job_id + )], + ) + .await; + } else if let Some(s) = obj.get("error").and_then(|v| v.as_str()) { + let mut patch = std::collections::HashMap::new(); + patch.insert("error".to_string(), s.to_string()); + let _ = service_poll + .update_job_result_merge_unchecked( + context_id, caller_id, job_id, patch, + ) + .await; + let _ = service_poll + .update_message_status( + context_id, + caller_id, + id, + MessageStatus::Processed, + ) + .await; + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Stored supervisor job.result for job {} (error, sync)", + job_id + )], + ) + .await; + } + } else if let Some(s) = res.and_then(|v| v.as_str()) { + let mut patch = std::collections::HashMap::new(); + patch.insert("success".to_string(), s.to_string()); + let _ = service_poll + .update_job_result_merge_unchecked( + context_id, caller_id, job_id, patch, + ) + .await; + let _ = service_poll + .update_message_status( + context_id, + caller_id, + id, + MessageStatus::Processed, + ) + .await; + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Stored supervisor job.result for job {} (success, sync)", + job_id + )], + ) + .await; + } else { + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec!["Supervisor job.result reply missing recognizable fields".to_string()], + ) + .await; + } } Err(e) => { let _ = service_poll @@ -421,31 +543,93 @@ async fn deliver_one( _ => { let sup = cache .get_or_create( - client.clone(), + sup_hub.clone(), sup_dest.clone(), sup_topic.clone(), secret_for_poller.clone(), ) .await; - match sup.job_status_with_ids(job_id.to_string()).await { - Ok((_out_id, inner_id)) => { - // Correlate this status request to the message/job - let _ = service_poll - .supcorr_set( - inner_id, context_id, caller_id, job_id, id, - ) - .await; - let _ = service_poll - .append_message_logs( - context_id, - caller_id, - id, - vec![format!( - "Requested supervisor job.status for job {}", - job_id - )], - ) - .await; + match sup.job_status_wait(job_id.to_string()).await { + Ok((_out_id, reply_status)) => { + // Interpret status reply synchronously + let result_opt = reply_status.get("result"); + let error_opt = reply_status.get("error"); + if let Some(err_obj) = error_opt { + let _ = service_poll + .update_job_status_unchecked( + context_id, caller_id, job_id, JobStatus::Error, + ) + .await; + let _ = service_poll + .append_message_logs( + context_id, caller_id, id, + vec![format!( + "Supervisor error for job {}: {} (sync)", + job_id, err_obj + )], + ) + .await; + } else if let Some(res) = result_opt { + let status_candidate = res + .get("status") + .and_then(|v| v.as_str()) + .or_else(|| res.as_str()); + if let Some(remote_status) = status_candidate { + if let Some((mapped, terminal)) = + map_supervisor_job_status(remote_status) + { + let _ = service_poll + .update_job_status_unchecked( + context_id, caller_id, job_id, mapped.clone(), + ) + .await; + let _ = service_poll + .append_message_logs( + context_id, caller_id, id, + vec![format!( + "Supervisor job.status for job {} -> {} (mapped to {:?}, sync)", + job_id, remote_status, mapped + )], + ) + .await; + + // If terminal, request job.result now (handled above for local terminal case) + if terminal { + // trigger job.result only if result empty to avoid spam + if let Ok(j_after) = service_poll.load_job(context_id, caller_id, job_id).await { + if j_after.result.is_empty() { + let sup2 = cache + .get_or_create( + sup_hub.clone(), + sup_dest.clone(), + sup_topic.clone(), + secret_for_poller.clone(), + ) + .await; + let _ = sup2.job_result_wait(job_id.to_string()).await + .and_then(|(_oid, reply2)| { + // Minimal parse and store + let res2 = reply2.get("result"); + if let Some(obj) = res2.and_then(|v| v.as_object()) { + if let Some(s) = obj.get("success").and_then(|v| v.as_str()) { + let mut patch = std::collections::HashMap::new(); + patch.insert("success".to_string(), s.to_string()); + tokio::spawn({ + let service_poll = service_poll.clone(); + async move { + let _ = service_poll.update_job_result_merge_unchecked(context_id, caller_id, job_id, patch).await; + } + }); + } + } + Ok((String::new(), Value::Null)) + }); + } + } + } + } + } + } } Err(e) => { let _ = service_poll @@ -465,26 +649,21 @@ async fn deliver_one( Err(_) => { let sup = cache .get_or_create( - client.clone(), + sup_hub.clone(), sup_dest.clone(), sup_topic.clone(), secret_for_poller.clone(), ) .await; - match sup.job_status_with_ids(job_id.to_string()).await { - Ok((_out_id, inner_id)) => { - let _ = service_poll - .supcorr_set( - inner_id, context_id, caller_id, job_id, id, - ) - .await; + match sup.job_status_wait(job_id.to_string()).await { + Ok((_out_id, _reply_status)) => { let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( - "Requested supervisor job.status for job {} (fallback; load_job failed)", + "Requested supervisor job.status for job {} (fallback; load_job failed, sync)", job_id )], ) @@ -624,423 +803,3 @@ pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task: }) } -/// Start a single global inbound listener that reads Mycelium popMessage with topic filter, -/// decodes supervisor JSON-RPC replies, and updates correlated jobs/messages. -/// This listens for async replies like {"result":{"job_queued":...}} carrying the same inner JSON-RPC id. -pub fn start_inbound_listener( - service: AppService, - cfg: RouterConfig, -) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move { - // Initialize Mycelium client (retry loop) - let mycelium = loop { - match MyceliumClient::new(cfg.base_url.clone()) { - Ok(c) => break Arc::new(c), - Err(e) => { - error!(error=%e, "MyceliumClient init error (inbound listener)"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - } - }; - - let cache = Arc::new(SupervisorClientCache::new()); - - loop { - // Poll for inbound supervisor messages on the configured topic - match mycelium.pop_message(Some(false), Some(20), None).await { - Ok(Some(inb)) => { - // Expect InboundMessage with base64 "payload" - let Some(payload_b64) = inb.get("payload").and_then(|v| v.as_str()) else { - // Not a payload-bearing message; ignore - continue; - }; - let Ok(raw) = BASE64_STANDARD.decode(payload_b64.as_bytes()) else { - let _ = service - .append_message_logs( - 0, // unknown context yet - 0, - 0, - vec![ - "Inbound payload base64 decode error (supervisor reply)".into(), - ], - ) - .await; - continue; - }; - tracing::info!( - raw = %String::from_utf8_lossy(&raw), - "Read raw messge from mycelium" - ); - let Ok(rpc): Result = serde_json::from_slice(&raw) else { - // Invalid JSON payload - continue; - }; - - // Extract inner supervisor JSON-RPC id (number preferred; string fallback) - let inner_id_u64 = match rpc.get("id") { - Some(Value::Number(n)) => n.as_u64(), - Some(Value::String(s)) => s.parse::().ok(), - _ => None, - }; - let Some(inner_id) = inner_id_u64 else { - // Cannot correlate without id - continue; - }; - - // Lookup correlation mapping - match service.supcorr_get(inner_id).await { - Ok(Some((context_id, caller_id, job_id, message_id))) => { - // Determine success/error from supervisor JSON-RPC envelope - // Inspect result/error to route job.run/job.status/job.result replies - let result_opt = rpc.get("result"); - let error_opt = rpc.get("error"); - - // Handle job.run success (job_queued) - let is_job_queued = result_opt - .and_then(|res| { - if res.get("job_queued").is_some() { - Some(true) - } else if let Some(s) = res.as_str() { - Some(s == "job_queued") - } else { - None - } - }) - .unwrap_or(false); - - if is_job_queued { - // Set to Dispatched (idempotent) per spec, and append log - let _ = service - .update_job_status_unchecked( - context_id, - caller_id, - job_id, - JobStatus::Dispatched, - ) - .await; - let _ = service - .append_message_logs( - context_id, - caller_id, - message_id, - vec![format!( - "Supervisor reply for job {}: job_queued", - job_id - )], - ) - .await; - let _ = service.supcorr_del(inner_id).await; - continue; - } - - // Error envelope: set job Error and log - if let Some(err_obj) = error_opt { - let _ = service - .update_job_status_unchecked( - context_id, - caller_id, - job_id, - JobStatus::Error, - ) - .await; - let _ = service - .append_message_logs( - context_id, - caller_id, - message_id, - vec![format!( - "Supervisor error for job {}: {}", - job_id, err_obj - )], - ) - .await; - let _ = service.supcorr_del(inner_id).await; - continue; - } - - // If we have a result, try to interpret it as job.status or job.result - if let Some(res) = result_opt { - // Try job.status: object {status: "..."} or bare string - let status_candidate = res - .get("status") - .and_then(|v| v.as_str()) - .or_else(|| res.as_str()); - - if let Some(remote_status) = status_candidate { - if let Some((mapped, terminal)) = - map_supervisor_job_status(remote_status) - { - // Update job status and log - let _ = service - .update_job_status_unchecked( - context_id, - caller_id, - job_id, - mapped.clone(), - ) - .await; - let _ = service - .append_message_logs( - context_id, - caller_id, - message_id, - vec![format!( - "Supervisor job.status for job {} -> {} (mapped to {:?})", - job_id, remote_status, mapped - )], - ) - .await; - // Done with this correlation id - let _ = service.supcorr_del(inner_id).await; - - // If terminal, request job.result asynchronously now - if terminal { - // Load job to determine script_type for runner selection - match service - .load_job(context_id, caller_id, job_id) - .await - { - Ok(job) => { - match service.scan_runners(context_id).await { - Ok(runners) => { - if let Some(runner) = - runners.into_iter().find(|r| { - r.script_type == job.script_type - }) - { - let dest = if !runner - .pubkey - .trim() - .is_empty() - { - Destination::Pk( - runner.pubkey.clone(), - ) - } else { - Destination::Ip(runner.address) - }; - let sup = cache - .get_or_create( - mycelium.clone(), - dest, - cfg.topic.clone(), - runner.secret.clone(), - ) - .await; - match sup - .job_result_with_ids( - job_id.to_string(), - ) - .await - { - Ok((_out2, inner2)) => { - let _ = service - .supcorr_set( - inner2, context_id, - caller_id, job_id, - message_id, - ) - .await; - let _ = service - .append_message_logs( - context_id, - caller_id, - message_id, - vec![format!( - "Requested supervisor job.result for job {}", - job_id - )], - ) - .await; - } - Err(e) => { - let _ = service - .append_message_logs( - context_id, - caller_id, - message_id, - vec![format!( - "job.result request error for job {}: {}", - job_id, e - )], - ) - .await; - } - } - } else { - let _ = service - .append_message_logs( - context_id, - caller_id, - message_id, - vec![format!( - "No runner with matching script_type found to request job.result for job {}", - job_id - )], - ) - .await; - } - } - Err(e) => { - let _ = service - .append_message_logs( - context_id, - caller_id, - message_id, - vec![format!( - "scan_runners error while requesting job.result for job {}: {}", - job_id, e - )], - ) - .await; - } - } - } - Err(e) => { - let _ = service - .append_message_logs( - context_id, - caller_id, - message_id, - vec![format!( - "load_job error while requesting job.result for job {}: {}", - job_id, e - )], - ) - .await; - } - } - } - continue; - } - } - - // Try job.result: object with success/error or bare string treated as success - if let Some(obj) = res.as_object() { - if let Some(s) = obj.get("success").and_then(|v| v.as_str()) { - let mut patch = std::collections::HashMap::new(); - patch.insert("success".to_string(), s.to_string()); - let _ = service - .update_job_result_merge_unchecked( - context_id, caller_id, job_id, patch, - ) - .await; - let _ = service - .update_message_status( - context_id, - caller_id, - message_id, - MessageStatus::Processed, - ) - .await; - let _ = service - .append_message_logs( - context_id, - caller_id, - message_id, - vec![format!( - "Stored supervisor job.result for job {} (success)", - job_id - )], - ) - .await; - let _ = service.supcorr_del(inner_id).await; - continue; - } - if let Some(s) = obj.get("error").and_then(|v| v.as_str()) { - let mut patch = std::collections::HashMap::new(); - patch.insert("error".to_string(), s.to_string()); - let _ = service - .update_job_result_merge_unchecked( - context_id, caller_id, job_id, patch, - ) - .await; - let _ = service - .update_message_status( - context_id, - caller_id, - message_id, - MessageStatus::Processed, - ) - .await; - let _ = service - .append_message_logs( - context_id, - caller_id, - message_id, - vec![format!( - "Stored supervisor job.result for job {} (error)", - job_id - )], - ) - .await; - let _ = service.supcorr_del(inner_id).await; - continue; - } - } else if let Some(s) = res.as_str() { - // Bare string => treat as success - let mut patch = std::collections::HashMap::new(); - patch.insert("success".to_string(), s.to_string()); - let _ = service - .update_job_result_merge_unchecked( - context_id, caller_id, job_id, patch, - ) - .await; - let _ = service - .update_message_status( - context_id, - caller_id, - message_id, - MessageStatus::Processed, - ) - .await; - let _ = service - .append_message_logs( - context_id, - caller_id, - message_id, - vec![format!( - "Stored supervisor job.result for job {} (success)", - job_id - )], - ) - .await; - let _ = service.supcorr_del(inner_id).await; - continue; - } - } - - // Unknown/unsupported supervisor reply; keep correlation for later - let _ = service - .append_message_logs( - context_id, - caller_id, - message_id, - vec![ - "Supervisor reply did not contain recognizable job.run/status/result fields" - .to_string(), - ], - ) - .await; - } - Ok(None) => { - // No correlation found; ignore or log once - } - Err(e) => { - error!(error=%e, "supcorr_get error"); - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - } - } - } - Ok(None) => { - // No message; continue polling - continue; - } - Err(e) => { - error!(error=%e, "popMessage error"); - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - } - } - } - }) -}