Fetch the result of a job more than once if needed

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
Lee Smet
2025-09-08 11:54:15 +02:00
parent 8cea17f4ec
commit 78a776877a

View File

@@ -386,17 +386,14 @@ async fn deliver_one(
// Stop on terminal states // Stop on terminal states
if matches!(s, TransportStatus::Delivered | TransportStatus::Read) { if matches!(s, TransportStatus::Delivered | TransportStatus::Read) {
// Only request a single job status/result per message if let Some(job_id) = job_id_opt {
if !requested_job_check { // First consult Redis for the latest job state in case we already have a terminal update
if let Some(job_id) = job_id_opt { match service_poll.load_job(context_id, caller_id, job_id).await {
// First consult Redis for the latest job state in case we already have a terminal update Ok(job) => {
match service_poll.load_job(context_id, caller_id, job_id).await match job.status() {
{ JobStatus::Finished | JobStatus::Error => {
Ok(job) => { // Local job is already terminal; skip supervisor job.status
match job.status() { let _ = service_poll
JobStatus::Finished | JobStatus::Error => {
// Local job is already terminal; skip supervisor job.status
let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -409,41 +406,41 @@ async fn deliver_one(
) )
.await; .await;
// If result is still empty, immediately request supervisor job.result // If result is still empty, immediately request supervisor job.result
if job.result.is_empty() { if job.result.is_empty() {
let sup = cache let sup = cache
.get_or_create( .get_or_create(
sup_hub.clone(), sup_hub.clone(),
sup_dest.clone(), sup_dest.clone(),
sup_topic.clone(), sup_topic.clone(),
secret_for_poller.clone(), secret_for_poller.clone(),
) )
.await; .await;
match sup match sup
.job_result_wait(job_id.to_string()) .job_result_wait(job_id.to_string())
.await .await
{ {
Ok((_out2, reply2)) => { Ok((_out2, reply2)) => {
// Interpret reply synchronously: success/error/bare string // Interpret reply synchronously: success/error/bare string
let res = reply2.get("result"); let res = reply2.get("result");
if let Some(obj) = if let Some(obj) =
res.and_then(|v| v.as_object()) res.and_then(|v| v.as_object())
{
if let Some(s) = obj
.get("success")
.and_then(|v| v.as_str())
{ {
if let Some(s) = obj let mut patch = std::collections::HashMap::new();
.get("success") patch.insert(
.and_then(|v| v.as_str()) "success".to_string(),
{ s.to_string(),
let mut patch = std::collections::HashMap::new(); );
patch.insert( let _ = service_poll
"success".to_string(),
s.to_string(),
);
let _ = service_poll
.update_job_result_merge_unchecked( .update_job_result_merge_unchecked(
context_id, caller_id, job_id, patch, context_id, caller_id, job_id, patch,
) )
.await; .await;
let _ = service_poll let _ = service_poll
.update_message_status( .update_message_status(
context_id, context_id,
caller_id, caller_id,
@@ -451,7 +448,7 @@ async fn deliver_one(
MessageStatus::Processed, MessageStatus::Processed,
) )
.await; .await;
let _ = service_poll let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -462,21 +459,21 @@ async fn deliver_one(
)], )],
) )
.await; .await;
} else if let Some(s) = obj } else if let Some(s) = obj
.get("error") .get("error")
.and_then(|v| v.as_str()) .and_then(|v| v.as_str())
{ {
let mut patch = std::collections::HashMap::new(); let mut patch = std::collections::HashMap::new();
patch.insert( patch.insert(
"error".to_string(), "error".to_string(),
s.to_string(), s.to_string(),
); );
let _ = service_poll let _ = service_poll
.update_job_result_merge_unchecked( .update_job_result_merge_unchecked(
context_id, caller_id, job_id, patch, context_id, caller_id, job_id, patch,
) )
.await; .await;
let _ = service_poll let _ = service_poll
.update_message_status( .update_message_status(
context_id, context_id,
caller_id, caller_id,
@@ -484,7 +481,7 @@ async fn deliver_one(
MessageStatus::Processed, MessageStatus::Processed,
) )
.await; .await;
let _ = service_poll let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -495,29 +492,31 @@ async fn deliver_one(
)], )],
) )
.await; .await;
} }
} else if let Some(s) = } else if let Some(s) =
res.and_then(|v| v.as_str()) res.and_then(|v| v.as_str())
{ {
let mut patch = std::collections::HashMap::new(); let mut patch =
patch.insert( std::collections::HashMap::new(
"success".to_string(),
s.to_string(),
); );
let _ = service_poll patch.insert(
"success".to_string(),
s.to_string(),
);
let _ = service_poll
.update_job_result_merge_unchecked( .update_job_result_merge_unchecked(
context_id, caller_id, job_id, patch, context_id, caller_id, job_id, patch,
) )
.await; .await;
let _ = service_poll let _ = service_poll
.update_message_status( .update_message_status(
context_id, context_id,
caller_id, caller_id,
id, id,
MessageStatus::Processed, MessageStatus::Processed,
) )
.await; .await;
let _ = service_poll let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -528,8 +527,8 @@ async fn deliver_one(
)], )],
) )
.await; .await;
} else { } else {
let _ = service_poll let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -537,10 +536,10 @@ async fn deliver_one(
vec!["Supervisor job.result reply missing recognizable fields".to_string()], vec!["Supervisor job.result reply missing recognizable fields".to_string()],
) )
.await; .await;
}
} }
Err(e) => { }
let _ = service_poll Err(e) => {
let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -551,11 +550,11 @@ async fn deliver_one(
)], )],
) )
.await; .await;
}
} }
} else { }
// Result already present; nothing to fetch } else {
let _ = service_poll // Result already present; nothing to fetch
let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -566,38 +565,34 @@ async fn deliver_one(
)], )],
) )
.await; .await;
}
} }
// Not terminal yet -> request supervisor job.status as before }
_ => { // Not terminal yet -> request supervisor job.status as before
let sup = cache _ => {
.get_or_create( let sup = cache
sup_hub.clone(), .get_or_create(
sup_dest.clone(), sup_hub.clone(),
sup_topic.clone(), sup_dest.clone(),
secret_for_poller.clone(), sup_topic.clone(),
) secret_for_poller.clone(),
.await; )
match sup .await;
.job_status_wait(job_id.to_string()) match sup.job_status_wait(job_id.to_string()).await
.await {
{ Ok((_out_id, reply_status)) => {
Ok((_out_id, reply_status)) => { // Interpret status reply synchronously
// Interpret status reply synchronously let result_opt = reply_status.get("result");
let result_opt = let error_opt = reply_status.get("error");
reply_status.get("result"); if let Some(err_obj) = error_opt {
let error_opt = let _ = service_poll
reply_status.get("error"); .update_job_status_unchecked(
if let Some(err_obj) = error_opt { context_id,
let _ = service_poll caller_id,
.update_job_status_unchecked( job_id,
context_id, JobStatus::Error,
caller_id, )
job_id, .await;
JobStatus::Error, let _ = service_poll
)
.await;
let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, caller_id, id, context_id, caller_id, id,
vec![format!( vec![format!(
@@ -606,28 +601,25 @@ async fn deliver_one(
)], )],
) )
.await; .await;
} else if let Some(res) = result_opt { } else if let Some(res) = result_opt {
let status_candidate = res let status_candidate = res
.get("status") .get("status")
.and_then(|v| v.as_str()) .and_then(|v| v.as_str())
.or_else(|| res.as_str()); .or_else(|| res.as_str());
if let Some(remote_status) = if let Some(remote_status) =
status_candidate status_candidate
{
if let Some((mapped, terminal)) =
map_supervisor_job_status(
remote_status,
)
{ {
if let Some(( let _ = service_poll
mapped,
terminal,
)) =
map_supervisor_job_status(
remote_status,
)
{
let _ = service_poll
.update_job_status_unchecked( .update_job_status_unchecked(
context_id, caller_id, job_id, mapped.clone(), context_id, caller_id, job_id, mapped.clone(),
) )
.await; .await;
let _ = service_poll let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, caller_id, id, context_id, caller_id, id,
vec![format!( vec![format!(
@@ -637,23 +629,23 @@ async fn deliver_one(
) )
.await; .await;
// If terminal, request job.result now (handled above for local terminal case) // If terminal, request job.result now (handled above for local terminal case)
if terminal { if terminal {
// trigger job.result only if result empty to avoid spam // trigger job.result only if result empty to avoid spam
if let Ok(j_after) = if let Ok(j_after) =
service_poll service_poll
.load_job( .load_job(
context_id, context_id,
caller_id, caller_id,
job_id, job_id,
) )
.await .await
{
if j_after
.result
.is_empty()
{ {
if j_after let sup2 = cache
.result
.is_empty()
{
let sup2 = cache
.get_or_create( .get_or_create(
sup_hub.clone(), sup_hub.clone(),
sup_dest.clone(), sup_dest.clone(),
@@ -661,7 +653,7 @@ async fn deliver_one(
secret_for_poller.clone(), secret_for_poller.clone(),
) )
.await; .await;
let _ = sup2.job_result_wait(job_id.to_string()).await let _ = sup2.job_result_wait(job_id.to_string()).await
.and_then(|(_oid, reply2)| { .and_then(|(_oid, reply2)| {
// Minimal parse and store // Minimal parse and store
let res2 = reply2.get("result"); let res2 = reply2.get("result");
@@ -679,40 +671,43 @@ async fn deliver_one(
} }
Ok((String::new(), Value::Null)) Ok((String::new(), Value::Null))
}); });
}
} }
} }
} }
} }
} }
} }
Err(e) => { }
let _ = service_poll Err(e) => {
.append_message_logs( let _ = service_poll
context_id, .append_message_logs(
caller_id, context_id,
id, caller_id,
vec![format!("job.status request error: {}", e)], id,
) vec![format!(
.await; "job.status request error: {}",
} e
)],
)
.await;
} }
} }
} }
} }
// If we cannot load the job, fall back to requesting job.status }
Err(_) => { // If we cannot load the job, fall back to requesting job.status
let sup = cache Err(_) => {
.get_or_create( let sup = cache
sup_hub.clone(), .get_or_create(
sup_dest.clone(), sup_hub.clone(),
sup_topic.clone(), sup_dest.clone(),
secret_for_poller.clone(), sup_topic.clone(),
) secret_for_poller.clone(),
.await; )
match sup.job_status_wait(job_id.to_string()).await { .await;
Ok((_out_id, _reply_status)) => { match sup.job_status_wait(job_id.to_string()).await {
let _ = service_poll Ok((_out_id, _reply_status)) => {
let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -723,26 +718,25 @@ async fn deliver_one(
)], )],
) )
.await; .await;
} }
Err(e) => { Err(e) => {
let _ = service_poll let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
id, id,
vec![format!( vec![format!(
"job.status request error: {}", "job.status request error: {}",
e e
)], )],
) )
.await; .await;
}
} }
} }
} }
// Ensure we only do this once
requested_job_check = true;
} }
// Ensure we only do this once
requested_job_check = true;
} }
// break; // break;
} }