@@ -12,9 +12,9 @@ use crate::clients::{Destination, MyceliumClient, MyceliumClientError, Superviso
|
|||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct SupervisorClient {
|
pub struct SupervisorClient {
|
||||||
hub: Arc<SupervisorHub>, // Global hub with background pop loop and shared id generator
|
hub: Arc<SupervisorHub>, // Global hub with background pop loop and shared id generator
|
||||||
destination: Destination, // ip or pk
|
destination: Destination, // ip or pk
|
||||||
secret: Option<String>, // optional, required by several supervisor methods
|
secret: Option<String>, // optional, required by several supervisor methods
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
|
@@ -102,8 +102,11 @@ async fn main() {
|
|||||||
// Start router workers (auto-discovered contexts) using a single global SupervisorHub (no separate inbound listener)
|
// Start router workers (auto-discovered contexts) using a single global SupervisorHub (no separate inbound listener)
|
||||||
{
|
{
|
||||||
let base_url = format!("http://{}:{}", cli.mycelium_ip, cli.mycelium_port);
|
let base_url = format!("http://{}:{}", cli.mycelium_ip, cli.mycelium_port);
|
||||||
let hub = herocoordinator::clients::SupervisorHub::new(base_url.clone(), "supervisor.rpc".to_string())
|
let hub = herocoordinator::clients::SupervisorHub::new(
|
||||||
.expect("Failed to initialize SupervisorHub");
|
base_url.clone(),
|
||||||
|
"supervisor.rpc".to_string(),
|
||||||
|
)
|
||||||
|
.expect("Failed to initialize SupervisorHub");
|
||||||
let cfg = herocoordinator::router::RouterConfig {
|
let cfg = herocoordinator::router::RouterConfig {
|
||||||
context_ids: Vec::new(), // ignored by start_router_auto
|
context_ids: Vec::new(), // ignored by start_router_auto
|
||||||
concurrency: 32,
|
concurrency: 32,
|
||||||
|
123
src/router.rs
123
src/router.rs
@@ -1,11 +1,14 @@
|
|||||||
use std::{collections::{HashSet, HashMap}, sync::Arc};
|
use std::{
|
||||||
|
collections::{HashMap, HashSet},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
use base64::Engine;
|
use base64::Engine;
|
||||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||||
use serde_json::{Value, json};
|
use serde_json::{Value, json};
|
||||||
use tokio::sync::{Semaphore, Mutex};
|
|
||||||
use std::hash::{Hash, Hasher};
|
|
||||||
use std::collections::hash_map::DefaultHasher;
|
use std::collections::hash_map::DefaultHasher;
|
||||||
|
use std::hash::{Hash, Hasher};
|
||||||
|
use tokio::sync::{Mutex, Semaphore};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
clients::{Destination, MyceliumClient, SupervisorClient, SupervisorHub},
|
clients::{Destination, MyceliumClient, SupervisorClient, SupervisorHub},
|
||||||
@@ -18,8 +21,8 @@ use tracing::{error, info};
|
|||||||
pub struct RouterConfig {
|
pub struct RouterConfig {
|
||||||
pub context_ids: Vec<u32>,
|
pub context_ids: Vec<u32>,
|
||||||
pub concurrency: usize,
|
pub concurrency: usize,
|
||||||
pub base_url: String, // e.g. http://127.0.0.1:8990
|
pub base_url: String, // e.g. http://127.0.0.1:8990
|
||||||
pub topic: String, // e.g. "supervisor.rpc"
|
pub topic: String, // e.g. "supervisor.rpc"
|
||||||
pub sup_hub: Arc<SupervisorHub>, // global supervisor hub for replies
|
pub sup_hub: Arc<SupervisorHub>, // global supervisor hub for replies
|
||||||
// Transport status polling configuration
|
// Transport status polling configuration
|
||||||
pub transport_poll_interval_secs: u64, // e.g. 2
|
pub transport_poll_interval_secs: u64, // e.g. 2
|
||||||
@@ -96,11 +99,7 @@ impl SupervisorClientCache {
|
|||||||
tracing::debug!(target: "router", cache="supervisor", hit=true, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache lookup (double-checked)");
|
tracing::debug!(target: "router", cache="supervisor", hit=true, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache lookup (double-checked)");
|
||||||
return existing.clone();
|
return existing.clone();
|
||||||
}
|
}
|
||||||
let client = Arc::new(SupervisorClient::new_with_hub(
|
let client = Arc::new(SupervisorClient::new_with_hub(hub, dest, secret.clone()));
|
||||||
hub,
|
|
||||||
dest,
|
|
||||||
secret.clone(),
|
|
||||||
));
|
|
||||||
guard.insert(key, client.clone());
|
guard.insert(key, client.clone());
|
||||||
tracing::debug!(target: "router", cache="supervisor", hit=false, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache insert");
|
tracing::debug!(target: "router", cache="supervisor", hit=false, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache insert");
|
||||||
client
|
client
|
||||||
@@ -153,9 +152,16 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec<tokio::task::
|
|||||||
async move {
|
async move {
|
||||||
// Ensure permit is dropped at end of task
|
// Ensure permit is dropped at end of task
|
||||||
let _permit = permit;
|
let _permit = permit;
|
||||||
if let Err(e) =
|
if let Err(e) = deliver_one(
|
||||||
deliver_one(&service_task, &cfg_task, ctx_id, &key, mycelium, sup_hub, cache.clone())
|
&service_task,
|
||||||
.await
|
&cfg_task,
|
||||||
|
ctx_id,
|
||||||
|
&key,
|
||||||
|
mycelium,
|
||||||
|
sup_hub,
|
||||||
|
cache.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
{
|
{
|
||||||
error!(context_id=ctx_id, key=%key, error=%e, "Delivery error");
|
error!(context_id=ctx_id, key=%key, error=%e, "Delivery error");
|
||||||
}
|
}
|
||||||
@@ -384,7 +390,8 @@ async fn deliver_one(
|
|||||||
if !requested_job_check {
|
if !requested_job_check {
|
||||||
if let Some(job_id) = job_id_opt {
|
if let Some(job_id) = job_id_opt {
|
||||||
// First consult Redis for the latest job state in case we already have a terminal update
|
// First consult Redis for the latest job state in case we already have a terminal update
|
||||||
match service_poll.load_job(context_id, caller_id, job_id).await {
|
match service_poll.load_job(context_id, caller_id, job_id).await
|
||||||
|
{
|
||||||
Ok(job) => {
|
Ok(job) => {
|
||||||
match job.status() {
|
match job.status() {
|
||||||
JobStatus::Finished | JobStatus::Error => {
|
JobStatus::Finished | JobStatus::Error => {
|
||||||
@@ -412,14 +419,25 @@ async fn deliver_one(
|
|||||||
secret_for_poller.clone(),
|
secret_for_poller.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
match sup.job_result_wait(job_id.to_string()).await {
|
match sup
|
||||||
|
.job_result_wait(job_id.to_string())
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok((_out2, reply2)) => {
|
Ok((_out2, reply2)) => {
|
||||||
// Interpret reply synchronously: success/error/bare string
|
// Interpret reply synchronously: success/error/bare string
|
||||||
let res = reply2.get("result");
|
let res = reply2.get("result");
|
||||||
if let Some(obj) = res.and_then(|v| v.as_object()) {
|
if let Some(obj) =
|
||||||
if let Some(s) = obj.get("success").and_then(|v| v.as_str()) {
|
res.and_then(|v| v.as_object())
|
||||||
|
{
|
||||||
|
if let Some(s) = obj
|
||||||
|
.get("success")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
{
|
||||||
let mut patch = std::collections::HashMap::new();
|
let mut patch = std::collections::HashMap::new();
|
||||||
patch.insert("success".to_string(), s.to_string());
|
patch.insert(
|
||||||
|
"success".to_string(),
|
||||||
|
s.to_string(),
|
||||||
|
);
|
||||||
let _ = service_poll
|
let _ = service_poll
|
||||||
.update_job_result_merge_unchecked(
|
.update_job_result_merge_unchecked(
|
||||||
context_id, caller_id, job_id, patch,
|
context_id, caller_id, job_id, patch,
|
||||||
@@ -444,9 +462,15 @@ async fn deliver_one(
|
|||||||
)],
|
)],
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
} else if let Some(s) = obj.get("error").and_then(|v| v.as_str()) {
|
} else if let Some(s) = obj
|
||||||
|
.get("error")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
{
|
||||||
let mut patch = std::collections::HashMap::new();
|
let mut patch = std::collections::HashMap::new();
|
||||||
patch.insert("error".to_string(), s.to_string());
|
patch.insert(
|
||||||
|
"error".to_string(),
|
||||||
|
s.to_string(),
|
||||||
|
);
|
||||||
let _ = service_poll
|
let _ = service_poll
|
||||||
.update_job_result_merge_unchecked(
|
.update_job_result_merge_unchecked(
|
||||||
context_id, caller_id, job_id, patch,
|
context_id, caller_id, job_id, patch,
|
||||||
@@ -472,9 +496,14 @@ async fn deliver_one(
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
} else if let Some(s) = res.and_then(|v| v.as_str()) {
|
} else if let Some(s) =
|
||||||
|
res.and_then(|v| v.as_str())
|
||||||
|
{
|
||||||
let mut patch = std::collections::HashMap::new();
|
let mut patch = std::collections::HashMap::new();
|
||||||
patch.insert("success".to_string(), s.to_string());
|
patch.insert(
|
||||||
|
"success".to_string(),
|
||||||
|
s.to_string(),
|
||||||
|
);
|
||||||
let _ = service_poll
|
let _ = service_poll
|
||||||
.update_job_result_merge_unchecked(
|
.update_job_result_merge_unchecked(
|
||||||
context_id, caller_id, job_id, patch,
|
context_id, caller_id, job_id, patch,
|
||||||
@@ -549,15 +578,23 @@ async fn deliver_one(
|
|||||||
secret_for_poller.clone(),
|
secret_for_poller.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
match sup.job_status_wait(job_id.to_string()).await {
|
match sup
|
||||||
|
.job_status_wait(job_id.to_string())
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok((_out_id, reply_status)) => {
|
Ok((_out_id, reply_status)) => {
|
||||||
// Interpret status reply synchronously
|
// Interpret status reply synchronously
|
||||||
let result_opt = reply_status.get("result");
|
let result_opt =
|
||||||
let error_opt = reply_status.get("error");
|
reply_status.get("result");
|
||||||
|
let error_opt =
|
||||||
|
reply_status.get("error");
|
||||||
if let Some(err_obj) = error_opt {
|
if let Some(err_obj) = error_opt {
|
||||||
let _ = service_poll
|
let _ = service_poll
|
||||||
.update_job_status_unchecked(
|
.update_job_status_unchecked(
|
||||||
context_id, caller_id, job_id, JobStatus::Error,
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
job_id,
|
||||||
|
JobStatus::Error,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
let _ = service_poll
|
let _ = service_poll
|
||||||
@@ -574,9 +611,16 @@ async fn deliver_one(
|
|||||||
.get("status")
|
.get("status")
|
||||||
.and_then(|v| v.as_str())
|
.and_then(|v| v.as_str())
|
||||||
.or_else(|| res.as_str());
|
.or_else(|| res.as_str());
|
||||||
if let Some(remote_status) = status_candidate {
|
if let Some(remote_status) =
|
||||||
if let Some((mapped, terminal)) =
|
status_candidate
|
||||||
map_supervisor_job_status(remote_status)
|
{
|
||||||
|
if let Some((
|
||||||
|
mapped,
|
||||||
|
terminal,
|
||||||
|
)) =
|
||||||
|
map_supervisor_job_status(
|
||||||
|
remote_status,
|
||||||
|
)
|
||||||
{
|
{
|
||||||
let _ = service_poll
|
let _ = service_poll
|
||||||
.update_job_status_unchecked(
|
.update_job_status_unchecked(
|
||||||
@@ -596,8 +640,19 @@ async fn deliver_one(
|
|||||||
// If terminal, request job.result now (handled above for local terminal case)
|
// If terminal, request job.result now (handled above for local terminal case)
|
||||||
if terminal {
|
if terminal {
|
||||||
// trigger job.result only if result empty to avoid spam
|
// trigger job.result only if result empty to avoid spam
|
||||||
if let Ok(j_after) = service_poll.load_job(context_id, caller_id, job_id).await {
|
if let Ok(j_after) =
|
||||||
if j_after.result.is_empty() {
|
service_poll
|
||||||
|
.load_job(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
job_id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
if j_after
|
||||||
|
.result
|
||||||
|
.is_empty()
|
||||||
|
{
|
||||||
let sup2 = cache
|
let sup2 = cache
|
||||||
.get_or_create(
|
.get_or_create(
|
||||||
sup_hub.clone(),
|
sup_hub.clone(),
|
||||||
@@ -675,7 +730,10 @@ async fn deliver_one(
|
|||||||
context_id,
|
context_id,
|
||||||
caller_id,
|
caller_id,
|
||||||
id,
|
id,
|
||||||
vec![format!("job.status request error: {}", e)],
|
vec![format!(
|
||||||
|
"job.status request error: {}",
|
||||||
|
e
|
||||||
|
)],
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
@@ -802,4 +860,3 @@ pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task:
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -1182,10 +1182,7 @@ impl AppService {
|
|||||||
&self,
|
&self,
|
||||||
inner_id: u64,
|
inner_id: u64,
|
||||||
) -> Result<Option<(u32, u32, u32, u32)>, BoxError> {
|
) -> Result<Option<(u32, u32, u32, u32)>, BoxError> {
|
||||||
self.redis
|
self.redis.supcorr_get(inner_id).await.map_err(Into::into)
|
||||||
.supcorr_get(inner_id)
|
|
||||||
.await
|
|
||||||
.map_err(Into::into)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Correlation map: delete mapping by inner supervisor JSON-RPC id.
|
/// Correlation map: delete mapping by inner supervisor JSON-RPC id.
|
||||||
|
@@ -789,10 +789,7 @@ impl RedisDriver {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn supcorr_get(
|
pub async fn supcorr_get(&self, inner_id: u64) -> Result<Option<(u32, u32, u32, u32)>> {
|
||||||
&self,
|
|
||||||
inner_id: u64,
|
|
||||||
) -> Result<Option<(u32, u32, u32, u32)>> {
|
|
||||||
let mut cm = self.manager_for_db(0).await?;
|
let mut cm = self.manager_for_db(0).await?;
|
||||||
let key = format!("supcorr:{}", inner_id);
|
let key = format!("supcorr:{}", inner_id);
|
||||||
let res: Option<String> = redis::cmd("GET")
|
let res: Option<String> = redis::cmd("GET")
|
||||||
|
Reference in New Issue
Block a user