diff --git a/src/clients/supervisor_client.rs b/src/clients/supervisor_client.rs index 80b07d8..f54f253 100644 --- a/src/clients/supervisor_client.rs +++ b/src/clients/supervisor_client.rs @@ -138,6 +138,54 @@ impl SupervisorClient { ))) } + /// Synchronous variant: wait for a JSON-RPC reply via Mycelium reply_timeout, and return the inner JSON-RPC "result". + /// If the supervisor returns an error object, map to RpcError. + pub async fn call_sync( + &self, + method: &str, + params: Value, + reply_timeout_secs: u64, + ) -> Result { + let inner = self.build_supervisor_payload(method, params); + let payload_b64 = Self::encode_payload(&inner)?; + + let result = self + .mycelium + .push_message(&self.destination, &self.topic, &payload_b64, Some(reply_timeout_secs)) + .await?; + + // Expect an InboundMessage-like with a base64 payload containing the supervisor JSON-RPC response + let payload_field = if let Some(p) = result.get("payload").and_then(|v| v.as_str()) { + p.to_string() + } else if let Some(arr) = result.as_array() { + // Defensive: handle single-element array shape + if let Some(one) = arr.get(0) { + one.get("payload") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .ok_or_else(|| SupervisorClientError::InvalidResponse(format!("missing payload in result: {result}")))? + } else { + return Err(SupervisorClientError::TransportTimeout); + } + } else { + // No payload => no reply received within timeout (Mycelium would have returned just an id) + return Err(SupervisorClientError::TransportTimeout); + }; + + let raw = BASE64_STANDARD + .decode(payload_field.as_bytes()) + .map_err(|e| SupervisorClientError::InvalidResponse(format!("invalid base64 payload: {e}")))?; + let rpc_resp: Value = serde_json::from_slice(&raw)?; + + if let Some(err) = rpc_resp.get("error") { + return Err(SupervisorClientError::RpcError(err.to_string())); + } + let res = rpc_resp + .get("result") + .ok_or_else(|| SupervisorClientError::InvalidResponse(format!("missing result in supervisor reply: {rpc_resp}")))?; + Ok(res.clone()) + } + fn need_secret(&self) -> Result<&str, SupervisorClientError> { self.secret .as_deref() @@ -257,6 +305,28 @@ impl SupervisorClient { self.call("job.status", json!([job_id.into()])).await } + /// Synchronous job.status: waits for the supervisor to reply and returns the status string. + /// The supervisor result may be an object with { status: "..." } or a bare string. + pub async fn job_status_sync( + &self, + job_id: impl Into, + reply_timeout_secs: u64, + ) -> Result { + let res = self + .call_sync("job.status", json!([job_id.into()]), reply_timeout_secs) + .await?; + let status = if let Some(s) = res.get("status").and_then(|v| v.as_str()) { + s.to_string() + } else if let Some(s) = res.as_str() { + s.to_string() + } else { + return Err(SupervisorClientError::InvalidResponse(format!( + "unexpected job.status result shape: {res}" + ))); + }; + Ok(status) + } + pub async fn job_result( &self, job_id: impl Into, @@ -264,6 +334,45 @@ impl SupervisorClient { self.call("job.result", json!([job_id.into()])).await } + /// Synchronous job.result: waits for the supervisor to reply and returns a map + /// containing exactly one of: + /// - {"success": "..."} on success + /// - {"error": "..."} on error reported by the runner + /// Some servers may return a bare string; we treat that as {"success": ""}. + pub async fn job_result_sync( + &self, + job_id: impl Into, + reply_timeout_secs: u64, + ) -> Result, SupervisorClientError> { + let res = self + .call_sync("job.result", json!([job_id.into()]), reply_timeout_secs) + .await?; + + use std::collections::HashMap; + let mut out: HashMap = HashMap::new(); + + if let Some(obj) = res.as_object() { + if let Some(s) = obj.get("success").and_then(|v| v.as_str()) { + out.insert("success".to_string(), s.to_string()); + return Ok(out); + } + if let Some(s) = obj.get("error").and_then(|v| v.as_str()) { + out.insert("error".to_string(), s.to_string()); + return Ok(out); + } + return Err(SupervisorClientError::InvalidResponse(format!( + "unexpected job.result result shape: {res}" + ))); + } else if let Some(s) = res.as_str() { + out.insert("success".to_string(), s.to_string()); + return Ok(out); + } + + Err(SupervisorClientError::InvalidResponse(format!( + "unexpected job.result result shape: {res}" + ))) + } + pub async fn job_stop( &self, job_id: impl Into, diff --git a/src/router.rs b/src/router.rs index 3177c47..7a671a9 100644 --- a/src/router.rs +++ b/src/router.rs @@ -5,7 +5,7 @@ use tokio::sync::Semaphore; use crate::{ clients::{Destination, SupervisorClient, MyceliumClient}, - models::{Job, Message, MessageStatus, ScriptType, TransportStatus}, + models::{Job, JobStatus, Message, MessageStatus, ScriptType, TransportStatus}, service::AppService, }; @@ -110,6 +110,8 @@ async fn deliver_one( // Load message let msg: Message = service.load_message(context_id, caller_id, id).await?; + // Embedded job id (if any) + let job_id_opt: Option = msg.job.first().map(|j| j.id); // Determine routing script_type let desired: ScriptType = determine_script_type(&msg); @@ -136,9 +138,12 @@ async fn deliver_one( } else { Destination::Ip(runner.address) }; + // Keep clones for poller usage + let dest_for_poller = dest.clone(); + let topic_for_poller = cfg.topic.clone(); let client = SupervisorClient::new_with_client( mycelium.clone(), - dest, + dest.clone(), cfg.topic.clone(), None, // secret ); @@ -173,11 +178,22 @@ async fn deliver_one( let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs); let out_id_cloned = out_id.clone(); let mycelium = mycelium.clone(); + // Determine reply timeout for supervisor job.result: prefer message.timeout_result, fallback to router config timeout + let job_result_reply_timeout: u64 = if msg.timeout_result > 0 { + msg.timeout_result as u64 + } else { + cfg.transport_poll_timeout_secs + }; tokio::spawn(async move { let start = std::time::Instant::now(); let client = mycelium; + // Supervisor call context captured for sync status checks + let sup_dest = dest_for_poller; + let sup_topic = topic_for_poller; + let job_id_opt = job_id_opt; + let mut last_status: Option = Some(TransportStatus::Sent); loop { @@ -211,6 +227,128 @@ async fn deliver_one( // Stop on terminal states if matches!(s, TransportStatus::Delivered | TransportStatus::Read) { + // On Read, fetch supervisor job.status and update local job/message if terminal + if matches!(s, TransportStatus::Read) { + if let Some(job_id) = job_id_opt { + let sup = SupervisorClient::new_with_client( + client.clone(), + sup_dest.clone(), + sup_topic.clone(), + None, + ); + match sup.job_status_sync(job_id.to_string(), 10).await { + Ok(remote_status) => { + if let Some((mapped, terminal)) = + map_supervisor_job_status(&remote_status) + { + if terminal { + let _ = service_poll + .update_job_status_unchecked( + context_id, + caller_id, + job_id, + mapped.clone(), + ) + .await; + + // After terminal status, fetch supervisor job.result and store into Job.result + let sup = SupervisorClient::new_with_client( + client.clone(), + sup_dest.clone(), + sup_topic.clone(), + None, + ); + match sup.job_result_sync(job_id.to_string(), job_result_reply_timeout).await { + Ok(result_map) => { + // Persist the result into the Job.result map (merge) + let _ = service_poll + .update_job_result_merge_unchecked( + context_id, + caller_id, + job_id, + result_map.clone(), + ) + .await; + // Log which key was stored (success or error) + let key = result_map.keys().next().cloned().unwrap_or_else(|| "unknown".to_string()); + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Stored supervisor job.result for job {} ({})", + job_id, key + )], + ) + .await; + } + Err(e) => { + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "job.result fetch error for job {}: {}", + job_id, e + )], + ) + .await; + } + } + + // Mark message as processed + let _ = service_poll + .update_message_status( + context_id, + caller_id, + id, + MessageStatus::Processed, + ) + .await; + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Supervisor job.status for job {} -> {} (mapped to {:?})", + job_id, remote_status, mapped + )], + ) + .await; + } + } else { + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Unknown supervisor status '{}' for job {}", + remote_status, job_id + )], + ) + .await; + } + } + Err(e) => { + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "job.status sync error: {}", + e + )], + ) + .await; + } + } + } + } break; } if matches!(s, TransportStatus::Failed) { @@ -287,6 +425,17 @@ fn parse_message_key(s: &str) -> Option<(u32, u32)> { } } +/// Map supervisor job.status -> (local JobStatus, terminal) +fn map_supervisor_job_status(s: &str) -> Option<(JobStatus, bool)> { + match s { + "created" | "queued" => Some((JobStatus::Dispatched, false)), + "running" => Some((JobStatus::Started, false)), + "completed" => Some((JobStatus::Finished, true)), + "failed" | "timeout" => Some((JobStatus::Error, true)), + _ => None, + } +} + /// Auto-discover contexts periodically and ensure a router loop exists for each. /// Returns a JoinHandle of the discovery task (router loops are detached). diff --git a/src/service.rs b/src/service.rs index 8b0a9b7..0d2c6dd 100644 --- a/src/service.rs +++ b/src/service.rs @@ -974,6 +974,22 @@ impl AppService { .await } + /// Bypass-permission variant to merge into a job's result field. + /// Intended for internal router/scheduler use where no actor identity is present. + pub async fn update_job_result_merge_unchecked( + &self, + context_id: u32, + caller_id: u32, + job_id: u32, + patch: HashMap, + ) -> Result<(), BoxError> { + // Ensure job exists, then write directly + let _ = self.redis.load_job(context_id, caller_id, job_id).await?; + self.redis + .update_job_result_merge(context_id, caller_id, job_id, patch) + .await + } + pub async fn append_message_logs( &self, context_id: u32,