use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use reqwest::Client as HttpClient; use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use serde_json::{Value, json}; use thiserror::Error; use crate::clients::Destination; use crate::models::TransportStatus; /// Lightweight client for Mycelium JSON-RPC (send + query status) #[derive(Clone)] pub struct MyceliumClient { base_url: String, // e.g. http://127.0.0.1:8990 http: HttpClient, id_counter: Arc, } #[derive(Debug, Error)] pub enum MyceliumClientError { #[error("HTTP error: {0}")] Http(#[from] reqwest::Error), #[error("JSON error: {0}")] Json(#[from] serde_json::Error), #[error("Transport timed out waiting for a reply (408)")] TransportTimeout, #[error("JSON-RPC error: {0}")] RpcError(String), #[error("Invalid response: {0}")] InvalidResponse(String), } impl MyceliumClient { pub fn new(base_url: impl Into) -> Result { let url = base_url.into(); let http = HttpClient::builder().build()?; Ok(Self { base_url: url, http, id_counter: Arc::new(AtomicU64::new(1)), }) } fn next_id(&self) -> u64 { self.id_counter.fetch_add(1, Ordering::Relaxed) } async fn jsonrpc(&self, method: &str, params: Value) -> Result { let req = json!({ "jsonrpc": "2.0", "id": self.next_id(), "method": method, "params": [ params ] }); tracing::info!(%req, "jsonrpc"); let resp = self.http.post(&self.base_url).json(&req).send().await?; let status = resp.status(); let body: Value = resp.json().await?; if let Some(err) = body.get("error") { let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0); let msg = err .get("message") .and_then(|v| v.as_str()) .unwrap_or("unknown error"); if code == 408 { return Err(MyceliumClientError::TransportTimeout); } return Err(MyceliumClientError::RpcError(format!( "code={code} msg={msg}" ))); } if !status.is_success() { return Err(MyceliumClientError::RpcError(format!( "HTTP {status}, body {body}" ))); } Ok(body) } /// Call messageStatus with an outbound message id (hex string) pub async fn message_status( &self, id_hex: &str, ) -> Result { let params = json!(id_hex); let body = self.jsonrpc("getMessageInfo", params).await?; let result = body.get("result").ok_or_else(|| { MyceliumClientError::InvalidResponse(format!("missing result in response: {body}")) })?; // Accept both { state: "..."} and bare "..." let status_str = if let Some(s) = result.get("state").and_then(|v| v.as_str()) { s.to_string() } else if let Some(s) = result.as_str() { s.to_string() } else { return Err(MyceliumClientError::InvalidResponse(format!( "unexpected result shape: {result}" ))); }; let status = Self::map_status(&status_str).ok_or_else(|| { MyceliumClientError::InvalidResponse(format!("unknown status: {status_str}")) }); tracing::info!(%id_hex, status = %status.as_ref().unwrap(), "queried messages status"); status } fn map_status(s: &str) -> Option { match s { "pending" => Some(TransportStatus::Queued), "received" => Some(TransportStatus::Delivered), "read" => Some(TransportStatus::Read), "aborted" => Some(TransportStatus::Failed), _ => None, } } /// Build params object for pushMessage without performing any network call. /// Exposed for serializer-only tests and reuse. pub(crate) fn build_push_params( dst: &Destination, topic: &str, payload_b64: &str, reply_timeout: Option, ) -> Value { let dst_v = match dst { Destination::Ip(ip) => json!({ "ip": ip.to_string() }), Destination::Pk(pk) => json!({ "pk": pk }), }; let mut message = json!({ "dst": dst_v, "topic": topic, "payload": payload_b64, }); if let Some(rt) = reply_timeout { message["reply_timeout"] = json!(rt); } message } /// pushMessage: send a message with dst/topic/payload. Optional reply_timeout for sync replies. pub async fn push_message( &self, dst: &Destination, topic: &str, payload_b64: &str, reply_timeout: Option, ) -> Result { let params = Self::build_push_params(dst, topic, payload_b64, reply_timeout); let body = self.jsonrpc("pushMessage", params).await?; let result = body.get("result").ok_or_else(|| { MyceliumClientError::InvalidResponse(format!("missing result in response: {body}")) })?; Ok(result.clone()) } /// Helper to extract outbound message id from pushMessage result (InboundMessage or PushMessageResponseId) pub fn extract_message_id_from_result(result: &Value) -> Option { result .get("id") .and_then(|v| v.as_str()) .map(|s| s.to_string()) } /// popMessage: retrieve an inbound message if available (optionally filtered by topic). /// - peek: if true, do not remove the message from the queue /// - timeout_secs: seconds to wait for a message (0 returns immediately) /// - topic_plain: optional plain-text topic which will be base64-encoded per Mycelium spec /// Returns: /// - Ok(Some(result_json)) on success, where result_json matches InboundMessage schema /// - Ok(None) when there is no message ready (Mycelium returns error code 204) pub async fn pop_message( &self, peek: Option, timeout_secs: Option, topic_plain: Option<&str>, ) -> Result, MyceliumClientError> { // Build params array let mut params_array = vec![]; if let Some(p) = peek { params_array.push(serde_json::Value::Bool(p)); } else { params_array.push(serde_json::Value::Null) } if let Some(t) = timeout_secs { params_array.push(serde_json::Value::Number(t.into())); } else { params_array.push(serde_json::Value::Null) } if let Some(tp) = topic_plain { let topic_b64 = BASE64_STANDARD.encode(tp.as_bytes()); params_array.push(serde_json::Value::String(topic_b64)); } else { params_array.push(serde_json::Value::Null) } let req = json!({ "jsonrpc": "2.0", "id": self.next_id(), "method": "popMessage", "params": serde_json::Value::Array(params_array), }); tracing::info!(%req, "calling popMessage"); let resp = self.http.post(&self.base_url).json(&req).send().await?; let status = resp.status(); let body: Value = resp.json().await?; // Handle JSON-RPC error envelope specially for code 204 (no message ready) if let Some(err) = body.get("error") { let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0); let msg = err .get("message") .and_then(|v| v.as_str()) .unwrap_or("unknown error"); if code == 204 { // No message ready return Ok(None); } if code == 408 { // Align with other transport timeout mapping return Err(MyceliumClientError::TransportTimeout); } return Err(MyceliumClientError::RpcError(format!( "code={code} msg={msg}" ))); } if !status.is_success() { return Err(MyceliumClientError::RpcError(format!( "HTTP {status}, body {body}" ))); } let result = body.get("result").ok_or_else(|| { MyceliumClientError::InvalidResponse(format!("missing result in response: {body}")) })?; Ok(Some(result.clone())) } } #[cfg(test)] mod tests { use super::*; use crate::clients::Destination; #[test] fn build_push_params_shapes_ip_pk_and_timeout() { // IP destination let p1 = MyceliumClient::build_push_params( &Destination::Ip("2001:db8::1".parse().unwrap()), "supervisor.rpc", "Zm9vYmFy", // "foobar" Some(10), ); let msg1 = p1.get("message").unwrap(); assert_eq!( msg1.get("topic").unwrap().as_str().unwrap(), "supervisor.rpc" ); assert_eq!(msg1.get("payload").unwrap().as_str().unwrap(), "Zm9vYmFy"); assert_eq!( msg1.get("dst") .unwrap() .get("ip") .unwrap() .as_str() .unwrap(), "2001:db8::1" ); assert_eq!(p1.get("reply_timeout").unwrap().as_u64().unwrap(), 10); // PK destination without timeout let p2 = MyceliumClient::build_push_params( &Destination::Pk( "bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32".into(), ), "supervisor.rpc", "YmF6", // "baz" None, ); let msg2 = p2.get("message").unwrap(); assert_eq!( msg2.get("dst") .unwrap() .get("pk") .unwrap() .as_str() .unwrap(), "bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32" ); assert!(p2.get("reply_timeout").is_none()); } #[test] fn extract_message_id_variants() { // PushMessageResponseId let r1 = json!({"id":"0123456789abcdef"}); assert_eq!( MyceliumClient::extract_message_id_from_result(&r1).unwrap(), "0123456789abcdef" ); // InboundMessage-like let r2 = json!({ "id":"fedcba9876543210", "srcIp":"449:abcd:0123:defa::1", "payload":"hpV+" }); assert_eq!( MyceliumClient::extract_message_id_from_result(&r2).unwrap(), "fedcba9876543210" ); } }