13 KiB
13 KiB
Coordinator Architecture
Overview
The Coordinator orchestrates distributed job execution across multiple Supervisors using a message-queue based architecture with DAG-based dependency resolution.
Core Components
1. Data Models
Job
- Purpose: Pure data model for executable work units
- Storage: Redis (CRUD operations only)
- Fields:
id,caller_id,context_id,script,executor,timeout,retries, etc. - Note: Job does NOT contain status or dependencies (moved to FlowNode)
FlowNode
- Purpose: Workflow orchestration metadata
- Contains:
id: References a Jobdepends: Vec - Job IDs this node depends onprerequisites: Vec - External prerequisitessupervisor_url: String - Where to route this jobnode_status: NodeStatus - Execution state (Pending, Ready, Dispatched, Running, Completed, Failed, Cancelled)
Flow
- Purpose: Defines a workflow
- Fields:
id,caller_id,context_id,jobs: Vec<u32>,status: FlowStatus - Status: Created, Started, Finished, Error
Message
- Purpose: Transport envelope for job dispatch
- Current: Contains
job: Vec<Job>(legacy) - Target: Should contain
nodes: Vec<FlowNode>for proper routing - Fields:
id,caller_id,context_id,message,status,transport_id, etc.
Runner
- Purpose: Supervisor registration in a context
- Fields:
id,pubkey,address,topic,secret - Routing: Router uses
pubkeyif non-empty, elseaddress
2. Flow Execution Pipeline
┌─────────────────────────────────────────────────────────────────┐
│ 1. User calls flow.start (RPC) │
└────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 2. flow_start() - service.rs:361 │
│ - Validates flow exists │
│ - Registers in active schedulers set │
│ - Sets flow status to Started │
│ - Spawns background scheduler loop (tokio::spawn) │
└────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 3. Scheduler Loop (background task) - service.rs:384 │
│ Loop every 1 second: │
│ - Load flow and all jobs │
│ - For each job: │
│ • Check job status (TODO: use DAG node_status instead) │
│ • If WaitingForPrerequisites AND deps_ok: │
│ → Create Message with job │
│ → Save message to Redis │
│ → Enqueue to msg_out: redis.enqueue_msg_out() │
│ → Update job status to Dispatched │
│ - Track aggregate state (all_finished, any_error) │
│ - Update flow status to Finished/Error when done │
│ - Remove from active schedulers set │
└────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 4. Router Loops (one per context) - router.rs:120 │
│ - Auto-discovered contexts (start_router_auto) │
│ - Per-context loop: │
│ • Polls msg_out queue: service.brpop_msg_out(ctx_id, 1s) │
│ • Gets message key: "message:{caller_id}:{id}" │
│ • Spawns deliver_one() with concurrency control │
└────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 5. deliver_one() - router.rs:188 │
│ - Loads Message from Redis │
│ - Gets Runner (supervisor info) │
│ TODO: Should use FlowNode.supervisor_url for routing │
│ Current: Scans all runners, picks first available │
│ - Creates SupervisorClient (via cache) │
│ • Destination: Runner.pubkey or Runner.address │
│ • Topic: "supervisor.rpc" │
│ • Secret: Runner.secret (optional) │
│ - Sends job.run to supervisor (client.job_run()) │
│ - Waits for synchronous reply │
│ - Updates message status to Acknowledged │
│ - Updates message transport status to Delivered │
│ TODO: Update node_status to Dispatched in DAG │
└────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 6. Supervisor Execution │
│ - Supervisor receives job.run │
│ - Executes job in runner (Python, V, etc.) │
│ - Sends result back via Mycelium │
│ - SupervisorHub receives reply │
└────────────────────┬────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ 7. Job Completion (TODO: Missing implementation) │
│ - Supervisor reply should trigger: │
│ • Update node_status in DAG (Completed/Failed) │
│ • Persist DAG state │
│ - Scheduler loop detects completion │
│ - Dispatches next ready jobs │
└─────────────────────────────────────────────────────────────────┘
Key Subsystems
DAG (Directed Acyclic Graph)
- File:
bin/coordinator/src/dag.rs - Purpose: Dependency resolution and topological sorting
- Key Functions:
build_flow_dag(): Constructs DAG from Flow and Jobsready_nodes(): Returns nodes with satisfied dependenciesmark_node_started(): Transitions node to started statemark_node_completed(): Transitions node to completed statemark_node_failed(): Transitions node to failed state
Service Layer
- File:
bin/coordinator/src/service.rs - Purpose: Business logic and orchestration
- Key Methods:
flow_start(): Spawns scheduler loop for a flowflow_execute(): Creates messages for ready jobsupdate_node_status(): Updates node status with validationupdate_node_status_unchecked(): Updates node status without permission check
Router
- File:
bin/coordinator/src/router.rs - Purpose: Message delivery to supervisors
- Key Components:
start_router_auto(): Auto-discovers contexts and spawns router loopsstart_router(): Per-context message delivery loopdeliver_one(): Delivers single message to supervisorSupervisorClientCache: Reuses supervisor connections
Storage (Redis)
- File:
bin/coordinator/src/storage/redis.rs - Purpose: Persistence layer
- Key Operations:
- Job CRUD:
save_job(),load_job(),update_job_status() - Flow CRUD:
save_flow(),load_flow(),update_flow_status() - Message CRUD:
save_message(),load_message(),update_message_status() - Queue operations:
enqueue_msg_out(),brpop_msg_out() - Runner CRUD:
save_runner(),load_runner(),scan_runners()
- Job CRUD:
Communication
Mycelium Transport
- Purpose: Overlay network for supervisor communication
- Protocol: JSON-RPC over Mycelium
- Components:
SupervisorHub: Global message brokerMyceliumTransport: Transport implementationSupervisorClient<MyceliumTransport>: Typed client for supervisor calls
Message Flow
Coordinator Mycelium Supervisor
| | |
|-- job.run (JSON-RPC) ------->| |
| |-- forward --------------->|
| | |
| |<-- reply ------------------|
|<-- result -------------------| |
Current Refactoring (In Progress)
Completed
- ✅ Separated Job (data) from FlowNode (orchestration)
- ✅ Created NodeStatus enum for execution state tracking
- ✅ Moved dependencies from Job to FlowNode
- ✅ Created update_node_status() methods with state transition validation
- ✅ Renamed methods to be node-centric (mark_node_started, etc.)
Completed ✅
- ✅ Replace
Message.job: Vec<Job>withMessage.nodes: Vec<FlowNode>- Both fields present during migration - ✅ Update scheduler loop to use
DAG.ready_jobs()instead of checking job status - ✅ Update
deliver_one()to useFlowNode.supervisor_urlfor routing - ✅ Implement job completion handler to update node_status in DAG
- ✅ Add flow_id to Message for proper DAG tracking
- ✅ Update DAG runtime state (started, completed, failed_job) on node status changes
- ✅ Fix all compilation errors - Zero errors!
TODO
- ❌ Persist DAG state to Redis (currently rebuilt each time)
- ❌ Store supervisor secret in runner config and use in routing
- ❌ Remove legacy
Message.jobfield after full migration - ❌ Add DAG state recovery on coordinator restart
State Transitions
NodeStatus
Pending ──────┐
├──> Ready ──> Dispatched ──> Running ──> Completed
│ └──> Failed
└──────────────────────────────────────> Cancelled
FlowStatus
Created ──> Started ──> Finished
└──> Error
Configuration
Environment Variables
COORDINATOR_URL: Coordinator RPC endpoint (default: http://127.0.0.1:9652)MYCELIUM_URL: Mycelium API endpoint (default: http://127.0.0.1:8990)
CLI Arguments
--mycelium-ip: Mycelium IP address (default: 127.0.0.1)--mycelium-port: Mycelium port (default: 8990)--redis-url: Redis connection URL (default: redis://127.0.0.1:6379)
Testing
See scripts/supervisor_flow_demo.py for end-to-end flow execution demo.