use std::{ collections::{HashMap, HashSet}, sync::Arc, }; use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use serde_json::{Value, json}; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; use tokio::sync::{Mutex, Semaphore}; use crate::{ models::{Job, JobStatus, Message, MessageStatus, TransportStatus}, service::AppService, }; use hero_supervisor_openrpc_client::{ SupervisorClient, transports::{Destination, MyceliumClient, MyceliumTransport, SupervisorHub}, }; use tracing::{error, info}; #[derive(Clone)] pub struct RouterConfig { pub context_ids: Vec, pub concurrency: usize, pub base_url: String, // e.g. http://127.0.0.1:8990 pub topic: String, // e.g. "supervisor.rpc" pub sup_hub: Arc, // global supervisor hub for replies // Transport status polling configuration pub transport_poll_interval_secs: u64, // e.g. 2 pub transport_poll_timeout_secs: u64, // e.g. 300 (5 minutes) } /* SupervisorClient reuse cache (Router-local): Rationale: - SupervisorClient maintains an internal JSON-RPC id_counter per instance. - Rebuilding a client for each message resets this counter, causing inner JSON-RPC ids to restart at 1. - We reuse one SupervisorClient per (destination, topic, secret) to preserve monotonically increasing ids. Scope: - Cache is per Router loop (and a separate one for the inbound listener). - If cross-loop/process reuse becomes necessary later, promote to a process-global cache. Keying: - Key: destination + topic + secret-presence (secret content hashed; not stored in plaintext). Concurrency: - tokio::Mutex protects a HashMap>. - Values are Arc so call sites clone cheaply and share the same id_counter. */ #[derive(Clone)] struct SupervisorClientCache { map: Arc>>>>, } impl SupervisorClientCache { fn new() -> Self { Self { map: Arc::new(Mutex::new(HashMap::new())), } } fn make_key(dest: &Destination, topic: &str, secret: &Option) -> String { let dst = match dest { Destination::Ip(ip) => format!("ip:{ip}"), Destination::Pk(pk) => format!("pk:{pk}"), }; // Hash the secret to avoid storing plaintext in keys while still differentiating values let sec_hash = match secret { Some(s) if !s.is_empty() => { let mut hasher = DefaultHasher::new(); s.hash(&mut hasher); format!("s:{}", hasher.finish()) } _ => "s:none".to_string(), }; format!("{dst}|t:{topic}|{sec_hash}") } async fn get_or_create( &self, hub: Arc, dest: Destination, topic: String, secret: Option, ) -> Arc> { let key = Self::make_key(&dest, &topic, &secret); { let guard = self.map.lock().await; if let Some(existing) = guard.get(&key) { tracing::debug!(target: "router", cache="supervisor", hit=true, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache lookup"); return existing.clone(); } } let mut guard = self.map.lock().await; if let Some(existing) = guard.get(&key) { tracing::debug!(target: "router", cache="supervisor", hit=true, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache lookup (double-checked)"); return existing.clone(); } let transport = MyceliumTransport::new(hub, dest); let client = Arc::new(SupervisorClient::new(transport, secret.clone().unwrap_or_default())); guard.insert(key, client.clone()); tracing::debug!(target: "router", cache="supervisor", hit=false, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache insert"); client } } /// Start background router loops, one per context. /// Each loop: /// - BRPOP msg_out with 1s timeout /// - Loads the Message by key, selects a Runner by script_type /// - Sends supervisor JSON-RPC via Mycelium /// - On success: Message.status = Acknowledged /// - On error: Message.status = Error and append a log pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec> { let mut handles = Vec::new(); for ctx_id in cfg.context_ids.clone() { let service_cloned = service.clone(); let cfg_cloned = cfg.clone(); let handle = tokio::spawn(async move { let sem = Arc::new(Semaphore::new(cfg_cloned.concurrency)); // Use the global SupervisorHub let sup_hub = cfg_cloned.sup_hub.clone(); let cache = Arc::new(SupervisorClientCache::new()); loop { // Pop next message key (blocking with timeout) match service_cloned.brpop_msg_out(ctx_id, 1).await { Ok(Some(key)) => { let permit = { // acquire a concurrency permit (non-fair is fine) let sem = sem.clone(); // if semaphore is exhausted, await until a slot becomes available match sem.acquire_owned().await { Ok(p) => p, Err(_) => { // Semaphore closed; exit loop break; } } }; let service_task = service_cloned.clone(); let cfg_task = cfg_cloned.clone(); tokio::spawn({ let cache = cache.clone(); let sup_hub = sup_hub.clone(); async move { // Ensure permit is dropped at end of task let _permit = permit; if let Err(e) = deliver_one( &service_task, &cfg_task, ctx_id, &key, sup_hub, cache.clone(), ) .await { error!(context_id=ctx_id, key=%key, error=%e, "Delivery error"); } } }); } Ok(None) => { // timeout: just tick continue; } Err(e) => { error!(context_id=ctx_id, error=%e, "BRPOP error"); // small backoff to avoid busy-loop on persistent errors tokio::time::sleep(std::time::Duration::from_millis(200)).await; } } } }); handles.push(handle); } handles } async fn deliver_one( service: &AppService, cfg: &RouterConfig, context_id: u32, msg_key: &str, sup_hub: Arc, cache: Arc, ) -> Result<(), Box> { // Parse "message:{caller_id}:{id}" let (caller_id, id) = parse_message_key(msg_key) .ok_or_else(|| format!("invalid message key format: {}", msg_key))?; // Load message let msg: Message = service.load_message(context_id, caller_id, id).await?; // Determine routing from FlowNode.supervisor_url if available let supervisor_url = if !msg.nodes.is_empty() { // Use FlowNode routing (new architecture) msg.nodes[0].supervisor_url.clone() } else { // Fallback: get first available runner (legacy) let runners = service.scan_runners(context_id).await?; let Some(runner) = runners.into_iter().next() else { let log = format!( "No runners available in context {} for message {}", context_id, msg_key ); let _ = service .append_message_logs(context_id, caller_id, id, vec![log.clone()]) .await; let _ = service .update_message_status(context_id, caller_id, id, MessageStatus::Error) .await; return Err(log.into()); }; // Build URL from runner if !runner.pubkey.trim().is_empty() { format!("mycelium://{}", runner.pubkey) } else { format!("http://{}", runner.address) } }; // Parse supervisor_url to determine destination // Format: "mycelium://" or "http://
" or just "
" let dest = if supervisor_url.starts_with("mycelium://") { let pubkey = supervisor_url.strip_prefix("mycelium://").unwrap_or(""); Destination::Pk(pubkey.to_string()) } else { // Extract address (strip http:// or https:// if present) let address_str = supervisor_url .strip_prefix("http://") .or_else(|| supervisor_url.strip_prefix("https://")) .unwrap_or(&supervisor_url); // Parse IP address (strip port if present) let ip_str = address_str.split(':').next().unwrap_or(address_str); let ip_addr = ip_str.parse().unwrap_or_else(|_| { // Default to localhost if parsing fails std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)) }); Destination::Ip(ip_addr) }; // Build SupervisorClient let client = cache .get_or_create( sup_hub.clone(), dest.clone(), cfg.topic.clone(), None, // TODO: Get secret from runner or config ) .await; // Build supervisor method and params from Message let method = msg.message.clone(); let params = build_params(&msg)?; // Send via the new client API // The transport handles message correlation internally let job_result = if method == "job.run" { if let Some(j) = msg.job.first() { // Use typed job_run method let job = serde_json::from_value(job_to_json(j)?)?; let result = client.job_run(job, None).await; // Update node status based on result if !msg.nodes.is_empty() { let node_id = msg.nodes[0].id; let flow_id = msg.flow_id; match &result { Ok(_) => { // Job completed successfully let _ = service .update_node_status_unchecked( context_id, flow_id, node_id, crate::dag::NodeStatus::Completed, ) .await; } Err(_) => { // Job failed let _ = service .update_node_status_unchecked( context_id, flow_id, node_id, crate::dag::NodeStatus::Failed, ) .await; } } } result?; serde_json::Value::Null } else { // Generic call - not supported in new API, would need custom implementation // For now, return error return Err("job.run requires a job parameter".into()); } } else { // For other methods, we'd need to add them to the client or use a generic mechanism // For now, this is a placeholder return Err(format!("Method {} not yet supported with new client", method).into()); }; // Mark as delivered since the new client waits for replies let _ = service .update_message_transport( context_id, caller_id, id, None, // No transport ID in new API Some(TransportStatus::Delivered), ) .await; // Mark as acknowledged on success service .update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged) .await?; // Log job completion if method == "job.run" { if let Some(job_id) = msg.job.first().map(|j| j.id.parse::().unwrap_or(0)) { let _ = service .append_message_logs( context_id, caller_id, id, vec![format!( "Job {} completed successfully", job_id )], ) .await; } } // No correlation map needed; replies are handled synchronously via SupervisorHub // No transport polling needed; the new client waits for replies synchronously Ok(()) } // Removed determine_executor - routing now based on FlowNode.supervisor_url fn build_params(msg: &Message) -> Result> { // Minimal mapping: // - "job.run" with exactly one embedded job: [{ "job": }] // - otherwise: [] if msg.message == "job.run" && let Some(j) = msg.job.first() { let jv = job_to_json(j)?; return Ok(json!([ { "job": jv } ])); } Ok(json!([])) } fn job_to_json(job: &Job) -> Result> { Ok(serde_json::to_value(job)?) } fn parse_message_key(s: &str) -> Option<(u32, u32)> { // Expect "message:{caller_id}:{id}" let mut it = s.split(':'); match (it.next(), it.next(), it.next(), it.next()) { (Some("message"), Some(caller), Some(id), None) => { let caller_id = caller.parse::().ok()?; let msg_id = id.parse::().ok()?; Some((caller_id, msg_id)) } _ => None, } } /// Map supervisor job.status -> (local JobStatus, terminal) fn map_supervisor_job_status(s: &str) -> Option<(JobStatus, bool)> { match s { "created" | "queued" => Some((JobStatus::Dispatched, false)), "running" => Some((JobStatus::Started, false)), "completed" => Some((JobStatus::Finished, true)), "failed" | "timeout" => Some((JobStatus::Error, true)), _ => None, } } /// Auto-discover contexts periodically and ensure a router loop exists for each. /// Returns a JoinHandle of the discovery task (router loops are detached). pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut active: HashSet = HashSet::new(); loop { match service.list_context_ids().await { Ok(ids) => { for ctx_id in ids { if !active.contains(&ctx_id) { // Spawn a loop for this new context let cfg_ctx = RouterConfig { context_ids: vec![ctx_id], ..cfg.clone() }; let _ = start_router(service.clone(), cfg_ctx); active.insert(ctx_id); info!(context_id = ctx_id, "Started loop for context"); } } } Err(e) => { error!(error=%e, "list_context_ids error"); } } tokio::time::sleep(std::time::Duration::from_secs(5)).await; } }) }