From 25f35ea8fc5c08aba85c88a00b03393f7af8ef52 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Fri, 5 Sep 2025 19:58:52 +0200 Subject: [PATCH] Check job status in redis db as well before sending rpc call Signed-off-by: Lee Smet --- src/router.rs | 208 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 167 insertions(+), 41 deletions(-) diff --git a/src/router.rs b/src/router.rs index bc531fc..7446d58 100644 --- a/src/router.rs +++ b/src/router.rs @@ -305,6 +305,8 @@ async fn deliver_one( let job_id_opt = job_id_opt; let mut last_status: Option = Some(TransportStatus::Sent); + // Ensure we only request supervisor job.status or job.result once per outbound message + let mut requested_job_check: bool = false; loop { if start.elapsed() >= poll_timeout { @@ -337,48 +339,172 @@ async fn deliver_one( // Stop on terminal states if matches!(s, TransportStatus::Delivered | TransportStatus::Read) { - // On Read, request supervisor job.status asynchronously; inbound listener will handle replies - // if matches!(s, TransportStatus::Read) - // && let Some(job_id) = job_id_opt - if let Some(job_id) = job_id_opt { - let sup = cache - .get_or_create( - client.clone(), - sup_dest.clone(), - sup_topic.clone(), - secret_for_poller.clone(), - ) - .await; - match sup.job_status_with_ids(job_id.to_string()).await { - Ok((_out_id, inner_id)) => { - // Correlate this status request to the message/job - let _ = service_poll - .supcorr_set( - inner_id, context_id, caller_id, job_id, id, - ) - .await; - let _ = service_poll - .append_message_logs( - context_id, - caller_id, - id, - vec![format!( - "Requested supervisor job.status for job {}", - job_id - )], - ) - .await; - } - Err(e) => { - let _ = service_poll - .append_message_logs( - context_id, - caller_id, - id, - vec![format!("job.status request error: {}", e)], - ) - .await; + // Only request a single job status/result per message + if !requested_job_check { + if let Some(job_id) = job_id_opt { + // First consult Redis for the latest job state in case we already have a terminal update + match service_poll.load_job(context_id, caller_id, job_id).await { + Ok(job) => { + match job.status() { + JobStatus::Finished | JobStatus::Error => { + // Local job is already terminal; skip supervisor job.status + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Local job {} status is terminal ({:?}); skipping supervisor job.status", + job_id, + job.status() + )], + ) + .await; + + // If result is still empty, immediately request supervisor job.result + if job.result.is_empty() { + let sup = cache + .get_or_create( + client.clone(), + sup_dest.clone(), + sup_topic.clone(), + secret_for_poller.clone(), + ) + .await; + match sup.job_result_with_ids(job_id.to_string()).await { + Ok((_out2, inner2)) => { + let _ = service_poll + .supcorr_set(inner2, context_id, caller_id, job_id, id) + .await; + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Requested supervisor job.result for job {} (local terminal w/ empty result)", + job_id + )], + ) + .await; + } + Err(e) => { + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "job.result request error for job {}: {}", + job_id, e + )], + ) + .await; + } + } + } else { + // Result already present; nothing to fetch + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Job {} already has result; no supervisor calls needed", + job_id + )], + ) + .await; + } + } + // Not terminal yet -> request supervisor job.status as before + _ => { + let sup = cache + .get_or_create( + client.clone(), + sup_dest.clone(), + sup_topic.clone(), + secret_for_poller.clone(), + ) + .await; + match sup.job_status_with_ids(job_id.to_string()).await { + Ok((_out_id, inner_id)) => { + // Correlate this status request to the message/job + let _ = service_poll + .supcorr_set( + inner_id, context_id, caller_id, job_id, id, + ) + .await; + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Requested supervisor job.status for job {}", + job_id + )], + ) + .await; + } + Err(e) => { + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!("job.status request error: {}", e)], + ) + .await; + } + } + } + } + } + // If we cannot load the job, fall back to requesting job.status + Err(_) => { + let sup = cache + .get_or_create( + client.clone(), + sup_dest.clone(), + sup_topic.clone(), + secret_for_poller.clone(), + ) + .await; + match sup.job_status_with_ids(job_id.to_string()).await { + Ok((_out_id, inner_id)) => { + let _ = service_poll + .supcorr_set( + inner_id, context_id, caller_id, job_id, id, + ) + .await; + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Requested supervisor job.status for job {} (fallback; load_job failed)", + job_id + )], + ) + .await; + } + Err(e) => { + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!("job.status request error: {}", e)], + ) + .await; + } + } + } } + // Ensure we only do this once + requested_job_check = true; } } // break;