Compare commits
4 Commits
97bcb55aaa
...
fb34b4e2f3
Author | SHA1 | Date | |
---|---|---|---|
|
fb34b4e2f3
|
||
|
2c88114d45
|
||
|
8de2597f19
|
||
|
3220f52956
|
@@ -86,13 +86,13 @@ impl MyceliumClient {
|
|||||||
&self,
|
&self,
|
||||||
id_hex: &str,
|
id_hex: &str,
|
||||||
) -> Result<TransportStatus, MyceliumClientError> {
|
) -> Result<TransportStatus, MyceliumClientError> {
|
||||||
let params = json!({ "id": id_hex });
|
let params = json!(id_hex);
|
||||||
let body = self.jsonrpc("messageStatus", params).await?;
|
let body = self.jsonrpc("getMessageInfo", params).await?;
|
||||||
let result = body.get("result").ok_or_else(|| {
|
let result = body.get("result").ok_or_else(|| {
|
||||||
MyceliumClientError::InvalidResponse(format!("missing result in response: {body}"))
|
MyceliumClientError::InvalidResponse(format!("missing result in response: {body}"))
|
||||||
})?;
|
})?;
|
||||||
// Accept both { status: "..."} and bare "..."
|
// Accept both { state: "..."} and bare "..."
|
||||||
let status_str = if let Some(s) = result.get("status").and_then(|v| v.as_str()) {
|
let status_str = if let Some(s) = result.get("state").and_then(|v| v.as_str()) {
|
||||||
s.to_string()
|
s.to_string()
|
||||||
} else if let Some(s) = result.as_str() {
|
} else if let Some(s) = result.as_str() {
|
||||||
s.to_string()
|
s.to_string()
|
||||||
@@ -101,18 +101,19 @@ impl MyceliumClient {
|
|||||||
"unexpected result shape: {result}"
|
"unexpected result shape: {result}"
|
||||||
)));
|
)));
|
||||||
};
|
};
|
||||||
Self::map_status(&status_str).ok_or_else(|| {
|
let status = Self::map_status(&status_str).ok_or_else(|| {
|
||||||
MyceliumClientError::InvalidResponse(format!("unknown status: {status_str}"))
|
MyceliumClientError::InvalidResponse(format!("unknown status: {status_str}"))
|
||||||
})
|
});
|
||||||
|
tracing::info!(%id_hex, status = %status.as_ref().unwrap(), "queried messages status");
|
||||||
|
status
|
||||||
}
|
}
|
||||||
|
|
||||||
fn map_status(s: &str) -> Option<TransportStatus> {
|
fn map_status(s: &str) -> Option<TransportStatus> {
|
||||||
match s {
|
match s {
|
||||||
"queued" => Some(TransportStatus::Queued),
|
"pending" => Some(TransportStatus::Queued),
|
||||||
"sent" => Some(TransportStatus::Sent),
|
"received" => Some(TransportStatus::Delivered),
|
||||||
"delivered" => Some(TransportStatus::Delivered),
|
|
||||||
"read" => Some(TransportStatus::Read),
|
"read" => Some(TransportStatus::Read),
|
||||||
"failed" => Some(TransportStatus::Failed),
|
"aborted" => Some(TransportStatus::Failed),
|
||||||
_ => None,
|
_ => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -193,66 +193,6 @@ impl SupervisorClient {
|
|||||||
Ok((out_id, inner_id))
|
Ok((out_id, inner_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Synchronous variant: wait for a JSON-RPC reply via Mycelium reply_timeout, and return the inner JSON-RPC "result".
|
|
||||||
/// If the supervisor returns an error object, map to RpcError.
|
|
||||||
pub async fn call_sync(
|
|
||||||
&self,
|
|
||||||
method: &str,
|
|
||||||
params: Value,
|
|
||||||
reply_timeout_secs: u64,
|
|
||||||
) -> Result<Value, SupervisorClientError> {
|
|
||||||
let inner = self.build_supervisor_payload(method, params);
|
|
||||||
let payload_b64 = Self::encode_payload(&inner)?;
|
|
||||||
|
|
||||||
let result = self
|
|
||||||
.mycelium
|
|
||||||
.push_message(
|
|
||||||
&self.destination,
|
|
||||||
&Self::encode_topic(self.topic.as_bytes()),
|
|
||||||
&payload_b64,
|
|
||||||
Some(reply_timeout_secs),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// Expect an InboundMessage-like with a base64 payload containing the supervisor JSON-RPC response
|
|
||||||
let payload_field = if let Some(p) = result.get("payload").and_then(|v| v.as_str()) {
|
|
||||||
p.to_string()
|
|
||||||
} else if let Some(arr) = result.as_array() {
|
|
||||||
// Defensive: handle single-element array shape
|
|
||||||
if let Some(one) = arr.get(0) {
|
|
||||||
one.get("payload")
|
|
||||||
.and_then(|v| v.as_str())
|
|
||||||
.map(|s| s.to_string())
|
|
||||||
.ok_or_else(|| {
|
|
||||||
SupervisorClientError::InvalidResponse(format!(
|
|
||||||
"missing payload in result: {result}"
|
|
||||||
))
|
|
||||||
})?
|
|
||||||
} else {
|
|
||||||
return Err(SupervisorClientError::TransportTimeout);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// No payload => no reply received within timeout (Mycelium would have returned just an id)
|
|
||||||
return Err(SupervisorClientError::TransportTimeout);
|
|
||||||
};
|
|
||||||
|
|
||||||
let raw = BASE64_STANDARD
|
|
||||||
.decode(payload_field.as_bytes())
|
|
||||||
.map_err(|e| {
|
|
||||||
SupervisorClientError::InvalidResponse(format!("invalid base64 payload: {e}"))
|
|
||||||
})?;
|
|
||||||
let rpc_resp: Value = serde_json::from_slice(&raw)?;
|
|
||||||
|
|
||||||
if let Some(err) = rpc_resp.get("error") {
|
|
||||||
return Err(SupervisorClientError::RpcError(err.to_string()));
|
|
||||||
}
|
|
||||||
let res = rpc_resp.get("result").ok_or_else(|| {
|
|
||||||
SupervisorClientError::InvalidResponse(format!(
|
|
||||||
"missing result in supervisor reply: {rpc_resp}"
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
Ok(res.clone())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn need_secret(&self) -> Result<&str, SupervisorClientError> {
|
fn need_secret(&self) -> Result<&str, SupervisorClientError> {
|
||||||
self.secret
|
self.secret
|
||||||
@@ -386,28 +326,15 @@ impl SupervisorClient {
|
|||||||
self.call("job.status", json!([job_id.into()])).await
|
self.call("job.status", json!([job_id.into()])).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Synchronous job.status: waits for the supervisor to reply and returns the status string.
|
/// Asynchronous job.status returning outbound and inner IDs for correlation
|
||||||
/// The supervisor result may be an object with { status: "..." } or a bare string.
|
pub async fn job_status_with_ids(
|
||||||
pub async fn job_status_sync(
|
|
||||||
&self,
|
&self,
|
||||||
job_id: impl Into<String>,
|
job_id: impl Into<String>,
|
||||||
reply_timeout_secs: u64,
|
) -> Result<(String, u64), SupervisorClientError> {
|
||||||
) -> Result<String, SupervisorClientError> {
|
self.call_with_ids("job.status", json!([job_id.into()])).await
|
||||||
let res = self
|
|
||||||
.call_sync("job.status", json!([job_id.into()]), reply_timeout_secs)
|
|
||||||
.await?;
|
|
||||||
let status = if let Some(s) = res.get("status").and_then(|v| v.as_str()) {
|
|
||||||
s.to_string()
|
|
||||||
} else if let Some(s) = res.as_str() {
|
|
||||||
s.to_string()
|
|
||||||
} else {
|
|
||||||
return Err(SupervisorClientError::InvalidResponse(format!(
|
|
||||||
"unexpected job.status result shape: {res}"
|
|
||||||
)));
|
|
||||||
};
|
|
||||||
Ok(status)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn job_result(
|
pub async fn job_result(
|
||||||
&self,
|
&self,
|
||||||
job_id: impl Into<String>,
|
job_id: impl Into<String>,
|
||||||
@@ -415,45 +342,15 @@ impl SupervisorClient {
|
|||||||
self.call("job.result", json!([job_id.into()])).await
|
self.call("job.result", json!([job_id.into()])).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Synchronous job.result: waits for the supervisor to reply and returns a map
|
/// Asynchronous job.result returning outbound and inner IDs for correlation
|
||||||
/// containing exactly one of:
|
pub async fn job_result_with_ids(
|
||||||
/// - {"success": "..."} on success
|
|
||||||
/// - {"error": "..."} on error reported by the runner
|
|
||||||
/// Some servers may return a bare string; we treat that as {"success": "<string>"}.
|
|
||||||
pub async fn job_result_sync(
|
|
||||||
&self,
|
&self,
|
||||||
job_id: impl Into<String>,
|
job_id: impl Into<String>,
|
||||||
reply_timeout_secs: u64,
|
) -> Result<(String, u64), SupervisorClientError> {
|
||||||
) -> Result<std::collections::HashMap<String, String>, SupervisorClientError> {
|
self.call_with_ids("job.result", json!([job_id.into()])).await
|
||||||
let res = self
|
|
||||||
.call_sync("job.result", json!([job_id.into()]), reply_timeout_secs)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
use std::collections::HashMap;
|
|
||||||
let mut out: HashMap<String, String> = HashMap::new();
|
|
||||||
|
|
||||||
if let Some(obj) = res.as_object() {
|
|
||||||
if let Some(s) = obj.get("success").and_then(|v| v.as_str()) {
|
|
||||||
out.insert("success".to_string(), s.to_string());
|
|
||||||
return Ok(out);
|
|
||||||
}
|
|
||||||
if let Some(s) = obj.get("error").and_then(|v| v.as_str()) {
|
|
||||||
out.insert("error".to_string(), s.to_string());
|
|
||||||
return Ok(out);
|
|
||||||
}
|
|
||||||
return Err(SupervisorClientError::InvalidResponse(format!(
|
|
||||||
"unexpected job.result result shape: {res}"
|
|
||||||
)));
|
|
||||||
} else if let Some(s) = res.as_str() {
|
|
||||||
out.insert("success".to_string(), s.to_string());
|
|
||||||
return Ok(out);
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(SupervisorClientError::InvalidResponse(format!(
|
|
||||||
"unexpected job.result result shape: {res}"
|
|
||||||
)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn job_stop(
|
pub async fn job_stop(
|
||||||
&self,
|
&self,
|
||||||
job_id: impl Into<String>,
|
job_id: impl Into<String>,
|
||||||
|
@@ -59,6 +59,18 @@ pub enum TransportStatus {
|
|||||||
Failed,
|
Failed,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for TransportStatus {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
TransportStatus::Queued => f.write_str("queued"),
|
||||||
|
TransportStatus::Sent => f.write_str("sent"),
|
||||||
|
TransportStatus::Delivered => f.write_str("delivered"),
|
||||||
|
TransportStatus::Read => f.write_str("read"),
|
||||||
|
TransportStatus::Failed => f.write_str("failed"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub enum MessageFormatType {
|
pub enum MessageFormatType {
|
||||||
Html,
|
Html,
|
||||||
|
580
src/router.rs
580
src/router.rs
@@ -1,9 +1,11 @@
|
|||||||
use std::{collections::HashSet, sync::Arc};
|
use std::{collections::{HashSet, HashMap}, sync::Arc};
|
||||||
|
|
||||||
use base64::Engine;
|
use base64::Engine;
|
||||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||||
use serde_json::{Value, json};
|
use serde_json::{Value, json};
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::{Semaphore, Mutex};
|
||||||
|
use std::hash::{Hash, Hasher};
|
||||||
|
use std::collections::hash_map::DefaultHasher;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
clients::{Destination, MyceliumClient, SupervisorClient},
|
clients::{Destination, MyceliumClient, SupervisorClient},
|
||||||
@@ -23,6 +25,88 @@ pub struct RouterConfig {
|
|||||||
pub transport_poll_timeout_secs: u64, // e.g. 300 (5 minutes)
|
pub transport_poll_timeout_secs: u64, // e.g. 300 (5 minutes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
SupervisorClient reuse cache (Router-local):
|
||||||
|
|
||||||
|
Rationale:
|
||||||
|
- SupervisorClient maintains an internal JSON-RPC id_counter per instance.
|
||||||
|
- Rebuilding a client for each message resets this counter, causing inner JSON-RPC ids to restart at 1.
|
||||||
|
- We reuse one SupervisorClient per (destination, topic, secret) to preserve monotonically increasing ids.
|
||||||
|
|
||||||
|
Scope:
|
||||||
|
- Cache is per Router loop (and a separate one for the inbound listener).
|
||||||
|
- If cross-loop/process reuse becomes necessary later, promote to a process-global cache.
|
||||||
|
|
||||||
|
Keying:
|
||||||
|
- Key: destination + topic + secret-presence (secret content hashed; not stored in plaintext).
|
||||||
|
|
||||||
|
Concurrency:
|
||||||
|
- tokio::Mutex protects a HashMap<String, Arc<SupervisorClient>>.
|
||||||
|
- Values are Arc so call sites clone cheaply and share the same id_counter.
|
||||||
|
*/
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct SupervisorClientCache {
|
||||||
|
map: Arc<Mutex<HashMap<String, Arc<SupervisorClient>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SupervisorClientCache {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
map: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_key(dest: &Destination, topic: &str, secret: &Option<String>) -> String {
|
||||||
|
let dst = match dest {
|
||||||
|
Destination::Ip(ip) => format!("ip:{ip}"),
|
||||||
|
Destination::Pk(pk) => format!("pk:{pk}"),
|
||||||
|
};
|
||||||
|
// Hash the secret to avoid storing plaintext in keys while still differentiating values
|
||||||
|
let sec_hash = match secret {
|
||||||
|
Some(s) if !s.is_empty() => {
|
||||||
|
let mut hasher = DefaultHasher::new();
|
||||||
|
s.hash(&mut hasher);
|
||||||
|
format!("s:{}", hasher.finish())
|
||||||
|
}
|
||||||
|
_ => "s:none".to_string(),
|
||||||
|
};
|
||||||
|
format!("{dst}|t:{topic}|{sec_hash}")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_or_create(
|
||||||
|
&self,
|
||||||
|
mycelium: Arc<MyceliumClient>,
|
||||||
|
dest: Destination,
|
||||||
|
topic: String,
|
||||||
|
secret: Option<String>,
|
||||||
|
) -> Arc<SupervisorClient> {
|
||||||
|
let key = Self::make_key(&dest, &topic, &secret);
|
||||||
|
|
||||||
|
{
|
||||||
|
let guard = self.map.lock().await;
|
||||||
|
if let Some(existing) = guard.get(&key) {
|
||||||
|
tracing::debug!(target: "router", cache="supervisor", hit=true, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache lookup");
|
||||||
|
return existing.clone();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut guard = self.map.lock().await;
|
||||||
|
if let Some(existing) = guard.get(&key) {
|
||||||
|
tracing::debug!(target: "router", cache="supervisor", hit=true, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache lookup (double-checked)");
|
||||||
|
return existing.clone();
|
||||||
|
}
|
||||||
|
let client = Arc::new(SupervisorClient::new_with_client(
|
||||||
|
mycelium,
|
||||||
|
dest,
|
||||||
|
topic.clone(),
|
||||||
|
secret.clone(),
|
||||||
|
));
|
||||||
|
guard.insert(key, client.clone());
|
||||||
|
tracing::debug!(target: "router", cache="supervisor", hit=false, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache insert");
|
||||||
|
client
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Start background router loops, one per context.
|
/// Start background router loops, one per context.
|
||||||
/// Each loop:
|
/// Each loop:
|
||||||
/// - BRPOP msg_out with 1s timeout
|
/// - BRPOP msg_out with 1s timeout
|
||||||
@@ -49,6 +133,8 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec<tokio::task::
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let cache = Arc::new(SupervisorClientCache::new());
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// Pop next message key (blocking with timeout)
|
// Pop next message key (blocking with timeout)
|
||||||
match service_cloned.brpop_msg_out(ctx_id, 1).await {
|
match service_cloned.brpop_msg_out(ctx_id, 1).await {
|
||||||
@@ -69,11 +155,12 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec<tokio::task::
|
|||||||
let cfg_task = cfg_cloned.clone();
|
let cfg_task = cfg_cloned.clone();
|
||||||
tokio::spawn({
|
tokio::spawn({
|
||||||
let mycelium = mycelium.clone();
|
let mycelium = mycelium.clone();
|
||||||
|
let cache = cache.clone();
|
||||||
async move {
|
async move {
|
||||||
// Ensure permit is dropped at end of task
|
// Ensure permit is dropped at end of task
|
||||||
let _permit = permit;
|
let _permit = permit;
|
||||||
if let Err(e) =
|
if let Err(e) =
|
||||||
deliver_one(&service_task, &cfg_task, ctx_id, &key, mycelium)
|
deliver_one(&service_task, &cfg_task, ctx_id, &key, mycelium, cache.clone())
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
error!(context_id=ctx_id, key=%key, error=%e, "Delivery error");
|
error!(context_id=ctx_id, key=%key, error=%e, "Delivery error");
|
||||||
@@ -104,6 +191,7 @@ async fn deliver_one(
|
|||||||
context_id: u32,
|
context_id: u32,
|
||||||
msg_key: &str,
|
msg_key: &str,
|
||||||
mycelium: Arc<MyceliumClient>,
|
mycelium: Arc<MyceliumClient>,
|
||||||
|
cache: Arc<SupervisorClientCache>,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
// Parse "message:{caller_id}:{id}"
|
// Parse "message:{caller_id}:{id}"
|
||||||
let (caller_id, id) = parse_message_key(msg_key)
|
let (caller_id, id) = parse_message_key(msg_key)
|
||||||
@@ -143,12 +231,14 @@ async fn deliver_one(
|
|||||||
let dest_for_poller = dest.clone();
|
let dest_for_poller = dest.clone();
|
||||||
let topic_for_poller = cfg.topic.clone();
|
let topic_for_poller = cfg.topic.clone();
|
||||||
let secret_for_poller = runner.secret.clone();
|
let secret_for_poller = runner.secret.clone();
|
||||||
let client = SupervisorClient::new_with_client(
|
let client = cache
|
||||||
mycelium.clone(),
|
.get_or_create(
|
||||||
dest.clone(),
|
mycelium.clone(),
|
||||||
cfg.topic.clone(),
|
dest.clone(),
|
||||||
runner.secret.clone(),
|
cfg.topic.clone(),
|
||||||
);
|
runner.secret.clone(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
// Build supervisor method and params from Message
|
// Build supervisor method and params from Message
|
||||||
let method = msg.message.clone();
|
let method = msg.message.clone();
|
||||||
@@ -204,12 +294,6 @@ async fn deliver_one(
|
|||||||
let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs);
|
let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs);
|
||||||
let out_id_cloned = out_id.clone();
|
let out_id_cloned = out_id.clone();
|
||||||
let mycelium = mycelium.clone();
|
let mycelium = mycelium.clone();
|
||||||
// Determine reply timeout for supervisor job.result: prefer message.timeout_result, fallback to router config timeout
|
|
||||||
let job_result_reply_timeout: u64 = if msg.timeout_result > 0 {
|
|
||||||
msg.timeout_result as u64
|
|
||||||
} else {
|
|
||||||
cfg.transport_poll_timeout_secs
|
|
||||||
};
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let start = std::time::Instant::now();
|
let start = std::time::Instant::now();
|
||||||
@@ -253,124 +337,37 @@ async fn deliver_one(
|
|||||||
|
|
||||||
// Stop on terminal states
|
// Stop on terminal states
|
||||||
if matches!(s, TransportStatus::Delivered | TransportStatus::Read) {
|
if matches!(s, TransportStatus::Delivered | TransportStatus::Read) {
|
||||||
// On Read, fetch supervisor job.status and update local job/message if terminal
|
// On Read, request supervisor job.status asynchronously; inbound listener will handle replies
|
||||||
if matches!(s, TransportStatus::Read)
|
// if matches!(s, TransportStatus::Read)
|
||||||
&& let Some(job_id) = job_id_opt
|
// && let Some(job_id) = job_id_opt
|
||||||
{
|
if let Some(job_id) = job_id_opt {
|
||||||
let sup = SupervisorClient::new_with_client(
|
let sup = cache
|
||||||
client.clone(),
|
.get_or_create(
|
||||||
sup_dest.clone(),
|
client.clone(),
|
||||||
sup_topic.clone(),
|
sup_dest.clone(),
|
||||||
secret_for_poller.clone(),
|
sup_topic.clone(),
|
||||||
);
|
secret_for_poller.clone(),
|
||||||
match sup.job_status_sync(job_id.to_string(), 10).await {
|
)
|
||||||
Ok(remote_status) => {
|
.await;
|
||||||
if let Some((mapped, terminal)) =
|
match sup.job_status_with_ids(job_id.to_string()).await {
|
||||||
map_supervisor_job_status(&remote_status)
|
Ok((_out_id, inner_id)) => {
|
||||||
{
|
// Correlate this status request to the message/job
|
||||||
if terminal {
|
let _ = service_poll
|
||||||
let _ = service_poll
|
.supcorr_set(
|
||||||
.update_job_status_unchecked(
|
inner_id, context_id, caller_id, job_id, id,
|
||||||
context_id,
|
)
|
||||||
caller_id,
|
.await;
|
||||||
job_id,
|
let _ = service_poll
|
||||||
mapped.clone(),
|
.append_message_logs(
|
||||||
)
|
context_id,
|
||||||
.await;
|
caller_id,
|
||||||
|
id,
|
||||||
// After terminal status, fetch supervisor job.result and store into Job.result
|
vec![format!(
|
||||||
let sup = SupervisorClient::new_with_client(
|
"Requested supervisor job.status for job {}",
|
||||||
client.clone(),
|
job_id
|
||||||
sup_dest.clone(),
|
)],
|
||||||
sup_topic.clone(),
|
)
|
||||||
secret_for_poller.clone(),
|
.await;
|
||||||
);
|
|
||||||
match sup
|
|
||||||
.job_result_sync(
|
|
||||||
job_id.to_string(),
|
|
||||||
job_result_reply_timeout,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(result_map) => {
|
|
||||||
// Persist the result into the Job.result map (merge)
|
|
||||||
let _ = service_poll
|
|
||||||
.update_job_result_merge_unchecked(
|
|
||||||
context_id,
|
|
||||||
caller_id,
|
|
||||||
job_id,
|
|
||||||
result_map.clone(),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
// Log which key was stored (success or error)
|
|
||||||
let key = result_map
|
|
||||||
.keys()
|
|
||||||
.next()
|
|
||||||
.cloned()
|
|
||||||
.unwrap_or_else(|| {
|
|
||||||
"unknown".to_string()
|
|
||||||
});
|
|
||||||
let _ = service_poll
|
|
||||||
.append_message_logs(
|
|
||||||
context_id,
|
|
||||||
caller_id,
|
|
||||||
id,
|
|
||||||
vec![format!(
|
|
||||||
"Stored supervisor job.result for job {} ({})",
|
|
||||||
job_id, key
|
|
||||||
)],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
let _ = service_poll
|
|
||||||
.append_message_logs(
|
|
||||||
context_id,
|
|
||||||
caller_id,
|
|
||||||
id,
|
|
||||||
vec![format!(
|
|
||||||
"job.result fetch error for job {}: {}",
|
|
||||||
job_id, e
|
|
||||||
)],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mark message as processed
|
|
||||||
let _ = service_poll
|
|
||||||
.update_message_status(
|
|
||||||
context_id,
|
|
||||||
caller_id,
|
|
||||||
id,
|
|
||||||
MessageStatus::Processed,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
let _ = service_poll
|
|
||||||
.append_message_logs(
|
|
||||||
context_id,
|
|
||||||
caller_id,
|
|
||||||
id,
|
|
||||||
vec![format!(
|
|
||||||
"Supervisor job.status for job {} -> {} (mapped to {:?})",
|
|
||||||
job_id, remote_status, mapped
|
|
||||||
)],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let _ = service_poll
|
|
||||||
.append_message_logs(
|
|
||||||
context_id,
|
|
||||||
caller_id,
|
|
||||||
id,
|
|
||||||
vec![format!(
|
|
||||||
"Unknown supervisor status '{}' for job {}",
|
|
||||||
remote_status, job_id
|
|
||||||
)],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let _ = service_poll
|
let _ = service_poll
|
||||||
@@ -378,13 +375,13 @@ async fn deliver_one(
|
|||||||
context_id,
|
context_id,
|
||||||
caller_id,
|
caller_id,
|
||||||
id,
|
id,
|
||||||
vec![format!("job.status sync error: {}", e)],
|
vec![format!("job.status request error: {}", e)],
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
// break;
|
||||||
}
|
}
|
||||||
if matches!(s, TransportStatus::Failed) {
|
if matches!(s, TransportStatus::Failed) {
|
||||||
let _ = service_poll
|
let _ = service_poll
|
||||||
@@ -512,7 +509,7 @@ pub fn start_inbound_listener(
|
|||||||
// Initialize Mycelium client (retry loop)
|
// Initialize Mycelium client (retry loop)
|
||||||
let mycelium = loop {
|
let mycelium = loop {
|
||||||
match MyceliumClient::new(cfg.base_url.clone()) {
|
match MyceliumClient::new(cfg.base_url.clone()) {
|
||||||
Ok(c) => break c,
|
Ok(c) => break Arc::new(c),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(error=%e, "MyceliumClient init error (inbound listener)");
|
error!(error=%e, "MyceliumClient init error (inbound listener)");
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||||
@@ -520,6 +517,8 @@ pub fn start_inbound_listener(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let cache = Arc::new(SupervisorClientCache::new());
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// Poll for inbound supervisor messages on the configured topic
|
// Poll for inbound supervisor messages on the configured topic
|
||||||
match mycelium.pop_message(Some(false), Some(20), None).await {
|
match mycelium.pop_message(Some(false), Some(20), None).await {
|
||||||
@@ -566,16 +565,25 @@ pub fn start_inbound_listener(
|
|||||||
match service.supcorr_get(inner_id).await {
|
match service.supcorr_get(inner_id).await {
|
||||||
Ok(Some((context_id, caller_id, job_id, message_id))) => {
|
Ok(Some((context_id, caller_id, job_id, message_id))) => {
|
||||||
// Determine success/error from supervisor JSON-RPC envelope
|
// Determine success/error from supervisor JSON-RPC envelope
|
||||||
let is_success = rpc
|
// Inspect result/error to route job.run/job.status/job.result replies
|
||||||
.get("result")
|
let result_opt = rpc.get("result");
|
||||||
.map(|res| {
|
let error_opt = rpc.get("error");
|
||||||
res.get("job_queued").is_some()
|
|
||||||
|| res.as_str().map(|s| s == "job_queued").unwrap_or(false)
|
// Handle job.run success (job_queued)
|
||||||
|
let is_job_queued = result_opt
|
||||||
|
.and_then(|res| {
|
||||||
|
if res.get("job_queued").is_some() {
|
||||||
|
Some(true)
|
||||||
|
} else if let Some(s) = res.as_str() {
|
||||||
|
Some(s == "job_queued")
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.unwrap_or(false);
|
.unwrap_or(false);
|
||||||
|
|
||||||
if is_success {
|
if is_job_queued {
|
||||||
// Set to Dispatched (idempotent) per spec choice, and append log
|
// Set to Dispatched (idempotent) per spec, and append log
|
||||||
let _ = service
|
let _ = service
|
||||||
.update_job_status_unchecked(
|
.update_job_status_unchecked(
|
||||||
context_id,
|
context_id,
|
||||||
@@ -596,8 +604,11 @@ pub fn start_inbound_listener(
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
let _ = service.supcorr_del(inner_id).await;
|
let _ = service.supcorr_del(inner_id).await;
|
||||||
} else if let Some(err_obj) = rpc.get("error") {
|
continue;
|
||||||
// Error path: set job Error and log details
|
}
|
||||||
|
|
||||||
|
// Error envelope: set job Error and log
|
||||||
|
if let Some(err_obj) = error_opt {
|
||||||
let _ = service
|
let _ = service
|
||||||
.update_job_status_unchecked(
|
.update_job_status_unchecked(
|
||||||
context_id,
|
context_id,
|
||||||
@@ -618,20 +629,273 @@ pub fn start_inbound_listener(
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
let _ = service.supcorr_del(inner_id).await;
|
let _ = service.supcorr_del(inner_id).await;
|
||||||
} else {
|
continue;
|
||||||
// Unknown result; keep correlation for a later, clearer reply
|
|
||||||
let _ = service
|
|
||||||
.append_message_logs(
|
|
||||||
context_id,
|
|
||||||
caller_id,
|
|
||||||
message_id,
|
|
||||||
vec![
|
|
||||||
"Supervisor reply did not contain job_queued or error"
|
|
||||||
.to_string(),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we have a result, try to interpret it as job.status or job.result
|
||||||
|
if let Some(res) = result_opt {
|
||||||
|
// Try job.status: object {status: "..."} or bare string
|
||||||
|
let status_candidate = res
|
||||||
|
.get("status")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.or_else(|| res.as_str());
|
||||||
|
|
||||||
|
if let Some(remote_status) = status_candidate {
|
||||||
|
if let Some((mapped, terminal)) =
|
||||||
|
map_supervisor_job_status(remote_status)
|
||||||
|
{
|
||||||
|
// Update job status and log
|
||||||
|
let _ = service
|
||||||
|
.update_job_status_unchecked(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
job_id,
|
||||||
|
mapped.clone(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
message_id,
|
||||||
|
vec![format!(
|
||||||
|
"Supervisor job.status for job {} -> {} (mapped to {:?})",
|
||||||
|
job_id, remote_status, mapped
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
// Done with this correlation id
|
||||||
|
let _ = service.supcorr_del(inner_id).await;
|
||||||
|
|
||||||
|
// If terminal, request job.result asynchronously now
|
||||||
|
if terminal {
|
||||||
|
// Load job to determine script_type for runner selection
|
||||||
|
match service
|
||||||
|
.load_job(context_id, caller_id, job_id)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(job) => {
|
||||||
|
match service.scan_runners(context_id).await {
|
||||||
|
Ok(runners) => {
|
||||||
|
if let Some(runner) =
|
||||||
|
runners.into_iter().find(|r| {
|
||||||
|
r.script_type == job.script_type
|
||||||
|
})
|
||||||
|
{
|
||||||
|
let dest = if !runner
|
||||||
|
.pubkey
|
||||||
|
.trim()
|
||||||
|
.is_empty()
|
||||||
|
{
|
||||||
|
Destination::Pk(
|
||||||
|
runner.pubkey.clone(),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
Destination::Ip(runner.address)
|
||||||
|
};
|
||||||
|
let sup = cache
|
||||||
|
.get_or_create(
|
||||||
|
mycelium.clone(),
|
||||||
|
dest,
|
||||||
|
cfg.topic.clone(),
|
||||||
|
runner.secret.clone(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
match sup
|
||||||
|
.job_result_with_ids(
|
||||||
|
job_id.to_string(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok((_out2, inner2)) => {
|
||||||
|
let _ = service
|
||||||
|
.supcorr_set(
|
||||||
|
inner2, context_id,
|
||||||
|
caller_id, job_id,
|
||||||
|
message_id,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
message_id,
|
||||||
|
vec![format!(
|
||||||
|
"Requested supervisor job.result for job {}",
|
||||||
|
job_id
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let _ = service
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
message_id,
|
||||||
|
vec![format!(
|
||||||
|
"job.result request error for job {}: {}",
|
||||||
|
job_id, e
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let _ = service
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
message_id,
|
||||||
|
vec![format!(
|
||||||
|
"No runner with matching script_type found to request job.result for job {}",
|
||||||
|
job_id
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let _ = service
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
message_id,
|
||||||
|
vec![format!(
|
||||||
|
"scan_runners error while requesting job.result for job {}: {}",
|
||||||
|
job_id, e
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let _ = service
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
message_id,
|
||||||
|
vec![format!(
|
||||||
|
"load_job error while requesting job.result for job {}: {}",
|
||||||
|
job_id, e
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try job.result: object with success/error or bare string treated as success
|
||||||
|
if let Some(obj) = res.as_object() {
|
||||||
|
if let Some(s) = obj.get("success").and_then(|v| v.as_str()) {
|
||||||
|
let mut patch = std::collections::HashMap::new();
|
||||||
|
patch.insert("success".to_string(), s.to_string());
|
||||||
|
let _ = service
|
||||||
|
.update_job_result_merge_unchecked(
|
||||||
|
context_id, caller_id, job_id, patch,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service
|
||||||
|
.update_message_status(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
message_id,
|
||||||
|
MessageStatus::Processed,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
message_id,
|
||||||
|
vec![format!(
|
||||||
|
"Stored supervisor job.result for job {} (success)",
|
||||||
|
job_id
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service.supcorr_del(inner_id).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if let Some(s) = obj.get("error").and_then(|v| v.as_str()) {
|
||||||
|
let mut patch = std::collections::HashMap::new();
|
||||||
|
patch.insert("error".to_string(), s.to_string());
|
||||||
|
let _ = service
|
||||||
|
.update_job_result_merge_unchecked(
|
||||||
|
context_id, caller_id, job_id, patch,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service
|
||||||
|
.update_message_status(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
message_id,
|
||||||
|
MessageStatus::Processed,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
message_id,
|
||||||
|
vec![format!(
|
||||||
|
"Stored supervisor job.result for job {} (error)",
|
||||||
|
job_id
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service.supcorr_del(inner_id).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else if let Some(s) = res.as_str() {
|
||||||
|
// Bare string => treat as success
|
||||||
|
let mut patch = std::collections::HashMap::new();
|
||||||
|
patch.insert("success".to_string(), s.to_string());
|
||||||
|
let _ = service
|
||||||
|
.update_job_result_merge_unchecked(
|
||||||
|
context_id, caller_id, job_id, patch,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service
|
||||||
|
.update_message_status(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
message_id,
|
||||||
|
MessageStatus::Processed,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
message_id,
|
||||||
|
vec![format!(
|
||||||
|
"Stored supervisor job.result for job {} (success)",
|
||||||
|
job_id
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let _ = service.supcorr_del(inner_id).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unknown/unsupported supervisor reply; keep correlation for later
|
||||||
|
let _ = service
|
||||||
|
.append_message_logs(
|
||||||
|
context_id,
|
||||||
|
caller_id,
|
||||||
|
message_id,
|
||||||
|
vec![
|
||||||
|
"Supervisor reply did not contain recognizable job.run/status/result fields"
|
||||||
|
.to_string(),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
// No correlation found; ignore or log once
|
// No correlation found; ignore or log once
|
||||||
|
Reference in New Issue
Block a user