diff --git a/src/dag.rs b/src/dag.rs index 82e568d..82745b5 100644 --- a/src/dag.rs +++ b/src/dag.rs @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::fmt; use crate::{ - models::{Flow, Job, ScriptType, JobStatus}, + models::{Flow, Job, JobStatus, ScriptType}, storage::RedisDriver, }; diff --git a/src/router.rs b/src/router.rs index ca5edb4..645a4ba 100644 --- a/src/router.rs +++ b/src/router.rs @@ -394,7 +394,11 @@ async fn deliver_one( // if currently Dispatched or WaitingForPrerequisites. // This makes DAG.started reflect "in-flight" work even when jobs // complete too quickly to observe an intermediate supervisor "running" status. - if matches!(job.status(), JobStatus::Dispatched | JobStatus::WaitingForPrerequisites) { + if matches!( + job.status(), + JobStatus::Dispatched + | JobStatus::WaitingForPrerequisites + ) { let _ = service_poll .update_job_status_unchecked( context_id, @@ -419,7 +423,7 @@ async fn deliver_one( )], ) .await; - + // If result is still empty, immediately request supervisor job.result if job.result.is_empty() { let sup = cache @@ -566,10 +570,13 @@ async fn deliver_one( .await; // Also mark job as Finished so the flow can progress (ignore invalid transitions) let _ = service_poll - .update_job_status_unchecked( - context_id, caller_id, job_id, JobStatus::Finished, - ) - .await; + .update_job_status_unchecked( + context_id, + caller_id, + job_id, + JobStatus::Finished, + ) + .await; let _ = service_poll .append_message_logs( context_id, @@ -631,16 +638,16 @@ async fn deliver_one( ) .await; } - + // Mark processed and stop polling for this message let _ = service_poll - .update_message_status( - context_id, - caller_id, - id, - MessageStatus::Processed, - ) - .await; + .update_message_status( + context_id, + caller_id, + id, + MessageStatus::Processed, + ) + .await; let _ = service_poll .append_message_logs( context_id, diff --git a/src/service.rs b/src/service.rs index 8403a5a..c2043a8 100644 --- a/src/service.rs +++ b/src/service.rs @@ -672,10 +672,16 @@ impl AppService { let allowed = match current { JobStatus::Dispatched => matches!( new_status, - JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Finished | JobStatus::Error + JobStatus::WaitingForPrerequisites + | JobStatus::Started + | JobStatus::Finished + | JobStatus::Error ), JobStatus::WaitingForPrerequisites => { - matches!(new_status, JobStatus::Started | JobStatus::Finished | JobStatus::Error) + matches!( + new_status, + JobStatus::Started | JobStatus::Finished | JobStatus::Error + ) } JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error), JobStatus::Finished | JobStatus::Error => false, @@ -714,10 +720,16 @@ impl AppService { let allowed = match current { JobStatus::Dispatched => matches!( new_status, - JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Finished | JobStatus::Error + JobStatus::WaitingForPrerequisites + | JobStatus::Started + | JobStatus::Finished + | JobStatus::Error ), JobStatus::WaitingForPrerequisites => { - matches!(new_status, JobStatus::Started | JobStatus::Finished | JobStatus::Error) + matches!( + new_status, + JobStatus::Started | JobStatus::Finished | JobStatus::Error + ) } JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error), JobStatus::Finished | JobStatus::Error => false,