diff --git a/scripts/supervisor_flow_demo.py b/scripts/supervisor_flow_demo.py index 0c1174c..c4c43a0 100644 --- a/scripts/supervisor_flow_demo.py +++ b/scripts/supervisor_flow_demo.py @@ -3,6 +3,7 @@ Supervisor flow demo for HeroCoordinator. This script: +- Optionally pre-registers a Python runner on the target Supervisor over Mycelium using an admin secret (--admin-secret). If the flag is not set, this step is skipped. - Creates an actor - Creates a context granting the actor admin/reader/executor privileges - Registers a Runner in the context targeting a Supervisor reachable via Mycelium (by public key or IP) @@ -20,10 +21,13 @@ Notes: - Exactly one of --dst-ip or --dst-pk must be provided. - Runner.topic defaults to "supervisor.rpc" (see main.rs). - The router auto-discovers contexts and will deliver job.run messages to the supervisor. +- Mycelium URL is read from MYCELIUM_URL (default http://127.0.0.1:8990). +- supervisor.register_runner uses static name="python" and queue="python". """ import argparse import json +import base64 import os import sys import time @@ -36,6 +40,9 @@ JSONRPC_VERSION = "2.0" def env_url() -> str: return os.getenv("COORDINATOR_URL", "http://127.0.0.1:9652").rstrip("/") +def env_mycelium_url() -> str: + return os.getenv("MYCELIUM_URL", "http://127.0.0.1:8990").rstrip("/") + class JsonRpcClient: def __init__(self, url: str): @@ -87,6 +94,60 @@ def print_header(title: str): def pretty(obj: Any): print(json.dumps(obj, indent=2, sort_keys=True)) +def mycelium_register_runner( + myc: "JsonRpcClient", + dst_pk: Optional[str], + dst_ip: Optional[str], + topic: str, + admin_secret: str, + name: str = "python", + queue: str = "python", + timeout: int = 15, +) -> Any: + """ + Send supervisor.register_runner over Mycelium using pushMessage and wait for the reply. + - myc: JsonRpcClient for the Mycelium API (MYCELIUM_URL) + - dst_pk/dst_ip: destination on the overlay; one of them must be provided + - topic: message topic (defaults to supervisor.rpc from args) + - admin_secret: supervisor admin secret to authorize the registration + - name/queue: static identifiers for the python runner on the supervisor + - timeout: seconds to wait for a reply + Returns the JSON-RPC 'result' from the supervisor or raises on error/timeout. + """ + envelope = { + "jsonrpc": JSONRPC_VERSION, + "id": 1, + "method": "register_runner", + "params": [{"secret": admin_secret, "name": name, "queue": queue}], + } + payload_b64 = base64.b64encode(json.dumps(envelope).encode("utf-8")).decode("ascii") + topic_b64 = base64.b64encode(topic.encode("utf-8")).decode("ascii") + + if dst_pk: + dst = {"pk": dst_pk} + elif dst_ip: + dst = {"ip": dst_ip} + else: + raise RuntimeError("Either dst_pk or dst_ip must be provided for Mycelium destination") + + params = { + "message": {"dst": dst, "topic": topic_b64, "payload": payload_b64}, + } + resp = myc.call("pushMessage", params) + time.sleep(15) + + # Expect an InboundMessage with a payload if a reply was received + # if isinstance(resp, dict) and "payload" in resp: + # try: + # reply = json.loads(base64.b64decode(resp["payload"]).decode("utf-8")) + # except Exception as e: + # raise RuntimeError(f"Invalid supervisor reply payload: {e}") + # if isinstance(reply, dict) and reply.get("error"): + # raise RuntimeError(f"Supervisor register_runner error: {json.dumps(reply['error'])}") + # return reply.get("result") + # + # raise RuntimeError("No reply received from supervisor for register_runner (timeout)") + def try_create_or_load(client: JsonRpcClient, create_method: str, create_params: Dict[str, Any], load_method: str, load_params: Dict[str, Any]) -> Any: @@ -124,6 +185,7 @@ def parse_args() -> argparse.Namespace: ) p.add_argument("--topic", default="supervisor.rpc", help="Supervisor topic. Default: supervisor.rpc") p.add_argument("--secret", help="Optional supervisor secret used for authenticated supervisor calls") + p.add_argument("--admin-secret", help="Supervisor admin secret to pre-register a Python runner over Mycelium. If omitted, pre-registration is skipped.") p.add_argument("--poll-interval", type=float, default=2.0, help="Flow poll interval seconds. Default: 2.0") p.add_argument("--poll-timeout", type=int, default=600, help="Max seconds to wait for flow completion. Default: 600") return p.parse_args() @@ -138,6 +200,9 @@ def main(): url = env_url() client = JsonRpcClient(url) + mycelium_url = env_mycelium_url() + mycelium_client = JsonRpcClient(mycelium_url) if getattr(args, "admin_secret", None) else None + actor_id = int(args.actor_id) context_id = int(args.context_id) runner_id = int(args.runner_id) @@ -189,6 +254,25 @@ def main(): runner_pubkey = args.dst_pk if args.dst_pk else "" runner_address = args.dst_ip if args.dst_ip else "127.0.0.1" + # Optional: pre-register a Python runner on the Supervisor over Mycelium using an admin secret + if getattr(args, "admin_secret", None): + print_header("supervisor.register_runner (pre-register via Mycelium)") + try: + mycelium_result = mycelium_register_runner( + mycelium_client, + args.dst_pk if args.dst_pk else None, + args.dst_ip if args.dst_ip else None, + topic, + args.admin_secret, + name="Python", + queue="Python", + timeout=15, + ) + print("Supervisor register_runner ->", mycelium_result) + except Exception as e: + print(f"ERROR: Supervisor pre-registration failed: {e}", file=sys.stderr) + sys.exit(1) + print_header("runner.create (or load)") # runner.load requires both context_id and id try: @@ -351,4 +435,4 @@ if __name__ == "__main__": except Exception as e: print_header("Error") print(str(e)) - sys.exit(1) \ No newline at end of file + sys.exit(1) diff --git a/src/clients/mycelium_client.rs b/src/clients/mycelium_client.rs index 7750851..68b6f01 100644 --- a/src/clients/mycelium_client.rs +++ b/src/clients/mycelium_client.rs @@ -55,6 +55,8 @@ impl MyceliumClient { "method": method, "params": [ params ] }); + + tracing::info!(%req, "jsonrpc"); let resp = self.http.post(&self.base_url).json(&req).send().await?; let status = resp.status(); let body: Value = resp.json().await?; diff --git a/src/clients/supervisor_client.rs b/src/clients/supervisor_client.rs index f19b592..b17a5c3 100644 --- a/src/clients/supervisor_client.rs +++ b/src/clients/supervisor_client.rs @@ -208,7 +208,7 @@ impl SupervisorClient { .mycelium .push_message( &self.destination, - &self.topic, + &Self::encode_topic(self.topic.as_bytes()), &payload_b64, Some(reply_timeout_secs), ) diff --git a/src/router.rs b/src/router.rs index 545228a..e8a7da7 100644 --- a/src/router.rs +++ b/src/router.rs @@ -522,10 +522,7 @@ pub fn start_inbound_listener( loop { // Poll for inbound supervisor messages on the configured topic - match mycelium - .pop_message(Some(false), Some(20), Some(cfg.topic.as_str())) - .await - { + match mycelium.pop_message(Some(false), Some(20), None).await { Ok(Some(inb)) => { // Expect InboundMessage with base64 "payload" let Some(payload_b64) = inb.get("payload").and_then(|v| v.as_str()) else { @@ -545,6 +542,10 @@ pub fn start_inbound_listener( .await; continue; }; + tracing::info!( + raw = %String::from_utf8_lossy(&raw), + "Read raw messge from mycelium" + ); let Ok(rpc): Result = serde_json::from_slice(&raw) else { // Invalid JSON payload continue;