From 78a776877aa7d4690beff98b0e0c51c745c6856b Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Mon, 8 Sep 2025 11:54:15 +0200 Subject: [PATCH] Fetch the result of a job more than once if needed Signed-off-by: Lee Smet --- src/router.rs | 366 +++++++++++++++++++++++++------------------------- 1 file changed, 180 insertions(+), 186 deletions(-) diff --git a/src/router.rs b/src/router.rs index 78292ef..bb994d2 100644 --- a/src/router.rs +++ b/src/router.rs @@ -386,17 +386,14 @@ async fn deliver_one( // Stop on terminal states if matches!(s, TransportStatus::Delivered | TransportStatus::Read) { - // Only request a single job status/result per message - if !requested_job_check { - if let Some(job_id) = job_id_opt { - // First consult Redis for the latest job state in case we already have a terminal update - match service_poll.load_job(context_id, caller_id, job_id).await - { - Ok(job) => { - match job.status() { - JobStatus::Finished | JobStatus::Error => { - // Local job is already terminal; skip supervisor job.status - let _ = service_poll + if let Some(job_id) = job_id_opt { + // First consult Redis for the latest job state in case we already have a terminal update + match service_poll.load_job(context_id, caller_id, job_id).await { + Ok(job) => { + match job.status() { + JobStatus::Finished | JobStatus::Error => { + // Local job is already terminal; skip supervisor job.status + let _ = service_poll .append_message_logs( context_id, caller_id, @@ -409,41 +406,41 @@ async fn deliver_one( ) .await; - // If result is still empty, immediately request supervisor job.result - if job.result.is_empty() { - let sup = cache - .get_or_create( - sup_hub.clone(), - sup_dest.clone(), - sup_topic.clone(), - secret_for_poller.clone(), - ) - .await; - match sup - .job_result_wait(job_id.to_string()) - .await - { - Ok((_out2, reply2)) => { - // Interpret reply synchronously: success/error/bare string - let res = reply2.get("result"); - if let Some(obj) = - res.and_then(|v| v.as_object()) + // If result is still empty, immediately request supervisor job.result + if job.result.is_empty() { + let sup = cache + .get_or_create( + sup_hub.clone(), + sup_dest.clone(), + sup_topic.clone(), + secret_for_poller.clone(), + ) + .await; + match sup + .job_result_wait(job_id.to_string()) + .await + { + Ok((_out2, reply2)) => { + // Interpret reply synchronously: success/error/bare string + let res = reply2.get("result"); + if let Some(obj) = + res.and_then(|v| v.as_object()) + { + if let Some(s) = obj + .get("success") + .and_then(|v| v.as_str()) { - if let Some(s) = obj - .get("success") - .and_then(|v| v.as_str()) - { - let mut patch = std::collections::HashMap::new(); - patch.insert( - "success".to_string(), - s.to_string(), - ); - let _ = service_poll + let mut patch = std::collections::HashMap::new(); + patch.insert( + "success".to_string(), + s.to_string(), + ); + let _ = service_poll .update_job_result_merge_unchecked( context_id, caller_id, job_id, patch, ) .await; - let _ = service_poll + let _ = service_poll .update_message_status( context_id, caller_id, @@ -451,7 +448,7 @@ async fn deliver_one( MessageStatus::Processed, ) .await; - let _ = service_poll + let _ = service_poll .append_message_logs( context_id, caller_id, @@ -462,21 +459,21 @@ async fn deliver_one( )], ) .await; - } else if let Some(s) = obj - .get("error") - .and_then(|v| v.as_str()) - { - let mut patch = std::collections::HashMap::new(); - patch.insert( - "error".to_string(), - s.to_string(), - ); - let _ = service_poll + } else if let Some(s) = obj + .get("error") + .and_then(|v| v.as_str()) + { + let mut patch = std::collections::HashMap::new(); + patch.insert( + "error".to_string(), + s.to_string(), + ); + let _ = service_poll .update_job_result_merge_unchecked( context_id, caller_id, job_id, patch, ) .await; - let _ = service_poll + let _ = service_poll .update_message_status( context_id, caller_id, @@ -484,7 +481,7 @@ async fn deliver_one( MessageStatus::Processed, ) .await; - let _ = service_poll + let _ = service_poll .append_message_logs( context_id, caller_id, @@ -495,29 +492,31 @@ async fn deliver_one( )], ) .await; - } - } else if let Some(s) = - res.and_then(|v| v.as_str()) - { - let mut patch = std::collections::HashMap::new(); - patch.insert( - "success".to_string(), - s.to_string(), + } + } else if let Some(s) = + res.and_then(|v| v.as_str()) + { + let mut patch = + std::collections::HashMap::new( ); - let _ = service_poll + patch.insert( + "success".to_string(), + s.to_string(), + ); + let _ = service_poll .update_job_result_merge_unchecked( context_id, caller_id, job_id, patch, ) .await; - let _ = service_poll - .update_message_status( - context_id, - caller_id, - id, - MessageStatus::Processed, - ) - .await; - let _ = service_poll + let _ = service_poll + .update_message_status( + context_id, + caller_id, + id, + MessageStatus::Processed, + ) + .await; + let _ = service_poll .append_message_logs( context_id, caller_id, @@ -528,8 +527,8 @@ async fn deliver_one( )], ) .await; - } else { - let _ = service_poll + } else { + let _ = service_poll .append_message_logs( context_id, caller_id, @@ -537,10 +536,10 @@ async fn deliver_one( vec!["Supervisor job.result reply missing recognizable fields".to_string()], ) .await; - } } - Err(e) => { - let _ = service_poll + } + Err(e) => { + let _ = service_poll .append_message_logs( context_id, caller_id, @@ -551,11 +550,11 @@ async fn deliver_one( )], ) .await; - } } - } else { - // Result already present; nothing to fetch - let _ = service_poll + } + } else { + // Result already present; nothing to fetch + let _ = service_poll .append_message_logs( context_id, caller_id, @@ -566,38 +565,34 @@ async fn deliver_one( )], ) .await; - } } - // Not terminal yet -> request supervisor job.status as before - _ => { - let sup = cache - .get_or_create( - sup_hub.clone(), - sup_dest.clone(), - sup_topic.clone(), - secret_for_poller.clone(), - ) - .await; - match sup - .job_status_wait(job_id.to_string()) - .await - { - Ok((_out_id, reply_status)) => { - // Interpret status reply synchronously - let result_opt = - reply_status.get("result"); - let error_opt = - reply_status.get("error"); - if let Some(err_obj) = error_opt { - let _ = service_poll - .update_job_status_unchecked( - context_id, - caller_id, - job_id, - JobStatus::Error, - ) - .await; - let _ = service_poll + } + // Not terminal yet -> request supervisor job.status as before + _ => { + let sup = cache + .get_or_create( + sup_hub.clone(), + sup_dest.clone(), + sup_topic.clone(), + secret_for_poller.clone(), + ) + .await; + match sup.job_status_wait(job_id.to_string()).await + { + Ok((_out_id, reply_status)) => { + // Interpret status reply synchronously + let result_opt = reply_status.get("result"); + let error_opt = reply_status.get("error"); + if let Some(err_obj) = error_opt { + let _ = service_poll + .update_job_status_unchecked( + context_id, + caller_id, + job_id, + JobStatus::Error, + ) + .await; + let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( @@ -606,28 +601,25 @@ async fn deliver_one( )], ) .await; - } else if let Some(res) = result_opt { - let status_candidate = res - .get("status") - .and_then(|v| v.as_str()) - .or_else(|| res.as_str()); - if let Some(remote_status) = - status_candidate + } else if let Some(res) = result_opt { + let status_candidate = res + .get("status") + .and_then(|v| v.as_str()) + .or_else(|| res.as_str()); + if let Some(remote_status) = + status_candidate + { + if let Some((mapped, terminal)) = + map_supervisor_job_status( + remote_status, + ) { - if let Some(( - mapped, - terminal, - )) = - map_supervisor_job_status( - remote_status, - ) - { - let _ = service_poll + let _ = service_poll .update_job_status_unchecked( context_id, caller_id, job_id, mapped.clone(), ) .await; - let _ = service_poll + let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( @@ -637,23 +629,23 @@ async fn deliver_one( ) .await; - // If terminal, request job.result now (handled above for local terminal case) - if terminal { - // trigger job.result only if result empty to avoid spam - if let Ok(j_after) = - service_poll - .load_job( - context_id, - caller_id, - job_id, - ) - .await + // If terminal, request job.result now (handled above for local terminal case) + if terminal { + // trigger job.result only if result empty to avoid spam + if let Ok(j_after) = + service_poll + .load_job( + context_id, + caller_id, + job_id, + ) + .await + { + if j_after + .result + .is_empty() { - if j_after - .result - .is_empty() - { - let sup2 = cache + let sup2 = cache .get_or_create( sup_hub.clone(), sup_dest.clone(), @@ -661,7 +653,7 @@ async fn deliver_one( secret_for_poller.clone(), ) .await; - let _ = sup2.job_result_wait(job_id.to_string()).await + let _ = sup2.job_result_wait(job_id.to_string()).await .and_then(|(_oid, reply2)| { // Minimal parse and store let res2 = reply2.get("result"); @@ -679,40 +671,43 @@ async fn deliver_one( } Ok((String::new(), Value::Null)) }); - } } } } } } } - Err(e) => { - let _ = service_poll - .append_message_logs( - context_id, - caller_id, - id, - vec![format!("job.status request error: {}", e)], - ) - .await; - } + } + Err(e) => { + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "job.status request error: {}", + e + )], + ) + .await; } } } } - // If we cannot load the job, fall back to requesting job.status - Err(_) => { - let sup = cache - .get_or_create( - sup_hub.clone(), - sup_dest.clone(), - sup_topic.clone(), - secret_for_poller.clone(), - ) - .await; - match sup.job_status_wait(job_id.to_string()).await { - Ok((_out_id, _reply_status)) => { - let _ = service_poll + } + // If we cannot load the job, fall back to requesting job.status + Err(_) => { + let sup = cache + .get_or_create( + sup_hub.clone(), + sup_dest.clone(), + sup_topic.clone(), + secret_for_poller.clone(), + ) + .await; + match sup.job_status_wait(job_id.to_string()).await { + Ok((_out_id, _reply_status)) => { + let _ = service_poll .append_message_logs( context_id, caller_id, @@ -723,26 +718,25 @@ async fn deliver_one( )], ) .await; - } - Err(e) => { - let _ = service_poll - .append_message_logs( - context_id, - caller_id, - id, - vec![format!( - "job.status request error: {}", - e - )], - ) - .await; - } + } + Err(e) => { + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "job.status request error: {}", + e + )], + ) + .await; } } } - // Ensure we only do this once - requested_job_check = true; } + // Ensure we only do this once + requested_job_check = true; } // break; }