#!/usr/bin/env python3 """ Supervisor flow demo for HeroCoordinator. This script: - Optionally pre-registers a Python runner on the target Supervisor over Mycelium using an admin secret (--admin-secret). If the flag is not set, this step is skipped. - Creates an actor - Creates a context granting the actor admin/reader/executor privileges - Registers a Runner in the context targeting a Supervisor reachable via Mycelium (by public key or IP) - Creates simple Python jobs (text jobs) with a small dependency chain - Creates a flow referencing those jobs - Starts the flow and polls until it finishes (or errors) Transport: JSON-RPC over HTTP to the Coordinator (default COORDINATOR_URL=http://127.0.0.1:9652). Example usage: COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/supervisor_flow_demo.py --dst-ip 2001:db8::1 [--secret your-secret] COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/supervisor_flow_demo.py --dst-pk bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32 [--secret your-secret] Notes: - Exactly one of --dst-ip or --dst-pk must be provided. - Runner.topic defaults to "supervisor.rpc" (see main.rs). - The router auto-discovers contexts and will deliver job.run messages to the supervisor. - Mycelium URL is read from MYCELIUM_URL (default http://127.0.0.1:8990). - supervisor.register_runner uses static name="python" and queue="python". """ import argparse import json import base64 import os import sys import time from typing import Any, Dict, List, Optional, Tuple from urllib import request, error JSONRPC_VERSION = "2.0" def env_url() -> str: return os.getenv("COORDINATOR_URL", "http://127.0.0.1:9652").rstrip("/") def env_mycelium_url() -> str: return os.getenv("MYCELIUM_URL", "http://127.0.0.1:8990").rstrip("/") class JsonRpcClient: def __init__(self, url: str): self.url = url self._id = 0 def call(self, method: str, params: Dict[str, Any]) -> Any: self._id += 1 payload = { "jsonrpc": JSONRPC_VERSION, "id": self._id, "method": method, "params": params, } data = json.dumps(payload).encode("utf-8") req = request.Request(self.url, data=data, headers={"Content-Type": "application/json"}) try: with request.urlopen(req) as resp: body = resp.read() except error.HTTPError as e: try: details = e.read().decode("utf-8", "ignore") except Exception: details = "" raise RuntimeError(f"HTTP error {e.code}: {details}") from e except error.URLError as e: raise RuntimeError(f"URL error: {e.reason}") from e try: obj = json.loads(body.decode("utf-8")) except Exception as e: raise RuntimeError(f"Invalid JSON response: {body!r}") from e if isinstance(obj, list): raise RuntimeError("Batch responses are not supported") if obj.get("error"): raise RuntimeError(f"RPC error: {json.dumps(obj['error'])}") return obj.get("result") def print_header(title: str): print("\n" + "=" * 80) print(title) print("=" * 80) def pretty(obj: Any): print(json.dumps(obj, indent=2, sort_keys=True)) def mycelium_register_runner( myc: "JsonRpcClient", dst_pk: Optional[str], dst_ip: Optional[str], topic: str, admin_secret: str, name: str = "python", queue: str = "python", timeout: int = 15, ) -> Any: """ Send supervisor.register_runner over Mycelium using pushMessage and wait for the reply. - myc: JsonRpcClient for the Mycelium API (MYCELIUM_URL) - dst_pk/dst_ip: destination on the overlay; one of them must be provided - topic: message topic (defaults to supervisor.rpc from args) - admin_secret: supervisor admin secret to authorize the registration - name/queue: static identifiers for the python runner on the supervisor - timeout: seconds to wait for a reply Returns the JSON-RPC 'result' from the supervisor or raises on error/timeout. """ envelope = { "jsonrpc": JSONRPC_VERSION, "id": 1, "method": "register_runner", "params": [{"secret": admin_secret, "name": name, "queue": queue}], } payload_b64 = base64.b64encode(json.dumps(envelope).encode("utf-8")).decode("ascii") topic_b64 = base64.b64encode(topic.encode("utf-8")).decode("ascii") if dst_pk: dst = {"pk": dst_pk} elif dst_ip: dst = {"ip": dst_ip} else: raise RuntimeError("Either dst_pk or dst_ip must be provided for Mycelium destination") params = { "message": {"dst": dst, "topic": topic_b64, "payload": payload_b64}, } resp = myc.call("pushMessage", params) time.sleep(15) # Expect an InboundMessage with a payload if a reply was received # if isinstance(resp, dict) and "payload" in resp: # try: # reply = json.loads(base64.b64decode(resp["payload"]).decode("utf-8")) # except Exception as e: # raise RuntimeError(f"Invalid supervisor reply payload: {e}") # if isinstance(reply, dict) and reply.get("error"): # raise RuntimeError(f"Supervisor register_runner error: {json.dumps(reply['error'])}") # return reply.get("result") # # raise RuntimeError("No reply received from supervisor for register_runner (timeout)") def try_create_or_load(client: JsonRpcClient, create_method: str, create_params: Dict[str, Any], load_method: str, load_params: Dict[str, Any]) -> Any: """Attempt a create; if it fails due to existence, try load.""" try: return client.call(create_method, create_params) except RuntimeError as e: msg = str(e) # Server maps AlreadyExists to StorageError, we don't have a structured error code here. if "Already exists" in msg or "Storage Error" in msg or "Invalid params" in msg: # Fall back to load return client.call(load_method, load_params) raise def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Create actor/context/runner/jobs/flow; start and wait until completion.") group = p.add_mutually_exclusive_group(required=True) group.add_argument("--dst-ip", help="Supervisor Mycelium IP address (IPv4 or IPv6)") group.add_argument("--dst-pk", help="Supervisor public key (64-hex)") p.add_argument("--context-id", type=int, default=2, help="Context id (Redis DB index; 0-15). Default: 2") p.add_argument("--actor-id", type=int, default=11001, help="Actor id. Default: 11001") p.add_argument("--runner-id", type=int, default=12001, help="Runner id. Default: 12001") p.add_argument("--flow-id", type=int, default=13001, help="Flow id. Default: 13001") p.add_argument("--base-job-id", type=int, default=20000, help="Base job id for first job; subsequent jobs increment. Default: 20000") p.add_argument("--jobs", type=int, default=3, help="Number of jobs to create (>=1). Forms a simple chain. Default: 3") p.add_argument("--timeout-secs", type=int, default=60, help="Per-job timeout seconds. Default: 60") p.add_argument("--retries", type=int, default=0, help="Per-job retries (0-255). Default: 0") p.add_argument( "--script-type", choices=["Python", "V", "Osis", "Sal"], default="Python", help="ScriptType for jobs/runner. Default: Python" ) p.add_argument("--topic", default="supervisor.rpc", help="Supervisor topic. Default: supervisor.rpc") p.add_argument("--secret", help="Optional supervisor secret used for authenticated supervisor calls") p.add_argument("--admin-secret", help="Supervisor admin secret to pre-register a Python runner over Mycelium. If omitted, pre-registration is skipped.") p.add_argument("--poll-interval", type=float, default=2.0, help="Flow poll interval seconds. Default: 2.0") p.add_argument("--poll-timeout", type=int, default=600, help="Max seconds to wait for flow completion. Default: 600") return p.parse_args() def main(): args = parse_args() if args.jobs < 1: print("ERROR: --jobs must be >= 1", file=sys.stderr) sys.exit(2) url = env_url() client = JsonRpcClient(url) mycelium_url = env_mycelium_url() mycelium_client = JsonRpcClient(mycelium_url) if getattr(args, "admin_secret", None) else None actor_id = int(args.actor_id) context_id = int(args.context_id) runner_id = int(args.runner_id) flow_id = int(args.flow_id) base_job_id = int(args.base_job_id) script_type = args.script_type timeout = int(args.timeout_secs) retries = int(args.retries) topic = args.topic # 1) Actor print_header("actor.create (or load)") actor = try_create_or_load( client, "actor.create", { "actor": { "id": actor_id, "pubkey": "demo-pubkey", "address": ["127.0.0.1"], } }, "actor.load", {"id": actor_id}, ) pretty(actor) # 2) Context print_header("context.create (or load)") context = try_create_or_load( client, "context.create", { "context": { "id": context_id, "admins": [actor_id], "readers": [actor_id], "executors": [actor_id], } }, "context.load", {"id": context_id}, ) pretty(context) # 3) Runner in this context # Router picks pubkey if non-empty, else IP address. # However, RunnerCreate requires both fields; we fill both and control routing via pubkey empty/non-empty. runner_pubkey = args.dst_pk if args.dst_pk else "" runner_address = args.dst_ip if args.dst_ip else "127.0.0.1" # Optional: pre-register a Python runner on the Supervisor over Mycelium using an admin secret if getattr(args, "admin_secret", None): print_header("supervisor.register_runner (pre-register via Mycelium)") try: mycelium_result = mycelium_register_runner( mycelium_client, args.dst_pk if args.dst_pk else None, args.dst_ip if args.dst_ip else None, topic, args.admin_secret, name="Python", queue="Python", timeout=15, ) print("Supervisor register_runner ->", mycelium_result) except Exception as e: print(f"ERROR: Supervisor pre-registration failed: {e}", file=sys.stderr) sys.exit(1) print_header("runner.create (or load)") # runner.load requires both context_id and id try: runner_payload = { "id": runner_id, "pubkey": runner_pubkey, "address": runner_address, "topic": topic, "script_type": script_type, "local": False, } # Optional supervisor secret used by router for authenticated supervisor calls if getattr(args, "secret", None): runner_payload["secret"] = args.secret runner = client.call("runner.create", { "context_id": context_id, "runner": runner_payload }) except RuntimeError as e: msg = str(e) if "Already exists" in msg or "Storage Error" in msg or "Invalid params" in msg: runner = client.call("runner.load", {"context_id": context_id, "id": runner_id}) else: raise pretty(runner) # 4) Jobs # Build a simple chain: J0 (root), J1 depends on J0, J2 depends on J1, ... up to N-1 job_ids: List[int] = [] for i in range(args.jobs): jid = base_job_id + i depends = [] if i == 0 else [base_job_id + (i - 1)] job_payload = { "id": jid, "caller_id": actor_id, "context_id": context_id, "script": f"print('Job {i} running')", "script_type": script_type, "timeout": timeout, "retries": retries, "env_vars": {}, "prerequisites": [], "depends": depends, } print_header(f"job.create - {jid} {'(root)' if not depends else f'(depends on {depends})'}") try: job = client.call("job.create", { "context_id": context_id, "job": job_payload }) except RuntimeError as e: msg = str(e) if "Already exists" in msg or "Storage Error" in msg or "Invalid params" in msg: job = client.call("job.load", { "context_id": context_id, "caller_id": actor_id, "id": jid }) else: raise pretty(job) job_ids.append(jid) # 5) Flow print_header("flow.create (or load)") try: flow = client.call("flow.create", { "context_id": context_id, "flow": { "id": flow_id, "caller_id": actor_id, "context_id": context_id, "jobs": job_ids, "env_vars": {} } }) except RuntimeError as e: msg = str(e) if "Already exists" in msg or "Storage Error" in msg or "Invalid params" in msg: flow = client.call("flow.load", {"context_id": context_id, "id": flow_id}) else: raise pretty(flow) # Optional: show DAG try: print_header("flow.dag") dag = client.call("flow.dag", {"context_id": context_id, "id": flow_id}) pretty(dag) except Exception as e: print(f"WARN: flow.dag failed: {e}", file=sys.stderr) # 6) Start flow (idempotent; returns bool whether scheduler started) print_header("flow.start") started = client.call("flow.start", {"context_id": context_id, "id": flow_id}) print(f"flow.start -> {started}") # 7) Poll until Finished or Error (or timeout) print_header("Polling flow.load until completion") t0 = time.time() status = None last_status_print = 0.0 poll_count = 0 while True: poll_count += 1 flow = client.call("flow.load", {"context_id": context_id, "id": flow_id}) status = flow.get("status") now = time.time() if now - last_status_print >= max(1.0, float(args.poll_interval)): print(f"[{int(now - t0)}s] flow.status = {status}") last_status_print = now # Every 5th poll, print the current flow DAG if (poll_count % 5) == 0: try: print_header("flow.dag (periodic)") dag = client.call("flow.dag", {"context_id": context_id, "id": flow_id}) pretty(dag) except Exception as e: print(f"WARN: periodic flow.dag failed: {e}", file=sys.stderr) if status in ("Finished", "Error"): break if (now - t0) > args.poll_timeout: print(f"ERROR: Flow did not complete within {args.poll_timeout}s (status={status})", file=sys.stderr) break time.sleep(float(args.poll_interval)) # 8) Final summary: job statuses print_header("Final job statuses") for jid in job_ids: try: j = client.call("job.load", { "context_id": context_id, "caller_id": actor_id, "id": jid }) print(f"Job {jid}: status={j.get('status')} result={j.get('result')}") except Exception as e: print(f"Job {jid}: load failed: {e}", file=sys.stderr) # Exit code if status == "Finished": print_header("Result") print("Flow finished successfully.") sys.exit(0) else: print_header("Result") print(f"Flow ended with status={status}") sys.exit(1) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\nInterrupted.") sys.exit(130) except Exception as e: print_header("Error") print(str(e)) sys.exit(1)