Properly update DAG view with started/finished jobs

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
Lee Smet
2025-09-08 14:12:40 +02:00
parent 3cd1a55768
commit 5ed9739d68
2 changed files with 53 additions and 4 deletions

View File

@@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt;
use crate::{
models::{Flow, Job, ScriptType},
models::{Flow, Job, ScriptType, JobStatus},
storage::RedisDriver,
};
@@ -212,6 +212,41 @@ pub async fn build_flow_dag(
edges.sort_unstable();
reverse_edges.sort_unstable();
// Populate runtime execution state from persisted Job.status()
let mut started_set: HashSet<u32> = HashSet::new();
let mut completed_set: HashSet<u32> = HashSet::new();
let mut error_ids: Vec<u32> = Vec::new();
for (&jid, job) in &jobs {
match job.status() {
JobStatus::Finished => {
completed_set.insert(jid);
}
JobStatus::Started => {
started_set.insert(jid);
}
JobStatus::Dispatched => {
// Consider Dispatched as "in-flight" for DAG runtime started set,
// so queued/running work is visible in periodic snapshots.
started_set.insert(jid);
}
JobStatus::Error => {
error_ids.push(jid);
}
JobStatus::WaitingForPrerequisites => {
// Neither started nor completed
}
}
}
// Choose a deterministic failed job if any errors exist (smallest job id)
let failed_job = if error_ids.is_empty() {
None
} else {
error_ids.sort_unstable();
Some(error_ids[0])
};
let dag = FlowDag {
flow_id,
caller_id,
@@ -222,9 +257,9 @@ pub async fn build_flow_dag(
roots,
leaves,
levels,
started: HashSet::new(),
completed: HashSet::new(),
failed_job: None,
started: started_set,
completed: completed_set,
failed_job,
};
Ok(dag)

View File

@@ -390,6 +390,20 @@ async fn deliver_one(
// First consult Redis for the latest job state in case we already have a terminal update
match service_poll.load_job(context_id, caller_id, job_id).await {
Ok(job) => {
// Promote to Started as soon as transport is delivered/read,
// if currently Dispatched or WaitingForPrerequisites.
// This makes DAG.started reflect "in-flight" work even when jobs
// complete too quickly to observe an intermediate supervisor "running" status.
if matches!(job.status(), JobStatus::Dispatched | JobStatus::WaitingForPrerequisites) {
let _ = service_poll
.update_job_status_unchecked(
context_id,
caller_id,
job_id,
JobStatus::Started,
)
.await;
}
match job.status() {
JobStatus::Finished | JobStatus::Error => {
// Local job is already terminal; skip supervisor job.status