Compare commits

...

20 Commits

Author SHA1 Message Date
Lee Smet
de6c799635 Set runner secret
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-04 14:05:03 +02:00
Lee Smet
c4971aa794 Add full flow script example
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-03 20:17:12 +02:00
Lee Smet
7aa35b6d06 Fix remainder of HSET return value deconding
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-03 20:16:53 +02:00
Lee Smet
60946af1df Fix pushMessage parameter encoding
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-03 20:11:10 +02:00
Lee Smet
83990cf16a Properly encode topic in mycelium rpc
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-03 20:09:47 +02:00
Lee Smet
dbb9493bcb Improve code format in router
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-03 14:54:11 +02:00
Lee Smet
d921dca75c Fix default mycelium jsonrpc api port
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-03 14:50:45 +02:00
Lee Smet
4a15269442 Fix more HSET types in redis driver
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-03 14:46:55 +02:00
Lee Smet
43fd61d662 Remove unused imports
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-03 11:29:26 +02:00
Lee Smet
38709e06f3 Add script to test actor/context/job/flow create and flow dag
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-08-29 15:43:32 +02:00
Lee Smet
08de312cd9 Fix HSET response decoding
The command internally uses (the deprecated) HMSET which just returns OK
on success instead of the amount of fields written

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-08-29 11:30:41 +02:00
Lee Smet
4d1cd3d910 Format codebase
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-08-29 11:22:42 +02:00
Lee Smet
c1c1ae3bd1 Bump thiserror to latest version
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-08-29 11:22:12 +02:00
Lee Smet
ec339c5cbe Add some internal logging
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-08-29 11:10:04 +02:00
Lee Smet
2aa6277385 Actors are global
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-08-29 10:29:32 +02:00
Lee Smet
9c47eaaf93 Embedd rpc spec in rpc api
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-08-29 09:59:42 +02:00
Lee Smet
fce0ccb2d8 Fetch job results if a job is finished
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-08-28 17:09:04 +02:00
Lee Smet
e5a6228448 Periodically check the job status on the supervisor
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-08-28 16:42:48 +02:00
Lee Smet
052539409b Separate mycelium client more from supervisor client
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-08-28 15:24:03 +02:00
Lee Smet
1551b4707b Periodically verify the status of messages sent over mycelium
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-08-28 14:53:08 +02:00
20 changed files with 1959 additions and 220 deletions

195
Cargo.lock generated
View File

@@ -17,6 +17,15 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "aho-corasick"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916"
dependencies = [
"memchr",
]
[[package]]
name = "anstream"
version = "0.6.20"
@@ -514,9 +523,10 @@ dependencies = [
"reqwest",
"serde",
"serde_json",
"thiserror 1.0.69",
"thiserror",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
@@ -846,7 +856,7 @@ dependencies = [
"rustc-hash",
"serde",
"serde_json",
"thiserror 2.0.16",
"thiserror",
"tokio",
"tower",
"tracing",
@@ -884,7 +894,7 @@ dependencies = [
"serde",
"serde_json",
"soketto",
"thiserror 2.0.16",
"thiserror",
"tokio",
"tokio-stream",
"tokio-util",
@@ -901,9 +911,15 @@ dependencies = [
"http",
"serde",
"serde_json",
"thiserror 2.0.16",
"thiserror",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "libc"
version = "0.2.175"
@@ -944,6 +960,15 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "memchr"
version = "2.7.5"
@@ -993,6 +1018,16 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.6"
@@ -1086,6 +1121,12 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.12.4"
@@ -1203,7 +1244,7 @@ dependencies = [
"rustc-hash",
"rustls",
"socket2 0.5.10",
"thiserror 2.0.16",
"thiserror",
"tokio",
"tracing",
"web-time",
@@ -1224,7 +1265,7 @@ dependencies = [
"rustls",
"rustls-pki-types",
"slab",
"thiserror 2.0.16",
"thiserror",
"tinyvec",
"tracing",
"web-time",
@@ -1352,6 +1393,50 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23d7fd106d8c02486a8d64e778353d1cffe08ce79ac2e82f540c86d0facf6912"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata 0.4.10",
"regex-syntax 0.8.6",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
name = "regex-automata"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b9458fa0bfeeac22b5ca447c63aaf45f28439a709ccd244698632f9aa6394d6"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.8.6",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001"
[[package]]
name = "reqwest"
version = "0.12.23"
@@ -1587,6 +1672,15 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
@@ -1733,33 +1827,13 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "thiserror"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl 1.0.69",
]
[[package]]
name = "thiserror"
version = "2.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3467d614147380f2e4e374161426ff399c91084acd2363eaf549172b3d5e60c0"
dependencies = [
"thiserror-impl 2.0.16",
]
[[package]]
name = "thiserror-impl"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn",
"thiserror-impl",
]
[[package]]
@@ -1773,6 +1847,15 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
dependencies = [
"cfg-if",
]
[[package]]
name = "tinystr"
version = "0.8.1"
@@ -1966,6 +2049,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
@@ -2015,6 +2128,12 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "vcpkg"
version = "0.2.15"
@@ -2151,6 +2270,28 @@ dependencies = [
"rustls-pki-types",
]
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-link"
version = "0.1.3"

View File

@@ -21,4 +21,5 @@ reqwest = { version = "0.12.7", features = ["json", "rustls-tls"] }
# Base64 encoding for message payloads
base64 = "0.22.1"
# Error derive for clean error types
thiserror = "1.0.64"
thiserror = "2.0.16"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt"] }

376
scripts/jsonrpc_demo.py Normal file
View File

@@ -0,0 +1,376 @@
#!/usr/bin/env python3
"""
Demo script for HeroCoordinator JSON-RPC API.
- Creates an actor
- Verifies by loading the actor
- Creates a context with the actor as admin/reader/executor
- Creates three jobs with dependencies
- Creates a flow referencing those jobs
- Fetches and prints the flow DAG
Usage:
COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/jsonrpc_demo.py
Defaults to http://127.0.0.1:9652 if COORDINATOR_URL is not set.
"""
import os
import json
import sys
from urllib import request, error
from typing import Any, Dict, List, Tuple
JSONRPC_VERSION = "2.0"
class JsonRpcClient:
def __init__(self, url: str):
self.url = url.rstrip("/")
self._id = 0
def call(self, method: str, params: Dict[str, Any]) -> Any:
self._id += 1
payload = {
"jsonrpc": JSONRPC_VERSION,
"id": self._id,
"method": method,
"params": params,
}
data = json.dumps(payload).encode("utf-8")
req = request.Request(self.url, data=data, headers={"Content-Type": "application/json"})
try:
with request.urlopen(req) as resp:
body = resp.read()
except error.HTTPError as e:
try:
details = e.read().decode("utf-8", "ignore")
except Exception:
details = ""
raise RuntimeError(f"HTTP error {e.code}: {details}") from e
except error.URLError as e:
raise RuntimeError(f"URL error: {e.reason}") from e
try:
obj = json.loads(body.decode("utf-8"))
except Exception as e:
raise RuntimeError(f"Invalid JSON response: {body!r}") from e
# JSON-RPC single response expected
if isinstance(obj, list):
raise RuntimeError("Batch responses are not supported in this demo")
if obj.get("error"):
raise RuntimeError(f"RPC error: {json.dumps(obj['error'])}")
return obj.get("result")
def print_header(title: str):
print("\n" + "=" * 80)
print(title)
print("=" * 80)
def pretty_print(obj: Any):
print(json.dumps(obj, indent=2, sort_keys=True))
def summarize_dag(dag: Dict[str, Any]):
print_header("Flow DAG Summary")
flow_id = dag.get("flow_id")
caller_id = dag.get("caller_id")
context_id = dag.get("context_id")
print(f"flow_id={flow_id} caller_id={caller_id} context_id={context_id}")
edges: List[Tuple[int, int]] = dag.get("edges", [])
roots: List[int] = dag.get("roots", [])
leaves: List[int] = dag.get("leaves", [])
levels: List[List[int]] = dag.get("levels", [])
nodes: Dict[str, Any] = dag.get("nodes", {})
print("Edges:")
for a, b in edges:
print(f" {a} -> {b}")
print(f"Roots: {roots}")
print(f"Leaves: {leaves}")
print("Levels:")
for i, lvl in enumerate(levels):
print(f" L{i}: {lvl}")
# Show nodes and their dependencies (from JobSummary)
print("Nodes:")
for k, v in nodes.items():
depends = v.get("depends", [])
prerequisites = v.get("prerequisites", [])
stype = v.get("script_type")
print(f" Job {k}: depends={depends} prerequisites={prerequisites} script_type={stype}")
def assert_edges(edges: List[Tuple[int, int]], required: List[Tuple[int, int]]):
edge_set = {(int(a), int(b)) for a, b in edges}
missing = [e for e in required if e not in edge_set]
if missing:
raise AssertionError(f"Missing expected edges in DAG: {missing}; got={sorted(edge_set)}")
def main():
url = os.getenv("COORDINATOR_URL", "http://127.0.0.1:9652")
client = JsonRpcClient(url)
# Deterministic demo IDs; change if collisions happen
actor_id = 1001
context_id = 1 # Redis DB indices are 0-15; keep <= 15
job_a = 3001
job_b = 3002
job_c = 3003
job_d = 3004
job_e = 3005
job_f = 3006
job_g = 3007
job_h = 3008
job_i = 3009
flow_id = 4001
runner_id = 2001
print_header("actor.create")
actor = client.call("actor.create", {
"actor": {
"id": actor_id,
"pubkey": "demo-pubkey",
"address": ["127.0.0.1"]
}
})
pretty_print(actor)
print_header("actor.load")
actor_loaded = client.call("actor.load", {"id": actor_id})
pretty_print(actor_loaded)
print_header("context.create")
context = client.call("context.create", {
"context": {
"id": context_id,
"admins": [actor_id],
"readers": [actor_id],
"executors": [actor_id]
}
})
pretty_print(context)
print_header("runner.create")
runner = client.call("runner.create", {
"context_id": context_id,
"runner": {
"id": runner_id,
"pubkey": "", # leave empty to route by IP
"address": "127.0.0.1",
"topic": f"runner{runner_id}",
"script_type": "Python",
"local": True,
"secret": "demo-secret"
}
})
pretty_print(runner)
print_header("job.create - A (root)")
jobA = client.call("job.create", {
"context_id": context_id,
"job": {
"id": job_a,
"caller_id": actor_id,
"context_id": context_id,
"script": "print('A')",
"script_type": "Python",
"timeout": 30,
"retries": 0,
"env_vars": {},
"prerequisites": [],
"depends": []
}
})
pretty_print(jobA)
print_header("job.create - B (root)")
jobB = client.call("job.create", {
"context_id": context_id,
"job": {
"id": job_b,
"caller_id": actor_id,
"context_id": context_id,
"script": "print('B')",
"script_type": "Python",
"timeout": 30,
"retries": 0,
"env_vars": {},
"prerequisites": [],
"depends": []
}
})
pretty_print(jobB)
print_header("job.create - C (depends on A and B)")
jobC = client.call("job.create", {
"context_id": context_id,
"job": {
"id": job_c,
"caller_id": actor_id,
"context_id": context_id,
"script": "print('C')",
"script_type": "Python",
"timeout": 30,
"retries": 0,
"env_vars": {},
"prerequisites": [],
"depends": [job_a, job_b]
}
})
pretty_print(jobC)
print_header("job.create - D (depends on A)")
jobD = client.call("job.create", {
"context_id": context_id,
"job": {
"id": job_d,
"caller_id": actor_id,
"context_id": context_id,
"script": "print('D')",
"script_type": "Python",
"timeout": 30,
"retries": 0,
"env_vars": {},
"prerequisites": [],
"depends": [job_a]
}
})
pretty_print(jobD)
print_header("job.create - E (depends on B)")
jobE = client.call("job.create", {
"context_id": context_id,
"job": {
"id": job_e,
"caller_id": actor_id,
"context_id": context_id,
"script": "print('E')",
"script_type": "Python",
"timeout": 30,
"retries": 0,
"env_vars": {},
"prerequisites": [],
"depends": [job_b]
}
})
pretty_print(jobE)
print_header("job.create - F (depends on C and D)")
jobF = client.call("job.create", {
"context_id": context_id,
"job": {
"id": job_f,
"caller_id": actor_id,
"context_id": context_id,
"script": "print('F')",
"script_type": "Python",
"timeout": 30,
"retries": 0,
"env_vars": {},
"prerequisites": [],
"depends": [job_c, job_d]
}
})
pretty_print(jobF)
print_header("job.create - G (depends on C and E)")
jobG = client.call("job.create", {
"context_id": context_id,
"job": {
"id": job_g,
"caller_id": actor_id,
"context_id": context_id,
"script": "print('G')",
"script_type": "Python",
"timeout": 30,
"retries": 0,
"env_vars": {},
"prerequisites": [],
"depends": [job_c, job_e]
}
})
pretty_print(jobG)
print_header("job.create - H (leaf; depends on F and G)")
jobH = client.call("job.create", {
"context_id": context_id,
"job": {
"id": job_h,
"caller_id": actor_id,
"context_id": context_id,
"script": "print('H')",
"script_type": "Python",
"timeout": 30,
"retries": 0,
"env_vars": {},
"prerequisites": [],
"depends": [job_f, job_g]
}
})
pretty_print(jobH)
print_header("job.create - I (leaf; depends on F and G)")
jobI = client.call("job.create", {
"context_id": context_id,
"job": {
"id": job_i,
"caller_id": actor_id,
"context_id": context_id,
"script": "print('I')",
"script_type": "Python",
"timeout": 30,
"retries": 0,
"env_vars": {},
"prerequisites": [],
"depends": [job_f, job_g]
}
})
pretty_print(jobI)
print_header("flow.create")
flow = client.call("flow.create", {
"context_id": context_id,
"flow": {
"id": flow_id,
"caller_id": actor_id,
"context_id": context_id,
"jobs": [job_a, job_b, job_c, job_d, job_e, job_f, job_g, job_h, job_i],
"env_vars": {}
}
})
pretty_print(flow)
print_header("flow.dag")
dag = client.call("flow.dag", {"context_id": context_id, "id": flow_id})
summarize_dag(dag)
# Validate roots and leaves
got_roots = list(map(int, dag.get("roots", [])))
if got_roots != sorted([job_a, job_b]):
print("WARNING: Unexpected roots:", got_roots, file=sys.stderr)
got_leaves = {int(x) for x in dag.get("leaves", [])}
expected_leaves = {job_h, job_i}
if got_leaves != expected_leaves:
print("WARNING: Unexpected leaves:", got_leaves, "expected:", expected_leaves, file=sys.stderr)
# Check edges reflect the expanded DAG
expected_edges = [
(job_a, job_c), (job_b, job_c),
(job_a, job_d), (job_b, job_e),
(job_c, job_f), (job_d, job_f),
(job_c, job_g), (job_e, job_g),
(job_f, job_h), (job_g, job_h),
(job_f, job_i), (job_g, job_i),
]
try:
assert_edges(dag.get("edges", []), expected_edges)
print("DAG edges contain expected dependencies:", expected_edges)
except AssertionError as e:
print("WARNING:", e, file=sys.stderr)
if __name__ == "__main__":
try:
main()
except Exception as e:
print_header("Error")
print(str(e))
sys.exit(1)

View File

@@ -0,0 +1,354 @@
#!/usr/bin/env python3
"""
Supervisor flow demo for HeroCoordinator.
This script:
- Creates an actor
- Creates a context granting the actor admin/reader/executor privileges
- Registers a Runner in the context targeting a Supervisor reachable via Mycelium (by public key or IP)
- Creates simple Python jobs (text jobs) with a small dependency chain
- Creates a flow referencing those jobs
- Starts the flow and polls until it finishes (or errors)
Transport: JSON-RPC over HTTP to the Coordinator (default COORDINATOR_URL=http://127.0.0.1:9652).
Example usage:
COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/supervisor_flow_demo.py --dst-ip 2001:db8::1 [--secret your-secret]
COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/supervisor_flow_demo.py --dst-pk bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32 [--secret your-secret]
Notes:
- Exactly one of --dst-ip or --dst-pk must be provided.
- Runner.topic defaults to "supervisor.rpc" (see main.rs).
- The router auto-discovers contexts and will deliver job.run messages to the supervisor.
"""
import argparse
import json
import os
import sys
import time
from typing import Any, Dict, List, Optional, Tuple
from urllib import request, error
JSONRPC_VERSION = "2.0"
def env_url() -> str:
return os.getenv("COORDINATOR_URL", "http://127.0.0.1:9652").rstrip("/")
class JsonRpcClient:
def __init__(self, url: str):
self.url = url
self._id = 0
def call(self, method: str, params: Dict[str, Any]) -> Any:
self._id += 1
payload = {
"jsonrpc": JSONRPC_VERSION,
"id": self._id,
"method": method,
"params": params,
}
data = json.dumps(payload).encode("utf-8")
req = request.Request(self.url, data=data, headers={"Content-Type": "application/json"})
try:
with request.urlopen(req) as resp:
body = resp.read()
except error.HTTPError as e:
try:
details = e.read().decode("utf-8", "ignore")
except Exception:
details = ""
raise RuntimeError(f"HTTP error {e.code}: {details}") from e
except error.URLError as e:
raise RuntimeError(f"URL error: {e.reason}") from e
try:
obj = json.loads(body.decode("utf-8"))
except Exception as e:
raise RuntimeError(f"Invalid JSON response: {body!r}") from e
if isinstance(obj, list):
raise RuntimeError("Batch responses are not supported")
if obj.get("error"):
raise RuntimeError(f"RPC error: {json.dumps(obj['error'])}")
return obj.get("result")
def print_header(title: str):
print("\n" + "=" * 80)
print(title)
print("=" * 80)
def pretty(obj: Any):
print(json.dumps(obj, indent=2, sort_keys=True))
def try_create_or_load(client: JsonRpcClient, create_method: str, create_params: Dict[str, Any],
load_method: str, load_params: Dict[str, Any]) -> Any:
"""Attempt a create; if it fails due to existence, try load."""
try:
return client.call(create_method, create_params)
except RuntimeError as e:
msg = str(e)
# Server maps AlreadyExists to StorageError, we don't have a structured error code here.
if "Already exists" in msg or "Storage Error" in msg or "Invalid params" in msg:
# Fall back to load
return client.call(load_method, load_params)
raise
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Create actor/context/runner/jobs/flow; start and wait until completion.")
group = p.add_mutually_exclusive_group(required=True)
group.add_argument("--dst-ip", help="Supervisor Mycelium IP address (IPv4 or IPv6)")
group.add_argument("--dst-pk", help="Supervisor public key (64-hex)")
p.add_argument("--context-id", type=int, default=2, help="Context id (Redis DB index; 0-15). Default: 2")
p.add_argument("--actor-id", type=int, default=11001, help="Actor id. Default: 11001")
p.add_argument("--runner-id", type=int, default=12001, help="Runner id. Default: 12001")
p.add_argument("--flow-id", type=int, default=13001, help="Flow id. Default: 13001")
p.add_argument("--base-job-id", type=int, default=20000, help="Base job id for first job; subsequent jobs increment. Default: 20000")
p.add_argument("--jobs", type=int, default=3, help="Number of jobs to create (>=1). Forms a simple chain. Default: 3")
p.add_argument("--timeout-secs", type=int, default=60, help="Per-job timeout seconds. Default: 60")
p.add_argument("--retries", type=int, default=0, help="Per-job retries (0-255). Default: 0")
p.add_argument(
"--script-type",
choices=["Python", "V", "Osis", "Sal"],
default="Python",
help="ScriptType for jobs/runner. Default: Python"
)
p.add_argument("--topic", default="supervisor.rpc", help="Supervisor topic. Default: supervisor.rpc")
p.add_argument("--secret", help="Optional supervisor secret used for authenticated supervisor calls")
p.add_argument("--poll-interval", type=float, default=2.0, help="Flow poll interval seconds. Default: 2.0")
p.add_argument("--poll-timeout", type=int, default=600, help="Max seconds to wait for flow completion. Default: 600")
return p.parse_args()
def main():
args = parse_args()
if args.jobs < 1:
print("ERROR: --jobs must be >= 1", file=sys.stderr)
sys.exit(2)
url = env_url()
client = JsonRpcClient(url)
actor_id = int(args.actor_id)
context_id = int(args.context_id)
runner_id = int(args.runner_id)
flow_id = int(args.flow_id)
base_job_id = int(args.base_job_id)
script_type = args.script_type
timeout = int(args.timeout_secs)
retries = int(args.retries)
topic = args.topic
# 1) Actor
print_header("actor.create (or load)")
actor = try_create_or_load(
client,
"actor.create",
{
"actor": {
"id": actor_id,
"pubkey": "demo-pubkey",
"address": ["127.0.0.1"],
}
},
"actor.load",
{"id": actor_id},
)
pretty(actor)
# 2) Context
print_header("context.create (or load)")
context = try_create_or_load(
client,
"context.create",
{
"context": {
"id": context_id,
"admins": [actor_id],
"readers": [actor_id],
"executors": [actor_id],
}
},
"context.load",
{"id": context_id},
)
pretty(context)
# 3) Runner in this context
# Router picks pubkey if non-empty, else IP address.
# However, RunnerCreate requires both fields; we fill both and control routing via pubkey empty/non-empty.
runner_pubkey = args.dst_pk if args.dst_pk else ""
runner_address = args.dst_ip if args.dst_ip else "127.0.0.1"
print_header("runner.create (or load)")
# runner.load requires both context_id and id
try:
runner_payload = {
"id": runner_id,
"pubkey": runner_pubkey,
"address": runner_address,
"topic": topic,
"script_type": script_type,
"local": False,
}
# Optional supervisor secret used by router for authenticated supervisor calls
if getattr(args, "secret", None):
runner_payload["secret"] = args.secret
runner = client.call("runner.create", {
"context_id": context_id,
"runner": runner_payload
})
except RuntimeError as e:
msg = str(e)
if "Already exists" in msg or "Storage Error" in msg or "Invalid params" in msg:
runner = client.call("runner.load", {"context_id": context_id, "id": runner_id})
else:
raise
pretty(runner)
# 4) Jobs
# Build a simple chain: J0 (root), J1 depends on J0, J2 depends on J1, ... up to N-1
job_ids: List[int] = []
for i in range(args.jobs):
jid = base_job_id + i
depends = [] if i == 0 else [base_job_id + (i - 1)]
job_payload = {
"id": jid,
"caller_id": actor_id,
"context_id": context_id,
"script": f"print('Job {i} running')",
"script_type": script_type,
"timeout": timeout,
"retries": retries,
"env_vars": {},
"prerequisites": [],
"depends": depends,
}
print_header(f"job.create - {jid} {'(root)' if not depends else f'(depends on {depends})'}")
try:
job = client.call("job.create", {
"context_id": context_id,
"job": job_payload
})
except RuntimeError as e:
msg = str(e)
if "Already exists" in msg or "Storage Error" in msg or "Invalid params" in msg:
job = client.call("job.load", {
"context_id": context_id,
"caller_id": actor_id,
"id": jid
})
else:
raise
pretty(job)
job_ids.append(jid)
# 5) Flow
print_header("flow.create (or load)")
try:
flow = client.call("flow.create", {
"context_id": context_id,
"flow": {
"id": flow_id,
"caller_id": actor_id,
"context_id": context_id,
"jobs": job_ids,
"env_vars": {}
}
})
except RuntimeError as e:
msg = str(e)
if "Already exists" in msg or "Storage Error" in msg or "Invalid params" in msg:
flow = client.call("flow.load", {"context_id": context_id, "id": flow_id})
else:
raise
pretty(flow)
# Optional: show DAG
try:
print_header("flow.dag")
dag = client.call("flow.dag", {"context_id": context_id, "id": flow_id})
pretty(dag)
except Exception as e:
print(f"WARN: flow.dag failed: {e}", file=sys.stderr)
# 6) Start flow (idempotent; returns bool whether scheduler started)
print_header("flow.start")
started = client.call("flow.start", {"context_id": context_id, "id": flow_id})
print(f"flow.start -> {started}")
# 7) Poll until Finished or Error (or timeout)
print_header("Polling flow.load until completion")
t0 = time.time()
status = None
last_status_print = 0.0
poll_count = 0
while True:
poll_count += 1
flow = client.call("flow.load", {"context_id": context_id, "id": flow_id})
status = flow.get("status")
now = time.time()
if now - last_status_print >= max(1.0, float(args.poll_interval)):
print(f"[{int(now - t0)}s] flow.status = {status}")
last_status_print = now
# Every 5th poll, print the current flow DAG
if (poll_count % 5) == 0:
try:
print_header("flow.dag (periodic)")
dag = client.call("flow.dag", {"context_id": context_id, "id": flow_id})
pretty(dag)
except Exception as e:
print(f"WARN: periodic flow.dag failed: {e}", file=sys.stderr)
if status in ("Finished", "Error"):
break
if (now - t0) > args.poll_timeout:
print(f"ERROR: Flow did not complete within {args.poll_timeout}s (status={status})", file=sys.stderr)
break
time.sleep(float(args.poll_interval))
# 8) Final summary: job statuses
print_header("Final job statuses")
for jid in job_ids:
try:
j = client.call("job.load", {
"context_id": context_id,
"caller_id": actor_id,
"id": jid
})
print(f"Job {jid}: status={j.get('status')} result={j.get('result')}")
except Exception as e:
print(f"Job {jid}: load failed: {e}", file=sys.stderr)
# Exit code
if status == "Finished":
print_header("Result")
print("Flow finished successfully.")
sys.exit(0)
else:
print_header("Result")
print(f"Flow ended with status={status}")
sys.exit(1)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nInterrupted.")
sys.exit(130)
except Exception as e:
print_header("Error")
print(str(e))
sys.exit(1)

View File

@@ -20,7 +20,7 @@
"methods": [
{
"name": "actor.create",
"summary": "Create/Upsert Actor in a context",
"summary": "Create/Upsert Actor",
"params": [
{
"name": "params",
@@ -49,7 +49,7 @@
},
{
"name": "actor.load",
"summary": "Load an Actor by id from a context",
"summary": "Load an Actor by id",
"params": [
{
"name": "params",
@@ -438,6 +438,16 @@
"Processed"
]
},
"TransportStatus": {
"type": "string",
"enum": [
"Queued",
"Sent",
"Delivered",
"Read",
"Failed"
]
},
"MessageType": {
"type": "string",
"enum": [
@@ -553,6 +563,9 @@
"local": {
"type": "boolean"
},
"secret": {
"type": "string"
},
"created_at": {
"type": "integer",
"format": "int64"
@@ -779,6 +792,12 @@
},
"status": {
"$ref": "#/components/schemas/MessageStatus"
},
"transport_id": {
"type": "string"
},
"transport_status": {
"$ref": "#/components/schemas/TransportStatus"
}
}
},
@@ -985,6 +1004,9 @@
},
"local": {
"type": "boolean"
},
"secret": {
"type": "string"
}
}
},
@@ -1165,14 +1187,9 @@
"ActorCreateParams": {
"type": "object",
"required": [
"context_id",
"actor"
],
"properties": {
"context_id": {
"type": "integer",
"format": "uint32"
},
"actor": {
"$ref": "#/components/schemas/ActorCreate"
}
@@ -1181,14 +1198,9 @@
"ActorLoadParams": {
"type": "object",
"required": [
"context_id",
"id"
],
"properties": {
"context_id": {
"type": "integer",
"format": "uint32"
},
"id": {
"type": "integer",
"format": "uint32"

View File

@@ -10,7 +10,7 @@
| **Runner** | Public key, Mycelium address, topic name, type (`v\|python\|osis\|rust`), local flag, timestamps | `runner:<id>` (hash) | The *worker* that actually executes **RunnerJob** scripts. It subscribes to a Mycelium topic (normally `runner<id>`). If `local == true` the runner also consumes jobs directly from a Redis queue that is named after the scripttype suffix (`v`, `python`, …). |
| **RunnerJob**| Script source, type (`osis\|sal\|v\|python`), envvars, prerequisites, dependencies, status, timestamps, result map | `job:<caller_id>:<id>` (hash) | A single executable unit. It lives inside a **Context**, belongs to a **Runner**, and is queued according to its `script_type` (e.g. `queue:python`). Its status moves through the lifecycle `dispatched → waiting_for_prerequisites → started → finished|error`. |
> **Key idea:** All objects are persisted as *hashes* in a **Redis** database that is dedicated to a *Context*. The system is completely **decentralised** each actor owns its own context and can spin up as many runners as needed. Communication between actors, runners and the rest of the system happens over **Mycelium**, a messagebus that uses Redis lists as queues.
> **Key idea:** All objects are persisted as *hashes*. Contextscoped objects (**Context**, **Flow**, **Message**, **Runner**, **RunnerJob**) live in a **Redis** database dedicated to that context. **Actors are global** and are stored in Redis DB 0 under `actor:<id>`. The system is completely **decentralised** each actor owns its own context and can spin up as many runners as needed. Communication between actors, runners and the rest of the system happens over **Mycelium**, a messagebus that uses Redis lists as queues.
---

View File

@@ -1,7 +1,7 @@
pub mod mycelium_client;
pub mod supervisor_client;
pub mod types;
pub use supervisor_client::{
Destination,
SupervisorClient,
SupervisorClientError,
};
pub use mycelium_client::{MyceliumClient, MyceliumClientError};
pub use supervisor_client::{SupervisorClient, SupervisorClientError};
pub use types::Destination;

View File

@@ -0,0 +1,237 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use reqwest::Client as HttpClient;
use serde_json::{Value, json};
use thiserror::Error;
use crate::clients::Destination;
use crate::models::TransportStatus;
/// Lightweight client for Mycelium JSON-RPC (send + query status)
#[derive(Clone)]
pub struct MyceliumClient {
base_url: String, // e.g. http://127.0.0.1:8990
http: HttpClient,
id_counter: Arc<AtomicU64>,
}
#[derive(Debug, Error)]
pub enum MyceliumClientError {
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("Transport timed out waiting for a reply (408)")]
TransportTimeout,
#[error("JSON-RPC error: {0}")]
RpcError(String),
#[error("Invalid response: {0}")]
InvalidResponse(String),
}
impl MyceliumClient {
pub fn new(base_url: impl Into<String>) -> Result<Self, MyceliumClientError> {
let url = base_url.into();
let http = HttpClient::builder().build()?;
Ok(Self {
base_url: url,
http,
id_counter: Arc::new(AtomicU64::new(1)),
})
}
fn next_id(&self) -> u64 {
self.id_counter.fetch_add(1, Ordering::Relaxed)
}
async fn jsonrpc(&self, method: &str, params: Value) -> Result<Value, MyceliumClientError> {
let req = json!({
"jsonrpc": "2.0",
"id": self.next_id(),
"method": method,
"params": [ params ]
});
let resp = self.http.post(&self.base_url).json(&req).send().await?;
let status = resp.status();
let body: Value = resp.json().await?;
if let Some(err) = body.get("error") {
let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0);
let msg = err
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("unknown error");
if code == 408 {
return Err(MyceliumClientError::TransportTimeout);
}
return Err(MyceliumClientError::RpcError(format!(
"code={code} msg={msg}"
)));
}
if !status.is_success() {
return Err(MyceliumClientError::RpcError(format!(
"HTTP {status}, body {body}"
)));
}
Ok(body)
}
/// Call messageStatus with an outbound message id (hex string)
pub async fn message_status(
&self,
id_hex: &str,
) -> Result<TransportStatus, MyceliumClientError> {
let params = json!({ "id": id_hex });
let body = self.jsonrpc("messageStatus", params).await?;
let result = body.get("result").ok_or_else(|| {
MyceliumClientError::InvalidResponse(format!("missing result in response: {body}"))
})?;
// Accept both { status: "..."} and bare "..."
let status_str = if let Some(s) = result.get("status").and_then(|v| v.as_str()) {
s.to_string()
} else if let Some(s) = result.as_str() {
s.to_string()
} else {
return Err(MyceliumClientError::InvalidResponse(format!(
"unexpected result shape: {result}"
)));
};
Self::map_status(&status_str).ok_or_else(|| {
MyceliumClientError::InvalidResponse(format!("unknown status: {status_str}"))
})
}
fn map_status(s: &str) -> Option<TransportStatus> {
match s {
"queued" => Some(TransportStatus::Queued),
"sent" => Some(TransportStatus::Sent),
"delivered" => Some(TransportStatus::Delivered),
"read" => Some(TransportStatus::Read),
"failed" => Some(TransportStatus::Failed),
_ => None,
}
}
/// Build params object for pushMessage without performing any network call.
/// Exposed for serializer-only tests and reuse.
pub(crate) fn build_push_params(
dst: &Destination,
topic: &str,
payload_b64: &str,
reply_timeout: Option<u64>,
) -> Value {
let dst_v = match dst {
Destination::Ip(ip) => json!({ "ip": ip.to_string() }),
Destination::Pk(pk) => json!({ "pk": pk }),
};
let mut message = json!({
"dst": dst_v,
"topic": topic,
"payload": payload_b64,
});
if let Some(rt) = reply_timeout {
message["reply_timeout"] = json!(rt);
}
message
}
/// pushMessage: send a message with dst/topic/payload. Optional reply_timeout for sync replies.
pub async fn push_message(
&self,
dst: &Destination,
topic: &str,
payload_b64: &str,
reply_timeout: Option<u64>,
) -> Result<Value, MyceliumClientError> {
let params = Self::build_push_params(dst, topic, payload_b64, reply_timeout);
let body = self.jsonrpc("pushMessage", params).await?;
let result = body.get("result").ok_or_else(|| {
MyceliumClientError::InvalidResponse(format!("missing result in response: {body}"))
})?;
Ok(result.clone())
}
/// Helper to extract outbound message id from pushMessage result (InboundMessage or PushMessageResponseId)
pub fn extract_message_id_from_result(result: &Value) -> Option<String> {
result
.get("id")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::clients::Destination;
#[test]
fn build_push_params_shapes_ip_pk_and_timeout() {
// IP destination
let p1 = MyceliumClient::build_push_params(
&Destination::Ip("2001:db8::1".parse().unwrap()),
"supervisor.rpc",
"Zm9vYmFy", // "foobar"
Some(10),
);
let msg1 = p1.get("message").unwrap();
assert_eq!(
msg1.get("topic").unwrap().as_str().unwrap(),
"supervisor.rpc"
);
assert_eq!(msg1.get("payload").unwrap().as_str().unwrap(), "Zm9vYmFy");
assert_eq!(
msg1.get("dst")
.unwrap()
.get("ip")
.unwrap()
.as_str()
.unwrap(),
"2001:db8::1"
);
assert_eq!(p1.get("reply_timeout").unwrap().as_u64().unwrap(), 10);
// PK destination without timeout
let p2 = MyceliumClient::build_push_params(
&Destination::Pk(
"bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32".into(),
),
"supervisor.rpc",
"YmF6", // "baz"
None,
);
let msg2 = p2.get("message").unwrap();
assert_eq!(
msg2.get("dst")
.unwrap()
.get("pk")
.unwrap()
.as_str()
.unwrap(),
"bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32"
);
assert!(p2.get("reply_timeout").is_none());
}
#[test]
fn extract_message_id_variants() {
// PushMessageResponseId
let r1 = json!({"id":"0123456789abcdef"});
assert_eq!(
MyceliumClient::extract_message_id_from_result(&r1).unwrap(),
"0123456789abcdef"
);
// InboundMessage-like
let r2 = json!({
"id":"fedcba9876543210",
"srcIp":"449:abcd:0123:defa::1",
"payload":"hpV+"
});
assert_eq!(
MyceliumClient::extract_message_id_from_result(&r2).unwrap(),
"fedcba9876543210"
);
}
}

View File

@@ -1,28 +1,20 @@
use std::net::IpAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use reqwest::Client as HttpClient;
use serde_json::{Value, json};
use thiserror::Error;
/// Destination for Mycelium messages
#[derive(Clone, Debug)]
pub enum Destination {
Ip(IpAddr),
Pk(String), // 64-hex public key
}
use crate::clients::{Destination, MyceliumClient, MyceliumClientError};
#[derive(Clone)]
pub struct SupervisorClient {
base_url: String, // e.g. "http://127.0.0.1:8990"
destination: Destination, // ip or pk
topic: String, // e.g. "supervisor.rpc"
secret: Option<String>, // optional, required by several supervisor methods
http: HttpClient,
id_counter: Arc<AtomicU64>, // JSON-RPC id generator (for inner + outer requests)
mycelium: Arc<MyceliumClient>, // Delegated Mycelium transport
destination: Destination, // ip or pk
topic: String, // e.g. "supervisor.rpc"
secret: Option<String>, // optional, required by several supervisor methods
id_counter: Arc<AtomicU64>, // JSON-RPC id generator (for inner supervisor requests)
}
#[derive(Debug, Error)]
@@ -41,8 +33,37 @@ pub enum SupervisorClientError {
MissingSecret,
}
impl From<MyceliumClientError> for SupervisorClientError {
fn from(e: MyceliumClientError) -> Self {
match e {
MyceliumClientError::TransportTimeout => SupervisorClientError::TransportTimeout,
MyceliumClientError::RpcError(m) => SupervisorClientError::RpcError(m),
MyceliumClientError::InvalidResponse(m) => SupervisorClientError::InvalidResponse(m),
MyceliumClientError::Http(err) => SupervisorClientError::Http(err),
MyceliumClientError::Json(err) => SupervisorClientError::Json(err),
}
}
}
impl SupervisorClient {
/// Create a new client. base_url defaults to Mycelium spec "http://127.0.0.1:8990" if empty.
/// Preferred constructor: provide a shared Mycelium client.
pub fn new_with_client(
mycelium: Arc<MyceliumClient>,
destination: Destination,
topic: impl Into<String>,
secret: Option<String>,
) -> Self {
Self {
mycelium,
destination,
topic: topic.into(),
secret,
id_counter: Arc::new(AtomicU64::new(1)),
}
}
/// Backward-compatible constructor that builds a Mycelium client from base_url.
/// base_url defaults to Mycelium spec "http://127.0.0.1:8990" if empty.
pub fn new(
base_url: impl Into<String>,
destination: Destination,
@@ -53,21 +74,15 @@ impl SupervisorClient {
if url.is_empty() {
url = "http://127.0.0.1:8990".to_string();
}
let http = HttpClient::builder().build()?;
Ok(Self {
base_url: url,
destination,
topic: topic.into(),
secret,
http,
id_counter: Arc::new(AtomicU64::new(1)),
})
let mycelium = Arc::new(MyceliumClient::new(url)?);
Ok(Self::new_with_client(mycelium, destination, topic, secret))
}
fn next_id(&self) -> u64 {
self.id_counter.fetch_add(1, Ordering::Relaxed)
}
/// Internal helper used by tests to inspect dst JSON shape.
fn build_dst(&self) -> Value {
match &self.destination {
Destination::Ip(ip) => json!({ "ip": ip.to_string() }),
@@ -89,47 +104,8 @@ impl SupervisorClient {
Ok(BASE64_STANDARD.encode(s.as_bytes()))
}
fn build_push_request(&self, payload_b64: String) -> Value {
let dst = self.build_dst();
let msg = json!({
"dst": dst,
"topic": self.topic,
"payload": payload_b64
});
// Async path: no reply_timeout attached
json!({
"jsonrpc": "2.0",
"id": self.next_id(),
"method": "pushMessage",
"params": [ { "message": msg } ]
})
}
async fn send_push(&self, req: &Value) -> Result<Value, SupervisorClientError> {
let resp = self.http.post(&self.base_url).json(req).send().await?;
let status = resp.status();
let body: Value = resp.json().await?;
// JSON-RPC error object handling
if let Some(err) = body.get("error") {
let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0);
let msg = err
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("unknown error");
if code == 408 {
return Err(SupervisorClientError::TransportTimeout);
}
return Err(SupervisorClientError::RpcError(format!(
"code={code} msg={msg}"
)));
}
if !status.is_success() {
return Err(SupervisorClientError::RpcError(format!(
"HTTP status {status}, body {body}"
)));
}
Ok(body)
fn encode_topic(topic: &[u8]) -> String {
BASE64_STANDARD.encode(topic)
}
fn extract_message_id_from_result(result: &Value) -> Option<String> {
@@ -142,34 +118,94 @@ impl SupervisorClient {
.map(|s| s.to_string())
}
/// Generic call: build supervisor JSON-RPC message, wrap in Mycelium pushMessage, return outbound message id (hex).
/// Generic call: build supervisor JSON-RPC message, send via Mycelium pushMessage, return outbound message id (hex).
pub async fn call(&self, method: &str, params: Value) -> Result<String, SupervisorClientError> {
let inner = self.build_supervisor_payload(method, params);
let payload_b64 = Self::encode_payload(&inner)?;
let push_req = self.build_push_request(payload_b64);
let resp = self.send_push(&push_req).await?;
let result = self
.mycelium
.push_message(
&self.destination,
&Self::encode_topic(self.topic.as_bytes()),
&payload_b64,
None,
)
.await?;
// Expect "result" to be either inbound message or response id
match resp.get("result") {
Some(res_obj) => {
if let Some(id) = Self::extract_message_id_from_result(res_obj) {
return Ok(id);
}
// Some servers might return the oneOf wrapped, handle len==1 array defensively (not in spec but resilient)
if let Some(arr) = res_obj.as_array()
&& arr.len() == 1
&& let Some(id) = Self::extract_message_id_from_result(&arr[0])
{
return Ok(id);
}
Err(SupervisorClientError::InvalidResponse(format!(
"result did not contain message id: {res_obj}"
)))
}
None => Err(SupervisorClientError::InvalidResponse(format!(
"missing result in response: {resp}"
))),
if let Some(id) = MyceliumClient::extract_message_id_from_result(&result) {
return Ok(id);
}
// Some servers might return the oneOf wrapped, handle len==1 array defensively (not in spec but resilient)
if let Some(arr) = result.as_array()
&& arr.len() == 1
&& let Some(id) = MyceliumClient::extract_message_id_from_result(&arr[0])
{
return Ok(id);
}
Err(SupervisorClientError::InvalidResponse(format!(
"result did not contain message id: {result}"
)))
}
/// Synchronous variant: wait for a JSON-RPC reply via Mycelium reply_timeout, and return the inner JSON-RPC "result".
/// If the supervisor returns an error object, map to RpcError.
pub async fn call_sync(
&self,
method: &str,
params: Value,
reply_timeout_secs: u64,
) -> Result<Value, SupervisorClientError> {
let inner = self.build_supervisor_payload(method, params);
let payload_b64 = Self::encode_payload(&inner)?;
let result = self
.mycelium
.push_message(
&self.destination,
&self.topic,
&payload_b64,
Some(reply_timeout_secs),
)
.await?;
// Expect an InboundMessage-like with a base64 payload containing the supervisor JSON-RPC response
let payload_field = if let Some(p) = result.get("payload").and_then(|v| v.as_str()) {
p.to_string()
} else if let Some(arr) = result.as_array() {
// Defensive: handle single-element array shape
if let Some(one) = arr.get(0) {
one.get("payload")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.ok_or_else(|| {
SupervisorClientError::InvalidResponse(format!(
"missing payload in result: {result}"
))
})?
} else {
return Err(SupervisorClientError::TransportTimeout);
}
} else {
// No payload => no reply received within timeout (Mycelium would have returned just an id)
return Err(SupervisorClientError::TransportTimeout);
};
let raw = BASE64_STANDARD
.decode(payload_field.as_bytes())
.map_err(|e| {
SupervisorClientError::InvalidResponse(format!("invalid base64 payload: {e}"))
})?;
let rpc_resp: Value = serde_json::from_slice(&raw)?;
if let Some(err) = rpc_resp.get("error") {
return Err(SupervisorClientError::RpcError(err.to_string()));
}
let res = rpc_resp.get("result").ok_or_else(|| {
SupervisorClientError::InvalidResponse(format!(
"missing result in supervisor reply: {rpc_resp}"
))
})?;
Ok(res.clone())
}
fn need_secret(&self) -> Result<&str, SupervisorClientError> {
@@ -291,6 +327,28 @@ impl SupervisorClient {
self.call("job.status", json!([job_id.into()])).await
}
/// Synchronous job.status: waits for the supervisor to reply and returns the status string.
/// The supervisor result may be an object with { status: "..." } or a bare string.
pub async fn job_status_sync(
&self,
job_id: impl Into<String>,
reply_timeout_secs: u64,
) -> Result<String, SupervisorClientError> {
let res = self
.call_sync("job.status", json!([job_id.into()]), reply_timeout_secs)
.await?;
let status = if let Some(s) = res.get("status").and_then(|v| v.as_str()) {
s.to_string()
} else if let Some(s) = res.as_str() {
s.to_string()
} else {
return Err(SupervisorClientError::InvalidResponse(format!(
"unexpected job.status result shape: {res}"
)));
};
Ok(status)
}
pub async fn job_result(
&self,
job_id: impl Into<String>,
@@ -298,6 +356,45 @@ impl SupervisorClient {
self.call("job.result", json!([job_id.into()])).await
}
/// Synchronous job.result: waits for the supervisor to reply and returns a map
/// containing exactly one of:
/// - {"success": "..."} on success
/// - {"error": "..."} on error reported by the runner
/// Some servers may return a bare string; we treat that as {"success": "<string>"}.
pub async fn job_result_sync(
&self,
job_id: impl Into<String>,
reply_timeout_secs: u64,
) -> Result<std::collections::HashMap<String, String>, SupervisorClientError> {
let res = self
.call_sync("job.result", json!([job_id.into()]), reply_timeout_secs)
.await?;
use std::collections::HashMap;
let mut out: HashMap<String, String> = HashMap::new();
if let Some(obj) = res.as_object() {
if let Some(s) = obj.get("success").and_then(|v| v.as_str()) {
out.insert("success".to_string(), s.to_string());
return Ok(out);
}
if let Some(s) = obj.get("error").and_then(|v| v.as_str()) {
out.insert("error".to_string(), s.to_string());
return Ok(out);
}
return Err(SupervisorClientError::InvalidResponse(format!(
"unexpected job.result result shape: {res}"
)));
} else if let Some(s) = res.as_str() {
out.insert("success".to_string(), s.to_string());
return Ok(out);
}
Err(SupervisorClientError::InvalidResponse(format!(
"unexpected job.result result shape: {res}"
)))
}
pub async fn job_stop(
&self,
job_id: impl Into<String>,
@@ -334,8 +431,10 @@ impl SupervisorClient {
#[cfg(test)]
mod tests {
use super::*;
use std::net::IpAddr;
fn mk_client() -> SupervisorClient {
// Uses the legacy constructor but will not issue real network calls in these tests.
SupervisorClient::new(
"http://127.0.0.1:8990",
Destination::Pk(
@@ -368,20 +467,21 @@ mod tests {
}
#[test]
fn wraps_supervisor_payload_in_push_message() {
fn encodes_supervisor_payload_b64() {
let c = mk_client();
let payload = c.build_supervisor_payload("list_runners", json!([]));
let b64 = SupervisorClient::encode_payload(&payload).unwrap();
let req = c.build_push_request(b64);
assert_eq!(req.get("method").unwrap().as_str().unwrap(), "pushMessage");
let params = req.get("params").unwrap().as_array().unwrap();
let msg = params[0].get("message").unwrap();
assert!(msg.get("payload").is_some());
// decode and compare round-trip JSON
let raw = base64::engine::general_purpose::STANDARD
.decode(b64.as_bytes())
.unwrap();
let decoded: Value = serde_json::from_slice(&raw).unwrap();
assert_eq!(
msg.get("topic").unwrap().as_str().unwrap(),
"supervisor.rpc"
decoded.get("method").unwrap().as_str().unwrap(),
"list_runners"
);
assert!(msg.get("dst").unwrap().get("pk").is_some());
assert_eq!(decoded.get("jsonrpc").unwrap().as_str().unwrap(), "2.0");
}
#[test]

9
src/clients/types.rs Normal file
View File

@@ -0,0 +1,9 @@
use std::net::IpAddr;
/// Destination for Mycelium messages (shared by clients)
#[derive(Clone, Debug)]
pub enum Destination {
Ip(IpAddr),
/// 64-hex public key of the receiver node
Pk(String),
}

View File

@@ -1,8 +1,8 @@
pub mod models;
pub mod storage;
pub mod service;
mod time;
pub mod dag;
pub mod rpc;
pub mod clients;
pub mod dag;
pub mod models;
pub mod router;
pub mod rpc;
pub mod service;
pub mod storage;
mod time;

View File

@@ -2,6 +2,8 @@ use clap::Parser;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
#[derive(Debug, Clone, Parser)]
#[command(
name = "herocoordinator",
@@ -23,8 +25,8 @@ struct Cli {
long = "mycelium-port",
short = 'p',
env = "MYCELIUM_PORT",
default_value_t = 9651u16,
help = "Port for Mycelium JSON-RPC (default: 9651)"
default_value_t = 8990u16,
help = "Port for Mycelium JSON-RPC (default: 8990)"
)]
mycelium_port: u16,
@@ -73,6 +75,14 @@ struct Cli {
#[tokio::main]
async fn main() {
let cli = Cli::parse();
// Initialize tracing subscriber (pretty formatter; controlled by RUST_LOG)
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
tracing_subscriber::fmt()
.with_env_filter(filter)
.pretty()
.with_target(true)
.with_level(true)
.init();
let http_addr = SocketAddr::new(cli.api_http_ip, cli.api_http_port);
let ws_addr = SocketAddr::new(cli.api_ws_ip, cli.api_ws_port);
@@ -97,6 +107,8 @@ async fn main() {
concurrency: 32,
base_url,
topic: "supervisor.rpc".to_string(),
transport_poll_interval_secs: 2,
transport_poll_timeout_secs: 300,
};
let _auto_handle = herocoordinator::router::start_router_auto(service_for_router, cfg);
}
@@ -105,10 +117,7 @@ async fn main() {
let http_module = herocoordinator::rpc::build_module(state.clone());
let ws_module = herocoordinator::rpc::build_module(state.clone());
println!(
"Starting JSON-RPC servers: HTTP http://{} | WS ws://{} | redis_addr={}",
http_addr, ws_addr, cli.redis_addr
);
info!(%http_addr, %ws_addr, redis_addr=%cli.redis_addr, "Starting JSON-RPC servers");
// Start servers
let _http_handle = herocoordinator::rpc::start_http(http_addr, http_module)
@@ -120,7 +129,7 @@ async fn main() {
// Wait for Ctrl+C to terminate
if let Err(e) = tokio::signal::ctrl_c().await {
eprintln!("Failed to listen for shutdown signal: {e}");
error!(error=%e, "Failed to listen for shutdown signal");
}
println!("Shutdown signal received, exiting.");
info!("Shutdown signal received, exiting.");
}

View File

@@ -10,6 +10,6 @@ pub use actor::Actor;
pub use context::Context;
pub use flow::{Flow, FlowStatus};
pub use job::{Job, JobStatus};
pub use message::{Message, MessageFormatType, MessageStatus, MessageType};
pub use message::{Message, MessageFormatType, MessageStatus, MessageType, TransportStatus};
pub use runner::Runner;
pub use script_type::ScriptType;

View File

@@ -22,6 +22,12 @@ pub struct Message {
pub timeout_ack: u32,
/// Seconds for the receiver to send us a reply
pub timeout_result: u32,
/// Outbound transport id returned by Mycelium on push
pub transport_id: Option<String>,
/// Latest transport status as reported by Mycelium
pub transport_status: Option<TransportStatus>,
pub job: Vec<Job>,
pub logs: Vec<Log>,
pub created_at: Timestamp,
@@ -44,6 +50,15 @@ pub enum MessageStatus {
Processed,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum TransportStatus {
Queued,
Sent,
Delivered,
Read,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MessageFormatType {
Html,

View File

@@ -18,6 +18,8 @@ pub struct Runner {
pub script_type: ScriptType,
/// If this is true, the runner also listens on a local redis queue
pub local: bool,
/// Optional secret used for authenticated supervisor calls (if required)
pub secret: Option<String>,
pub created_at: Timestamp,
pub updated_at: Timestamp,
}

View File

@@ -4,10 +4,11 @@ use serde_json::{Value, json};
use tokio::sync::Semaphore;
use crate::{
clients::{Destination, SupervisorClient},
models::{Job, Message, MessageStatus, ScriptType},
clients::{Destination, MyceliumClient, SupervisorClient},
models::{Job, JobStatus, Message, MessageStatus, ScriptType, TransportStatus},
service::AppService,
};
use tracing::{error, info};
#[derive(Clone, Debug)]
pub struct RouterConfig {
@@ -15,7 +16,9 @@ pub struct RouterConfig {
pub concurrency: usize,
pub base_url: String, // e.g. http://127.0.0.1:8990
pub topic: String, // e.g. "supervisor.rpc"
// secret currently unused (None), add here later if needed
// Transport status polling configuration
pub transport_poll_interval_secs: u64, // e.g. 2
pub transport_poll_timeout_secs: u64, // e.g. 300 (5 minutes)
}
/// Start background router loops, one per context.
@@ -32,6 +35,18 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec<tokio::task::
let cfg_cloned = cfg.clone();
let handle = tokio::spawn(async move {
let sem = Arc::new(Semaphore::new(cfg_cloned.concurrency));
// Create a shared Mycelium client for this context loop (retry until available)
let mycelium = loop {
match MyceliumClient::new(cfg_cloned.base_url.clone()) {
Ok(c) => break Arc::new(c),
Err(e) => {
error!(context_id=ctx_id, error=%e, "MyceliumClient init error");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
};
loop {
// Pop next message key (blocking with timeout)
match service_cloned.brpop_msg_out(ctx_id, 1).await {
@@ -50,16 +65,17 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec<tokio::task::
};
let service_task = service_cloned.clone();
let cfg_task = cfg_cloned.clone();
tokio::spawn(async move {
// Ensure permit is dropped at end of task
let _permit = permit;
if let Err(e) =
deliver_one(&service_task, &cfg_task, ctx_id, &key).await
{
eprintln!(
"[router ctx={}] delivery error for {}: {}",
ctx_id, key, e
);
tokio::spawn({
let mycelium = mycelium.clone();
async move {
// Ensure permit is dropped at end of task
let _permit = permit;
if let Err(e) =
deliver_one(&service_task, &cfg_task, ctx_id, &key, mycelium)
.await
{
error!(context_id=ctx_id, key=%key, error=%e, "Delivery error");
}
}
});
}
@@ -68,7 +84,7 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec<tokio::task::
continue;
}
Err(e) => {
eprintln!("[router ctx={}] brpop error: {}", ctx_id, e);
error!(context_id=ctx_id, error=%e, "BRPOP error");
// small backoff to avoid busy-loop on persistent errors
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
@@ -85,6 +101,7 @@ async fn deliver_one(
cfg: &RouterConfig,
context_id: u32,
msg_key: &str,
mycelium: Arc<MyceliumClient>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Parse "message:{caller_id}:{id}"
let (caller_id, id) = parse_message_key(msg_key)
@@ -92,6 +109,8 @@ async fn deliver_one(
// Load message
let msg: Message = service.load_message(context_id, caller_id, id).await?;
// Embedded job id (if any)
let job_id_opt: Option<u32> = msg.job.first().map(|j| j.id);
// Determine routing script_type
let desired: ScriptType = determine_script_type(&msg);
@@ -118,25 +137,261 @@ async fn deliver_one(
} else {
Destination::Ip(runner.address)
};
let client = SupervisorClient::new(
cfg.base_url.clone(),
dest,
// Keep clones for poller usage
let dest_for_poller = dest.clone();
let topic_for_poller = cfg.topic.clone();
let secret_for_poller = runner.secret.clone();
let client = SupervisorClient::new_with_client(
mycelium.clone(),
dest.clone(),
cfg.topic.clone(),
None, // secret
)?;
runner.secret.clone(),
);
// Build supervisor method and params from Message
let method = msg.message.clone();
let params = build_params(&msg)?;
// Send
let _out_id = client.call(&method, params).await?;
let out_id = client.call(&method, params).await?;
// Store transport id and initial Sent status
let _ = service
.update_message_transport(
context_id,
caller_id,
id,
Some(out_id.clone()),
Some(TransportStatus::Sent),
)
.await;
// Mark as acknowledged on success
service
.update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged)
.await?;
// Spawn transport-status poller
{
let service_poll = service.clone();
let poll_interval = std::time::Duration::from_secs(cfg.transport_poll_interval_secs);
let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs);
let out_id_cloned = out_id.clone();
let mycelium = mycelium.clone();
// Determine reply timeout for supervisor job.result: prefer message.timeout_result, fallback to router config timeout
let job_result_reply_timeout: u64 = if msg.timeout_result > 0 {
msg.timeout_result as u64
} else {
cfg.transport_poll_timeout_secs
};
tokio::spawn(async move {
let start = std::time::Instant::now();
let client = mycelium;
// Supervisor call context captured for sync status checks
let sup_dest = dest_for_poller;
let sup_topic = topic_for_poller;
let job_id_opt = job_id_opt;
let mut last_status: Option<TransportStatus> = Some(TransportStatus::Sent);
loop {
if start.elapsed() >= poll_timeout {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec!["Transport-status polling timed out".to_string()],
)
.await;
// leave last known status; do not override
break;
}
match client.message_status(&out_id_cloned).await {
Ok(s) => {
if last_status.as_ref() != Some(&s) {
let _ = service_poll
.update_message_transport(
context_id,
caller_id,
id,
None,
Some(s.clone()),
)
.await;
last_status = Some(s.clone());
}
// Stop on terminal states
if matches!(s, TransportStatus::Delivered | TransportStatus::Read) {
// On Read, fetch supervisor job.status and update local job/message if terminal
if matches!(s, TransportStatus::Read)
&& let Some(job_id) = job_id_opt
{
let sup = SupervisorClient::new_with_client(
client.clone(),
sup_dest.clone(),
sup_topic.clone(),
secret_for_poller.clone(),
);
match sup.job_status_sync(job_id.to_string(), 10).await {
Ok(remote_status) => {
if let Some((mapped, terminal)) =
map_supervisor_job_status(&remote_status)
{
if terminal {
let _ = service_poll
.update_job_status_unchecked(
context_id,
caller_id,
job_id,
mapped.clone(),
)
.await;
// After terminal status, fetch supervisor job.result and store into Job.result
let sup = SupervisorClient::new_with_client(
client.clone(),
sup_dest.clone(),
sup_topic.clone(),
secret_for_poller.clone(),
);
match sup
.job_result_sync(
job_id.to_string(),
job_result_reply_timeout,
)
.await
{
Ok(result_map) => {
// Persist the result into the Job.result map (merge)
let _ = service_poll
.update_job_result_merge_unchecked(
context_id,
caller_id,
job_id,
result_map.clone(),
)
.await;
// Log which key was stored (success or error)
let key = result_map
.keys()
.next()
.cloned()
.unwrap_or_else(|| {
"unknown".to_string()
});
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Stored supervisor job.result for job {} ({})",
job_id, key
)],
)
.await;
}
Err(e) => {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"job.result fetch error for job {}: {}",
job_id, e
)],
)
.await;
}
}
// Mark message as processed
let _ = service_poll
.update_message_status(
context_id,
caller_id,
id,
MessageStatus::Processed,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Supervisor job.status for job {} -> {} (mapped to {:?})",
job_id, remote_status, mapped
)],
)
.await;
}
} else {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Unknown supervisor status '{}' for job {}",
remote_status, job_id
)],
)
.await;
}
}
Err(e) => {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!("job.status sync error: {}", e)],
)
.await;
}
}
}
break;
}
if matches!(s, TransportStatus::Failed) {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Transport failed for outbound id {out_id_cloned}"
)],
)
.await;
break;
}
}
Err(e) => {
// Log and continue polling
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!("messageStatus query error: {e}")],
)
.await;
}
}
tokio::time::sleep(poll_interval).await;
}
});
}
Ok(())
}
@@ -179,6 +434,16 @@ fn parse_message_key(s: &str) -> Option<(u32, u32)> {
}
}
/// Map supervisor job.status -> (local JobStatus, terminal)
fn map_supervisor_job_status(s: &str) -> Option<(JobStatus, bool)> {
match s {
"created" | "queued" => Some((JobStatus::Dispatched, false)),
"running" => Some((JobStatus::Started, false)),
"completed" => Some((JobStatus::Finished, true)),
"failed" | "timeout" => Some((JobStatus::Error, true)),
_ => None,
}
}
/// Auto-discover contexts periodically and ensure a router loop exists for each.
/// Returns a JoinHandle of the discovery task (router loops are detached).
@@ -197,12 +462,12 @@ pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task:
};
let _ = start_router(service.clone(), cfg_ctx);
active.insert(ctx_id);
eprintln!("[router] started loop for context {}", ctx_id);
info!(context_id = ctx_id, "Started loop for context");
}
}
}
Err(e) => {
eprintln!("[router] list_context_ids error: {}", e);
error!(error=%e, "list_context_ids error");
}
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;

View File

@@ -22,6 +22,9 @@ use crate::{
time::current_timestamp,
};
/// The OpenRPC specification for the HeroCoordinator JSON-RPC API
const OPENRPC_SPEC: &str = include_str!("../specs/openrpc.json");
pub struct AppState {
pub service: AppService,
}
@@ -147,6 +150,8 @@ pub struct RunnerCreate {
/// The script type this runner executes (used for routing)
pub script_type: ScriptType,
pub local: bool,
/// Optional secret used for authenticated supervisor calls (if required)
pub secret: Option<String>,
}
impl RunnerCreate {
pub fn into_domain(self) -> Runner {
@@ -159,6 +164,7 @@ impl RunnerCreate {
topic,
script_type,
local,
secret,
} = self;
Runner {
@@ -168,6 +174,7 @@ impl RunnerCreate {
topic,
script_type,
local,
secret,
created_at: ts,
updated_at: ts,
}
@@ -299,6 +306,8 @@ impl MessageCreate {
timeout,
timeout_ack,
timeout_result,
transport_id: None,
transport_status: None,
job: job.into_iter().map(JobCreate::into_domain).collect(),
logs: Vec::new(),
created_at: ts,
@@ -310,12 +319,10 @@ impl MessageCreate {
#[derive(Debug, Deserialize)]
pub struct ActorCreateParams {
pub context_id: u32,
pub actor: ActorCreate,
}
#[derive(Debug, Deserialize)]
pub struct ActorLoadParams {
pub context_id: u32,
pub id: u32,
}
@@ -392,7 +399,7 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
let actor = p.actor.into_domain().map_err(invalid_params_err)?;
let actor = state
.service
.create_actor(p.context_id, actor)
.create_actor(actor)
.await
.map_err(storage_err)?;
Ok::<_, ErrorObjectOwned>(actor)
@@ -407,11 +414,7 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
let state = state.clone();
async move {
let p: ActorLoadParams = params.parse().map_err(invalid_params_err)?;
let actor = state
.service
.load_actor(p.context_id, p.id)
.await
.map_err(storage_err)?;
let actor = state.service.load_actor(p.id).await.map_err(storage_err)?;
Ok::<_, ErrorObjectOwned>(actor)
}
})
@@ -636,6 +639,15 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
})
.expect("register message.load");
}
{
module
.register_async_method("rpc.discover", move |_params, _caller, _ctx| async move {
let spec = serde_json::from_str::<serde_json::Value>(OPENRPC_SPEC)
.expect("Failed to parse OpenRPC spec");
Ok::<_, ErrorObjectOwned>(spec)
})
.expect("register rpc.discover");
}
module
}

View File

@@ -1,7 +1,7 @@
use crate::dag::{DagError, DagResult, FlowDag, build_flow_dag};
use crate::models::{
Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageFormatType, MessageStatus,
Runner,
Runner, TransportStatus,
};
use crate::storage::RedisDriver;
@@ -157,7 +157,7 @@ fn validate_context(ctx: &Context) -> Result<(), BoxError> {
Ok(())
}
fn validate_actor(_context_id: u32, actor: &Actor) -> Result<(), BoxError> {
fn validate_actor(actor: &Actor) -> Result<(), BoxError> {
let v = as_json(actor)?;
let id = json_get_u32(&v, "id")?;
if id == 0 {
@@ -344,17 +344,17 @@ impl AppService {
// -----------------------------
// Actor
// -----------------------------
pub async fn create_actor(&self, context_id: u32, actor: Actor) -> Result<Actor, BoxError> {
validate_actor(context_id, &actor)?;
pub async fn create_actor(&self, actor: Actor) -> Result<Actor, BoxError> {
validate_actor(&actor)?;
let v = as_json(&actor)?;
let id = json_get_u32(&v, "id")?;
self.ensure_actor_not_exists(context_id, id).await?;
self.redis.save_actor(context_id, &actor).await?;
self.ensure_actor_not_exists_global(id).await?;
self.redis.save_actor_global(&actor).await?;
Ok(actor)
}
pub async fn load_actor(&self, context_id: u32, id: u32) -> Result<Actor, BoxError> {
let actor = self.redis.load_actor(context_id, id).await?;
pub async fn load_actor(&self, id: u32) -> Result<Actor, BoxError> {
let actor = self.redis.load_actor_global(id).await?;
Ok(actor)
}
@@ -508,6 +508,8 @@ impl AppService {
timeout: job.timeout,
timeout_ack: 10,
timeout_result: job.timeout,
transport_id: None,
transport_status: None,
job: vec![job.clone()],
logs: Vec::new(),
created_at: ts,
@@ -589,6 +591,8 @@ impl AppService {
timeout: job.timeout,
timeout_ack: 10,
timeout_result: job.timeout,
transport_id: None,
transport_status: None,
job: vec![job.clone()],
logs: Vec::new(),
created_at: ts,
@@ -690,6 +694,48 @@ impl AppService {
Ok(())
}
/// Bypass-permission variant to update a job status with transition validation.
/// This skips the executor permission check but enforces the same state transition rules.
pub async fn update_job_status_unchecked(
&self,
context_id: u32,
caller_id: u32,
id: u32,
new_status: JobStatus,
) -> Result<(), BoxError> {
let job = self.redis.load_job(context_id, caller_id, id).await?;
let current = job.status();
if new_status == current {
// Idempotent: don't touch storage if no change
return Ok(());
}
let allowed = match current {
JobStatus::Dispatched => matches!(
new_status,
JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error
),
JobStatus::WaitingForPrerequisites => {
matches!(new_status, JobStatus::Started | JobStatus::Error)
}
JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error),
JobStatus::Finished | JobStatus::Error => false,
};
if !allowed {
return Err(Box::new(InvalidJobStatusTransition {
from: current,
to: new_status,
}));
}
self.redis
.update_job_status(context_id, caller_id, id, new_status)
.await?;
Ok(())
}
// -----------------------------
// Message
@@ -817,6 +863,21 @@ impl AppService {
.await
}
pub async fn update_message_transport(
&self,
context_id: u32,
caller_id: u32,
id: u32,
transport_id: Option<String>,
transport_status: Option<TransportStatus>,
) -> Result<(), BoxError> {
// Ensure message exists (provides clearer error)
let _ = self.redis.load_message(context_id, caller_id, id).await?;
self.redis
.update_message_transport(context_id, caller_id, id, transport_id, transport_status)
.await
}
pub async fn update_flow_env_vars_merge(
&self,
context_id: u32,
@@ -913,6 +974,22 @@ impl AppService {
.await
}
/// Bypass-permission variant to merge into a job's result field.
/// Intended for internal router/scheduler use where no actor identity is present.
pub async fn update_job_result_merge_unchecked(
&self,
context_id: u32,
caller_id: u32,
job_id: u32,
patch: HashMap<String, String>,
) -> Result<(), BoxError> {
// Ensure job exists, then write directly
let _ = self.redis.load_job(context_id, caller_id, job_id).await?;
self.redis
.update_job_result_merge(context_id, caller_id, job_id, patch)
.await
}
pub async fn append_message_logs(
&self,
context_id: u32,
@@ -946,8 +1023,8 @@ impl AppService {
}
}
async fn ensure_actor_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> {
match self.redis.load_actor(db, id).await {
async fn ensure_actor_not_exists_global(&self, id: u32) -> Result<(), BoxError> {
match self.redis.load_actor_global(id).await {
Ok(_) => Err(Box::new(AlreadyExistsError {
key: format!("actor:{}", id),
})),

View File

@@ -1,4 +1,3 @@
pub mod redis;
pub use redis::RedisDriver;

View File

@@ -8,7 +8,9 @@ use tokio::sync::Mutex;
use crate::models::{
Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner,
TransportStatus,
};
use tracing::{error, warn};
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
@@ -52,8 +54,14 @@ impl RedisDriver {
// Slow path: create a new manager and cache it
let url = format!("{}/{}", self.base_addr.trim_end_matches('/'), db);
let client = redis::Client::open(url.as_str())?;
let cm = client.get_connection_manager().await?;
let client = redis::Client::open(url.as_str()).map_err(|e| {
error!(%url, db=%db, error=%e, "Redis client open failed");
e
})?;
let cm = client.get_connection_manager().await.map_err(|e| {
error!(%url, db=%db, error=%e, "Redis connection manager init failed");
e
})?;
let mut guard = self.managers.lock().await;
let entry = guard.entry(db).or_insert(cm);
@@ -104,21 +112,37 @@ impl RedisDriver {
async fn hset_model<T: Serialize>(&self, db: u32, key: &str, model: &T) -> Result<()> {
let mut cm = self.manager_for_db(db).await?;
let pairs = Self::struct_to_hset_pairs(model)?;
let pairs = Self::struct_to_hset_pairs(model).map_err(|e| {
error!(db=%db, key=%key, error=%e, "Serialize model to HSET pairs failed");
e
})?;
// Ensure no stale fields
let _: u64 = cm.del(key).await.unwrap_or(0);
let del_res: redis::RedisResult<u64> = cm.del(key).await;
if let Err(e) = del_res {
warn!(db=%db, key=%key, error=%e, "DEL before HSET failed");
}
// Write all fields
let _: usize = cm.hset_multiple(key, &pairs).await?;
let _: () = cm.hset_multiple(key, &pairs).await.map_err(|e| {
error!(db=%db, key=%key, error=%e, "HSET multiple failed");
e
})?;
Ok(())
}
async fn hget_model<T: DeserializeOwned>(&self, db: u32, key: &str) -> Result<T> {
let mut cm = self.manager_for_db(db).await?;
let map: StdHashMap<String, String> = cm.hgetall(key).await?;
let map: StdHashMap<String, String> = cm.hgetall(key).await.map_err(|e| {
error!(db=%db, key=%key, error=%e, "HGETALL failed");
e
})?;
if map.is_empty() {
// NotFound is expected in some flows; don't log as error
return Err(format!("Key not found: {}", key).into());
}
Self::hmap_to_struct(map)
Self::hmap_to_struct(map).map_err(|e| {
error!(db=%db, key=%key, error=%e, "Deserialize model from HGETALL failed");
e
})
}
// -----------------------------
@@ -196,6 +220,22 @@ impl RedisDriver {
let key = Self::actor_key(id);
self.hget_model(db, &key).await
}
/// Save an Actor globally in DB 0 (Actor is context-independent)
pub async fn save_actor_global(&self, actor: &Actor) -> Result<()> {
let json = serde_json::to_value(actor)?;
let id = json
.get("id")
.and_then(|v| v.as_u64())
.ok_or("Actor.id missing or not a number")? as u32;
let key = Self::actor_key(id);
self.hset_model(0, &key, actor).await
}
/// Load an Actor globally from DB 0 by id
pub async fn load_actor_global(&self, id: u32) -> Result<Actor> {
let key = Self::actor_key(id);
self.hget_model(0, &key).await
}
// -----------------------------
// Runner
@@ -283,7 +323,10 @@ impl RedisDriver {
("status".to_string(), status_str),
("updated_at".to_string(), ts.to_string()),
];
let _: usize = cm.hset_multiple(key, &pairs).await?;
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
error!(db=%db, key=%key, error=%e, "HSET update_job_status failed");
e
})?;
Ok(())
}
@@ -329,7 +372,10 @@ impl RedisDriver {
("status".to_string(), status_str),
("updated_at".to_string(), ts.to_string()),
];
let _: usize = cm.hset_multiple(key, &pairs).await?;
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
error!(db=%db, key=%key, error=%e, "HSET update_flow_status failed");
e
})?;
Ok(())
}
@@ -354,7 +400,47 @@ impl RedisDriver {
("status".to_string(), status_str),
("updated_at".to_string(), ts.to_string()),
];
let _: usize = cm.hset_multiple(key, &pairs).await?;
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
error!(db=%db, key=%key, error=%e, "HSET update_message_status failed");
e
})?;
Ok(())
}
/// Message: update transport_id / transport_status (optionally) and bump updated_at
pub async fn update_message_transport(
&self,
db: u32,
caller_id: u32,
id: u32,
transport_id: Option<String>,
transport_status: Option<TransportStatus>,
) -> Result<()> {
let mut cm = self.manager_for_db(db).await?;
let key = Self::message_key(caller_id, id);
let mut pairs: Vec<(String, String)> = Vec::new();
if let Some(tid) = transport_id {
pairs.push(("transport_id".to_string(), tid));
}
if let Some(ts_status) = transport_status {
let status_str = match serde_json::to_value(&ts_status)? {
Value::String(s) => s,
v => v.to_string(),
};
pairs.push(("transport_status".to_string(), status_str));
}
// Always bump updated_at
let ts = crate::time::current_timestamp();
pairs.push(("updated_at".to_string(), ts.to_string()));
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
error!(db=%db, key=%key, error=%e, "HSET update_message_transport failed");
e
})?;
Ok(())
}
@@ -387,7 +473,10 @@ impl RedisDriver {
("env_vars".to_string(), env_vars_str),
("updated_at".to_string(), ts.to_string()),
];
let _: usize = cm.hset_multiple(key, &pairs).await?;
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
error!(db=%db, key=%key, error=%e, "HSET update_flow_env_vars_merge failed");
e
})?;
Ok(())
}
@@ -420,7 +509,10 @@ impl RedisDriver {
("result".to_string(), result_str),
("updated_at".to_string(), ts.to_string()),
];
let _: usize = cm.hset_multiple(key, &pairs).await?;
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
error!(db=%db, key=%key, error=%e, "HSET update_flow_result_merge failed");
e
})?;
Ok(())
}
@@ -454,7 +546,10 @@ impl RedisDriver {
("env_vars".to_string(), env_vars_str),
("updated_at".to_string(), ts.to_string()),
];
let _: usize = cm.hset_multiple(key, &pairs).await?;
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
error!(db=%db, key=%key, error=%e, "HSET update_job_env_vars_merge failed");
e
})?;
Ok(())
}
@@ -488,7 +583,10 @@ impl RedisDriver {
("result".to_string(), result_str),
("updated_at".to_string(), ts.to_string()),
];
let _: usize = cm.hset_multiple(key, &pairs).await?;
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
error!(db=%db, key=%key, error=%e, "HSET update_job_result_merge failed");
e
})?;
Ok(())
}
@@ -503,7 +601,10 @@ impl RedisDriver {
("jobs".to_string(), jobs_str),
("updated_at".to_string(), ts.to_string()),
];
let _: usize = cm.hset_multiple(key, &pairs).await?;
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
error!(db=%db, key=%key, error=%e, "HSET update_flow_jobs_set failed");
e
})?;
Ok(())
}
@@ -534,7 +635,10 @@ impl RedisDriver {
("logs".to_string(), logs_str),
("updated_at".to_string(), ts.to_string()),
];
let _: usize = cm.hset_multiple(key, &pairs).await?;
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
error!(db=%db, key=%key, error=%e, "HSET append_message_logs failed");
e
})?;
Ok(())
}
@@ -545,7 +649,10 @@ impl RedisDriver {
/// Push a value onto a Redis list using LPUSH in the given DB.
pub async fn lpush_list(&self, db: u32, list: &str, value: &str) -> Result<()> {
let mut cm = self.manager_for_db(db).await?;
let _: i64 = cm.lpush(list, value).await?;
let _: i64 = cm.lpush(list, value).await.map_err(|e| {
error!(db=%db, list=%list, value=%value, error=%e, "LPUSH failed");
e
})?;
Ok(())
}
@@ -565,7 +672,11 @@ impl RedisDriver {
.arg("msg_out")
.arg(timeout_secs)
.query_async(&mut cm)
.await?;
.await
.map_err(|e| {
error!(db=%db, timeout_secs=%timeout_secs, error=%e, "BRPOP failed");
e
})?;
Ok(res.map(|(_, v)| v))
}
@@ -582,7 +693,11 @@ impl RedisDriver {
.arg("COUNT")
.arg(100)
.query_async(&mut cm)
.await?;
.await
.map_err(|e| {
error!(db=%db, cursor=%cursor, error=%e, "SCAN failed");
e
})?;
for k in keys {
if let Ok(r) = self.hget_model::<Runner>(db, &k).await {
out.push(r);
@@ -603,7 +718,15 @@ impl RedisDriver {
/// Register a context id in the global set "contexts" stored in DB 0.
pub async fn register_context_id(&self, id: u32) -> Result<()> {
let mut cm = self.manager_for_db(0).await?;
let _: i64 = redis::cmd("SADD").arg("contexts").arg(id).query_async(&mut cm).await?;
let _: i64 = redis::cmd("SADD")
.arg("contexts")
.arg(id)
.query_async(&mut cm)
.await
.map_err(|e| {
error!(db=0, context_id=%id, error=%e, "SADD contexts failed");
e
})?;
Ok(())
}
@@ -611,7 +734,14 @@ impl RedisDriver {
pub async fn list_context_ids(&self) -> Result<Vec<u32>> {
let mut cm = self.manager_for_db(0).await?;
// Using SMEMBERS and parsing into u32
let vals: Vec<String> = redis::cmd("SMEMBERS").arg("contexts").query_async(&mut cm).await?;
let vals: Vec<String> = redis::cmd("SMEMBERS")
.arg("contexts")
.query_async(&mut cm)
.await
.map_err(|e| {
error!(db=0, error=%e, "SMEMBERS contexts failed");
e
})?;
let mut out = Vec::with_capacity(vals.len());
for v in vals {
if let Ok(n) = v.parse::<u32>() {