diff --git a/src/clients/supervisor_client.rs b/src/clients/supervisor_client.rs index 1f9f9ee..1cb3f04 100644 --- a/src/clients/supervisor_client.rs +++ b/src/clients/supervisor_client.rs @@ -12,9 +12,9 @@ use crate::clients::{Destination, MyceliumClient, MyceliumClientError, Superviso #[derive(Clone)] pub struct SupervisorClient { - hub: Arc, // Global hub with background pop loop and shared id generator - destination: Destination, // ip or pk - secret: Option, // optional, required by several supervisor methods + hub: Arc, // Global hub with background pop loop and shared id generator + destination: Destination, // ip or pk + secret: Option, // optional, required by several supervisor methods } #[derive(Debug, Error)] diff --git a/src/main.rs b/src/main.rs index ab32472..971c8ae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -102,8 +102,11 @@ async fn main() { // Start router workers (auto-discovered contexts) using a single global SupervisorHub (no separate inbound listener) { let base_url = format!("http://{}:{}", cli.mycelium_ip, cli.mycelium_port); - let hub = herocoordinator::clients::SupervisorHub::new(base_url.clone(), "supervisor.rpc".to_string()) - .expect("Failed to initialize SupervisorHub"); + let hub = herocoordinator::clients::SupervisorHub::new( + base_url.clone(), + "supervisor.rpc".to_string(), + ) + .expect("Failed to initialize SupervisorHub"); let cfg = herocoordinator::router::RouterConfig { context_ids: Vec::new(), // ignored by start_router_auto concurrency: 32, diff --git a/src/router.rs b/src/router.rs index 272e50e..78292ef 100644 --- a/src/router.rs +++ b/src/router.rs @@ -1,11 +1,14 @@ -use std::{collections::{HashSet, HashMap}, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use serde_json::{Value, json}; -use tokio::sync::{Semaphore, Mutex}; -use std::hash::{Hash, Hasher}; use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; +use tokio::sync::{Mutex, Semaphore}; use crate::{ clients::{Destination, MyceliumClient, SupervisorClient, SupervisorHub}, @@ -18,8 +21,8 @@ use tracing::{error, info}; pub struct RouterConfig { pub context_ids: Vec, pub concurrency: usize, - pub base_url: String, // e.g. http://127.0.0.1:8990 - pub topic: String, // e.g. "supervisor.rpc" + pub base_url: String, // e.g. http://127.0.0.1:8990 + pub topic: String, // e.g. "supervisor.rpc" pub sup_hub: Arc, // global supervisor hub for replies // Transport status polling configuration pub transport_poll_interval_secs: u64, // e.g. 2 @@ -96,11 +99,7 @@ impl SupervisorClientCache { tracing::debug!(target: "router", cache="supervisor", hit=true, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache lookup (double-checked)"); return existing.clone(); } - let client = Arc::new(SupervisorClient::new_with_hub( - hub, - dest, - secret.clone(), - )); + let client = Arc::new(SupervisorClient::new_with_hub(hub, dest, secret.clone())); guard.insert(key, client.clone()); tracing::debug!(target: "router", cache="supervisor", hit=false, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache insert"); client @@ -153,9 +152,16 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec { match job.status() { JobStatus::Finished | JobStatus::Error => { @@ -412,14 +419,25 @@ async fn deliver_one( secret_for_poller.clone(), ) .await; - match sup.job_result_wait(job_id.to_string()).await { + match sup + .job_result_wait(job_id.to_string()) + .await + { Ok((_out2, reply2)) => { // Interpret reply synchronously: success/error/bare string let res = reply2.get("result"); - if let Some(obj) = res.and_then(|v| v.as_object()) { - if let Some(s) = obj.get("success").and_then(|v| v.as_str()) { + if let Some(obj) = + res.and_then(|v| v.as_object()) + { + if let Some(s) = obj + .get("success") + .and_then(|v| v.as_str()) + { let mut patch = std::collections::HashMap::new(); - patch.insert("success".to_string(), s.to_string()); + patch.insert( + "success".to_string(), + s.to_string(), + ); let _ = service_poll .update_job_result_merge_unchecked( context_id, caller_id, job_id, patch, @@ -444,9 +462,15 @@ async fn deliver_one( )], ) .await; - } else if let Some(s) = obj.get("error").and_then(|v| v.as_str()) { + } else if let Some(s) = obj + .get("error") + .and_then(|v| v.as_str()) + { let mut patch = std::collections::HashMap::new(); - patch.insert("error".to_string(), s.to_string()); + patch.insert( + "error".to_string(), + s.to_string(), + ); let _ = service_poll .update_job_result_merge_unchecked( context_id, caller_id, job_id, patch, @@ -472,9 +496,14 @@ async fn deliver_one( ) .await; } - } else if let Some(s) = res.and_then(|v| v.as_str()) { + } else if let Some(s) = + res.and_then(|v| v.as_str()) + { let mut patch = std::collections::HashMap::new(); - patch.insert("success".to_string(), s.to_string()); + patch.insert( + "success".to_string(), + s.to_string(), + ); let _ = service_poll .update_job_result_merge_unchecked( context_id, caller_id, job_id, patch, @@ -549,15 +578,23 @@ async fn deliver_one( secret_for_poller.clone(), ) .await; - match sup.job_status_wait(job_id.to_string()).await { + match sup + .job_status_wait(job_id.to_string()) + .await + { Ok((_out_id, reply_status)) => { // Interpret status reply synchronously - let result_opt = reply_status.get("result"); - let error_opt = reply_status.get("error"); + let result_opt = + reply_status.get("result"); + let error_opt = + reply_status.get("error"); if let Some(err_obj) = error_opt { let _ = service_poll .update_job_status_unchecked( - context_id, caller_id, job_id, JobStatus::Error, + context_id, + caller_id, + job_id, + JobStatus::Error, ) .await; let _ = service_poll @@ -574,9 +611,16 @@ async fn deliver_one( .get("status") .and_then(|v| v.as_str()) .or_else(|| res.as_str()); - if let Some(remote_status) = status_candidate { - if let Some((mapped, terminal)) = - map_supervisor_job_status(remote_status) + if let Some(remote_status) = + status_candidate + { + if let Some(( + mapped, + terminal, + )) = + map_supervisor_job_status( + remote_status, + ) { let _ = service_poll .update_job_status_unchecked( @@ -596,8 +640,19 @@ async fn deliver_one( // If terminal, request job.result now (handled above for local terminal case) if terminal { // trigger job.result only if result empty to avoid spam - if let Ok(j_after) = service_poll.load_job(context_id, caller_id, job_id).await { - if j_after.result.is_empty() { + if let Ok(j_after) = + service_poll + .load_job( + context_id, + caller_id, + job_id, + ) + .await + { + if j_after + .result + .is_empty() + { let sup2 = cache .get_or_create( sup_hub.clone(), @@ -675,7 +730,10 @@ async fn deliver_one( context_id, caller_id, id, - vec![format!("job.status request error: {}", e)], + vec![format!( + "job.status request error: {}", + e + )], ) .await; } @@ -802,4 +860,3 @@ pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task: } }) } - diff --git a/src/service.rs b/src/service.rs index 84c9754..d586830 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1182,10 +1182,7 @@ impl AppService { &self, inner_id: u64, ) -> Result, BoxError> { - self.redis - .supcorr_get(inner_id) - .await - .map_err(Into::into) + self.redis.supcorr_get(inner_id).await.map_err(Into::into) } /// Correlation map: delete mapping by inner supervisor JSON-RPC id. diff --git a/src/storage/redis.rs b/src/storage/redis.rs index 41f5196..2a6f323 100644 --- a/src/storage/redis.rs +++ b/src/storage/redis.rs @@ -789,10 +789,7 @@ impl RedisDriver { Ok(()) } - pub async fn supcorr_get( - &self, - inner_id: u64, - ) -> Result> { + pub async fn supcorr_get(&self, inner_id: u64) -> Result> { let mut cm = self.manager_for_db(0).await?; let key = format!("supcorr:{}", inner_id); let res: Option = redis::cmd("GET")