diff --git a/src/service.rs b/src/service.rs index b330323..c2bfb28 100644 --- a/src/service.rs +++ b/src/service.rs @@ -566,152 +566,6 @@ impl AppService { /// - When all jobs are Finished sets Flow to Finished; if any job is Error sets Flow to Error. /// Returns: /// - Ok(true) if a scheduler was started - /// - Ok(false) if a scheduler was already running for this (context_id, flow_id) - pub async fn flow_start(&self, context_id: u32, flow_id: u32) -> Result { - // Ensure flow exists (and load caller_id) - let flow = self.redis.load_flow(context_id, flow_id).await?; - let caller_id = flow.caller_id(); - - // Try to register this flow in the active scheduler set - { - let mut guard = self.schedulers.lock().await; - if !guard.insert((context_id, flow_id)) { - // Already running - return Ok(false); - } - } - - // Clone resources for background task - let redis = self.redis.clone(); - let schedulers = self.schedulers.clone(); - - // Set Flow status to Started - let _ = redis - .update_flow_status(context_id, flow_id, FlowStatus::Started) - .await; - - tokio::spawn(async move { - // Background loop - loop { - // Load current flow; stop if missing - let flow = match redis.load_flow(context_id, flow_id).await { - Ok(f) => f, - Err(_) => break, - }; - - // Track aggregate state - let mut all_finished = true; - let mut any_error = false; - - // Iterate jobs declared in the flow - for jid in flow.jobs() { - // Load job - let job = match redis.load_job(context_id, caller_id, *jid).await { - Ok(j) => j, - Err(_) => { - // If job is missing treat as error state for the flow and stop - any_error = true; - all_finished = false; - break; - } - }; - - match job.status() { - JobStatus::Finished => { - // done - } - JobStatus::Error => { - any_error = true; - all_finished = false; - } - JobStatus::Dispatched | JobStatus::Started => { - all_finished = false; - } - JobStatus::WaitingForPrerequisites => { - all_finished = false; - - // Check dependencies complete - let mut deps_ok = true; - for dep in job.depends() { - match redis.load_job(context_id, caller_id, *dep).await { - Ok(dj) => { - if dj.status() != JobStatus::Finished { - deps_ok = false; - break; - } - } - Err(_) => { - deps_ok = false; - break; - } - } - } - - if deps_ok { - // Build Message embedding this job - let ts = crate::time::current_timestamp(); - let msg_id: u32 = job.id(); // deterministic message id per job for now - - let message = Message { - id: msg_id, - caller_id: job.caller_id(), - context_id, - message: "job.run".to_string(), - message_type: job.script_type(), - message_format_type: MessageFormatType::Text, - timeout: job.timeout, - timeout_ack: 10, - timeout_result: job.timeout, - job: vec![job.clone()], - logs: Vec::new(), - created_at: ts, - updated_at: ts, - status: MessageStatus::Dispatched, - }; - - // Persist the message and enqueue it - if redis.save_message(context_id, &message).await.is_ok() { - let _ = redis - .enqueue_msg_out(context_id, job.caller_id(), msg_id) - .await; - // Mark job as Dispatched - let _ = redis - .update_job_status( - context_id, - job.caller_id(), - job.id(), - JobStatus::Dispatched, - ) - .await; - } - } - } - } - } - - if any_error { - let _ = redis - .update_flow_status(context_id, flow_id, FlowStatus::Error) - .await; - break; - } - if all_finished { - let _ = redis - .update_flow_status(context_id, flow_id, FlowStatus::Finished) - .await; - break; - } - - sleep(Duration::from_secs(1)).await; - } - - // Remove from active schedulers set - let mut guard = schedulers.lock().await; - guard.remove(&(context_id, flow_id)); - }); - - Ok(true) - } /// Execute a flow: compute DAG, create Message entries for ready jobs, and enqueue their keys to msg_out. /// Returns the list of enqueued message keys ("message:{caller_id}:{id}") in deterministic order (by job id).