# Coordinator Architecture ## Overview The Coordinator orchestrates distributed job execution across multiple Supervisors using a message-queue based architecture with DAG-based dependency resolution. ## Core Components ### 1. Data Models #### Job - **Purpose**: Pure data model for executable work units - **Storage**: Redis (CRUD operations only) - **Fields**: `id`, `caller_id`, `context_id`, `script`, `executor`, `timeout`, `retries`, etc. - **Note**: Job does NOT contain status or dependencies (moved to FlowNode) #### FlowNode - **Purpose**: Workflow orchestration metadata - **Contains**: - `id`: References a Job - `depends`: Vec - Job IDs this node depends on - `prerequisites`: Vec - External prerequisites - `supervisor_url`: String - Where to route this job - `node_status`: NodeStatus - Execution state (Pending, Ready, Dispatched, Running, Completed, Failed, Cancelled) #### Flow - **Purpose**: Defines a workflow - **Fields**: `id`, `caller_id`, `context_id`, `jobs: Vec`, `status: FlowStatus` - **Status**: Created, Started, Finished, Error #### Message - **Purpose**: Transport envelope for job dispatch - **Current**: Contains `job: Vec` (legacy) - **Target**: Should contain `nodes: Vec` for proper routing - **Fields**: `id`, `caller_id`, `context_id`, `message`, `status`, `transport_id`, etc. #### Runner - **Purpose**: Supervisor registration in a context - **Fields**: `id`, `pubkey`, `address`, `topic`, `secret` - **Routing**: Router uses `pubkey` if non-empty, else `address` ### 2. Flow Execution Pipeline ``` ┌─────────────────────────────────────────────────────────────────┐ │ 1. User calls flow.start (RPC) │ └────────────────────┬────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 2. flow_start() - service.rs:361 │ │ - Validates flow exists │ │ - Registers in active schedulers set │ │ - Sets flow status to Started │ │ - Spawns background scheduler loop (tokio::spawn) │ └────────────────────┬────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 3. Scheduler Loop (background task) - service.rs:384 │ │ Loop every 1 second: │ │ - Load flow and all jobs │ │ - For each job: │ │ • Check job status (TODO: use DAG node_status instead) │ │ • If WaitingForPrerequisites AND deps_ok: │ │ → Create Message with job │ │ → Save message to Redis │ │ → Enqueue to msg_out: redis.enqueue_msg_out() │ │ → Update job status to Dispatched │ │ - Track aggregate state (all_finished, any_error) │ │ - Update flow status to Finished/Error when done │ │ - Remove from active schedulers set │ └────────────────────┬────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 4. Router Loops (one per context) - router.rs:120 │ │ - Auto-discovered contexts (start_router_auto) │ │ - Per-context loop: │ │ • Polls msg_out queue: service.brpop_msg_out(ctx_id, 1s) │ │ • Gets message key: "message:{caller_id}:{id}" │ │ • Spawns deliver_one() with concurrency control │ └────────────────────┬────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 5. deliver_one() - router.rs:188 │ │ - Loads Message from Redis │ │ - Gets Runner (supervisor info) │ │ TODO: Should use FlowNode.supervisor_url for routing │ │ Current: Scans all runners, picks first available │ │ - Creates SupervisorClient (via cache) │ │ • Destination: Runner.pubkey or Runner.address │ │ • Topic: "supervisor.rpc" │ │ • Secret: Runner.secret (optional) │ │ - Sends job.run to supervisor (client.job_run()) │ │ - Waits for synchronous reply │ │ - Updates message status to Acknowledged │ │ - Updates message transport status to Delivered │ │ TODO: Update node_status to Dispatched in DAG │ └────────────────────┬────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 6. Supervisor Execution │ │ - Supervisor receives job.run │ │ - Executes job in runner (Python, V, etc.) │ │ - Sends result back via Mycelium │ │ - SupervisorHub receives reply │ └────────────────────┬────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 7. Job Completion (TODO: Missing implementation) │ │ - Supervisor reply should trigger: │ │ • Update node_status in DAG (Completed/Failed) │ │ • Persist DAG state │ │ - Scheduler loop detects completion │ │ - Dispatches next ready jobs │ └─────────────────────────────────────────────────────────────────┘ ``` ## Key Subsystems ### DAG (Directed Acyclic Graph) - **File**: `bin/coordinator/src/dag.rs` - **Purpose**: Dependency resolution and topological sorting - **Key Functions**: - `build_flow_dag()`: Constructs DAG from Flow and Jobs - `ready_nodes()`: Returns nodes with satisfied dependencies - `mark_node_started()`: Transitions node to started state - `mark_node_completed()`: Transitions node to completed state - `mark_node_failed()`: Transitions node to failed state ### Service Layer - **File**: `bin/coordinator/src/service.rs` - **Purpose**: Business logic and orchestration - **Key Methods**: - `flow_start()`: Spawns scheduler loop for a flow - `flow_execute()`: Creates messages for ready jobs - `update_node_status()`: Updates node status with validation - `update_node_status_unchecked()`: Updates node status without permission check ### Router - **File**: `bin/coordinator/src/router.rs` - **Purpose**: Message delivery to supervisors - **Key Components**: - `start_router_auto()`: Auto-discovers contexts and spawns router loops - `start_router()`: Per-context message delivery loop - `deliver_one()`: Delivers single message to supervisor - `SupervisorClientCache`: Reuses supervisor connections ### Storage (Redis) - **File**: `bin/coordinator/src/storage/redis.rs` - **Purpose**: Persistence layer - **Key Operations**: - Job CRUD: `save_job()`, `load_job()`, `update_job_status()` - Flow CRUD: `save_flow()`, `load_flow()`, `update_flow_status()` - Message CRUD: `save_message()`, `load_message()`, `update_message_status()` - Queue operations: `enqueue_msg_out()`, `brpop_msg_out()` - Runner CRUD: `save_runner()`, `load_runner()`, `scan_runners()` ## Communication ### Mycelium Transport - **Purpose**: Overlay network for supervisor communication - **Protocol**: JSON-RPC over Mycelium - **Components**: - `SupervisorHub`: Global message broker - `MyceliumTransport`: Transport implementation - `SupervisorClient`: Typed client for supervisor calls ### Message Flow ``` Coordinator Mycelium Supervisor | | | |-- job.run (JSON-RPC) ------->| | | |-- forward --------------->| | | | | |<-- reply ------------------| |<-- result -------------------| | ``` ## Current Refactoring (In Progress) ### Completed 1. ✅ Separated Job (data) from FlowNode (orchestration) 2. ✅ Created NodeStatus enum for execution state tracking 3. ✅ Moved dependencies from Job to FlowNode 4. ✅ Created update_node_status() methods with state transition validation 5. ✅ Renamed methods to be node-centric (mark_node_started, etc.) ### Completed ✅ 1. ✅ Replace `Message.job: Vec` with `Message.nodes: Vec` - Both fields present during migration 2. ✅ Update scheduler loop to use `DAG.ready_jobs()` instead of checking job status 3. ✅ Update `deliver_one()` to use `FlowNode.supervisor_url` for routing 4. ✅ Implement job completion handler to update node_status in DAG 5. ✅ Add flow_id to Message for proper DAG tracking 6. ✅ Update DAG runtime state (started, completed, failed_job) on node status changes 7. ✅ Fix all compilation errors - **Zero errors!** ### TODO 1. ❌ Persist DAG state to Redis (currently rebuilt each time) 2. ❌ Store supervisor secret in runner config and use in routing 3. ❌ Remove legacy `Message.job` field after full migration 4. ❌ Add DAG state recovery on coordinator restart ## State Transitions ### NodeStatus ``` Pending ──────┐ ├──> Ready ──> Dispatched ──> Running ──> Completed │ └──> Failed └──────────────────────────────────────> Cancelled ``` ### FlowStatus ``` Created ──> Started ──> Finished └──> Error ``` ## Configuration ### Environment Variables - `COORDINATOR_URL`: Coordinator RPC endpoint (default: http://127.0.0.1:9652) - `MYCELIUM_URL`: Mycelium API endpoint (default: http://127.0.0.1:8990) ### CLI Arguments - `--mycelium-ip`: Mycelium IP address (default: 127.0.0.1) - `--mycelium-port`: Mycelium port (default: 8990) - `--redis-url`: Redis connection URL (default: redis://127.0.0.1:6379) ## Testing See `scripts/supervisor_flow_demo.py` for end-to-end flow execution demo.