Compare commits
7 Commits
66c89d2485
...
main
Author | SHA1 | Date | |
---|---|---|---|
|
688c42493e
|
||
|
a75fb9c55d
|
||
|
5ed9739d68
|
||
|
3cd1a55768
|
||
|
c860553acd
|
||
|
78a776877a
|
||
|
8cea17f4ec
|
26
README.md
26
README.md
@@ -1,2 +1,28 @@
|
|||||||
# herocoordinator
|
# herocoordinator
|
||||||
|
|
||||||
|
## Demo setup
|
||||||
|
|
||||||
|
A python script is provided in the [scripts directory](./scripts/supervisor_flow_demo.py). This script
|
||||||
|
generates some demo jobs to be run by [a supervisor](https://git.ourworld.tf/herocode/supervisor).
|
||||||
|
Communication happens over [mycelium](https://github.com/threefoldtech/mycelium). To run the demo a
|
||||||
|
supervisor must be running, which uses a mycelium instance to read and write messages. A __different__
|
||||||
|
mycelium instance needs to run for the coordinator (the supervisor can run on a different host than
|
||||||
|
the coordinator, so long as the 2 mycelium instances used can reach eachother).
|
||||||
|
|
||||||
|
An example of a local setup:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Run a redis docker
|
||||||
|
docker run -it -d -p 6379:6379 --name redis redis
|
||||||
|
# Spawn mycelium node 1 with default settings. This also creates a TUN interface though that is not
|
||||||
|
# necessary for the messages
|
||||||
|
mycelium
|
||||||
|
# Spawn mycelium node 2, connect to the first node
|
||||||
|
mycelium --key-file key.bin --peers tcp://127.0.0.1:9651 --disable-quic --disable-peer-discovery --api-addr 127.0.0.1:9989 --jsonrpc-addr 127.0.0.1:9990 --no-tun -t 8651
|
||||||
|
# Start the supervisor
|
||||||
|
supervisor --admin-secret admin123 --user-secret user123 --register-secret register123 --mycelium-url http://127.0.0.1:9990 --topic supervisor.rpc
|
||||||
|
# Start the coordinator
|
||||||
|
cargo run # (alternatively if a compiled binary is present that can be run)
|
||||||
|
# Finally, invoke the demo script
|
||||||
|
python3 scripts/supervisor_flow_demo.py
|
||||||
|
```
|
||||||
|
@@ -48,7 +48,7 @@ impl SupervisorHub {
|
|||||||
fn spawn_pop_loop(hub: Arc<Self>) {
|
fn spawn_pop_loop(hub: Arc<Self>) {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
match hub.mycelium.pop_message(Some(false), Some(2), None).await {
|
match hub.mycelium.pop_message(Some(false), Some(20), None).await {
|
||||||
Ok(Some(inb)) => {
|
Ok(Some(inb)) => {
|
||||||
// Extract and decode payload
|
// Extract and decode payload
|
||||||
let Some(payload_b64) = inb.get("payload").and_then(|v| v.as_str()) else {
|
let Some(payload_b64) = inb.get("payload").and_then(|v| v.as_str()) else {
|
||||||
|
43
src/dag.rs
43
src/dag.rs
@@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
|
|||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
models::{Flow, Job, ScriptType},
|
models::{Flow, Job, JobStatus, ScriptType},
|
||||||
storage::RedisDriver,
|
storage::RedisDriver,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -212,6 +212,41 @@ pub async fn build_flow_dag(
|
|||||||
edges.sort_unstable();
|
edges.sort_unstable();
|
||||||
reverse_edges.sort_unstable();
|
reverse_edges.sort_unstable();
|
||||||
|
|
||||||
|
// Populate runtime execution state from persisted Job.status()
|
||||||
|
let mut started_set: HashSet<u32> = HashSet::new();
|
||||||
|
let mut completed_set: HashSet<u32> = HashSet::new();
|
||||||
|
let mut error_ids: Vec<u32> = Vec::new();
|
||||||
|
|
||||||
|
for (&jid, job) in &jobs {
|
||||||
|
match job.status() {
|
||||||
|
JobStatus::Finished => {
|
||||||
|
completed_set.insert(jid);
|
||||||
|
}
|
||||||
|
JobStatus::Started => {
|
||||||
|
started_set.insert(jid);
|
||||||
|
}
|
||||||
|
JobStatus::Dispatched => {
|
||||||
|
// Consider Dispatched as "in-flight" for DAG runtime started set,
|
||||||
|
// so queued/running work is visible in periodic snapshots.
|
||||||
|
started_set.insert(jid);
|
||||||
|
}
|
||||||
|
JobStatus::Error => {
|
||||||
|
error_ids.push(jid);
|
||||||
|
}
|
||||||
|
JobStatus::WaitingForPrerequisites => {
|
||||||
|
// Neither started nor completed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Choose a deterministic failed job if any errors exist (smallest job id)
|
||||||
|
let failed_job = if error_ids.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
error_ids.sort_unstable();
|
||||||
|
Some(error_ids[0])
|
||||||
|
};
|
||||||
|
|
||||||
let dag = FlowDag {
|
let dag = FlowDag {
|
||||||
flow_id,
|
flow_id,
|
||||||
caller_id,
|
caller_id,
|
||||||
@@ -222,9 +257,9 @@ pub async fn build_flow_dag(
|
|||||||
roots,
|
roots,
|
||||||
leaves,
|
leaves,
|
||||||
levels,
|
levels,
|
||||||
started: HashSet::new(),
|
started: started_set,
|
||||||
completed: HashSet::new(),
|
completed: completed_set,
|
||||||
failed_job: None,
|
failed_job,
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(dag)
|
Ok(dag)
|
||||||
|
146
src/router.rs
146
src/router.rs
@@ -386,13 +386,28 @@ async fn deliver_one(
|
|||||||
|
|
||||||
// Stop on terminal states
|
// Stop on terminal states
|
||||||
if matches!(s, TransportStatus::Delivered | TransportStatus::Read) {
|
if matches!(s, TransportStatus::Delivered | TransportStatus::Read) {
|
||||||
// Only request a single job status/result per message
|
|
||||||
if !requested_job_check {
|
|
||||||
if let Some(job_id) = job_id_opt {
|
if let Some(job_id) = job_id_opt {
|
||||||
// First consult Redis for the latest job state in case we already have a terminal update
|
// First consult Redis for the latest job state in case we already have a terminal update
|
||||||
match service_poll.load_job(context_id, caller_id, job_id).await
|
match service_poll.load_job(context_id, caller_id, job_id).await {
|
||||||
{
|
|
||||||
Ok(job) => {
|
Ok(job) => {
|
||||||
|
// Promote to Started as soon as transport is delivered/read,
|
||||||
|
// if currently Dispatched or WaitingForPrerequisites.
|
||||||
|
// This makes DAG.started reflect "in-flight" work even when jobs
|
||||||
|
// complete too quickly to observe an intermediate supervisor "running" status.
|
||||||
|
if matches!(
|
||||||
|
job.status(),
|
||||||
|
JobStatus::Dispatched
|
||||||
|
| JobStatus::WaitingForPrerequisites
|
||||||
|
) {
|
||||||
|
let _ = service_poll
|
||||||
|
.update_job_status_unchecked(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
job_id,
|
||||||
|
JobStatus::Started,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
match job.status() {
|
match job.status() {
|
||||||
JobStatus::Finished | JobStatus::Error => {
|
JobStatus::Finished | JobStatus::Error => {
|
||||||
// Local job is already terminal; skip supervisor job.status
|
// Local job is already terminal; skip supervisor job.status
|
||||||
@@ -451,6 +466,23 @@ async fn deliver_one(
|
|||||||
MessageStatus::Processed,
|
MessageStatus::Processed,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
// Also mark job as Finished so the flow can progress (ignore invalid transitions)
|
||||||
|
let _ = service_poll
|
||||||
|
.update_job_status_unchecked(
|
||||||
|
context_id, caller_id, job_id, JobStatus::Finished,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service_poll
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
id,
|
||||||
|
vec![format!(
|
||||||
|
"Updated job {} status to Finished (sync)", job_id
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
// Existing log about storing result
|
||||||
let _ = service_poll
|
let _ = service_poll
|
||||||
.append_message_logs(
|
.append_message_logs(
|
||||||
context_id,
|
context_id,
|
||||||
@@ -484,6 +516,23 @@ async fn deliver_one(
|
|||||||
MessageStatus::Processed,
|
MessageStatus::Processed,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
// Also mark job as Error so the flow can handle failure (ignore invalid transitions)
|
||||||
|
let _ = service_poll
|
||||||
|
.update_job_status_unchecked(
|
||||||
|
context_id, caller_id, job_id, JobStatus::Error,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service_poll
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
id,
|
||||||
|
vec![format!(
|
||||||
|
"Updated job {} status to Error (sync)", job_id
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
// Existing log about storing result
|
||||||
let _ = service_poll
|
let _ = service_poll
|
||||||
.append_message_logs(
|
.append_message_logs(
|
||||||
context_id,
|
context_id,
|
||||||
@@ -499,7 +548,9 @@ async fn deliver_one(
|
|||||||
} else if let Some(s) =
|
} else if let Some(s) =
|
||||||
res.and_then(|v| v.as_str())
|
res.and_then(|v| v.as_str())
|
||||||
{
|
{
|
||||||
let mut patch = std::collections::HashMap::new();
|
let mut patch =
|
||||||
|
std::collections::HashMap::new(
|
||||||
|
);
|
||||||
patch.insert(
|
patch.insert(
|
||||||
"success".to_string(),
|
"success".to_string(),
|
||||||
s.to_string(),
|
s.to_string(),
|
||||||
@@ -517,6 +568,26 @@ async fn deliver_one(
|
|||||||
MessageStatus::Processed,
|
MessageStatus::Processed,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
// Also mark job as Finished so the flow can progress (ignore invalid transitions)
|
||||||
|
let _ = service_poll
|
||||||
|
.update_job_status_unchecked(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
job_id,
|
||||||
|
JobStatus::Finished,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service_poll
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
id,
|
||||||
|
vec![format!(
|
||||||
|
"Updated job {} status to Finished (sync)", job_id
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
// Existing log about storing result
|
||||||
let _ = service_poll
|
let _ = service_poll
|
||||||
.append_message_logs(
|
.append_message_logs(
|
||||||
context_id,
|
context_id,
|
||||||
@@ -567,6 +638,28 @@ async fn deliver_one(
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark processed and stop polling for this message
|
||||||
|
let _ = service_poll
|
||||||
|
.update_message_status(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
id,
|
||||||
|
MessageStatus::Processed,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service_poll
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
id,
|
||||||
|
vec![format!(
|
||||||
|
"Terminal job {} detected; stopping transport polling",
|
||||||
|
job_id
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
// Not terminal yet -> request supervisor job.status as before
|
// Not terminal yet -> request supervisor job.status as before
|
||||||
_ => {
|
_ => {
|
||||||
@@ -578,16 +671,12 @@ async fn deliver_one(
|
|||||||
secret_for_poller.clone(),
|
secret_for_poller.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
match sup
|
match sup.job_status_wait(job_id.to_string()).await
|
||||||
.job_status_wait(job_id.to_string())
|
|
||||||
.await
|
|
||||||
{
|
{
|
||||||
Ok((_out_id, reply_status)) => {
|
Ok((_out_id, reply_status)) => {
|
||||||
// Interpret status reply synchronously
|
// Interpret status reply synchronously
|
||||||
let result_opt =
|
let result_opt = reply_status.get("result");
|
||||||
reply_status.get("result");
|
let error_opt = reply_status.get("error");
|
||||||
let error_opt =
|
|
||||||
reply_status.get("error");
|
|
||||||
if let Some(err_obj) = error_opt {
|
if let Some(err_obj) = error_opt {
|
||||||
let _ = service_poll
|
let _ = service_poll
|
||||||
.update_job_status_unchecked(
|
.update_job_status_unchecked(
|
||||||
@@ -614,10 +703,7 @@ async fn deliver_one(
|
|||||||
if let Some(remote_status) =
|
if let Some(remote_status) =
|
||||||
status_candidate
|
status_candidate
|
||||||
{
|
{
|
||||||
if let Some((
|
if let Some((mapped, terminal)) =
|
||||||
mapped,
|
|
||||||
terminal,
|
|
||||||
)) =
|
|
||||||
map_supervisor_job_status(
|
map_supervisor_job_status(
|
||||||
remote_status,
|
remote_status,
|
||||||
)
|
)
|
||||||
@@ -681,6 +767,28 @@ async fn deliver_one(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark processed and stop polling for this message
|
||||||
|
let _ = service_poll
|
||||||
|
.update_message_status(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
id,
|
||||||
|
MessageStatus::Processed,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service_poll
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
id,
|
||||||
|
vec![format!(
|
||||||
|
"Terminal job {} detected from supervisor status; stopping transport polling",
|
||||||
|
job_id
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -692,7 +800,10 @@ async fn deliver_one(
|
|||||||
context_id,
|
context_id,
|
||||||
caller_id,
|
caller_id,
|
||||||
id,
|
id,
|
||||||
vec![format!("job.status request error: {}", e)],
|
vec![format!(
|
||||||
|
"job.status request error: {}",
|
||||||
|
e
|
||||||
|
)],
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
@@ -743,7 +854,6 @@ async fn deliver_one(
|
|||||||
// Ensure we only do this once
|
// Ensure we only do this once
|
||||||
requested_job_check = true;
|
requested_job_check = true;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
// break;
|
// break;
|
||||||
}
|
}
|
||||||
if matches!(s, TransportStatus::Failed) {
|
if matches!(s, TransportStatus::Failed) {
|
||||||
|
@@ -672,10 +672,16 @@ impl AppService {
|
|||||||
let allowed = match current {
|
let allowed = match current {
|
||||||
JobStatus::Dispatched => matches!(
|
JobStatus::Dispatched => matches!(
|
||||||
new_status,
|
new_status,
|
||||||
JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error
|
JobStatus::WaitingForPrerequisites
|
||||||
|
| JobStatus::Started
|
||||||
|
| JobStatus::Finished
|
||||||
|
| JobStatus::Error
|
||||||
),
|
),
|
||||||
JobStatus::WaitingForPrerequisites => {
|
JobStatus::WaitingForPrerequisites => {
|
||||||
matches!(new_status, JobStatus::Started | JobStatus::Error)
|
matches!(
|
||||||
|
new_status,
|
||||||
|
JobStatus::Started | JobStatus::Finished | JobStatus::Error
|
||||||
|
)
|
||||||
}
|
}
|
||||||
JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error),
|
JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error),
|
||||||
JobStatus::Finished | JobStatus::Error => false,
|
JobStatus::Finished | JobStatus::Error => false,
|
||||||
@@ -714,10 +720,16 @@ impl AppService {
|
|||||||
let allowed = match current {
|
let allowed = match current {
|
||||||
JobStatus::Dispatched => matches!(
|
JobStatus::Dispatched => matches!(
|
||||||
new_status,
|
new_status,
|
||||||
JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error
|
JobStatus::WaitingForPrerequisites
|
||||||
|
| JobStatus::Started
|
||||||
|
| JobStatus::Finished
|
||||||
|
| JobStatus::Error
|
||||||
),
|
),
|
||||||
JobStatus::WaitingForPrerequisites => {
|
JobStatus::WaitingForPrerequisites => {
|
||||||
matches!(new_status, JobStatus::Started | JobStatus::Error)
|
matches!(
|
||||||
|
new_status,
|
||||||
|
JobStatus::Started | JobStatus::Finished | JobStatus::Error
|
||||||
|
)
|
||||||
}
|
}
|
||||||
JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error),
|
JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error),
|
||||||
JobStatus::Finished | JobStatus::Error => false,
|
JobStatus::Finished | JobStatus::Error => false,
|
||||||
|
Reference in New Issue
Block a user