diff --git a/src/dag.rs b/src/dag.rs index 5a2b74c..82e568d 100644 --- a/src/dag.rs +++ b/src/dag.rs @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::fmt; use crate::{ - models::{Flow, Job, ScriptType}, + models::{Flow, Job, ScriptType, JobStatus}, storage::RedisDriver, }; @@ -212,6 +212,41 @@ pub async fn build_flow_dag( edges.sort_unstable(); reverse_edges.sort_unstable(); + // Populate runtime execution state from persisted Job.status() + let mut started_set: HashSet = HashSet::new(); + let mut completed_set: HashSet = HashSet::new(); + let mut error_ids: Vec = Vec::new(); + + for (&jid, job) in &jobs { + match job.status() { + JobStatus::Finished => { + completed_set.insert(jid); + } + JobStatus::Started => { + started_set.insert(jid); + } + JobStatus::Dispatched => { + // Consider Dispatched as "in-flight" for DAG runtime started set, + // so queued/running work is visible in periodic snapshots. + started_set.insert(jid); + } + JobStatus::Error => { + error_ids.push(jid); + } + JobStatus::WaitingForPrerequisites => { + // Neither started nor completed + } + } + } + + // Choose a deterministic failed job if any errors exist (smallest job id) + let failed_job = if error_ids.is_empty() { + None + } else { + error_ids.sort_unstable(); + Some(error_ids[0]) + }; + let dag = FlowDag { flow_id, caller_id, @@ -222,9 +257,9 @@ pub async fn build_flow_dag( roots, leaves, levels, - started: HashSet::new(), - completed: HashSet::new(), - failed_job: None, + started: started_set, + completed: completed_set, + failed_job, }; Ok(dag) diff --git a/src/router.rs b/src/router.rs index 6145ae8..ca5edb4 100644 --- a/src/router.rs +++ b/src/router.rs @@ -390,6 +390,20 @@ async fn deliver_one( // First consult Redis for the latest job state in case we already have a terminal update match service_poll.load_job(context_id, caller_id, job_id).await { Ok(job) => { + // Promote to Started as soon as transport is delivered/read, + // if currently Dispatched or WaitingForPrerequisites. + // This makes DAG.started reflect "in-flight" work even when jobs + // complete too quickly to observe an intermediate supervisor "running" status. + if matches!(job.status(), JobStatus::Dispatched | JobStatus::WaitingForPrerequisites) { + let _ = service_poll + .update_job_status_unchecked( + context_id, + caller_id, + job_id, + JobStatus::Started, + ) + .await; + } match job.status() { JobStatus::Finished | JobStatus::Error => { // Local job is already terminal; skip supervisor job.status