Separate mycelium client more from supervisor client
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
@@ -34,6 +34,18 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec<tokio::task::
|
||||
let cfg_cloned = cfg.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
let sem = Arc::new(Semaphore::new(cfg_cloned.concurrency));
|
||||
|
||||
// Create a shared Mycelium client for this context loop (retry until available)
|
||||
let mycelium = loop {
|
||||
match MyceliumClient::new(cfg_cloned.base_url.clone()) {
|
||||
Ok(c) => break Arc::new(c),
|
||||
Err(e) => {
|
||||
eprintln!("[router ctx={}] MyceliumClient init error: {}", ctx_id, e);
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
loop {
|
||||
// Pop next message key (blocking with timeout)
|
||||
match service_cloned.brpop_msg_out(ctx_id, 1).await {
|
||||
@@ -52,16 +64,19 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec<tokio::task::
|
||||
};
|
||||
let service_task = service_cloned.clone();
|
||||
let cfg_task = cfg_cloned.clone();
|
||||
tokio::spawn(async move {
|
||||
// Ensure permit is dropped at end of task
|
||||
let _permit = permit;
|
||||
if let Err(e) =
|
||||
deliver_one(&service_task, &cfg_task, ctx_id, &key).await
|
||||
{
|
||||
eprintln!(
|
||||
"[router ctx={}] delivery error for {}: {}",
|
||||
ctx_id, key, e
|
||||
);
|
||||
tokio::spawn({
|
||||
let mycelium = mycelium.clone();
|
||||
async move {
|
||||
// Ensure permit is dropped at end of task
|
||||
let _permit = permit;
|
||||
if let Err(e) =
|
||||
deliver_one(&service_task, &cfg_task, ctx_id, &key, mycelium).await
|
||||
{
|
||||
eprintln!(
|
||||
"[router ctx={}] delivery error for {}: {}",
|
||||
ctx_id, key, e
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -87,6 +102,7 @@ async fn deliver_one(
|
||||
cfg: &RouterConfig,
|
||||
context_id: u32,
|
||||
msg_key: &str,
|
||||
mycelium: Arc<MyceliumClient>,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Parse "message:{caller_id}:{id}"
|
||||
let (caller_id, id) = parse_message_key(msg_key)
|
||||
@@ -120,12 +136,12 @@ async fn deliver_one(
|
||||
} else {
|
||||
Destination::Ip(runner.address)
|
||||
};
|
||||
let client = SupervisorClient::new(
|
||||
cfg.base_url.clone(),
|
||||
let client = SupervisorClient::new_with_client(
|
||||
mycelium.clone(),
|
||||
dest,
|
||||
cfg.topic.clone(),
|
||||
None, // secret
|
||||
)?;
|
||||
);
|
||||
|
||||
// Build supervisor method and params from Message
|
||||
let method = msg.message.clone();
|
||||
@@ -153,27 +169,14 @@ async fn deliver_one(
|
||||
// Spawn transport-status poller
|
||||
{
|
||||
let service_poll = service.clone();
|
||||
let base_url = cfg.base_url.clone();
|
||||
let poll_interval = std::time::Duration::from_secs(cfg.transport_poll_interval_secs);
|
||||
let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs);
|
||||
let out_id_cloned = out_id.clone();
|
||||
let mycelium = mycelium.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let start = std::time::Instant::now();
|
||||
let client = match MyceliumClient::new(base_url) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
vec![format!("MyceliumClient init error: {e}")],
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let client = mycelium;
|
||||
|
||||
let mut last_status: Option<TransportStatus> = Some(TransportStatus::Sent);
|
||||
|
||||
|
Reference in New Issue
Block a user