use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use serde_json::{Value, json}; use thiserror::Error; use tokio::time::timeout; use crate::clients::{Destination, MyceliumClient, MyceliumClientError, SupervisorHub}; #[derive(Clone)] pub struct SupervisorClient { hub: Arc, // Global hub with background pop loop and shared id generator destination: Destination, // ip or pk secret: Option, // optional, required by several supervisor methods } #[derive(Debug, Error)] pub enum SupervisorClientError { #[error("HTTP error: {0}")] Http(#[from] reqwest::Error), #[error("JSON error: {0}")] Json(#[from] serde_json::Error), #[error("Transport timed out waiting for a reply (408)")] TransportTimeout, #[error("JSON-RPC error: {0}")] RpcError(String), #[error("Invalid response: {0}")] InvalidResponse(String), #[error("Missing secret for method requiring authentication")] MissingSecret, } impl From for SupervisorClientError { fn from(e: MyceliumClientError) -> Self { match e { MyceliumClientError::TransportTimeout => SupervisorClientError::TransportTimeout, MyceliumClientError::RpcError(m) => SupervisorClientError::RpcError(m), MyceliumClientError::InvalidResponse(m) => SupervisorClientError::InvalidResponse(m), MyceliumClientError::Http(err) => SupervisorClientError::Http(err), MyceliumClientError::Json(err) => SupervisorClientError::Json(err), } } } impl SupervisorClient { /// Preferred constructor using a shared SupervisorHub (single global listener). pub fn new_with_hub( hub: Arc, destination: Destination, secret: Option, ) -> Self { Self { hub, destination, secret, } } /// Backward-compatible constructor that builds a new Hub from base_url/topic. /// NOTE: This spawns a background popMessage listener for the given topic. /// Prefer `new_with_hub` so the process has a single global hub. pub fn new( base_url: impl Into, destination: Destination, topic: impl Into, secret: Option, ) -> Result { let mut url = base_url.into(); if url.is_empty() { url = "http://127.0.0.1:8990".to_string(); } let mycelium = Arc::new(MyceliumClient::new(url)?); Ok(Self::new_with_client(mycelium, destination, topic, secret)) } /// Backward-compatible constructor that reuses an existing Mycelium client. /// NOTE: This creates a new hub and its own background listener. Prefer `new_with_hub`. pub fn new_with_client( mycelium: Arc, destination: Destination, topic: impl Into, secret: Option, ) -> Self { let hub = SupervisorHub::new_with_client(mycelium, topic); Self::new_with_hub(hub, destination, secret) } /// Internal helper used by tests to inspect dst JSON shape. fn build_dst(&self) -> Value { match &self.destination { Destination::Ip(ip) => json!({ "ip": ip.to_string() }), Destination::Pk(pk) => json!({ "pk": pk }), } } fn build_supervisor_payload(&self, method: &str, params: Value) -> Value { json!({ "jsonrpc": "2.0", "id": self.hub.next_id(), "method": method, "params": params, }) } /// Build a supervisor JSON-RPC payload but force a specific id (used for correlation). fn build_supervisor_payload_with_id(&self, method: &str, params: Value, id: u64) -> Value { json!({ "jsonrpc": "2.0", "id": id, "method": method, "params": params, }) } fn encode_payload(payload: &Value) -> Result { let s = serde_json::to_string(payload)?; Ok(BASE64_STANDARD.encode(s.as_bytes())) } fn encode_topic(topic: &[u8]) -> String { BASE64_STANDARD.encode(topic) } fn extract_message_id_from_result(result: &Value) -> Option { // Two possibilities per Mycelium spec oneOf: // - PushMessageResponseId: { "id": "0123456789abcdef" } // - InboundMessage: object containing "id" plus srcIp, ...; we still return id. result .get("id") .and_then(|v| v.as_str()) .map(|s| s.to_string()) } fn need_secret(&self) -> Result<&str, SupervisorClientError> { self.secret .as_deref() .ok_or(SupervisorClientError::MissingSecret) } // ----------------------------- // Core: request-reply call via Hub with default 10s timeout // ----------------------------- /// Send a supervisor JSON-RPC request and await its reply via the Hub. /// Returns (outbound_message_id, reply_envelope_json). pub async fn call_with_reply_timeout( &self, method: &str, params: Value, timeout_secs: u64, ) -> Result<(String, Value), SupervisorClientError> { let inner_id = self.hub.next_id(); // Register waiter before sending to avoid race let rx = self.hub.register_waiter(inner_id).await; let inner = self.build_supervisor_payload_with_id(method, params, inner_id); let payload_b64 = Self::encode_payload(&inner)?; let result = self .hub .mycelium() .push_message( &self.destination, &Self::encode_topic(self.hub.topic().as_bytes()), &payload_b64, None, ) .await?; let out_id = if let Some(id) = MyceliumClient::extract_message_id_from_result(&result) { id } else if let Some(arr) = result.as_array() && arr.len() == 1 && let Some(id) = MyceliumClient::extract_message_id_from_result(&arr[0]) { id } else { // Clean pending entry to avoid leak let _ = self.hub.remove_waiter(inner_id).await; return Err(SupervisorClientError::InvalidResponse(format!( "result did not contain message id: {result}" ))); }; let d = Duration::from_secs(timeout_secs); match timeout(d, rx).await { Ok(Ok(reply)) => Ok((out_id, reply)), Ok(Err(_canceled)) => Err(SupervisorClientError::InvalidResponse( "oneshot canceled before receiving reply".into(), )), Err(_elapsed) => { // Cleanup on timeout let _ = self.hub.remove_waiter(inner_id).await; Err(SupervisorClientError::TransportTimeout) } } } /// Send and await with default 10s timeout. pub async fn call_with_reply( &self, method: &str, params: Value, ) -> Result<(String, Value), SupervisorClientError> { self.call_with_reply_timeout(method, params, 60).await } /// Back-compat: Send and await a reply but return only the outbound id (discard reply). /// This keeps existing call sites working while the system migrates to reply-aware paths. pub async fn call(&self, method: &str, params: Value) -> Result { let (out_id, _reply) = self.call_with_reply(method, params).await?; Ok(out_id) } // ----------------------------- // Typed wrappers for Supervisor API (await replies) // ----------------------------- // Runners pub async fn list_runners_wait(&self) -> Result<(String, Value), SupervisorClientError> { self.call_with_reply("list_runners", json!([])).await } pub async fn register_runner_wait( &self, name: impl Into, queue: impl Into, ) -> Result<(String, Value), SupervisorClientError> { let secret = self.need_secret()?; let params = json!([{ "secret": secret, "name": name.into(), "queue": queue.into() }]); self.call_with_reply("register_runner", params).await } pub async fn remove_runner_wait( &self, actor_id: impl Into, ) -> Result<(String, Value), SupervisorClientError> { self.call_with_reply("remove_runner", json!([actor_id.into()])) .await } pub async fn start_runner_wait( &self, actor_id: impl Into, ) -> Result<(String, Value), SupervisorClientError> { self.call_with_reply("start_runner", json!([actor_id.into()])) .await } pub async fn stop_runner_wait( &self, actor_id: impl Into, force: bool, ) -> Result<(String, Value), SupervisorClientError> { self.call_with_reply("stop_runner", json!([actor_id.into(), force])) .await } pub async fn get_runner_status_wait( &self, actor_id: impl Into, ) -> Result<(String, Value), SupervisorClientError> { self.call_with_reply("get_runner_status", json!([actor_id.into()])) .await } pub async fn get_all_runner_status_wait( &self, ) -> Result<(String, Value), SupervisorClientError> { self.call_with_reply("get_all_runner_status", json!([])) .await } pub async fn start_all_wait(&self) -> Result<(String, Value), SupervisorClientError> { self.call_with_reply("start_all", json!([])).await } pub async fn stop_all_wait( &self, force: bool, ) -> Result<(String, Value), SupervisorClientError> { self.call_with_reply("stop_all", json!([force])).await } pub async fn get_all_status_wait(&self) -> Result<(String, Value), SupervisorClientError> { self.call_with_reply("get_all_status", json!([])).await } // Jobs (await) pub async fn jobs_create_wait( &self, job: Value, ) -> Result<(String, Value), SupervisorClientError> { let secret = self.need_secret()?; let params = json!([{ "secret": secret, "job": job }]); self.call_with_reply("jobs.create", params).await } pub async fn jobs_list_wait(&self) -> Result<(String, Value), SupervisorClientError> { self.call_with_reply("jobs.list", json!([])).await } pub async fn job_run_wait(&self, job: Value) -> Result<(String, Value), SupervisorClientError> { let secret = self.need_secret()?; let params = json!([{ "secret": secret, "job": job }]); self.call_with_reply("job.run", params).await } pub async fn job_start_wait( &self, job_id: impl Into, ) -> Result<(String, Value), SupervisorClientError> { let secret = self.need_secret()?; let params = json!([{ "secret": secret, "job_id": job_id.into() }]); self.call_with_reply("job.start", params).await } pub async fn job_status_wait( &self, job_id: impl Into, ) -> Result<(String, Value), SupervisorClientError> { self.call_with_reply("job.status", json!([job_id.into()])) .await } pub async fn job_result_wait( &self, job_id: impl Into, ) -> Result<(String, Value), SupervisorClientError> { self.call_with_reply("job.result", json!([job_id.into()])) .await } pub async fn job_stop_wait( &self, job_id: impl Into, ) -> Result<(String, Value), SupervisorClientError> { let secret = self.need_secret()?; let params = json!([{ "secret": secret, "job_id": job_id.into() }]); self.call_with_reply("job.stop", params).await } pub async fn job_delete_wait( &self, job_id: impl Into, ) -> Result<(String, Value), SupervisorClientError> { let secret = self.need_secret()?; let params = json!([{ "secret": secret, "job_id": job_id.into() }]); self.call_with_reply("job.delete", params).await } pub async fn rpc_discover_wait(&self) -> Result<(String, Value), SupervisorClientError> { self.call_with_reply("rpc.discover", json!([])).await } // ----------------------------- // Backward-compatible variants returning only outbound id (discarding reply) // ----------------------------- pub async fn list_runners(&self) -> Result { let (id, _) = self.list_runners_wait().await?; Ok(id) } pub async fn register_runner( &self, name: impl Into, queue: impl Into, ) -> Result { let (id, _) = self.register_runner_wait(name, queue).await?; Ok(id) } pub async fn remove_runner( &self, actor_id: impl Into, ) -> Result { let (id, _) = self.remove_runner_wait(actor_id).await?; Ok(id) } pub async fn start_runner( &self, actor_id: impl Into, ) -> Result { let (id, _) = self.start_runner_wait(actor_id).await?; Ok(id) } pub async fn stop_runner( &self, actor_id: impl Into, force: bool, ) -> Result { let (id, _) = self.stop_runner_wait(actor_id, force).await?; Ok(id) } pub async fn get_runner_status( &self, actor_id: impl Into, ) -> Result { let (id, _) = self.get_runner_status_wait(actor_id).await?; Ok(id) } pub async fn get_all_runner_status(&self) -> Result { let (id, _) = self.get_all_runner_status_wait().await?; Ok(id) } pub async fn start_all(&self) -> Result { let (id, _) = self.start_all_wait().await?; Ok(id) } pub async fn stop_all(&self, force: bool) -> Result { let (id, _) = self.stop_all_wait(force).await?; Ok(id) } pub async fn get_all_status(&self) -> Result { let (id, _) = self.get_all_status_wait().await?; Ok(id) } pub async fn jobs_create(&self, job: Value) -> Result { let (id, _) = self.jobs_create_wait(job).await?; Ok(id) } pub async fn jobs_list(&self) -> Result { let (id, _) = self.jobs_list_wait().await?; Ok(id) } pub async fn job_run(&self, job: Value) -> Result { let (id, _) = self.job_run_wait(job).await?; Ok(id) } pub async fn job_start( &self, job_id: impl Into, ) -> Result { let (id, _) = self.job_start_wait(job_id).await?; Ok(id) } pub async fn job_status( &self, job_id: impl Into, ) -> Result { let (id, _) = self.job_status_wait(job_id).await?; Ok(id) } pub async fn job_result( &self, job_id: impl Into, ) -> Result { let (id, _) = self.job_result_wait(job_id).await?; Ok(id) } pub async fn job_stop( &self, job_id: impl Into, ) -> Result { let (id, _) = self.job_stop_wait(job_id).await?; Ok(id) } pub async fn job_delete( &self, job_id: impl Into, ) -> Result { let (id, _) = self.job_delete_wait(job_id).await?; Ok(id) } pub async fn rpc_discover(&self) -> Result { let (id, _) = self.rpc_discover_wait().await?; Ok(id) } } // ----------------------------- // Tests (serialization-only) // ----------------------------- #[cfg(test)] mod tests { use super::*; use std::net::IpAddr; fn mk_client() -> SupervisorClient { // Build a hub but it won't issue real network calls in these serializer-only tests. let mycelium = Arc::new(MyceliumClient::new("http://127.0.0.1:8990").unwrap()); let hub = SupervisorHub::new_with_client(mycelium, "supervisor.rpc"); SupervisorClient::new_with_hub( hub, Destination::Pk( "bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32".to_string(), ), Some("secret".to_string()), ) } #[test] fn builds_dst_ip_and_pk() { let mycelium = Arc::new(MyceliumClient::new("http://127.0.0.1:8990").unwrap()); let hub_ip = SupervisorHub::new_with_client(mycelium.clone(), "supervisor.rpc"); let c_ip = SupervisorClient::new_with_hub( hub_ip, Destination::Ip("2001:db8::1".parse().unwrap()), None, ); let v_ip = c_ip.build_dst(); assert_eq!(v_ip.get("ip").unwrap().as_str().unwrap(), "2001:db8::1"); let c_pk = mk_client(); let v_pk = c_pk.build_dst(); assert_eq!( v_pk.get("pk").unwrap().as_str().unwrap(), "bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32" ); } #[test] fn encodes_supervisor_payload_b64() { let c = mk_client(); let payload = c.build_supervisor_payload("list_runners", json!([])); let b64 = SupervisorClient::encode_payload(&payload).unwrap(); // decode and compare round-trip JSON let raw = base64::engine::general_purpose::STANDARD .decode(b64.as_bytes()) .unwrap(); let decoded: Value = serde_json::from_slice(&raw).unwrap(); assert_eq!( decoded.get("method").unwrap().as_str().unwrap(), "list_runners" ); assert_eq!(decoded.get("jsonrpc").unwrap().as_str().unwrap(), "2.0"); } #[test] fn extract_message_id_works_for_both_variants() { // PushMessageResponseId let r1 = json!({"id":"0123456789abcdef"}); assert_eq!( SupervisorClient::extract_message_id_from_result(&r1).unwrap(), "0123456789abcdef" ); // InboundMessage-like let r2 = json!({ "id":"fedcba9876543210", "srcIp":"449:abcd:0123:defa::1", "payload":"hpV+" }); assert_eq!( SupervisorClient::extract_message_id_from_result(&r2).unwrap(), "fedcba9876543210" ); } }