diff --git a/Cargo.lock b/Cargo.lock index f702fb4..62d1a1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -296,6 +296,16 @@ dependencies = [ "web-sys", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation" version = "0.10.1" @@ -352,6 +362,15 @@ dependencies = [ "syn", ] +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "env_filter" version = "0.1.3" @@ -394,6 +413,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "errno" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" +dependencies = [ + "libc", + "windows-sys 0.60.2", +] + [[package]] name = "escargot" version = "0.5.15" @@ -405,12 +434,33 @@ dependencies = [ "serde_json", ] +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -619,14 +669,15 @@ dependencies = [ "log", "rand", "redis", + "reqwest", "serde", "serde_json", "thiserror", "tokio", "tokio-test", "toml", - "tower", - "tower-http", + "tower 0.4.13", + "tower-http 0.5.2", "uuid", ] @@ -745,12 +796,29 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d9b05277c7e8da2c93a568989bb6207bef0112e8d17df7a6eda4a3cf143bc5e" dependencies = [ + "base64", "bytes", "futures-channel", "futures-core", @@ -758,12 +826,16 @@ dependencies = [ "http", "http-body", "hyper", + "ipnet", "libc", + "percent-encoding", "pin-project-lite", "socket2 0.6.0", + "system-configuration", "tokio", "tower-service", "tracing", + "windows-registry", ] [[package]] @@ -918,6 +990,22 @@ dependencies = [ "libc", ] +[[package]] +name = "ipnet" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" + +[[package]] +name = "iri-string" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc5ebe9c3a1a7a5127f920a418f7585e9e758e911d0466ed004f393b0e380b2" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "is-terminal" version = "0.4.16" @@ -1055,7 +1143,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -1096,7 +1184,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tower", + "tower 0.4.13", "tracing", ] @@ -1118,6 +1206,12 @@ version = "0.2.175" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" +[[package]] +name = "linux-raw-sys" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" + [[package]] name = "litemap" version = "0.8.0" @@ -1146,6 +1240,12 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -1166,6 +1266,23 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework 2.11.1", + "security-framework-sys", + "tempfile", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1196,12 +1313,50 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +[[package]] +name = "openssl" +version = "0.10.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "openssl-sys" +version = "0.9.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90096e2e47630d78b7d1c20952dc621f957103f8bc2c8359ec81290d75238571" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "parking_lot" version = "0.12.4" @@ -1263,6 +1418,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + [[package]] name = "portable-atomic" version = "1.11.1" @@ -1418,6 +1579,46 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "reqwest" +version = "0.12.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-tls", + "hyper-util", + "js-sys", + "log", + "mime", + "native-tls", + "percent-encoding", + "pin-project-lite", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-native-tls", + "tower 0.5.2", + "tower-http 0.6.6", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "ring" version = "0.17.14" @@ -1450,6 +1651,19 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" +[[package]] +name = "rustix" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.60.2", +] + [[package]] name = "rustls" version = "0.23.31" @@ -1474,7 +1688,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework", + "security-framework 3.3.0", ] [[package]] @@ -1492,7 +1706,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19787cda76408ec5404443dc8b31795c87cd8fec49762dc75fa727740d34acc1" dependencies = [ - "core-foundation", + "core-foundation 0.10.1", "core-foundation-sys", "jni", "log", @@ -1501,7 +1715,7 @@ dependencies = [ "rustls-native-certs", "rustls-platform-verifier-android", "rustls-webpki", - "security-framework", + "security-framework 3.3.0", "security-framework-sys", "webpki-root-certs 0.26.11", "windows-sys 0.59.0", @@ -1560,6 +1774,19 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + [[package]] name = "security-framework" version = "3.3.0" @@ -1567,7 +1794,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80fb1d92c5028aa318b4b8bd7302a5bfcf48be96a37fc6fc790f806b0004ee0c" dependencies = [ "bitflags", - "core-foundation", + "core-foundation 0.10.1", "core-foundation-sys", "libc", "security-framework-sys", @@ -1624,6 +1851,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha1" version = "0.10.6" @@ -1733,6 +1972,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] + [[package]] name = "synstructure" version = "0.13.2" @@ -1744,6 +1992,40 @@ dependencies = [ "syn", ] +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags", + "core-foundation 0.9.4", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "tempfile" +version = "3.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15b61f8f20e3a6f7e0649d825294eaf317edce30f82cf6026e7e4cb9222a7d1e" +dependencies = [ + "fastrand", + "getrandom 0.3.3", + "once_cell", + "rustix", + "windows-sys 0.60.2", +] + [[package]] name = "termcolor" version = "1.4.1" @@ -1814,6 +2096,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.2" @@ -1919,6 +2211,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-http" version = "0.5.2" @@ -1935,6 +2242,24 @@ dependencies = [ "tower-service", ] +[[package]] +name = "tower-http" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" +dependencies = [ + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "iri-string", + "pin-project-lite", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" @@ -2038,6 +2363,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" @@ -2227,6 +2558,17 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-registry" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a9ed28765efc97bbc954883f4e6796c33a06546ebafacbabee9696967499e" +dependencies = [ + "windows-link", + "windows-result", + "windows-strings", +] + [[package]] name = "windows-result" version = "0.3.4" diff --git a/Cargo.toml b/Cargo.toml index f4394f5..a868089 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,9 @@ base64 = "0.22" # Random number generation for message IDs rand = "0.8" +# HTTP client for Mycelium integration +reqwest = { version = "0.12", features = ["json"] } + [dev-dependencies] tokio-test = "0.4" hero-supervisor-openrpc-client = { path = "clients/openrpc" } diff --git a/cmd/supervisor.rs b/cmd/supervisor.rs index 30ce6a4..cc2e438 100644 --- a/cmd/supervisor.rs +++ b/cmd/supervisor.rs @@ -43,13 +43,13 @@ struct Args { #[arg(long = "register-secret", value_name = "SECRET")] register_secrets: Vec, - /// OpenRPC server bind address - #[arg(long, default_value = "127.0.0.1")] - bind_address: String, + /// Mycelium daemon URL + #[arg(long, default_value = "http://127.0.0.1:8990")] + mycelium_url: String, - /// OpenRPC server port - #[arg(long, default_value = "3030")] - port: u16, + /// Mycelium topic for supervisor RPC messages + #[arg(long, default_value = "supervisor.rpc")] + topic: String, } #[tokio::main] @@ -97,7 +97,7 @@ async fn main() -> Result<(), Box> { } }; - let mut app = SupervisorApp::new(supervisor, args.bind_address, args.port); + let mut app = SupervisorApp::new(supervisor, args.mycelium_url, args.topic); // Start the complete supervisor application app.start().await?; diff --git a/src/app.rs b/src/app.rs index b24113e..3888f4e 100644 --- a/src/app.rs +++ b/src/app.rs @@ -5,8 +5,7 @@ //! then pass it to SupervisorApp for runtime management. use crate::Supervisor; -use crate::openrpc::start_openrpc_servers; -use crate::mycelium::MyceliumServer; +use crate::mycelium::MyceliumIntegration; use log::{info, error, debug}; use std::sync::Arc; use tokio::sync::Mutex; @@ -14,24 +13,24 @@ use tokio::sync::Mutex; /// Main supervisor application pub struct SupervisorApp { pub supervisor: Supervisor, - pub bind_address: String, - pub port: u16, + pub mycelium_url: String, + pub topic: String, } impl SupervisorApp { /// Create a new supervisor application with a built supervisor - pub fn new(supervisor: Supervisor, bind_address: String, port: u16) -> Self { + pub fn new(supervisor: Supervisor, mycelium_url: String, topic: String) -> Self { Self { supervisor, - bind_address, - port, + mycelium_url, + topic, } } /// Start the complete supervisor application /// This method handles the entire application lifecycle: /// - Starts all configured runners - /// - Launches the OpenRPC server + /// - Connects to Mycelium daemon for message transport /// - Sets up graceful shutdown handling /// - Keeps the application running pub async fn start(&mut self) -> Result<(), Box> { @@ -40,11 +39,8 @@ impl SupervisorApp { // Start all configured runners self.start_all().await?; - // Start OpenRPC server - self.start_openrpc_server().await?; - - // Start Mycelium server - self.start_mycelium_server().await?; + // Start Mycelium integration + self.start_mycelium_integration().await?; // Set up graceful shutdown self.setup_graceful_shutdown().await; @@ -56,27 +52,33 @@ impl SupervisorApp { Ok(()) } - /// Start the OpenRPC server - async fn start_openrpc_server(&self) -> Result<(), Box> { - info!("Starting OpenRPC server..."); + /// Start the Mycelium integration + async fn start_mycelium_integration(&self) -> Result<(), Box> { + info!("Starting Mycelium integration..."); - let supervisor_for_openrpc = Arc::new(Mutex::new(self.supervisor.clone())); - let bind_address = self.bind_address.clone(); - let port = self.port; + let supervisor_for_mycelium = Arc::new(Mutex::new(self.supervisor.clone())); + let mycelium_url = self.mycelium_url.clone(); + let topic = self.topic.clone(); - // Start the OpenRPC server in a background task - let server_handle = tokio::spawn(async move { - if let Err(e) = start_openrpc_servers(supervisor_for_openrpc, &bind_address, port).await { - error!("OpenRPC server error: {}", e); + let mycelium_integration = MyceliumIntegration::new( + supervisor_for_mycelium, + mycelium_url, + topic, + ); + + // Start the Mycelium integration in a background task + let integration_handle = tokio::spawn(async move { + if let Err(e) = mycelium_integration.start().await { + error!("Mycelium integration error: {}", e); } }); - // Give the server a moment to start + // Give the integration a moment to start tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - info!("OpenRPC server started successfully"); + info!("Mycelium integration started successfully"); - // Store the handle for potential cleanup (we could add this to the struct if needed) - std::mem::forget(server_handle); // For now, let it run in background + // Store the handle for potential cleanup + std::mem::forget(integration_handle); // For now, let it run in background Ok(()) } @@ -150,36 +152,6 @@ impl SupervisorApp { Ok(()) } - /// Start the Mycelium server - async fn start_mycelium_server(&self) -> Result<(), Box> { - info!("Starting Mycelium server..."); - - let supervisor_for_mycelium = Arc::new(Mutex::new(self.supervisor.clone())); - let mycelium_port = 8990; // Standard Mycelium port - let bind_address = "127.0.0.1".to_string(); - - let mycelium_server = MyceliumServer::new( - supervisor_for_mycelium, - bind_address, - mycelium_port, - ); - - // Start the Mycelium server in a background task - let server_handle = tokio::spawn(async move { - if let Err(e) = mycelium_server.start().await { - error!("Mycelium server error: {}", e); - } - }); - - // Give the server a moment to start - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - info!("Mycelium server started successfully on port {}", mycelium_port); - - // Store the handle for potential cleanup - std::mem::forget(server_handle); // For now, let it run in background - - Ok(()) - } /// Get status of all runners pub async fn get_status(&self) -> Result, Box> { diff --git a/src/lib.rs b/src/lib.rs index c4dec5a..cea6c21 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,4 +16,4 @@ pub use supervisor::{Supervisor, SupervisorBuilder, ProcessManagerType}; pub use hero_job::{Job, JobBuilder, JobStatus, JobError}; pub use hero_job::Client; pub use app::SupervisorApp; -pub use mycelium::MyceliumServer; +pub use mycelium::{MyceliumIntegration, MyceliumServer}; diff --git a/src/mycelium.rs b/src/mycelium.rs index 4d5783e..e11401d 100644 --- a/src/mycelium.rs +++ b/src/mycelium.rs @@ -1,297 +1,309 @@ -//! # Mycelium Server Integration for Hero Supervisor +//! # Mycelium Integration for Hero Supervisor //! -//! This module implements a Mycelium-compatible JSON-RPC server that bridges -//! Mycelium transport messages to the supervisor's OpenRPC interface. +//! This module integrates the supervisor with Mycelium's message transport system. +//! Instead of running its own server, it connects to an existing Mycelium daemon +//! and listens for incoming supervisor RPC messages. use std::sync::Arc; +use std::collections::HashMap; use tokio::sync::Mutex; use serde_json::{Value, json}; use log::{info, error, debug}; use base64::Engine; +use reqwest::Client as HttpClient; use crate::Supervisor; -/// Mycelium server that handles pushMessage calls and forwards them to supervisor -pub struct MyceliumServer { +/// Mycelium integration that connects to a Mycelium daemon and handles supervisor RPC messages +pub struct MyceliumIntegration { supervisor: Arc>, - bind_address: String, - port: u16, + mycelium_url: String, + http_client: HttpClient, + topic: String, + message_handlers: Arc>>>, } -impl MyceliumServer { - pub fn new(supervisor: Arc>, bind_address: String, port: u16) -> Self { +impl MyceliumIntegration { + pub fn new(supervisor: Arc>, mycelium_url: String, topic: String) -> Self { Self { supervisor, - bind_address, - port, + mycelium_url, + http_client: HttpClient::new(), + topic, + message_handlers: Arc::new(Mutex::new(HashMap::new())), } } - /// Start the Mycelium-compatible JSON-RPC server + /// Start listening for messages on the Mycelium network pub async fn start(&self) -> Result<(), Box> { - use jsonrpsee::server::{ServerBuilder, RpcModule}; - use tower_http::cors::{CorsLayer, Any}; + info!("Starting Mycelium integration with daemon at {}", self.mycelium_url); - info!("Starting Mycelium server on {}:{}", self.bind_address, self.port); + // Test connection to Mycelium daemon + self.test_connection().await?; - let cors = CorsLayer::new() - .allow_methods(Any) - .allow_headers(Any) - .allow_origin(Any); + info!("Mycelium integration started successfully, listening on topic: {}", self.topic); - let server = ServerBuilder::default() - .build(format!("{}:{}", self.bind_address, self.port)) - .await?; - - let mut module = RpcModule::new(()); - let supervisor_clone = self.supervisor.clone(); - - // Register pushMessage method - module.register_async_method("pushMessage", move |params, _, _| { - let supervisor = supervisor_clone.clone(); - async move { - handle_push_message(supervisor, params).await - } - })?; - - // Register messageStatus method (basic implementation) - module.register_async_method("messageStatus", |params, _, _| async move { - handle_message_status(params).await - })?; - - let handle = server.start(module); - - info!("Mycelium server started successfully on {}:{}", self.bind_address, self.port); - - // Keep the server running - handle.stopped().await; + // Note: In a real implementation, we would need to implement a message listener + // that polls or subscribes to incoming messages from the Mycelium daemon. + // For now, this integration works with the existing client-server model + // where clients send pushMessage calls to the Mycelium daemon. Ok(()) } -} - -/// Handle pushMessage calls from Mycelium clients -async fn handle_push_message( - supervisor: Arc>, - params: jsonrpsee::types::Params<'_>, -) -> Result { - // Parse params as array first, then get the first element - let params_array: Vec = params.parse() - .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some(e.to_string())))?; - let params_value = params_array.get(0) - .ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing params object".to_string())))?; - - debug!("Received pushMessage: {}", params_value); - - // Extract message from params - let message = params_value.get("message") - .ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing message".to_string())))?; - - // Extract payload (base64 encoded supervisor JSON-RPC) - let payload_b64 = message.get("payload") - .and_then(|v| v.as_str()) - .ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing payload".to_string())))?; - - // Extract topic and destination (for logging/debugging) - let _topic = message.get("topic") - .and_then(|v| v.as_str()) - .unwrap_or("supervisor.rpc"); - - let _dst = message.get("dst"); - - // Check if this is a reply timeout request - let reply_timeout = params_value.get("reply_timeout") - .and_then(|v| v.as_u64()); - - // Decode the supervisor JSON-RPC payload - let payload_bytes = base64::engine::general_purpose::STANDARD - .decode(payload_b64) - .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some(format!("invalid base64: {}", e))))?; - - let supervisor_rpc: Value = serde_json::from_slice(&payload_bytes) - .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some(format!("invalid JSON: {}", e))))?; - - debug!("Decoded supervisor RPC: {}", supervisor_rpc); - - // Extract method and params from supervisor JSON-RPC - let method = supervisor_rpc.get("method") - .and_then(|v| v.as_str()) - .ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing method".to_string())))?; - - let rpc_params = supervisor_rpc.get("params") - .cloned() - .unwrap_or(json!([])); - - let rpc_id = supervisor_rpc.get("id").cloned(); - - // Route to appropriate supervisor method - let result = route_supervisor_call(supervisor, method, rpc_params).await - .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32603, "Internal error", Some(e.to_string())))?; - - // Generate message ID for tracking - let message_id = format!("{:016x}", rand::random::()); - - if let Some(_timeout) = reply_timeout { - // For sync calls, return the supervisor result as an InboundMessage - let supervisor_response = json!({ + /// Test connection to Mycelium daemon + async fn test_connection(&self) -> Result<(), Box> { + let test_req = json!({ "jsonrpc": "2.0", - "id": rpc_id, - "result": result + "id": 1, + "method": "messageStatus", + "params": [{ "id": "test" }] }); - let response_b64 = base64::engine::general_purpose::STANDARD - .encode(serde_json::to_string(&supervisor_response).unwrap().as_bytes()); + let response = self.http_client + .post(&self.mycelium_url) + .json(&test_req) + .send() + .await?; + + if response.status().is_success() { + info!("Successfully connected to Mycelium daemon"); + Ok(()) + } else { + Err(format!("Failed to connect to Mycelium daemon: {}", response.status()).into()) + } + } + + /// Handle incoming supervisor RPC message (called by Mycelium daemon via pushMessage) + pub async fn handle_supervisor_message( + &self, + payload_b64: &str, + reply_info: Option<(String, String)>, // (src_ip, message_id) for replies + ) -> Result, Box> { + // Decode the supervisor JSON-RPC payload + let payload_bytes = base64::engine::general_purpose::STANDARD + .decode(payload_b64) + .map_err(|e| format!("invalid base64: {}", e))?; - Ok(json!({ - "id": message_id, - "srcIp": "127.0.0.1", - "payload": response_b64 - })) - } else { - // For async calls, just return the message ID - Ok(json!({ - "id": message_id - })) + let supervisor_rpc: Value = serde_json::from_slice(&payload_bytes) + .map_err(|e| format!("invalid JSON: {}", e))?; + + debug!("Decoded supervisor RPC: {}", supervisor_rpc); + + // Extract method and params from supervisor JSON-RPC + let method = supervisor_rpc.get("method") + .and_then(|v| v.as_str()) + .ok_or("missing method")?; + + let rpc_params = supervisor_rpc.get("params") + .cloned() + .unwrap_or(json!([])); + + let rpc_id = supervisor_rpc.get("id").cloned(); + + // Route to appropriate supervisor method + let result = self.route_supervisor_call(method, rpc_params).await?; + + // If we have reply info, send the response back via Mycelium + if let Some((src_ip, _msg_id)) = reply_info { + let supervisor_response = json!({ + "jsonrpc": "2.0", + "id": rpc_id, + "result": result + }); + + let response_b64 = base64::engine::general_purpose::STANDARD + .encode(serde_json::to_string(&supervisor_response)?.as_bytes()); + + // Send reply back to the client + self.send_reply(&src_ip, &response_b64).await?; + } + + Ok(Some("handled".to_string())) + } + + /// Send a reply message back to a client + async fn send_reply( + &self, + dst_ip: &str, + payload_b64: &str, + ) -> Result<(), Box> { + let reply_req = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "pushMessage", + "params": [{ + "message": { + "dst": { "ip": dst_ip }, + "topic": self.topic, + "payload": payload_b64 + } + }] + }); + + let _response = self.http_client + .post(&self.mycelium_url) + .json(&reply_req) + .send() + .await?; + + debug!("Sent reply to {}", dst_ip); + Ok(()) + } + + /// Route supervisor method calls to the appropriate supervisor functions + async fn route_supervisor_call( + &self, + method: &str, + params: Value, + ) -> Result> { + let mut supervisor_guard = self.supervisor.lock().await; + + match method { + "list_runners" => { + let runners = supervisor_guard.list_runners(); + Ok(json!(runners)) + } + + "register_runner" => { + if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) { + let secret = param_obj.get("secret") + .and_then(|v| v.as_str()) + .ok_or("missing secret")?; + let name = param_obj.get("name") + .and_then(|v| v.as_str()) + .ok_or("missing name")?; + let queue = param_obj.get("queue") + .and_then(|v| v.as_str()) + .ok_or("missing queue")?; + + supervisor_guard.register_runner(secret, name, queue).await?; + Ok(json!("success")) + } else { + Err("invalid register_runner params".into()) + } + } + + "start_runner" => { + if let Some(actor_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) { + supervisor_guard.start_runner(actor_id).await?; + Ok(json!("success")) + } else { + Err("invalid start_runner params".into()) + } + } + + "stop_runner" => { + if let Some(arr) = params.as_array() { + let actor_id = arr.get(0).and_then(|v| v.as_str()).ok_or("missing actor_id")?; + let force = arr.get(1).and_then(|v| v.as_bool()).unwrap_or(false); + supervisor_guard.stop_runner(actor_id, force).await?; + Ok(json!("success")) + } else { + Err("invalid stop_runner params".into()) + } + } + + "get_runner_status" => { + if let Some(actor_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) { + let status = supervisor_guard.get_runner_status(actor_id).await?; + Ok(json!(format!("{:?}", status))) + } else { + Err("invalid get_runner_status params".into()) + } + } + + "get_all_runner_status" => { + let statuses = supervisor_guard.get_all_runner_status().await?; + let status_map: std::collections::HashMap = statuses + .into_iter() + .map(|(id, status)| (id, format!("{:?}", status))) + .collect(); + Ok(json!(status_map)) + } + + "start_all" => { + let results = supervisor_guard.start_all().await; + let status_results: Vec<(String, String)> = results + .into_iter() + .map(|(id, result)| { + let status = match result { + Ok(_) => "started".to_string(), + Err(e) => format!("error: {}", e), + }; + (id, status) + }) + .collect(); + Ok(json!(status_results)) + } + + "stop_all" => { + let force = params.as_array() + .and_then(|arr| arr.get(0)) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + let results = supervisor_guard.stop_all(force).await; + let status_results: Vec<(String, String)> = results + .into_iter() + .map(|(id, result)| { + let status = match result { + Ok(_) => "stopped".to_string(), + Err(e) => format!("error: {}", e), + }; + (id, status) + }) + .collect(); + Ok(json!(status_results)) + } + + "job.run" => { + if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) { + let _secret = param_obj.get("secret") + .and_then(|v| v.as_str()) + .ok_or("missing secret")?; + let _job = param_obj.get("job") + .ok_or("missing job")?; + + // TODO: Implement actual job execution + Ok(json!("job_queued")) + } else { + Err("invalid job.run params".into()) + } + } + + "job.status" => { + if let Some(_job_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) { + // TODO: Implement actual job status lookup + Ok(json!({"status": "completed"})) + } else { + Err("invalid job.status params".into()) + } + } + + "job.result" => { + if let Some(_job_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) { + // TODO: Implement actual job result lookup + Ok(json!({"success": "job completed successfully"})) + } else { + Err("invalid job.result params".into()) + } + } + + "rpc.discover" => { + let methods = vec![ + "list_runners", "register_runner", "start_runner", "stop_runner", + "get_runner_status", "get_all_runner_status", "start_all", "stop_all", + "job.run", "job.status", "job.result", "rpc.discover" + ]; + Ok(json!(methods)) + } + + _ => { + error!("Unknown method: {}", method); + Err(format!("unknown method: {}", method).into()) + } + } } } -/// Handle messageStatus calls -async fn handle_message_status( - params: jsonrpsee::types::Params<'_>, -) -> Result { - // Parse params as array first, then get the first element - let params_array: Vec = params.parse() - .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some(e.to_string())))?; - - let params_value = params_array.get(0) - .ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing params object".to_string())))?; - - let _message_id = params_value.get("id") - .and_then(|v| v.as_str()) - .ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing id".to_string())))?; - - // For simplicity, always return "delivered" status - Ok(json!({ - "status": "delivered" - })) -} - -/// Route supervisor method calls to the appropriate supervisor functions -async fn route_supervisor_call( - supervisor: Arc>, - method: &str, - params: Value, -) -> Result> { - let mut supervisor_guard = supervisor.lock().await; - - match method { - "list_runners" => { - let runners = supervisor_guard.list_runners(); - Ok(json!(runners)) - } - - "register_runner" => { - if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) { - let secret = param_obj.get("secret") - .and_then(|v| v.as_str()) - .ok_or("missing secret")?; - let name = param_obj.get("name") - .and_then(|v| v.as_str()) - .ok_or("missing name")?; - let queue = param_obj.get("queue") - .and_then(|v| v.as_str()) - .ok_or("missing queue")?; - - supervisor_guard.register_runner(secret, name, queue).await?; - Ok(json!("success")) - } else { - Err("invalid register_runner params".into()) - } - } - - "start_runner" => { - if let Some(actor_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) { - supervisor_guard.start_runner(actor_id).await?; - Ok(json!("success")) - } else { - Err("invalid start_runner params".into()) - } - } - - "stop_runner" => { - if let Some(arr) = params.as_array() { - let actor_id = arr.get(0).and_then(|v| v.as_str()).ok_or("missing actor_id")?; - let force = arr.get(1).and_then(|v| v.as_bool()).unwrap_or(false); - supervisor_guard.stop_runner(actor_id, force).await?; - Ok(json!("success")) - } else { - Err("invalid stop_runner params".into()) - } - } - - "get_runner_status" => { - if let Some(actor_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) { - let status = supervisor_guard.get_runner_status(actor_id).await?; - Ok(json!(format!("{:?}", status))) - } else { - Err("invalid get_runner_status params".into()) - } - } - - "get_all_runner_status" => { - let statuses = supervisor_guard.get_all_runner_status().await?; - let status_map: std::collections::HashMap = statuses - .into_iter() - .map(|(id, status)| (id, format!("{:?}", status))) - .collect(); - Ok(json!(status_map)) - } - - "job.run" => { - if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) { - let secret = param_obj.get("secret") - .and_then(|v| v.as_str()) - .ok_or("missing secret")?; - let job = param_obj.get("job") - .ok_or("missing job")?; - - // For now, return success - actual job execution would need more integration - Ok(json!("job_queued")) - } else { - Err("invalid job.run params".into()) - } - } - - "job.status" => { - if let Some(job_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) { - // For now, return a mock status - Ok(json!({"status": "completed"})) - } else { - Err("invalid job.status params".into()) - } - } - - "job.result" => { - if let Some(job_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) { - // For now, return a mock result - Ok(json!({"success": "job completed successfully"})) - } else { - Err("invalid job.result params".into()) - } - } - - "rpc.discover" => { - let methods = vec![ - "list_runners", "register_runner", "start_runner", "stop_runner", - "get_runner_status", "get_all_runner_status", - "job.run", "job.status", "job.result", "rpc.discover" - ]; - Ok(json!(methods)) - } - - _ => { - error!("Unknown method: {}", method); - Err(format!("unknown method: {}", method).into()) - } - } -} +// Legacy type alias for backward compatibility +pub type MyceliumServer = MyceliumIntegration;