use std::{collections::{HashSet, HashMap}, sync::Arc}; use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use serde_json::{Value, json}; use tokio::sync::{Semaphore, Mutex}; use std::hash::{Hash, Hasher}; use std::collections::hash_map::DefaultHasher; use crate::{ clients::{Destination, MyceliumClient, SupervisorClient}, models::{Job, JobStatus, Message, MessageStatus, ScriptType, TransportStatus}, service::AppService, }; use tracing::{error, info}; #[derive(Clone, Debug)] pub struct RouterConfig { pub context_ids: Vec, pub concurrency: usize, pub base_url: String, // e.g. http://127.0.0.1:8990 pub topic: String, // e.g. "supervisor.rpc" // Transport status polling configuration pub transport_poll_interval_secs: u64, // e.g. 2 pub transport_poll_timeout_secs: u64, // e.g. 300 (5 minutes) } /* SupervisorClient reuse cache (Router-local): Rationale: - SupervisorClient maintains an internal JSON-RPC id_counter per instance. - Rebuilding a client for each message resets this counter, causing inner JSON-RPC ids to restart at 1. - We reuse one SupervisorClient per (destination, topic, secret) to preserve monotonically increasing ids. Scope: - Cache is per Router loop (and a separate one for the inbound listener). - If cross-loop/process reuse becomes necessary later, promote to a process-global cache. Keying: - Key: destination + topic + secret-presence (secret content hashed; not stored in plaintext). Concurrency: - tokio::Mutex protects a HashMap>. - Values are Arc so call sites clone cheaply and share the same id_counter. */ #[derive(Clone)] struct SupervisorClientCache { map: Arc>>>, } impl SupervisorClientCache { fn new() -> Self { Self { map: Arc::new(Mutex::new(HashMap::new())), } } fn make_key(dest: &Destination, topic: &str, secret: &Option) -> String { let dst = match dest { Destination::Ip(ip) => format!("ip:{ip}"), Destination::Pk(pk) => format!("pk:{pk}"), }; // Hash the secret to avoid storing plaintext in keys while still differentiating values let sec_hash = match secret { Some(s) if !s.is_empty() => { let mut hasher = DefaultHasher::new(); s.hash(&mut hasher); format!("s:{}", hasher.finish()) } _ => "s:none".to_string(), }; format!("{dst}|t:{topic}|{sec_hash}") } async fn get_or_create( &self, mycelium: Arc, dest: Destination, topic: String, secret: Option, ) -> Arc { let key = Self::make_key(&dest, &topic, &secret); { let guard = self.map.lock().await; if let Some(existing) = guard.get(&key) { tracing::debug!(target: "router", cache="supervisor", hit=true, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache lookup"); return existing.clone(); } } let mut guard = self.map.lock().await; if let Some(existing) = guard.get(&key) { tracing::debug!(target: "router", cache="supervisor", hit=true, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache lookup (double-checked)"); return existing.clone(); } let client = Arc::new(SupervisorClient::new_with_client( mycelium, dest, topic.clone(), secret.clone(), )); guard.insert(key, client.clone()); tracing::debug!(target: "router", cache="supervisor", hit=false, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache insert"); client } } /// Start background router loops, one per context. /// Each loop: /// - BRPOP msg_out with 1s timeout /// - Loads the Message by key, selects a Runner by script_type /// - Sends supervisor JSON-RPC via Mycelium /// - On success: Message.status = Acknowledged /// - On error: Message.status = Error and append a log pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec> { let mut handles = Vec::new(); for ctx_id in cfg.context_ids.clone() { let service_cloned = service.clone(); let cfg_cloned = cfg.clone(); let handle = tokio::spawn(async move { let sem = Arc::new(Semaphore::new(cfg_cloned.concurrency)); // Create a shared Mycelium client for this context loop (retry until available) let mycelium = loop { match MyceliumClient::new(cfg_cloned.base_url.clone()) { Ok(c) => break Arc::new(c), Err(e) => { error!(context_id=ctx_id, error=%e, "MyceliumClient init error"); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } }; let cache = Arc::new(SupervisorClientCache::new()); loop { // Pop next message key (blocking with timeout) match service_cloned.brpop_msg_out(ctx_id, 1).await { Ok(Some(key)) => { let permit = { // acquire a concurrency permit (non-fair is fine) let sem = sem.clone(); // if semaphore is exhausted, await until a slot becomes available match sem.acquire_owned().await { Ok(p) => p, Err(_) => { // Semaphore closed; exit loop break; } } }; let service_task = service_cloned.clone(); let cfg_task = cfg_cloned.clone(); tokio::spawn({ let mycelium = mycelium.clone(); let cache = cache.clone(); async move { // Ensure permit is dropped at end of task let _permit = permit; if let Err(e) = deliver_one(&service_task, &cfg_task, ctx_id, &key, mycelium, cache.clone()) .await { error!(context_id=ctx_id, key=%key, error=%e, "Delivery error"); } } }); } Ok(None) => { // timeout: just tick continue; } Err(e) => { error!(context_id=ctx_id, error=%e, "BRPOP error"); // small backoff to avoid busy-loop on persistent errors tokio::time::sleep(std::time::Duration::from_millis(200)).await; } } } }); handles.push(handle); } handles } async fn deliver_one( service: &AppService, cfg: &RouterConfig, context_id: u32, msg_key: &str, mycelium: Arc, cache: Arc, ) -> Result<(), Box> { // Parse "message:{caller_id}:{id}" let (caller_id, id) = parse_message_key(msg_key) .ok_or_else(|| format!("invalid message key format: {}", msg_key))?; // Load message let msg: Message = service.load_message(context_id, caller_id, id).await?; // Embedded job id (if any) let job_id_opt: Option = msg.job.first().map(|j| j.id); // Determine routing script_type let desired: ScriptType = determine_script_type(&msg); // Discover runners and select a matching one let runners = service.scan_runners(context_id).await?; let Some(runner) = runners.into_iter().find(|r| r.script_type == desired) else { let log = format!( "No runner with script_type {:?} available in context {} for message {}", desired, context_id, msg_key ); let _ = service .append_message_logs(context_id, caller_id, id, vec![log.clone()]) .await; let _ = service .update_message_status(context_id, caller_id, id, MessageStatus::Error) .await; return Err(log.into()); }; // Build SupervisorClient let dest = if !runner.pubkey.trim().is_empty() { Destination::Pk(runner.pubkey.clone()) } else { Destination::Ip(runner.address) }; // Keep clones for poller usage let dest_for_poller = dest.clone(); let topic_for_poller = cfg.topic.clone(); let secret_for_poller = runner.secret.clone(); let client = cache .get_or_create( mycelium.clone(), dest.clone(), cfg.topic.clone(), runner.secret.clone(), ) .await; // Build supervisor method and params from Message let method = msg.message.clone(); let params = build_params(&msg)?; // Send // If this is a job.run and we have a secret configured on the client, // prefer the typed wrapper that injects the secret into inner supervisor params, // and also capture the inner supervisor JSON-RPC id for correlation. let (out_id, inner_id_opt) = if method == "job.run" { if let Some(j) = msg.job.first() { let jv = job_to_json(j)?; // Returns (outbound message id, inner supervisor JSON-RPC id) let (out, inner) = client.job_run_with_ids(jv).await?; (out, Some(inner)) } else { // Fallback: no embedded job, use the generic call let out = client.call(&method, params).await?; (out, None) } } else { let out = client.call(&method, params).await?; (out, None) }; // Store transport id and initial Sent status let _ = service .update_message_transport( context_id, caller_id, id, Some(out_id.clone()), Some(TransportStatus::Sent), ) .await; // Mark as acknowledged on success service .update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged) .await?; // Record correlation (inner supervisor JSON-RPC id -> job/message) for inbound popMessage handling if let (Some(inner_id), Some(job_id)) = (inner_id_opt, job_id_opt) { let _ = service .supcorr_set(inner_id, context_id, caller_id, job_id, id) .await; } // Spawn transport-status poller { let service_poll = service.clone(); let poll_interval = std::time::Duration::from_secs(cfg.transport_poll_interval_secs); let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs); let out_id_cloned = out_id.clone(); let mycelium = mycelium.clone(); tokio::spawn(async move { let start = std::time::Instant::now(); let client = mycelium; // Supervisor call context captured for sync status checks let sup_dest = dest_for_poller; let sup_topic = topic_for_poller; let job_id_opt = job_id_opt; let mut last_status: Option = Some(TransportStatus::Sent); // Ensure we only request supervisor job.status or job.result once per outbound message let mut requested_job_check: bool = false; loop { if start.elapsed() >= poll_timeout { let _ = service_poll .append_message_logs( context_id, caller_id, id, vec!["Transport-status polling timed out".to_string()], ) .await; // leave last known status; do not override break; } match client.message_status(&out_id_cloned).await { Ok(s) => { if last_status.as_ref() != Some(&s) { let _ = service_poll .update_message_transport( context_id, caller_id, id, None, Some(s.clone()), ) .await; last_status = Some(s.clone()); } // Stop on terminal states if matches!(s, TransportStatus::Delivered | TransportStatus::Read) { // Only request a single job status/result per message if !requested_job_check { if let Some(job_id) = job_id_opt { // First consult Redis for the latest job state in case we already have a terminal update match service_poll.load_job(context_id, caller_id, job_id).await { Ok(job) => { match job.status() { JobStatus::Finished | JobStatus::Error => { // Local job is already terminal; skip supervisor job.status let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( "Local job {} status is terminal ({:?}); skipping supervisor job.status", job_id, job.status() )], ) .await; // If result is still empty, immediately request supervisor job.result if job.result.is_empty() { let sup = cache .get_or_create( client.clone(), sup_dest.clone(), sup_topic.clone(), secret_for_poller.clone(), ) .await; match sup.job_result_with_ids(job_id.to_string()).await { Ok((_out2, inner2)) => { let _ = service_poll .supcorr_set(inner2, context_id, caller_id, job_id, id) .await; let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( "Requested supervisor job.result for job {} (local terminal w/ empty result)", job_id )], ) .await; } Err(e) => { let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( "job.result request error for job {}: {}", job_id, e )], ) .await; } } } else { // Result already present; nothing to fetch let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( "Job {} already has result; no supervisor calls needed", job_id )], ) .await; } } // Not terminal yet -> request supervisor job.status as before _ => { let sup = cache .get_or_create( client.clone(), sup_dest.clone(), sup_topic.clone(), secret_for_poller.clone(), ) .await; match sup.job_status_with_ids(job_id.to_string()).await { Ok((_out_id, inner_id)) => { // Correlate this status request to the message/job let _ = service_poll .supcorr_set( inner_id, context_id, caller_id, job_id, id, ) .await; let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( "Requested supervisor job.status for job {}", job_id )], ) .await; } Err(e) => { let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!("job.status request error: {}", e)], ) .await; } } } } } // If we cannot load the job, fall back to requesting job.status Err(_) => { let sup = cache .get_or_create( client.clone(), sup_dest.clone(), sup_topic.clone(), secret_for_poller.clone(), ) .await; match sup.job_status_with_ids(job_id.to_string()).await { Ok((_out_id, inner_id)) => { let _ = service_poll .supcorr_set( inner_id, context_id, caller_id, job_id, id, ) .await; let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( "Requested supervisor job.status for job {} (fallback; load_job failed)", job_id )], ) .await; } Err(e) => { let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!("job.status request error: {}", e)], ) .await; } } } } // Ensure we only do this once requested_job_check = true; } } // break; } if matches!(s, TransportStatus::Failed) { let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( "Transport failed for outbound id {out_id_cloned}" )], ) .await; break; } } Err(e) => { // Log and continue polling let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!("messageStatus query error: {e}")], ) .await; } } tokio::time::sleep(poll_interval).await; } }); } Ok(()) } fn determine_script_type(msg: &Message) -> ScriptType { // Prefer embedded job's script_type if available, else fallback to message.message_type match msg.job.first() { Some(j) => j.script_type.clone(), None => msg.message_type.clone(), } } fn build_params(msg: &Message) -> Result> { // Minimal mapping: // - "job.run" with exactly one embedded job: [{ "job": }] // - otherwise: [] if msg.message == "job.run" && let Some(j) = msg.job.first() { let jv = job_to_json(j)?; return Ok(json!([ { "job": jv } ])); } Ok(json!([])) } fn job_to_json(job: &Job) -> Result> { Ok(serde_json::to_value(job)?) } fn parse_message_key(s: &str) -> Option<(u32, u32)> { // Expect "message:{caller_id}:{id}" let mut it = s.split(':'); match (it.next(), it.next(), it.next(), it.next()) { (Some("message"), Some(caller), Some(id), None) => { let caller_id = caller.parse::().ok()?; let msg_id = id.parse::().ok()?; Some((caller_id, msg_id)) } _ => None, } } /// Map supervisor job.status -> (local JobStatus, terminal) fn map_supervisor_job_status(s: &str) -> Option<(JobStatus, bool)> { match s { "created" | "queued" => Some((JobStatus::Dispatched, false)), "running" => Some((JobStatus::Started, false)), "completed" => Some((JobStatus::Finished, true)), "failed" | "timeout" => Some((JobStatus::Error, true)), _ => None, } } /// Auto-discover contexts periodically and ensure a router loop exists for each. /// Returns a JoinHandle of the discovery task (router loops are detached). pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut active: HashSet = HashSet::new(); loop { match service.list_context_ids().await { Ok(ids) => { for ctx_id in ids { if !active.contains(&ctx_id) { // Spawn a loop for this new context let cfg_ctx = RouterConfig { context_ids: vec![ctx_id], ..cfg.clone() }; let _ = start_router(service.clone(), cfg_ctx); active.insert(ctx_id); info!(context_id = ctx_id, "Started loop for context"); } } } Err(e) => { error!(error=%e, "list_context_ids error"); } } tokio::time::sleep(std::time::Duration::from_secs(5)).await; } }) } /// Start a single global inbound listener that reads Mycelium popMessage with topic filter, /// decodes supervisor JSON-RPC replies, and updates correlated jobs/messages. /// This listens for async replies like {"result":{"job_queued":...}} carrying the same inner JSON-RPC id. pub fn start_inbound_listener( service: AppService, cfg: RouterConfig, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { // Initialize Mycelium client (retry loop) let mycelium = loop { match MyceliumClient::new(cfg.base_url.clone()) { Ok(c) => break Arc::new(c), Err(e) => { error!(error=%e, "MyceliumClient init error (inbound listener)"); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } }; let cache = Arc::new(SupervisorClientCache::new()); loop { // Poll for inbound supervisor messages on the configured topic match mycelium.pop_message(Some(false), Some(20), None).await { Ok(Some(inb)) => { // Expect InboundMessage with base64 "payload" let Some(payload_b64) = inb.get("payload").and_then(|v| v.as_str()) else { // Not a payload-bearing message; ignore continue; }; let Ok(raw) = BASE64_STANDARD.decode(payload_b64.as_bytes()) else { let _ = service .append_message_logs( 0, // unknown context yet 0, 0, vec![ "Inbound payload base64 decode error (supervisor reply)".into(), ], ) .await; continue; }; tracing::info!( raw = %String::from_utf8_lossy(&raw), "Read raw messge from mycelium" ); let Ok(rpc): Result = serde_json::from_slice(&raw) else { // Invalid JSON payload continue; }; // Extract inner supervisor JSON-RPC id (number preferred; string fallback) let inner_id_u64 = match rpc.get("id") { Some(Value::Number(n)) => n.as_u64(), Some(Value::String(s)) => s.parse::().ok(), _ => None, }; let Some(inner_id) = inner_id_u64 else { // Cannot correlate without id continue; }; // Lookup correlation mapping match service.supcorr_get(inner_id).await { Ok(Some((context_id, caller_id, job_id, message_id))) => { // Determine success/error from supervisor JSON-RPC envelope // Inspect result/error to route job.run/job.status/job.result replies let result_opt = rpc.get("result"); let error_opt = rpc.get("error"); // Handle job.run success (job_queued) let is_job_queued = result_opt .and_then(|res| { if res.get("job_queued").is_some() { Some(true) } else if let Some(s) = res.as_str() { Some(s == "job_queued") } else { None } }) .unwrap_or(false); if is_job_queued { // Set to Dispatched (idempotent) per spec, and append log let _ = service .update_job_status_unchecked( context_id, caller_id, job_id, JobStatus::Dispatched, ) .await; let _ = service .append_message_logs( context_id, caller_id, message_id, vec![format!( "Supervisor reply for job {}: job_queued", job_id )], ) .await; let _ = service.supcorr_del(inner_id).await; continue; } // Error envelope: set job Error and log if let Some(err_obj) = error_opt { let _ = service .update_job_status_unchecked( context_id, caller_id, job_id, JobStatus::Error, ) .await; let _ = service .append_message_logs( context_id, caller_id, message_id, vec![format!( "Supervisor error for job {}: {}", job_id, err_obj )], ) .await; let _ = service.supcorr_del(inner_id).await; continue; } // If we have a result, try to interpret it as job.status or job.result if let Some(res) = result_opt { // Try job.status: object {status: "..."} or bare string let status_candidate = res .get("status") .and_then(|v| v.as_str()) .or_else(|| res.as_str()); if let Some(remote_status) = status_candidate { if let Some((mapped, terminal)) = map_supervisor_job_status(remote_status) { // Update job status and log let _ = service .update_job_status_unchecked( context_id, caller_id, job_id, mapped.clone(), ) .await; let _ = service .append_message_logs( context_id, caller_id, message_id, vec![format!( "Supervisor job.status for job {} -> {} (mapped to {:?})", job_id, remote_status, mapped )], ) .await; // Done with this correlation id let _ = service.supcorr_del(inner_id).await; // If terminal, request job.result asynchronously now if terminal { // Load job to determine script_type for runner selection match service .load_job(context_id, caller_id, job_id) .await { Ok(job) => { match service.scan_runners(context_id).await { Ok(runners) => { if let Some(runner) = runners.into_iter().find(|r| { r.script_type == job.script_type }) { let dest = if !runner .pubkey .trim() .is_empty() { Destination::Pk( runner.pubkey.clone(), ) } else { Destination::Ip(runner.address) }; let sup = cache .get_or_create( mycelium.clone(), dest, cfg.topic.clone(), runner.secret.clone(), ) .await; match sup .job_result_with_ids( job_id.to_string(), ) .await { Ok((_out2, inner2)) => { let _ = service .supcorr_set( inner2, context_id, caller_id, job_id, message_id, ) .await; let _ = service .append_message_logs( context_id, caller_id, message_id, vec![format!( "Requested supervisor job.result for job {}", job_id )], ) .await; } Err(e) => { let _ = service .append_message_logs( context_id, caller_id, message_id, vec![format!( "job.result request error for job {}: {}", job_id, e )], ) .await; } } } else { let _ = service .append_message_logs( context_id, caller_id, message_id, vec![format!( "No runner with matching script_type found to request job.result for job {}", job_id )], ) .await; } } Err(e) => { let _ = service .append_message_logs( context_id, caller_id, message_id, vec![format!( "scan_runners error while requesting job.result for job {}: {}", job_id, e )], ) .await; } } } Err(e) => { let _ = service .append_message_logs( context_id, caller_id, message_id, vec![format!( "load_job error while requesting job.result for job {}: {}", job_id, e )], ) .await; } } } continue; } } // Try job.result: object with success/error or bare string treated as success if let Some(obj) = res.as_object() { if let Some(s) = obj.get("success").and_then(|v| v.as_str()) { let mut patch = std::collections::HashMap::new(); patch.insert("success".to_string(), s.to_string()); let _ = service .update_job_result_merge_unchecked( context_id, caller_id, job_id, patch, ) .await; let _ = service .update_message_status( context_id, caller_id, message_id, MessageStatus::Processed, ) .await; let _ = service .append_message_logs( context_id, caller_id, message_id, vec![format!( "Stored supervisor job.result for job {} (success)", job_id )], ) .await; let _ = service.supcorr_del(inner_id).await; continue; } if let Some(s) = obj.get("error").and_then(|v| v.as_str()) { let mut patch = std::collections::HashMap::new(); patch.insert("error".to_string(), s.to_string()); let _ = service .update_job_result_merge_unchecked( context_id, caller_id, job_id, patch, ) .await; let _ = service .update_message_status( context_id, caller_id, message_id, MessageStatus::Processed, ) .await; let _ = service .append_message_logs( context_id, caller_id, message_id, vec![format!( "Stored supervisor job.result for job {} (error)", job_id )], ) .await; let _ = service.supcorr_del(inner_id).await; continue; } } else if let Some(s) = res.as_str() { // Bare string => treat as success let mut patch = std::collections::HashMap::new(); patch.insert("success".to_string(), s.to_string()); let _ = service .update_job_result_merge_unchecked( context_id, caller_id, job_id, patch, ) .await; let _ = service .update_message_status( context_id, caller_id, message_id, MessageStatus::Processed, ) .await; let _ = service .append_message_logs( context_id, caller_id, message_id, vec![format!( "Stored supervisor job.result for job {} (success)", job_id )], ) .await; let _ = service.supcorr_del(inner_id).await; continue; } } // Unknown/unsupported supervisor reply; keep correlation for later let _ = service .append_message_logs( context_id, caller_id, message_id, vec![ "Supervisor reply did not contain recognizable job.run/status/result fields" .to_string(), ], ) .await; } Ok(None) => { // No correlation found; ignore or log once } Err(e) => { error!(error=%e, "supcorr_get error"); tokio::time::sleep(std::time::Duration::from_millis(200)).await; } } } Ok(None) => { // No message; continue polling continue; } Err(e) => { error!(error=%e, "popMessage error"); tokio::time::sleep(std::time::Duration::from_millis(200)).await; } } } }) }