921 lines
45 KiB
Rust
921 lines
45 KiB
Rust
use std::{collections::{HashSet, HashMap}, sync::Arc};
|
|
|
|
use base64::Engine;
|
|
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
|
use serde_json::{Value, json};
|
|
use tokio::sync::{Semaphore, Mutex};
|
|
use std::hash::{Hash, Hasher};
|
|
use std::collections::hash_map::DefaultHasher;
|
|
|
|
use crate::{
|
|
clients::{Destination, MyceliumClient, SupervisorClient},
|
|
models::{Job, JobStatus, Message, MessageStatus, ScriptType, TransportStatus},
|
|
service::AppService,
|
|
};
|
|
use tracing::{error, info};
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct RouterConfig {
|
|
pub context_ids: Vec<u32>,
|
|
pub concurrency: usize,
|
|
pub base_url: String, // e.g. http://127.0.0.1:8990
|
|
pub topic: String, // e.g. "supervisor.rpc"
|
|
// Transport status polling configuration
|
|
pub transport_poll_interval_secs: u64, // e.g. 2
|
|
pub transport_poll_timeout_secs: u64, // e.g. 300 (5 minutes)
|
|
}
|
|
|
|
/*
|
|
SupervisorClient reuse cache (Router-local):
|
|
|
|
Rationale:
|
|
- SupervisorClient maintains an internal JSON-RPC id_counter per instance.
|
|
- Rebuilding a client for each message resets this counter, causing inner JSON-RPC ids to restart at 1.
|
|
- We reuse one SupervisorClient per (destination, topic, secret) to preserve monotonically increasing ids.
|
|
|
|
Scope:
|
|
- Cache is per Router loop (and a separate one for the inbound listener).
|
|
- If cross-loop/process reuse becomes necessary later, promote to a process-global cache.
|
|
|
|
Keying:
|
|
- Key: destination + topic + secret-presence (secret content hashed; not stored in plaintext).
|
|
|
|
Concurrency:
|
|
- tokio::Mutex protects a HashMap<String, Arc<SupervisorClient>>.
|
|
- Values are Arc so call sites clone cheaply and share the same id_counter.
|
|
*/
|
|
#[derive(Clone)]
|
|
struct SupervisorClientCache {
|
|
map: Arc<Mutex<HashMap<String, Arc<SupervisorClient>>>>,
|
|
}
|
|
|
|
impl SupervisorClientCache {
|
|
fn new() -> Self {
|
|
Self {
|
|
map: Arc::new(Mutex::new(HashMap::new())),
|
|
}
|
|
}
|
|
|
|
fn make_key(dest: &Destination, topic: &str, secret: &Option<String>) -> String {
|
|
let dst = match dest {
|
|
Destination::Ip(ip) => format!("ip:{ip}"),
|
|
Destination::Pk(pk) => format!("pk:{pk}"),
|
|
};
|
|
// Hash the secret to avoid storing plaintext in keys while still differentiating values
|
|
let sec_hash = match secret {
|
|
Some(s) if !s.is_empty() => {
|
|
let mut hasher = DefaultHasher::new();
|
|
s.hash(&mut hasher);
|
|
format!("s:{}", hasher.finish())
|
|
}
|
|
_ => "s:none".to_string(),
|
|
};
|
|
format!("{dst}|t:{topic}|{sec_hash}")
|
|
}
|
|
|
|
async fn get_or_create(
|
|
&self,
|
|
mycelium: Arc<MyceliumClient>,
|
|
dest: Destination,
|
|
topic: String,
|
|
secret: Option<String>,
|
|
) -> Arc<SupervisorClient> {
|
|
let key = Self::make_key(&dest, &topic, &secret);
|
|
|
|
{
|
|
let guard = self.map.lock().await;
|
|
if let Some(existing) = guard.get(&key) {
|
|
tracing::debug!(target: "router", cache="supervisor", hit=true, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache lookup");
|
|
return existing.clone();
|
|
}
|
|
}
|
|
|
|
let mut guard = self.map.lock().await;
|
|
if let Some(existing) = guard.get(&key) {
|
|
tracing::debug!(target: "router", cache="supervisor", hit=true, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache lookup (double-checked)");
|
|
return existing.clone();
|
|
}
|
|
let client = Arc::new(SupervisorClient::new_with_client(
|
|
mycelium,
|
|
dest,
|
|
topic.clone(),
|
|
secret.clone(),
|
|
));
|
|
guard.insert(key, client.clone());
|
|
tracing::debug!(target: "router", cache="supervisor", hit=false, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache insert");
|
|
client
|
|
}
|
|
}
|
|
|
|
/// Start background router loops, one per context.
|
|
/// Each loop:
|
|
/// - BRPOP msg_out with 1s timeout
|
|
/// - Loads the Message by key, selects a Runner by script_type
|
|
/// - Sends supervisor JSON-RPC via Mycelium
|
|
/// - On success: Message.status = Acknowledged
|
|
/// - On error: Message.status = Error and append a log
|
|
pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec<tokio::task::JoinHandle<()>> {
|
|
let mut handles = Vec::new();
|
|
for ctx_id in cfg.context_ids.clone() {
|
|
let service_cloned = service.clone();
|
|
let cfg_cloned = cfg.clone();
|
|
let handle = tokio::spawn(async move {
|
|
let sem = Arc::new(Semaphore::new(cfg_cloned.concurrency));
|
|
|
|
// Create a shared Mycelium client for this context loop (retry until available)
|
|
let mycelium = loop {
|
|
match MyceliumClient::new(cfg_cloned.base_url.clone()) {
|
|
Ok(c) => break Arc::new(c),
|
|
Err(e) => {
|
|
error!(context_id=ctx_id, error=%e, "MyceliumClient init error");
|
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|
}
|
|
}
|
|
};
|
|
|
|
let cache = Arc::new(SupervisorClientCache::new());
|
|
|
|
loop {
|
|
// Pop next message key (blocking with timeout)
|
|
match service_cloned.brpop_msg_out(ctx_id, 1).await {
|
|
Ok(Some(key)) => {
|
|
let permit = {
|
|
// acquire a concurrency permit (non-fair is fine)
|
|
let sem = sem.clone();
|
|
// if semaphore is exhausted, await until a slot becomes available
|
|
match sem.acquire_owned().await {
|
|
Ok(p) => p,
|
|
Err(_) => {
|
|
// Semaphore closed; exit loop
|
|
break;
|
|
}
|
|
}
|
|
};
|
|
let service_task = service_cloned.clone();
|
|
let cfg_task = cfg_cloned.clone();
|
|
tokio::spawn({
|
|
let mycelium = mycelium.clone();
|
|
let cache = cache.clone();
|
|
async move {
|
|
// Ensure permit is dropped at end of task
|
|
let _permit = permit;
|
|
if let Err(e) =
|
|
deliver_one(&service_task, &cfg_task, ctx_id, &key, mycelium, cache.clone())
|
|
.await
|
|
{
|
|
error!(context_id=ctx_id, key=%key, error=%e, "Delivery error");
|
|
}
|
|
}
|
|
});
|
|
}
|
|
Ok(None) => {
|
|
// timeout: just tick
|
|
continue;
|
|
}
|
|
Err(e) => {
|
|
error!(context_id=ctx_id, error=%e, "BRPOP error");
|
|
// small backoff to avoid busy-loop on persistent errors
|
|
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
handles.push(handle);
|
|
}
|
|
handles
|
|
}
|
|
|
|
async fn deliver_one(
|
|
service: &AppService,
|
|
cfg: &RouterConfig,
|
|
context_id: u32,
|
|
msg_key: &str,
|
|
mycelium: Arc<MyceliumClient>,
|
|
cache: Arc<SupervisorClientCache>,
|
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|
// Parse "message:{caller_id}:{id}"
|
|
let (caller_id, id) = parse_message_key(msg_key)
|
|
.ok_or_else(|| format!("invalid message key format: {}", msg_key))?;
|
|
|
|
// Load message
|
|
let msg: Message = service.load_message(context_id, caller_id, id).await?;
|
|
// Embedded job id (if any)
|
|
let job_id_opt: Option<u32> = msg.job.first().map(|j| j.id);
|
|
|
|
// Determine routing script_type
|
|
let desired: ScriptType = determine_script_type(&msg);
|
|
|
|
// Discover runners and select a matching one
|
|
let runners = service.scan_runners(context_id).await?;
|
|
let Some(runner) = runners.into_iter().find(|r| r.script_type == desired) else {
|
|
let log = format!(
|
|
"No runner with script_type {:?} available in context {} for message {}",
|
|
desired, context_id, msg_key
|
|
);
|
|
let _ = service
|
|
.append_message_logs(context_id, caller_id, id, vec![log.clone()])
|
|
.await;
|
|
let _ = service
|
|
.update_message_status(context_id, caller_id, id, MessageStatus::Error)
|
|
.await;
|
|
return Err(log.into());
|
|
};
|
|
|
|
// Build SupervisorClient
|
|
let dest = if !runner.pubkey.trim().is_empty() {
|
|
Destination::Pk(runner.pubkey.clone())
|
|
} else {
|
|
Destination::Ip(runner.address)
|
|
};
|
|
// Keep clones for poller usage
|
|
let dest_for_poller = dest.clone();
|
|
let topic_for_poller = cfg.topic.clone();
|
|
let secret_for_poller = runner.secret.clone();
|
|
let client = cache
|
|
.get_or_create(
|
|
mycelium.clone(),
|
|
dest.clone(),
|
|
cfg.topic.clone(),
|
|
runner.secret.clone(),
|
|
)
|
|
.await;
|
|
|
|
// Build supervisor method and params from Message
|
|
let method = msg.message.clone();
|
|
let params = build_params(&msg)?;
|
|
|
|
// Send
|
|
// If this is a job.run and we have a secret configured on the client,
|
|
// prefer the typed wrapper that injects the secret into inner supervisor params,
|
|
// and also capture the inner supervisor JSON-RPC id for correlation.
|
|
let (out_id, inner_id_opt) = if method == "job.run" {
|
|
if let Some(j) = msg.job.first() {
|
|
let jv = job_to_json(j)?;
|
|
// Returns (outbound message id, inner supervisor JSON-RPC id)
|
|
let (out, inner) = client.job_run_with_ids(jv).await?;
|
|
(out, Some(inner))
|
|
} else {
|
|
// Fallback: no embedded job, use the generic call
|
|
let out = client.call(&method, params).await?;
|
|
(out, None)
|
|
}
|
|
} else {
|
|
let out = client.call(&method, params).await?;
|
|
(out, None)
|
|
};
|
|
|
|
// Store transport id and initial Sent status
|
|
let _ = service
|
|
.update_message_transport(
|
|
context_id,
|
|
caller_id,
|
|
id,
|
|
Some(out_id.clone()),
|
|
Some(TransportStatus::Sent),
|
|
)
|
|
.await;
|
|
|
|
// Mark as acknowledged on success
|
|
service
|
|
.update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged)
|
|
.await?;
|
|
|
|
// Record correlation (inner supervisor JSON-RPC id -> job/message) for inbound popMessage handling
|
|
if let (Some(inner_id), Some(job_id)) = (inner_id_opt, job_id_opt) {
|
|
let _ = service
|
|
.supcorr_set(inner_id, context_id, caller_id, job_id, id)
|
|
.await;
|
|
}
|
|
|
|
// Spawn transport-status poller
|
|
{
|
|
let service_poll = service.clone();
|
|
let poll_interval = std::time::Duration::from_secs(cfg.transport_poll_interval_secs);
|
|
let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs);
|
|
let out_id_cloned = out_id.clone();
|
|
let mycelium = mycelium.clone();
|
|
|
|
tokio::spawn(async move {
|
|
let start = std::time::Instant::now();
|
|
let client = mycelium;
|
|
|
|
// Supervisor call context captured for sync status checks
|
|
let sup_dest = dest_for_poller;
|
|
let sup_topic = topic_for_poller;
|
|
let job_id_opt = job_id_opt;
|
|
|
|
let mut last_status: Option<TransportStatus> = Some(TransportStatus::Sent);
|
|
|
|
loop {
|
|
if start.elapsed() >= poll_timeout {
|
|
let _ = service_poll
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
id,
|
|
vec!["Transport-status polling timed out".to_string()],
|
|
)
|
|
.await;
|
|
// leave last known status; do not override
|
|
break;
|
|
}
|
|
|
|
match client.message_status(&out_id_cloned).await {
|
|
Ok(s) => {
|
|
if last_status.as_ref() != Some(&s) {
|
|
let _ = service_poll
|
|
.update_message_transport(
|
|
context_id,
|
|
caller_id,
|
|
id,
|
|
None,
|
|
Some(s.clone()),
|
|
)
|
|
.await;
|
|
last_status = Some(s.clone());
|
|
}
|
|
|
|
// Stop on terminal states
|
|
if matches!(s, TransportStatus::Delivered | TransportStatus::Read) {
|
|
// On Read, request supervisor job.status asynchronously; inbound listener will handle replies
|
|
// if matches!(s, TransportStatus::Read)
|
|
// && let Some(job_id) = job_id_opt
|
|
if let Some(job_id) = job_id_opt {
|
|
let sup = cache
|
|
.get_or_create(
|
|
client.clone(),
|
|
sup_dest.clone(),
|
|
sup_topic.clone(),
|
|
secret_for_poller.clone(),
|
|
)
|
|
.await;
|
|
match sup.job_status_with_ids(job_id.to_string()).await {
|
|
Ok((_out_id, inner_id)) => {
|
|
// Correlate this status request to the message/job
|
|
let _ = service_poll
|
|
.supcorr_set(
|
|
inner_id, context_id, caller_id, job_id, id,
|
|
)
|
|
.await;
|
|
let _ = service_poll
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
id,
|
|
vec![format!(
|
|
"Requested supervisor job.status for job {}",
|
|
job_id
|
|
)],
|
|
)
|
|
.await;
|
|
}
|
|
Err(e) => {
|
|
let _ = service_poll
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
id,
|
|
vec![format!("job.status request error: {}", e)],
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
}
|
|
// break;
|
|
}
|
|
if matches!(s, TransportStatus::Failed) {
|
|
let _ = service_poll
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
id,
|
|
vec![format!(
|
|
"Transport failed for outbound id {out_id_cloned}"
|
|
)],
|
|
)
|
|
.await;
|
|
break;
|
|
}
|
|
}
|
|
Err(e) => {
|
|
// Log and continue polling
|
|
let _ = service_poll
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
id,
|
|
vec![format!("messageStatus query error: {e}")],
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
|
|
tokio::time::sleep(poll_interval).await;
|
|
}
|
|
});
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn determine_script_type(msg: &Message) -> ScriptType {
|
|
// Prefer embedded job's script_type if available, else fallback to message.message_type
|
|
match msg.job.first() {
|
|
Some(j) => j.script_type.clone(),
|
|
None => msg.message_type.clone(),
|
|
}
|
|
}
|
|
|
|
fn build_params(msg: &Message) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
|
|
// Minimal mapping:
|
|
// - "job.run" with exactly one embedded job: [{ "job": <job> }]
|
|
// - otherwise: []
|
|
if msg.message == "job.run"
|
|
&& let Some(j) = msg.job.first()
|
|
{
|
|
let jv = job_to_json(j)?;
|
|
return Ok(json!([ { "job": jv } ]));
|
|
}
|
|
|
|
Ok(json!([]))
|
|
}
|
|
|
|
fn job_to_json(job: &Job) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
|
|
Ok(serde_json::to_value(job)?)
|
|
}
|
|
|
|
fn parse_message_key(s: &str) -> Option<(u32, u32)> {
|
|
// Expect "message:{caller_id}:{id}"
|
|
let mut it = s.split(':');
|
|
match (it.next(), it.next(), it.next(), it.next()) {
|
|
(Some("message"), Some(caller), Some(id), None) => {
|
|
let caller_id = caller.parse::<u32>().ok()?;
|
|
let msg_id = id.parse::<u32>().ok()?;
|
|
Some((caller_id, msg_id))
|
|
}
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
/// Map supervisor job.status -> (local JobStatus, terminal)
|
|
fn map_supervisor_job_status(s: &str) -> Option<(JobStatus, bool)> {
|
|
match s {
|
|
"created" | "queued" => Some((JobStatus::Dispatched, false)),
|
|
"running" => Some((JobStatus::Started, false)),
|
|
"completed" => Some((JobStatus::Finished, true)),
|
|
"failed" | "timeout" => Some((JobStatus::Error, true)),
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
/// Auto-discover contexts periodically and ensure a router loop exists for each.
|
|
/// Returns a JoinHandle of the discovery task (router loops are detached).
|
|
pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task::JoinHandle<()> {
|
|
tokio::spawn(async move {
|
|
let mut active: HashSet<u32> = HashSet::new();
|
|
loop {
|
|
match service.list_context_ids().await {
|
|
Ok(ids) => {
|
|
for ctx_id in ids {
|
|
if !active.contains(&ctx_id) {
|
|
// Spawn a loop for this new context
|
|
let cfg_ctx = RouterConfig {
|
|
context_ids: vec![ctx_id],
|
|
..cfg.clone()
|
|
};
|
|
let _ = start_router(service.clone(), cfg_ctx);
|
|
active.insert(ctx_id);
|
|
info!(context_id = ctx_id, "Started loop for context");
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
error!(error=%e, "list_context_ids error");
|
|
}
|
|
}
|
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
|
}
|
|
})
|
|
}
|
|
|
|
/// Start a single global inbound listener that reads Mycelium popMessage with topic filter,
|
|
/// decodes supervisor JSON-RPC replies, and updates correlated jobs/messages.
|
|
/// This listens for async replies like {"result":{"job_queued":...}} carrying the same inner JSON-RPC id.
|
|
pub fn start_inbound_listener(
|
|
service: AppService,
|
|
cfg: RouterConfig,
|
|
) -> tokio::task::JoinHandle<()> {
|
|
tokio::spawn(async move {
|
|
// Initialize Mycelium client (retry loop)
|
|
let mycelium = loop {
|
|
match MyceliumClient::new(cfg.base_url.clone()) {
|
|
Ok(c) => break Arc::new(c),
|
|
Err(e) => {
|
|
error!(error=%e, "MyceliumClient init error (inbound listener)");
|
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|
}
|
|
}
|
|
};
|
|
|
|
let cache = Arc::new(SupervisorClientCache::new());
|
|
|
|
loop {
|
|
// Poll for inbound supervisor messages on the configured topic
|
|
match mycelium.pop_message(Some(false), Some(20), None).await {
|
|
Ok(Some(inb)) => {
|
|
// Expect InboundMessage with base64 "payload"
|
|
let Some(payload_b64) = inb.get("payload").and_then(|v| v.as_str()) else {
|
|
// Not a payload-bearing message; ignore
|
|
continue;
|
|
};
|
|
let Ok(raw) = BASE64_STANDARD.decode(payload_b64.as_bytes()) else {
|
|
let _ = service
|
|
.append_message_logs(
|
|
0, // unknown context yet
|
|
0,
|
|
0,
|
|
vec![
|
|
"Inbound payload base64 decode error (supervisor reply)".into(),
|
|
],
|
|
)
|
|
.await;
|
|
continue;
|
|
};
|
|
tracing::info!(
|
|
raw = %String::from_utf8_lossy(&raw),
|
|
"Read raw messge from mycelium"
|
|
);
|
|
let Ok(rpc): Result<Value, _> = serde_json::from_slice(&raw) else {
|
|
// Invalid JSON payload
|
|
continue;
|
|
};
|
|
|
|
// Extract inner supervisor JSON-RPC id (number preferred; string fallback)
|
|
let inner_id_u64 = match rpc.get("id") {
|
|
Some(Value::Number(n)) => n.as_u64(),
|
|
Some(Value::String(s)) => s.parse::<u64>().ok(),
|
|
_ => None,
|
|
};
|
|
let Some(inner_id) = inner_id_u64 else {
|
|
// Cannot correlate without id
|
|
continue;
|
|
};
|
|
|
|
// Lookup correlation mapping
|
|
match service.supcorr_get(inner_id).await {
|
|
Ok(Some((context_id, caller_id, job_id, message_id))) => {
|
|
// Determine success/error from supervisor JSON-RPC envelope
|
|
// Inspect result/error to route job.run/job.status/job.result replies
|
|
let result_opt = rpc.get("result");
|
|
let error_opt = rpc.get("error");
|
|
|
|
// Handle job.run success (job_queued)
|
|
let is_job_queued = result_opt
|
|
.and_then(|res| {
|
|
if res.get("job_queued").is_some() {
|
|
Some(true)
|
|
} else if let Some(s) = res.as_str() {
|
|
Some(s == "job_queued")
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.unwrap_or(false);
|
|
|
|
if is_job_queued {
|
|
// Set to Dispatched (idempotent) per spec, and append log
|
|
let _ = service
|
|
.update_job_status_unchecked(
|
|
context_id,
|
|
caller_id,
|
|
job_id,
|
|
JobStatus::Dispatched,
|
|
)
|
|
.await;
|
|
let _ = service
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
vec![format!(
|
|
"Supervisor reply for job {}: job_queued",
|
|
job_id
|
|
)],
|
|
)
|
|
.await;
|
|
let _ = service.supcorr_del(inner_id).await;
|
|
continue;
|
|
}
|
|
|
|
// Error envelope: set job Error and log
|
|
if let Some(err_obj) = error_opt {
|
|
let _ = service
|
|
.update_job_status_unchecked(
|
|
context_id,
|
|
caller_id,
|
|
job_id,
|
|
JobStatus::Error,
|
|
)
|
|
.await;
|
|
let _ = service
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
vec![format!(
|
|
"Supervisor error for job {}: {}",
|
|
job_id, err_obj
|
|
)],
|
|
)
|
|
.await;
|
|
let _ = service.supcorr_del(inner_id).await;
|
|
continue;
|
|
}
|
|
|
|
// If we have a result, try to interpret it as job.status or job.result
|
|
if let Some(res) = result_opt {
|
|
// Try job.status: object {status: "..."} or bare string
|
|
let status_candidate = res
|
|
.get("status")
|
|
.and_then(|v| v.as_str())
|
|
.or_else(|| res.as_str());
|
|
|
|
if let Some(remote_status) = status_candidate {
|
|
if let Some((mapped, terminal)) =
|
|
map_supervisor_job_status(remote_status)
|
|
{
|
|
// Update job status and log
|
|
let _ = service
|
|
.update_job_status_unchecked(
|
|
context_id,
|
|
caller_id,
|
|
job_id,
|
|
mapped.clone(),
|
|
)
|
|
.await;
|
|
let _ = service
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
vec![format!(
|
|
"Supervisor job.status for job {} -> {} (mapped to {:?})",
|
|
job_id, remote_status, mapped
|
|
)],
|
|
)
|
|
.await;
|
|
// Done with this correlation id
|
|
let _ = service.supcorr_del(inner_id).await;
|
|
|
|
// If terminal, request job.result asynchronously now
|
|
if terminal {
|
|
// Load job to determine script_type for runner selection
|
|
match service
|
|
.load_job(context_id, caller_id, job_id)
|
|
.await
|
|
{
|
|
Ok(job) => {
|
|
match service.scan_runners(context_id).await {
|
|
Ok(runners) => {
|
|
if let Some(runner) =
|
|
runners.into_iter().find(|r| {
|
|
r.script_type == job.script_type
|
|
})
|
|
{
|
|
let dest = if !runner
|
|
.pubkey
|
|
.trim()
|
|
.is_empty()
|
|
{
|
|
Destination::Pk(
|
|
runner.pubkey.clone(),
|
|
)
|
|
} else {
|
|
Destination::Ip(runner.address)
|
|
};
|
|
let sup = cache
|
|
.get_or_create(
|
|
mycelium.clone(),
|
|
dest,
|
|
cfg.topic.clone(),
|
|
runner.secret.clone(),
|
|
)
|
|
.await;
|
|
match sup
|
|
.job_result_with_ids(
|
|
job_id.to_string(),
|
|
)
|
|
.await
|
|
{
|
|
Ok((_out2, inner2)) => {
|
|
let _ = service
|
|
.supcorr_set(
|
|
inner2, context_id,
|
|
caller_id, job_id,
|
|
message_id,
|
|
)
|
|
.await;
|
|
let _ = service
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
vec![format!(
|
|
"Requested supervisor job.result for job {}",
|
|
job_id
|
|
)],
|
|
)
|
|
.await;
|
|
}
|
|
Err(e) => {
|
|
let _ = service
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
vec![format!(
|
|
"job.result request error for job {}: {}",
|
|
job_id, e
|
|
)],
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
} else {
|
|
let _ = service
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
vec![format!(
|
|
"No runner with matching script_type found to request job.result for job {}",
|
|
job_id
|
|
)],
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
Err(e) => {
|
|
let _ = service
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
vec![format!(
|
|
"scan_runners error while requesting job.result for job {}: {}",
|
|
job_id, e
|
|
)],
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
let _ = service
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
vec![format!(
|
|
"load_job error while requesting job.result for job {}: {}",
|
|
job_id, e
|
|
)],
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
}
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Try job.result: object with success/error or bare string treated as success
|
|
if let Some(obj) = res.as_object() {
|
|
if let Some(s) = obj.get("success").and_then(|v| v.as_str()) {
|
|
let mut patch = std::collections::HashMap::new();
|
|
patch.insert("success".to_string(), s.to_string());
|
|
let _ = service
|
|
.update_job_result_merge_unchecked(
|
|
context_id, caller_id, job_id, patch,
|
|
)
|
|
.await;
|
|
let _ = service
|
|
.update_message_status(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
MessageStatus::Processed,
|
|
)
|
|
.await;
|
|
let _ = service
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
vec![format!(
|
|
"Stored supervisor job.result for job {} (success)",
|
|
job_id
|
|
)],
|
|
)
|
|
.await;
|
|
let _ = service.supcorr_del(inner_id).await;
|
|
continue;
|
|
}
|
|
if let Some(s) = obj.get("error").and_then(|v| v.as_str()) {
|
|
let mut patch = std::collections::HashMap::new();
|
|
patch.insert("error".to_string(), s.to_string());
|
|
let _ = service
|
|
.update_job_result_merge_unchecked(
|
|
context_id, caller_id, job_id, patch,
|
|
)
|
|
.await;
|
|
let _ = service
|
|
.update_message_status(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
MessageStatus::Processed,
|
|
)
|
|
.await;
|
|
let _ = service
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
vec![format!(
|
|
"Stored supervisor job.result for job {} (error)",
|
|
job_id
|
|
)],
|
|
)
|
|
.await;
|
|
let _ = service.supcorr_del(inner_id).await;
|
|
continue;
|
|
}
|
|
} else if let Some(s) = res.as_str() {
|
|
// Bare string => treat as success
|
|
let mut patch = std::collections::HashMap::new();
|
|
patch.insert("success".to_string(), s.to_string());
|
|
let _ = service
|
|
.update_job_result_merge_unchecked(
|
|
context_id, caller_id, job_id, patch,
|
|
)
|
|
.await;
|
|
let _ = service
|
|
.update_message_status(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
MessageStatus::Processed,
|
|
)
|
|
.await;
|
|
let _ = service
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
vec![format!(
|
|
"Stored supervisor job.result for job {} (success)",
|
|
job_id
|
|
)],
|
|
)
|
|
.await;
|
|
let _ = service.supcorr_del(inner_id).await;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Unknown/unsupported supervisor reply; keep correlation for later
|
|
let _ = service
|
|
.append_message_logs(
|
|
context_id,
|
|
caller_id,
|
|
message_id,
|
|
vec![
|
|
"Supervisor reply did not contain recognizable job.run/status/result fields"
|
|
.to_string(),
|
|
],
|
|
)
|
|
.await;
|
|
}
|
|
Ok(None) => {
|
|
// No correlation found; ignore or log once
|
|
}
|
|
Err(e) => {
|
|
error!(error=%e, "supcorr_get error");
|
|
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
|
}
|
|
}
|
|
}
|
|
Ok(None) => {
|
|
// No message; continue polling
|
|
continue;
|
|
}
|
|
Err(e) => {
|
|
error!(error=%e, "popMessage error");
|
|
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|