use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use serde_json::Value; use tokio::sync::{Mutex, oneshot}; use crate::clients::mycelium_client::MyceliumClient; /// Global hub that: /// - Owns a single MyceliumClient /// - Spawns a background popMessage loop filtered by topic /// - Correlates supervisor JSON-RPC replies by inner id to waiting callers via oneshot channels #[derive(Clone)] pub struct SupervisorHub { mycelium: Arc, topic: String, pending: Arc>>>, id_counter: Arc, } impl SupervisorHub { /// Create a new hub and start the background popMessage task. /// - base_url: Mycelium JSON-RPC endpoint, e.g. "http://127.0.0.1:8990" /// - topic: plain-text topic (e.g., "supervisor.rpc") pub fn new( base_url: impl Into, topic: impl Into, ) -> Result, crate::clients::MyceliumClientError> { let myc = Arc::new(MyceliumClient::new(base_url)?); Ok(Self::new_with_client(myc, topic)) } /// Variant that reuses an existing Mycelium client. pub fn new_with_client(mycelium: Arc, topic: impl Into) -> Arc { let hub = Arc::new(Self { mycelium, topic: topic.into(), pending: Arc::new(Mutex::new(HashMap::new())), id_counter: Arc::new(AtomicU64::new(1)), }); Self::spawn_pop_loop(hub.clone()); hub } fn spawn_pop_loop(hub: Arc) { tokio::spawn(async move { loop { match hub.mycelium.pop_message(Some(false), Some(2), None).await { Ok(Some(inb)) => { // Extract and decode payload let Some(payload_b64) = inb.get("payload").and_then(|v| v.as_str()) else { // Not a payload-bearing message; ignore continue; }; let Ok(raw) = BASE64_STANDARD.decode(payload_b64.as_bytes()) else { tracing::warn!(target: "supervisor_hub", "Failed to decode inbound payload base64"); continue; }; let Ok(rpc): Result = serde_json::from_slice(&raw) else { tracing::warn!(target: "supervisor_hub", "Failed to parse inbound payload JSON"); continue; }; // Extract inner JSON-RPC id let inner_id_u64 = match rpc.get("id") { Some(Value::Number(n)) => n.as_u64(), Some(Value::String(s)) => s.parse::().ok(), _ => None, }; if let Some(inner_id) = inner_id_u64 { // Try to deliver to a pending waiter let sender_opt = { let mut guard = hub.pending.lock().await; guard.remove(&inner_id) }; if let Some(tx) = sender_opt { let _ = tx.send(rpc); } else { tracing::warn!( target: "supervisor_hub", inner_id, payload = %String::from_utf8_lossy(&raw), "Unmatched supervisor reply; no waiter registered" ); } } else { tracing::warn!(target: "supervisor_hub", "Inbound supervisor reply missing id; dropping"); } } Ok(None) => { // No message; continue polling continue; } Err(e) => { tracing::warn!(target: "supervisor_hub", error = %e, "popMessage error; backing off"); tokio::time::sleep(std::time::Duration::from_millis(200)).await; } } } }); } /// Allocate a new inner supervisor JSON-RPC id. pub fn next_id(&self) -> u64 { self.id_counter.fetch_add(1, Ordering::Relaxed) } /// Register a oneshot sender for the given inner id and return the receiver side. pub async fn register_waiter(&self, inner_id: u64) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); let mut guard = self.pending.lock().await; guard.insert(inner_id, tx); rx } /// Remove a pending waiter for a given id (used to cleanup on timeout). pub async fn remove_waiter(&self, inner_id: u64) -> Option> { let mut guard = self.pending.lock().await; guard.remove(&inner_id) } /// Access to underlying Mycelium client (for pushMessage). pub fn mycelium(&self) -> Arc { self.mycelium.clone() } /// Access configured topic. pub fn topic(&self) -> &str { &self.topic } } impl std::fmt::Debug for SupervisorHub { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SupervisorHub") .field("topic", &self.topic) .finish() } }