Compare commits

..

7 Commits

Author SHA1 Message Date
Lee Smet
688c42493e Add info on how to run demo script
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-09 11:10:46 +02:00
Lee Smet
a75fb9c55d Format code
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-08 14:25:35 +02:00
Lee Smet
5ed9739d68 Properly update DAG view with started/finished jobs
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-08 14:12:40 +02:00
Lee Smet
3cd1a55768 Fix job status transitions
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-08 13:37:42 +02:00
Lee Smet
c860553acd Stop polling when a job reached terminal status
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-08 12:07:26 +02:00
Lee Smet
78a776877a Fetch the result of a job more than once if needed
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-08 11:54:15 +02:00
Lee Smet
8cea17f4ec Increase supervisor hub popmessage timeout
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-08 11:43:44 +02:00
5 changed files with 371 additions and 188 deletions

View File

@@ -1,2 +1,28 @@
# herocoordinator # herocoordinator
## Demo setup
A python script is provided in the [scripts directory](./scripts/supervisor_flow_demo.py). This script
generates some demo jobs to be run by [a supervisor](https://git.ourworld.tf/herocode/supervisor).
Communication happens over [mycelium](https://github.com/threefoldtech/mycelium). To run the demo a
supervisor must be running, which uses a mycelium instance to read and write messages. A __different__
mycelium instance needs to run for the coordinator (the supervisor can run on a different host than
the coordinator, so long as the 2 mycelium instances used can reach eachother).
An example of a local setup:
```bash
# Run a redis docker
docker run -it -d -p 6379:6379 --name redis redis
# Spawn mycelium node 1 with default settings. This also creates a TUN interface though that is not
# necessary for the messages
mycelium
# Spawn mycelium node 2, connect to the first node
mycelium --key-file key.bin --peers tcp://127.0.0.1:9651 --disable-quic --disable-peer-discovery --api-addr 127.0.0.1:9989 --jsonrpc-addr 127.0.0.1:9990 --no-tun -t 8651
# Start the supervisor
supervisor --admin-secret admin123 --user-secret user123 --register-secret register123 --mycelium-url http://127.0.0.1:9990 --topic supervisor.rpc
# Start the coordinator
cargo run # (alternatively if a compiled binary is present that can be run)
# Finally, invoke the demo script
python3 scripts/supervisor_flow_demo.py
```

View File

@@ -48,7 +48,7 @@ impl SupervisorHub {
fn spawn_pop_loop(hub: Arc<Self>) { fn spawn_pop_loop(hub: Arc<Self>) {
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
match hub.mycelium.pop_message(Some(false), Some(2), None).await { match hub.mycelium.pop_message(Some(false), Some(20), None).await {
Ok(Some(inb)) => { Ok(Some(inb)) => {
// Extract and decode payload // Extract and decode payload
let Some(payload_b64) = inb.get("payload").and_then(|v| v.as_str()) else { let Some(payload_b64) = inb.get("payload").and_then(|v| v.as_str()) else {

View File

@@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt; use std::fmt;
use crate::{ use crate::{
models::{Flow, Job, ScriptType}, models::{Flow, Job, JobStatus, ScriptType},
storage::RedisDriver, storage::RedisDriver,
}; };
@@ -212,6 +212,41 @@ pub async fn build_flow_dag(
edges.sort_unstable(); edges.sort_unstable();
reverse_edges.sort_unstable(); reverse_edges.sort_unstable();
// Populate runtime execution state from persisted Job.status()
let mut started_set: HashSet<u32> = HashSet::new();
let mut completed_set: HashSet<u32> = HashSet::new();
let mut error_ids: Vec<u32> = Vec::new();
for (&jid, job) in &jobs {
match job.status() {
JobStatus::Finished => {
completed_set.insert(jid);
}
JobStatus::Started => {
started_set.insert(jid);
}
JobStatus::Dispatched => {
// Consider Dispatched as "in-flight" for DAG runtime started set,
// so queued/running work is visible in periodic snapshots.
started_set.insert(jid);
}
JobStatus::Error => {
error_ids.push(jid);
}
JobStatus::WaitingForPrerequisites => {
// Neither started nor completed
}
}
}
// Choose a deterministic failed job if any errors exist (smallest job id)
let failed_job = if error_ids.is_empty() {
None
} else {
error_ids.sort_unstable();
Some(error_ids[0])
};
let dag = FlowDag { let dag = FlowDag {
flow_id, flow_id,
caller_id, caller_id,
@@ -222,9 +257,9 @@ pub async fn build_flow_dag(
roots, roots,
leaves, leaves,
levels, levels,
started: HashSet::new(), started: started_set,
completed: HashSet::new(), completed: completed_set,
failed_job: None, failed_job,
}; };
Ok(dag) Ok(dag)

View File

@@ -386,17 +386,32 @@ async fn deliver_one(
// Stop on terminal states // Stop on terminal states
if matches!(s, TransportStatus::Delivered | TransportStatus::Read) { if matches!(s, TransportStatus::Delivered | TransportStatus::Read) {
// Only request a single job status/result per message if let Some(job_id) = job_id_opt {
if !requested_job_check { // First consult Redis for the latest job state in case we already have a terminal update
if let Some(job_id) = job_id_opt { match service_poll.load_job(context_id, caller_id, job_id).await {
// First consult Redis for the latest job state in case we already have a terminal update Ok(job) => {
match service_poll.load_job(context_id, caller_id, job_id).await // Promote to Started as soon as transport is delivered/read,
{ // if currently Dispatched or WaitingForPrerequisites.
Ok(job) => { // This makes DAG.started reflect "in-flight" work even when jobs
match job.status() { // complete too quickly to observe an intermediate supervisor "running" status.
JobStatus::Finished | JobStatus::Error => { if matches!(
// Local job is already terminal; skip supervisor job.status job.status(),
let _ = service_poll JobStatus::Dispatched
| JobStatus::WaitingForPrerequisites
) {
let _ = service_poll
.update_job_status_unchecked(
context_id,
caller_id,
job_id,
JobStatus::Started,
)
.await;
}
match job.status() {
JobStatus::Finished | JobStatus::Error => {
// Local job is already terminal; skip supervisor job.status
let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -409,41 +424,41 @@ async fn deliver_one(
) )
.await; .await;
// If result is still empty, immediately request supervisor job.result // If result is still empty, immediately request supervisor job.result
if job.result.is_empty() { if job.result.is_empty() {
let sup = cache let sup = cache
.get_or_create( .get_or_create(
sup_hub.clone(), sup_hub.clone(),
sup_dest.clone(), sup_dest.clone(),
sup_topic.clone(), sup_topic.clone(),
secret_for_poller.clone(), secret_for_poller.clone(),
) )
.await; .await;
match sup match sup
.job_result_wait(job_id.to_string()) .job_result_wait(job_id.to_string())
.await .await
{ {
Ok((_out2, reply2)) => { Ok((_out2, reply2)) => {
// Interpret reply synchronously: success/error/bare string // Interpret reply synchronously: success/error/bare string
let res = reply2.get("result"); let res = reply2.get("result");
if let Some(obj) = if let Some(obj) =
res.and_then(|v| v.as_object()) res.and_then(|v| v.as_object())
{
if let Some(s) = obj
.get("success")
.and_then(|v| v.as_str())
{ {
if let Some(s) = obj let mut patch = std::collections::HashMap::new();
.get("success") patch.insert(
.and_then(|v| v.as_str()) "success".to_string(),
{ s.to_string(),
let mut patch = std::collections::HashMap::new(); );
patch.insert( let _ = service_poll
"success".to_string(),
s.to_string(),
);
let _ = service_poll
.update_job_result_merge_unchecked( .update_job_result_merge_unchecked(
context_id, caller_id, job_id, patch, context_id, caller_id, job_id, patch,
) )
.await; .await;
let _ = service_poll let _ = service_poll
.update_message_status( .update_message_status(
context_id, context_id,
caller_id, caller_id,
@@ -451,7 +466,24 @@ async fn deliver_one(
MessageStatus::Processed, MessageStatus::Processed,
) )
.await; .await;
let _ = service_poll // Also mark job as Finished so the flow can progress (ignore invalid transitions)
let _ = service_poll
.update_job_status_unchecked(
context_id, caller_id, job_id, JobStatus::Finished,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Updated job {} status to Finished (sync)", job_id
)],
)
.await;
// Existing log about storing result
let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -462,21 +494,21 @@ async fn deliver_one(
)], )],
) )
.await; .await;
} else if let Some(s) = obj } else if let Some(s) = obj
.get("error") .get("error")
.and_then(|v| v.as_str()) .and_then(|v| v.as_str())
{ {
let mut patch = std::collections::HashMap::new(); let mut patch = std::collections::HashMap::new();
patch.insert( patch.insert(
"error".to_string(), "error".to_string(),
s.to_string(), s.to_string(),
); );
let _ = service_poll let _ = service_poll
.update_job_result_merge_unchecked( .update_job_result_merge_unchecked(
context_id, caller_id, job_id, patch, context_id, caller_id, job_id, patch,
) )
.await; .await;
let _ = service_poll let _ = service_poll
.update_message_status( .update_message_status(
context_id, context_id,
caller_id, caller_id,
@@ -484,7 +516,24 @@ async fn deliver_one(
MessageStatus::Processed, MessageStatus::Processed,
) )
.await; .await;
let _ = service_poll // Also mark job as Error so the flow can handle failure (ignore invalid transitions)
let _ = service_poll
.update_job_status_unchecked(
context_id, caller_id, job_id, JobStatus::Error,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Updated job {} status to Error (sync)", job_id
)],
)
.await;
// Existing log about storing result
let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -495,29 +544,51 @@ async fn deliver_one(
)], )],
) )
.await; .await;
} }
} else if let Some(s) = } else if let Some(s) =
res.and_then(|v| v.as_str()) res.and_then(|v| v.as_str())
{ {
let mut patch = std::collections::HashMap::new(); let mut patch =
patch.insert( std::collections::HashMap::new(
"success".to_string(),
s.to_string(),
); );
let _ = service_poll patch.insert(
"success".to_string(),
s.to_string(),
);
let _ = service_poll
.update_job_result_merge_unchecked( .update_job_result_merge_unchecked(
context_id, caller_id, job_id, patch, context_id, caller_id, job_id, patch,
) )
.await; .await;
let _ = service_poll let _ = service_poll
.update_message_status( .update_message_status(
context_id,
caller_id,
id,
MessageStatus::Processed,
)
.await;
// Also mark job as Finished so the flow can progress (ignore invalid transitions)
let _ = service_poll
.update_job_status_unchecked(
context_id,
caller_id,
job_id,
JobStatus::Finished,
)
.await;
let _ = service_poll
.append_message_logs(
context_id, context_id,
caller_id, caller_id,
id, id,
MessageStatus::Processed, vec![format!(
"Updated job {} status to Finished (sync)", job_id
)],
) )
.await; .await;
let _ = service_poll // Existing log about storing result
let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -528,8 +599,8 @@ async fn deliver_one(
)], )],
) )
.await; .await;
} else { } else {
let _ = service_poll let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -537,10 +608,10 @@ async fn deliver_one(
vec!["Supervisor job.result reply missing recognizable fields".to_string()], vec!["Supervisor job.result reply missing recognizable fields".to_string()],
) )
.await; .await;
}
} }
Err(e) => { }
let _ = service_poll Err(e) => {
let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -551,11 +622,11 @@ async fn deliver_one(
)], )],
) )
.await; .await;
}
} }
} else { }
// Result already present; nothing to fetch } else {
let _ = service_poll // Result already present; nothing to fetch
let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -566,38 +637,56 @@ async fn deliver_one(
)], )],
) )
.await; .await;
}
} }
// Not terminal yet -> request supervisor job.status as before
_ => { // Mark processed and stop polling for this message
let sup = cache let _ = service_poll
.get_or_create( .update_message_status(
sup_hub.clone(), context_id,
sup_dest.clone(), caller_id,
sup_topic.clone(), id,
secret_for_poller.clone(), MessageStatus::Processed,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Terminal job {} detected; stopping transport polling",
job_id
)],
) )
.await; .await;
match sup break;
.job_status_wait(job_id.to_string()) }
.await // Not terminal yet -> request supervisor job.status as before
{ _ => {
Ok((_out_id, reply_status)) => { let sup = cache
// Interpret status reply synchronously .get_or_create(
let result_opt = sup_hub.clone(),
reply_status.get("result"); sup_dest.clone(),
let error_opt = sup_topic.clone(),
reply_status.get("error"); secret_for_poller.clone(),
if let Some(err_obj) = error_opt { )
let _ = service_poll .await;
.update_job_status_unchecked( match sup.job_status_wait(job_id.to_string()).await
context_id, {
caller_id, Ok((_out_id, reply_status)) => {
job_id, // Interpret status reply synchronously
JobStatus::Error, let result_opt = reply_status.get("result");
) let error_opt = reply_status.get("error");
.await; if let Some(err_obj) = error_opt {
let _ = service_poll let _ = service_poll
.update_job_status_unchecked(
context_id,
caller_id,
job_id,
JobStatus::Error,
)
.await;
let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, caller_id, id, context_id, caller_id, id,
vec![format!( vec![format!(
@@ -606,28 +695,25 @@ async fn deliver_one(
)], )],
) )
.await; .await;
} else if let Some(res) = result_opt { } else if let Some(res) = result_opt {
let status_candidate = res let status_candidate = res
.get("status") .get("status")
.and_then(|v| v.as_str()) .and_then(|v| v.as_str())
.or_else(|| res.as_str()); .or_else(|| res.as_str());
if let Some(remote_status) = if let Some(remote_status) =
status_candidate status_candidate
{
if let Some((mapped, terminal)) =
map_supervisor_job_status(
remote_status,
)
{ {
if let Some(( let _ = service_poll
mapped,
terminal,
)) =
map_supervisor_job_status(
remote_status,
)
{
let _ = service_poll
.update_job_status_unchecked( .update_job_status_unchecked(
context_id, caller_id, job_id, mapped.clone(), context_id, caller_id, job_id, mapped.clone(),
) )
.await; .await;
let _ = service_poll let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, caller_id, id, context_id, caller_id, id,
vec![format!( vec![format!(
@@ -637,23 +723,23 @@ async fn deliver_one(
) )
.await; .await;
// If terminal, request job.result now (handled above for local terminal case) // If terminal, request job.result now (handled above for local terminal case)
if terminal { if terminal {
// trigger job.result only if result empty to avoid spam // trigger job.result only if result empty to avoid spam
if let Ok(j_after) = if let Ok(j_after) =
service_poll service_poll
.load_job( .load_job(
context_id, context_id,
caller_id, caller_id,
job_id, job_id,
) )
.await .await
{
if j_after
.result
.is_empty()
{ {
if j_after let sup2 = cache
.result
.is_empty()
{
let sup2 = cache
.get_or_create( .get_or_create(
sup_hub.clone(), sup_hub.clone(),
sup_dest.clone(), sup_dest.clone(),
@@ -661,7 +747,7 @@ async fn deliver_one(
secret_for_poller.clone(), secret_for_poller.clone(),
) )
.await; .await;
let _ = sup2.job_result_wait(job_id.to_string()).await let _ = sup2.job_result_wait(job_id.to_string()).await
.and_then(|(_oid, reply2)| { .and_then(|(_oid, reply2)| {
// Minimal parse and store // Minimal parse and store
let res2 = reply2.get("result"); let res2 = reply2.get("result");
@@ -679,40 +765,65 @@ async fn deliver_one(
} }
Ok((String::new(), Value::Null)) Ok((String::new(), Value::Null))
}); });
}
} }
} }
// Mark processed and stop polling for this message
let _ = service_poll
.update_message_status(
context_id,
caller_id,
id,
MessageStatus::Processed,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Terminal job {} detected from supervisor status; stopping transport polling",
job_id
)],
)
.await;
break;
} }
} }
} }
} }
Err(e) => { }
let _ = service_poll Err(e) => {
.append_message_logs( let _ = service_poll
context_id, .append_message_logs(
caller_id, context_id,
id, caller_id,
vec![format!("job.status request error: {}", e)], id,
) vec![format!(
.await; "job.status request error: {}",
} e
)],
)
.await;
} }
} }
} }
} }
// If we cannot load the job, fall back to requesting job.status }
Err(_) => { // If we cannot load the job, fall back to requesting job.status
let sup = cache Err(_) => {
.get_or_create( let sup = cache
sup_hub.clone(), .get_or_create(
sup_dest.clone(), sup_hub.clone(),
sup_topic.clone(), sup_dest.clone(),
secret_for_poller.clone(), sup_topic.clone(),
) secret_for_poller.clone(),
.await; )
match sup.job_status_wait(job_id.to_string()).await { .await;
Ok((_out_id, _reply_status)) => { match sup.job_status_wait(job_id.to_string()).await {
let _ = service_poll Ok((_out_id, _reply_status)) => {
let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
@@ -723,26 +834,25 @@ async fn deliver_one(
)], )],
) )
.await; .await;
} }
Err(e) => { Err(e) => {
let _ = service_poll let _ = service_poll
.append_message_logs( .append_message_logs(
context_id, context_id,
caller_id, caller_id,
id, id,
vec![format!( vec![format!(
"job.status request error: {}", "job.status request error: {}",
e e
)], )],
) )
.await; .await;
}
} }
} }
} }
// Ensure we only do this once
requested_job_check = true;
} }
// Ensure we only do this once
requested_job_check = true;
} }
// break; // break;
} }

View File

@@ -672,10 +672,16 @@ impl AppService {
let allowed = match current { let allowed = match current {
JobStatus::Dispatched => matches!( JobStatus::Dispatched => matches!(
new_status, new_status,
JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error JobStatus::WaitingForPrerequisites
| JobStatus::Started
| JobStatus::Finished
| JobStatus::Error
), ),
JobStatus::WaitingForPrerequisites => { JobStatus::WaitingForPrerequisites => {
matches!(new_status, JobStatus::Started | JobStatus::Error) matches!(
new_status,
JobStatus::Started | JobStatus::Finished | JobStatus::Error
)
} }
JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error), JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error),
JobStatus::Finished | JobStatus::Error => false, JobStatus::Finished | JobStatus::Error => false,
@@ -714,10 +720,16 @@ impl AppService {
let allowed = match current { let allowed = match current {
JobStatus::Dispatched => matches!( JobStatus::Dispatched => matches!(
new_status, new_status,
JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error JobStatus::WaitingForPrerequisites
| JobStatus::Started
| JobStatus::Finished
| JobStatus::Error
), ),
JobStatus::WaitingForPrerequisites => { JobStatus::WaitingForPrerequisites => {
matches!(new_status, JobStatus::Started | JobStatus::Error) matches!(
new_status,
JobStatus::Started | JobStatus::Finished | JobStatus::Error
)
} }
JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error), JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error),
JobStatus::Finished | JobStatus::Error => false, JobStatus::Finished | JobStatus::Error => false,