Remove notion of sync calls

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
Lee Smet
2025-09-05 13:23:48 +02:00
parent 8de2597f19
commit 2c88114d45
2 changed files with 321 additions and 256 deletions

View File

@@ -204,12 +204,6 @@ async fn deliver_one(
let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs);
let out_id_cloned = out_id.clone();
let mycelium = mycelium.clone();
// Determine reply timeout for supervisor job.result: prefer message.timeout_result, fallback to router config timeout
let job_result_reply_timeout: u64 = if msg.timeout_result > 0 {
msg.timeout_result as u64
} else {
cfg.transport_poll_timeout_secs
};
tokio::spawn(async move {
let start = std::time::Instant::now();
@@ -253,124 +247,35 @@ async fn deliver_one(
// Stop on terminal states
if matches!(s, TransportStatus::Delivered | TransportStatus::Read) {
// On Read, fetch supervisor job.status and update local job/message if terminal
if matches!(s, TransportStatus::Read)
&& let Some(job_id) = job_id_opt
{
// On Read, request supervisor job.status asynchronously; inbound listener will handle replies
// if matches!(s, TransportStatus::Read)
// && let Some(job_id) = job_id_opt
if let Some(job_id) = job_id_opt {
let sup = SupervisorClient::new_with_client(
client.clone(),
sup_dest.clone(),
sup_topic.clone(),
secret_for_poller.clone(),
);
match sup.job_status_sync(job_id.to_string(), 10).await {
Ok(remote_status) => {
if let Some((mapped, terminal)) =
map_supervisor_job_status(&remote_status)
{
if terminal {
let _ = service_poll
.update_job_status_unchecked(
context_id,
caller_id,
job_id,
mapped.clone(),
)
.await;
// After terminal status, fetch supervisor job.result and store into Job.result
let sup = SupervisorClient::new_with_client(
client.clone(),
sup_dest.clone(),
sup_topic.clone(),
secret_for_poller.clone(),
);
match sup
.job_result_sync(
job_id.to_string(),
job_result_reply_timeout,
)
.await
{
Ok(result_map) => {
// Persist the result into the Job.result map (merge)
let _ = service_poll
.update_job_result_merge_unchecked(
context_id,
caller_id,
job_id,
result_map.clone(),
)
.await;
// Log which key was stored (success or error)
let key = result_map
.keys()
.next()
.cloned()
.unwrap_or_else(|| {
"unknown".to_string()
});
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Stored supervisor job.result for job {} ({})",
job_id, key
)],
)
.await;
}
Err(e) => {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"job.result fetch error for job {}: {}",
job_id, e
)],
)
.await;
}
}
// Mark message as processed
let _ = service_poll
.update_message_status(
context_id,
caller_id,
id,
MessageStatus::Processed,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Supervisor job.status for job {} -> {} (mapped to {:?})",
job_id, remote_status, mapped
)],
)
.await;
}
} else {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Unknown supervisor status '{}' for job {}",
remote_status, job_id
)],
)
.await;
}
match sup.job_status_with_ids(job_id.to_string()).await {
Ok((_out_id, inner_id)) => {
// Correlate this status request to the message/job
let _ = service_poll
.supcorr_set(
inner_id, context_id, caller_id, job_id, id,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Requested supervisor job.status for job {}",
job_id
)],
)
.await;
}
Err(e) => {
let _ = service_poll
@@ -378,13 +283,13 @@ async fn deliver_one(
context_id,
caller_id,
id,
vec![format!("job.status sync error: {}", e)],
vec![format!("job.status request error: {}", e)],
)
.await;
}
}
}
break;
// break;
}
if matches!(s, TransportStatus::Failed) {
let _ = service_poll
@@ -512,7 +417,7 @@ pub fn start_inbound_listener(
// Initialize Mycelium client (retry loop)
let mycelium = loop {
match MyceliumClient::new(cfg.base_url.clone()) {
Ok(c) => break c,
Ok(c) => break Arc::new(c),
Err(e) => {
error!(error=%e, "MyceliumClient init error (inbound listener)");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
@@ -566,16 +471,25 @@ pub fn start_inbound_listener(
match service.supcorr_get(inner_id).await {
Ok(Some((context_id, caller_id, job_id, message_id))) => {
// Determine success/error from supervisor JSON-RPC envelope
let is_success = rpc
.get("result")
.map(|res| {
res.get("job_queued").is_some()
|| res.as_str().map(|s| s == "job_queued").unwrap_or(false)
// Inspect result/error to route job.run/job.status/job.result replies
let result_opt = rpc.get("result");
let error_opt = rpc.get("error");
// Handle job.run success (job_queued)
let is_job_queued = result_opt
.and_then(|res| {
if res.get("job_queued").is_some() {
Some(true)
} else if let Some(s) = res.as_str() {
Some(s == "job_queued")
} else {
None
}
})
.unwrap_or(false);
if is_success {
// Set to Dispatched (idempotent) per spec choice, and append log
if is_job_queued {
// Set to Dispatched (idempotent) per spec, and append log
let _ = service
.update_job_status_unchecked(
context_id,
@@ -596,8 +510,11 @@ pub fn start_inbound_listener(
)
.await;
let _ = service.supcorr_del(inner_id).await;
} else if let Some(err_obj) = rpc.get("error") {
// Error path: set job Error and log details
continue;
}
// Error envelope: set job Error and log
if let Some(err_obj) = error_opt {
let _ = service
.update_job_status_unchecked(
context_id,
@@ -618,20 +535,271 @@ pub fn start_inbound_listener(
)
.await;
let _ = service.supcorr_del(inner_id).await;
} else {
// Unknown result; keep correlation for a later, clearer reply
let _ = service
.append_message_logs(
context_id,
caller_id,
message_id,
vec![
"Supervisor reply did not contain job_queued or error"
.to_string(),
],
)
.await;
continue;
}
// If we have a result, try to interpret it as job.status or job.result
if let Some(res) = result_opt {
// Try job.status: object {status: "..."} or bare string
let status_candidate = res
.get("status")
.and_then(|v| v.as_str())
.or_else(|| res.as_str());
if let Some(remote_status) = status_candidate {
if let Some((mapped, terminal)) =
map_supervisor_job_status(remote_status)
{
// Update job status and log
let _ = service
.update_job_status_unchecked(
context_id,
caller_id,
job_id,
mapped.clone(),
)
.await;
let _ = service
.append_message_logs(
context_id,
caller_id,
message_id,
vec![format!(
"Supervisor job.status for job {} -> {} (mapped to {:?})",
job_id, remote_status, mapped
)],
)
.await;
// Done with this correlation id
let _ = service.supcorr_del(inner_id).await;
// If terminal, request job.result asynchronously now
if terminal {
// Load job to determine script_type for runner selection
match service
.load_job(context_id, caller_id, job_id)
.await
{
Ok(job) => {
match service.scan_runners(context_id).await {
Ok(runners) => {
if let Some(runner) =
runners.into_iter().find(|r| {
r.script_type == job.script_type
})
{
let dest = if !runner
.pubkey
.trim()
.is_empty()
{
Destination::Pk(
runner.pubkey.clone(),
)
} else {
Destination::Ip(runner.address)
};
let sup = SupervisorClient::new_with_client(
mycelium.clone(),
dest,
cfg.topic.clone(),
runner.secret.clone(),
);
match sup
.job_result_with_ids(
job_id.to_string(),
)
.await
{
Ok((_out2, inner2)) => {
let _ = service
.supcorr_set(
inner2, context_id,
caller_id, job_id,
message_id,
)
.await;
let _ = service
.append_message_logs(
context_id,
caller_id,
message_id,
vec![format!(
"Requested supervisor job.result for job {}",
job_id
)],
)
.await;
}
Err(e) => {
let _ = service
.append_message_logs(
context_id,
caller_id,
message_id,
vec![format!(
"job.result request error for job {}: {}",
job_id, e
)],
)
.await;
}
}
} else {
let _ = service
.append_message_logs(
context_id,
caller_id,
message_id,
vec![format!(
"No runner with matching script_type found to request job.result for job {}",
job_id
)],
)
.await;
}
}
Err(e) => {
let _ = service
.append_message_logs(
context_id,
caller_id,
message_id,
vec![format!(
"scan_runners error while requesting job.result for job {}: {}",
job_id, e
)],
)
.await;
}
}
}
Err(e) => {
let _ = service
.append_message_logs(
context_id,
caller_id,
message_id,
vec![format!(
"load_job error while requesting job.result for job {}: {}",
job_id, e
)],
)
.await;
}
}
}
continue;
}
}
// Try job.result: object with success/error or bare string treated as success
if let Some(obj) = res.as_object() {
if let Some(s) = obj.get("success").and_then(|v| v.as_str()) {
let mut patch = std::collections::HashMap::new();
patch.insert("success".to_string(), s.to_string());
let _ = service
.update_job_result_merge_unchecked(
context_id, caller_id, job_id, patch,
)
.await;
let _ = service
.update_message_status(
context_id,
caller_id,
message_id,
MessageStatus::Processed,
)
.await;
let _ = service
.append_message_logs(
context_id,
caller_id,
message_id,
vec![format!(
"Stored supervisor job.result for job {} (success)",
job_id
)],
)
.await;
let _ = service.supcorr_del(inner_id).await;
continue;
}
if let Some(s) = obj.get("error").and_then(|v| v.as_str()) {
let mut patch = std::collections::HashMap::new();
patch.insert("error".to_string(), s.to_string());
let _ = service
.update_job_result_merge_unchecked(
context_id, caller_id, job_id, patch,
)
.await;
let _ = service
.update_message_status(
context_id,
caller_id,
message_id,
MessageStatus::Processed,
)
.await;
let _ = service
.append_message_logs(
context_id,
caller_id,
message_id,
vec![format!(
"Stored supervisor job.result for job {} (error)",
job_id
)],
)
.await;
let _ = service.supcorr_del(inner_id).await;
continue;
}
} else if let Some(s) = res.as_str() {
// Bare string => treat as success
let mut patch = std::collections::HashMap::new();
patch.insert("success".to_string(), s.to_string());
let _ = service
.update_job_result_merge_unchecked(
context_id, caller_id, job_id, patch,
)
.await;
let _ = service
.update_message_status(
context_id,
caller_id,
message_id,
MessageStatus::Processed,
)
.await;
let _ = service
.append_message_logs(
context_id,
caller_id,
message_id,
vec![format!(
"Stored supervisor job.result for job {} (success)",
job_id
)],
)
.await;
let _ = service.supcorr_del(inner_id).await;
continue;
}
}
// Unknown/unsupported supervisor reply; keep correlation for later
let _ = service
.append_message_logs(
context_id,
caller_id,
message_id,
vec![
"Supervisor reply did not contain recognizable job.run/status/result fields"
.to_string(),
],
)
.await;
}
Ok(None) => {
// No correlation found; ignore or log once