supervisor minor fixed

This commit is contained in:
Timur Gordon
2025-09-02 09:15:24 +02:00
parent 77b4c66a10
commit e7c377460e
2 changed files with 191 additions and 48 deletions

3
.gitignore vendored
View File

@@ -1 +1,2 @@
target target
.bin

View File

@@ -2,16 +2,16 @@
//! //!
//! This module integrates the supervisor with Mycelium's message transport system. //! This module integrates the supervisor with Mycelium's message transport system.
//! Instead of running its own server, it connects to an existing Mycelium daemon //! Instead of running its own server, it connects to an existing Mycelium daemon
//! and listens for incoming supervisor RPC messages. //! and listens for incoming supervisor RPC messages via HTTP REST API.
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashMap;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use serde_json::{Value, json}; use serde_json::{Value, json};
use log::{info, error, debug}; use log::{info, error, debug, trace};
use base64::Engine; use base64::Engine;
use reqwest::Client as HttpClient; use reqwest::Client as HttpClient;
use crate::Supervisor; use crate::Supervisor;
use tokio::time::{sleep, Duration};
/// Mycelium integration that connects to a Mycelium daemon and handles supervisor RPC messages /// Mycelium integration that connects to a Mycelium daemon and handles supervisor RPC messages
pub struct MyceliumIntegration { pub struct MyceliumIntegration {
@@ -19,7 +19,7 @@ pub struct MyceliumIntegration {
mycelium_url: String, mycelium_url: String,
http_client: HttpClient, http_client: HttpClient,
topic: String, topic: String,
message_handlers: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>, running: Arc<Mutex<bool>>,
} }
impl MyceliumIntegration { impl MyceliumIntegration {
@@ -29,7 +29,7 @@ impl MyceliumIntegration {
mycelium_url, mycelium_url,
http_client: HttpClient::new(), http_client: HttpClient::new(),
topic, topic,
message_handlers: Arc::new(Mutex::new(HashMap::new())), running: Arc::new(Mutex::new(false)),
} }
} }
@@ -37,39 +37,61 @@ impl MyceliumIntegration {
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> { pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting Mycelium integration with daemon at {}", self.mycelium_url); info!("Starting Mycelium integration with daemon at {}", self.mycelium_url);
// Test connection to Mycelium daemon // Skip connection test for now due to API compatibility issues
self.test_connection().await?; // TODO: Fix Mycelium API compatibility
info!("Skipping connection test - assuming Mycelium daemon is running");
// Set running flag
{
let mut running = self.running.lock().await;
*running = true;
}
info!("Mycelium integration started successfully, listening on topic: {}", self.topic); info!("Mycelium integration started successfully, listening on topic: {}", self.topic);
// Note: In a real implementation, we would need to implement a message listener // Start message polling loop
// that polls or subscribes to incoming messages from the Mycelium daemon. let supervisor = Arc::clone(&self.supervisor);
// For now, this integration works with the existing client-server model let http_client = self.http_client.clone();
// where clients send pushMessage calls to the Mycelium daemon. let mycelium_url = self.mycelium_url.clone();
let topic = self.topic.clone();
let running = Arc::clone(&self.running);
tokio::spawn(async move {
Self::message_loop(supervisor, http_client, mycelium_url, topic, running).await;
});
Ok(()) Ok(())
} }
/// Test connection to Mycelium daemon /// Test connection to Mycelium daemon using JSON-RPC
async fn test_connection(&self) -> Result<(), Box<dyn std::error::Error>> { async fn test_connection(&self) -> Result<(), Box<dyn std::error::Error>> {
let test_req = json!({ let test_request = json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 1, "method": "getInfo",
"method": "messageStatus", "params": [],
"params": [{ "id": "test" }] "id": 1
}); });
let response = self.http_client let response = self.http_client
.post(&self.mycelium_url) .post(&self.mycelium_url)
.json(&test_req) .json(&test_request)
.send() .send()
.await?; .await?;
if response.status().is_success() { if response.status().is_success() {
info!("Successfully connected to Mycelium daemon"); let result: Value = response.json().await?;
Ok(()) if result.get("result").is_some() {
info!("Successfully connected to Mycelium daemon at {}", self.mycelium_url);
Ok(())
} else {
error!("Mycelium daemon returned error: {}", result);
Err("Mycelium daemon returned error".into())
}
} else { } else {
Err(format!("Failed to connect to Mycelium daemon: {}", response.status()).into()) let status = response.status();
let text = response.text().await.unwrap_or_default();
error!("Failed to connect to Mycelium daemon: {} - {}", status, text);
Err(format!("Mycelium connection failed: {}", status).into())
} }
} }
@@ -77,28 +99,30 @@ impl MyceliumIntegration {
pub async fn handle_supervisor_message( pub async fn handle_supervisor_message(
&self, &self,
payload_b64: &str, payload_b64: &str,
reply_info: Option<(String, String)>, // (src_ip, message_id) for replies reply_info: Option<(String, String)>,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
// Decode the supervisor JSON-RPC payload // Decode the base64 payload
let payload_bytes = base64::engine::general_purpose::STANDARD let payload_bytes = base64::engine::general_purpose::STANDARD
.decode(payload_b64) .decode(payload_b64.as_bytes())?;
.map_err(|e| format!("invalid base64: {}", e))?; let payload_str = String::from_utf8(payload_bytes)?;
let supervisor_rpc: Value = serde_json::from_slice(&payload_bytes) info!("Received supervisor message: {}", payload_str);
.map_err(|e| format!("invalid JSON: {}", e))?;
debug!("Decoded supervisor RPC: {}", supervisor_rpc); // Parse the JSON-RPC request
let request: Value = serde_json::from_str(&payload_str)?;
debug!("Decoded supervisor RPC: {}", request);
// Extract method and params from supervisor JSON-RPC // Extract method and params from supervisor JSON-RPC
let method = supervisor_rpc.get("method") let method = request.get("method")
.and_then(|v| v.as_str()) .and_then(|v| v.as_str())
.ok_or("missing method")?; .ok_or("missing method")?;
let rpc_params = supervisor_rpc.get("params") let rpc_params = request.get("params")
.cloned() .cloned()
.unwrap_or(json!([])); .unwrap_or(json!([]));
let rpc_id = supervisor_rpc.get("id").cloned(); let rpc_id = request.get("id").cloned();
// Route to appropriate supervisor method // Route to appropriate supervisor method
let result = self.route_supervisor_call(method, rpc_params).await?; let result = self.route_supervisor_call(method, rpc_params).await?;
@@ -114,40 +138,62 @@ impl MyceliumIntegration {
let response_b64 = base64::engine::general_purpose::STANDARD let response_b64 = base64::engine::general_purpose::STANDARD
.encode(serde_json::to_string(&supervisor_response)?.as_bytes()); .encode(serde_json::to_string(&supervisor_response)?.as_bytes());
info!("Sending response back to client at {}: {}", src_ip, supervisor_response);
// Send reply back to the client // Send reply back to the client
self.send_reply(&src_ip, &response_b64).await?; match self.send_reply(&src_ip, &response_b64).await {
Ok(()) => info!("✅ Response sent successfully to {}", src_ip),
Err(e) => error!("❌ Failed to send response to {}: {}", src_ip, e),
}
} }
Ok(Some("handled".to_string())) Ok(Some("handled".to_string()))
} }
/// Send a reply message back to a client /// Send a reply message back to a client using Mycelium JSON-RPC
async fn send_reply( async fn send_reply(
&self, &self,
dst_ip: &str, dst_ip: &str,
payload_b64: &str, payload_b64: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let reply_req = json!({ // Send response to a dedicated response topic
"jsonrpc": "2.0", let response_topic = "supervisor.response";
"id": 1, let topic_b64 = base64::engine::general_purpose::STANDARD.encode(response_topic.as_bytes());
"method": "pushMessage",
"params": [{ let message_info = json!({
"message": { "dst": { "ip": dst_ip },
"dst": { "ip": dst_ip }, "topic": topic_b64,
"topic": self.topic, "payload": payload_b64 // payload_b64 is already base64 encoded
"payload": payload_b64
}
}]
}); });
let _response = self.http_client let push_request = json!({
"jsonrpc": "2.0",
"method": "pushMessage",
"params": [message_info, null],
"id": 1
});
let response = self.http_client
.post(&self.mycelium_url) .post(&self.mycelium_url)
.json(&reply_req) .json(&push_request)
.send() .send()
.await?; .await?;
debug!("Sent reply to {}", dst_ip); if response.status().is_success() {
Ok(()) let result: Value = response.json().await?;
if result.get("result").is_some() {
debug!("Sent reply to {}", dst_ip);
Ok(())
} else {
error!("Failed to send reply, Mycelium error: {}", result);
Err("Mycelium pushMessage failed".into())
}
} else {
let status = response.status();
let text = response.text().await.unwrap_or_default();
error!("Failed to send reply: {} - {}", status, text);
Err(format!("Failed to send reply: {}", status).into())
}
} }
/// Route supervisor method calls to the appropriate supervisor functions /// Route supervisor method calls to the appropriate supervisor functions
@@ -160,6 +206,7 @@ impl MyceliumIntegration {
match method { match method {
"list_runners" => { "list_runners" => {
// list_runners doesn't require parameters
let runners = supervisor_guard.list_runners(); let runners = supervisor_guard.list_runners();
Ok(json!(runners)) Ok(json!(runners))
} }
@@ -303,6 +350,101 @@ impl MyceliumIntegration {
} }
} }
} }
/// Message polling loop that listens for incoming messages
async fn message_loop(
supervisor: Arc<Mutex<Supervisor>>,
http_client: HttpClient,
mycelium_url: String,
topic: String,
running: Arc<Mutex<bool>>,
) {
info!("Starting message polling loop for topic: {} (base64: {})", topic, base64::engine::general_purpose::STANDARD.encode(topic.as_bytes()));
while {
let running_guard = running.lock().await;
*running_guard
} {
// Poll for messages using Mycelium JSON-RPC API
// Topic needs to be base64 encoded for the RPC API
let topic_b64 = base64::engine::general_purpose::STANDARD.encode(topic.as_bytes());
let poll_request = json!({
"jsonrpc": "2.0",
"method": "popMessage",
"params": [null, 1, topic_b64], // Reduced timeout to 1 second
"id": 1
});
debug!("Polling for messages with request: {}", poll_request);
match tokio::time::timeout(
Duration::from_secs(10),
http_client.post(&mycelium_url).json(&poll_request).send()
).await {
Ok(Ok(response)) => {
if response.status().is_success() {
match response.json::<Value>().await {
Ok(rpc_response) => {
if let Some(message) = rpc_response.get("result") {
debug!("Received message: {}", message);
// Extract message details
if let (Some(payload), Some(src_ip), Some(msg_id)) = (
message.get("payload").and_then(|v| v.as_str()),
message.get("srcIp").and_then(|v| v.as_str()),
message.get("id").and_then(|v| v.as_str()),
) {
// Create a temporary integration instance to handle the message
let integration = MyceliumIntegration {
supervisor: Arc::clone(&supervisor),
mycelium_url: mycelium_url.clone(),
http_client: http_client.clone(),
topic: topic.clone(),
running: Arc::clone(&running),
};
let reply_info = Some((src_ip.to_string(), msg_id.to_string()));
if let Err(e) = integration.handle_supervisor_message(payload, reply_info).await {
error!("Error handling supervisor message: {}", e);
}
}
} else if let Some(error) = rpc_response.get("error") {
let error_code = error.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
if error_code == -32014 {
// Timeout - no message available, continue polling
trace!("No messages available (timeout)");
} else {
error!("Mycelium RPC error: {}", error);
sleep(Duration::from_secs(1)).await;
}
} else {
trace!("No messages available");
}
}
Err(e) => {
error!("Failed to parse RPC response JSON: {}", e);
}
}
} else {
let status = response.status();
let text = response.text().await.unwrap_or_default();
error!("Message polling error: {} - {}", status, text);
sleep(Duration::from_secs(1)).await;
}
}
Ok(Err(e)) => {
error!("HTTP request failed: {}", e);
sleep(Duration::from_secs(1)).await;
}
Err(_) => {
error!("Polling request timed out after 10 seconds");
sleep(Duration::from_secs(1)).await;
}
}
}
info!("Message polling loop stopped");
}
} }
// Legacy type alias for backward compatibility // Legacy type alias for backward compatibility