Files
horus/bin/coordinator/src/service.rs
2025-11-14 00:35:26 +01:00

1172 lines
39 KiB
Rust

use crate::dag::{DagError, DagResult, FlowDag, NodeStatus, build_flow_dag};
use crate::models::{
Context, Flow, FlowStatus, Job, JobStatus, Message, MessageFormatType, MessageStatus,
Runner, TransportStatus,
};
use crate::storage::RedisDriver;
use serde::Serialize;
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{Duration, sleep};
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
#[derive(Debug)]
struct InvalidJobStatusTransition {
from: JobStatus,
to: JobStatus,
}
impl std::fmt::Display for InvalidJobStatusTransition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Invalid job status transition: {:?} -> {:?}",
self.from, self.to
)
}
}
impl std::error::Error for InvalidJobStatusTransition {}
#[derive(Debug)]
struct ValidationError {
msg: String,
}
impl ValidationError {
fn new(msg: impl Into<String>) -> Self {
Self { msg: msg.into() }
}
}
impl std::fmt::Display for ValidationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Validation error: {}", self.msg)
}
}
impl std::error::Error for ValidationError {}
#[derive(Debug)]
struct PermissionDeniedError {
actor_id: u32,
context_id: u32,
action: String,
}
impl std::fmt::Display for PermissionDeniedError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Permission denied: actor {} cannot {} in context {}",
self.actor_id, self.action, self.context_id
)
}
}
impl std::error::Error for PermissionDeniedError {}
#[derive(Debug)]
struct AlreadyExistsError {
key: String,
}
impl std::fmt::Display for AlreadyExistsError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Already exists: {}", self.key)
}
}
impl std::error::Error for AlreadyExistsError {}
// -----------------------------
// Internal helpers
// -----------------------------
fn as_json(model: &impl Serialize) -> Result<Value, BoxError> {
Ok(serde_json::to_value(model)?)
}
fn json_get_u32(v: &Value, key: &str) -> Result<u32, BoxError> {
v.get(key)
.and_then(|v| v.as_u64())
.map(|x| x as u32)
.ok_or_else(|| {
ValidationError::new(format!("missing or invalid u32 field '{}'", key)).into()
})
}
fn json_get_str(v: &Value, key: &str) -> Result<String, BoxError> {
v.get(key)
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.ok_or_else(|| {
ValidationError::new(format!("missing or invalid string field '{}'", key)).into()
})
}
fn json_get_array(v: &Value, key: &str) -> Result<Vec<Value>, BoxError> {
let arr = v
.get(key)
.and_then(|v| v.as_array())
.ok_or_else(|| ValidationError::new(format!("missing or invalid array field '{}'", key)))?;
Ok(arr.clone())
}
fn contains_key_not_found(e: &BoxError) -> bool {
e.to_string().contains("Key not found")
}
fn has_duplicate_u32s(list: &Vec<Value>) -> bool {
let mut seen = std::collections::HashSet::new();
for it in list {
if let Some(x) = it.as_u64() {
if !seen.insert(x) {
return true;
}
}
}
false
}
fn vec_u32_contains(list: &[Value], val: u32) -> bool {
list.iter().any(|v| v.as_u64() == Some(val as u64))
}
// role = "admins" | "executors" | "readers"
fn context_has_role(ctx: &Context, role: &str, actor_id: u32) -> Result<bool, BoxError> {
let v = as_json(ctx)?;
let arr = v
.get(role)
.and_then(|r| r.as_array())
.ok_or_else(|| ValidationError::new(format!("Context.{} missing or invalid", role)))?;
Ok(arr.iter().any(|x| x.as_u64() == Some(actor_id as u64)))
}
// -----------------------------
// Validation helpers (minimal, spec-aligned)
// -----------------------------
fn validate_context(ctx: &Context) -> Result<(), BoxError> {
let v = as_json(ctx)?;
let id = json_get_u32(&v, "id")?;
if id == 0 {
return Err(ValidationError::new("Context.id must be > 0").into());
}
// admins required
let admins = json_get_array(&v, "admins")?;
if admins.is_empty() {
return Err(ValidationError::new("Context.admins must not be empty").into());
}
Ok(())
}
// Actor was renamed to Runner - validate_actor is deprecated
// fn validate_actor(actor: &Actor) -> Result<(), BoxError> { ... }
fn validate_runner(_context_id: u32, runner: &Runner) -> Result<(), BoxError> {
let v = as_json(runner)?;
let id = json_get_u32(&v, "id")?;
if id == 0 {
return Err(ValidationError::new("Runner.id must be > 0").into());
}
let pubkey = json_get_str(&v, "pubkey")?;
if pubkey.trim().is_empty() {
return Err(ValidationError::new("Runner.pubkey must not be empty").into());
}
let topic = json_get_str(&v, "topic")?;
if topic.trim().is_empty() {
return Err(ValidationError::new("Runner.topic must not be empty").into());
}
// address presence is ensured by serde typing; no additional validation here
Ok(())
}
fn validate_flow(context_id: u32, flow: &Flow) -> Result<(), BoxError> {
let v = as_json(flow)?;
let id = json_get_u32(&v, "id")?;
if id == 0 {
return Err(ValidationError::new("Flow.id must be > 0").into());
}
let ctx = json_get_u32(&v, "context_id")?;
if ctx != context_id {
return Err(ValidationError::new(format!(
"Flow.context_id ({}) does not match path context_id ({})",
ctx, context_id
))
.into());
}
let jobs = json_get_array(&v, "jobs")?;
if has_duplicate_u32s(&jobs) {
return Err(ValidationError::new("Flow.jobs must not contain duplicates").into());
}
Ok(())
}
// Validation moved to Job model - use job.validate_required_fields() and job.validate_context()
fn validate_message(context_id: u32, msg: &Message) -> Result<(), BoxError> {
let v = as_json(msg)?;
let id = json_get_u32(&v, "id")?;
if id == 0 {
return Err(ValidationError::new("Message.id must be > 0").into());
}
let ctx = json_get_u32(&v, "context_id")?;
if ctx != context_id {
return Err(ValidationError::new(format!(
"Message.context_id ({}) does not match path context_id ({})",
ctx, context_id
))
.into());
}
let body = json_get_str(&v, "message")?;
if body.trim().is_empty() {
return Err(ValidationError::new("Message.message must not be empty").into());
}
let t = json_get_u32(&v, "timeout")?;
let ta = json_get_u32(&v, "timeout_ack")?;
let tr = json_get_u32(&v, "timeout_result")?;
if t == 0 || ta == 0 || tr == 0 {
return Err(ValidationError::new(
"Message timeouts (timeout|timeout_ack|timeout_result) must be > 0",
)
.into());
}
// Validate embedded jobs minimal consistency (caller/context match)
let jobs = json_get_array(&v, "job")?;
let msg_caller = json_get_u32(&v, "caller_id")?;
for jv in jobs {
if let Some(obj) = jv.as_object() {
let mut jid = 0u32;
if let Some(u) = obj.get("id").and_then(|x| x.as_u64()) {
jid = u as u32;
}
if let (Some(jctx), Some(jcaller)) = (
obj.get("context_id").and_then(|x| x.as_u64()),
obj.get("caller_id").and_then(|x| x.as_u64()),
) {
if jctx as u32 != context_id {
return Err(ValidationError::new(format!(
"Embedded Job {} context_id mismatch ({} != {})",
jid, jctx as u32, context_id
))
.into());
}
if jcaller as u32 != msg_caller {
return Err(ValidationError::new(format!(
"Embedded Job {} caller_id mismatch ({} != {})",
jid, jcaller as u32, msg_caller
))
.into());
}
}
}
}
Ok(())
}
// -----------------------------
// Service API
// -----------------------------
#[derive(Clone)]
pub struct AppService {
redis: Arc<RedisDriver>,
schedulers: Arc<Mutex<HashSet<(u32, u32)>>>,
}
impl AppService {
pub fn new(redis: RedisDriver) -> Self {
Self {
redis: Arc::new(redis),
schedulers: Arc::new(Mutex::new(HashSet::new())),
}
}
// -----------------------------
// Context
// -----------------------------
pub async fn create_context(&self, ctx: Context) -> Result<Context, BoxError> {
validate_context(&ctx)?;
// id
let v = as_json(&ctx)?;
let context_id = json_get_u32(&v, "id")?;
self.ensure_context_not_exists(context_id).await?;
self.redis.save_context(&ctx).await?;
Ok(ctx)
}
pub async fn load_context(&self, id: u32) -> Result<Context, BoxError> {
let ctx = self.redis.load_context(id).await?;
Ok(ctx)
}
// -----------------------------
// Actor (deprecated - renamed to Runner)
// -----------------------------
// pub async fn create_actor(&self, actor: Actor) -> Result<Actor, BoxError> { ... }
// pub async fn load_actor(&self, id: u32) -> Result<Actor, BoxError> { ... }
// -----------------------------
// Runner
// -----------------------------
pub async fn create_runner(&self, context_id: u32, runner: Runner) -> Result<Runner, BoxError> {
validate_runner(context_id, &runner)?;
let v = as_json(&runner)?;
let id = json_get_u32(&v, "id")?;
self.ensure_runner_not_exists(context_id, id).await?;
self.redis.save_runner(context_id, &runner).await?;
Ok(runner)
}
pub async fn load_runner(&self, context_id: u32, id: u32) -> Result<Runner, BoxError> {
let runner = self.redis.load_runner(context_id, id).await?;
Ok(runner)
}
// -----------------------------
// Flow
// -----------------------------
pub async fn create_flow(&self, context_id: u32, flow: Flow) -> Result<Flow, BoxError> {
validate_flow(context_id, &flow)?;
// Permission: require that flow.caller_id is admin in the context
let v = as_json(&flow)?;
let fid = json_get_u32(&v, "id")?;
let caller_id = json_get_u32(&v, "caller_id")?;
self.require_admin(context_id, caller_id, "create flow")
.await?;
self.ensure_flow_not_exists(context_id, fid).await?;
// Require that the context exists
let _ = self.redis.load_context(context_id).await?;
self.redis.save_flow(context_id, &flow).await?;
Ok(flow)
}
pub async fn load_flow(&self, context_id: u32, id: u32) -> Result<Flow, BoxError> {
let flow = self.redis.load_flow(context_id, id).await?;
Ok(flow)
}
pub async fn flow_dag(&self, context_id: u32, flow_id: u32) -> DagResult<FlowDag> {
build_flow_dag(&self.redis, context_id, flow_id).await
}
/// Start a background scheduler for a flow.
/// - Ticks every 1 second.
/// - Sets Flow status to Started immediately.
/// - Dispatches jobs whose dependencies are Finished: creates a Message and LPUSHes its key into msg_out,
/// and marks the job status to Dispatched.
/// - When all jobs are Finished sets Flow to Finished; if any job is Error sets Flow to Error.
/// Returns:
/// - Ok(true) if a scheduler was started
/// - Ok(false) if a scheduler was already running for this (context_id, flow_id)
pub async fn flow_start(&self, context_id: u32, flow_id: u32) -> Result<bool, BoxError> {
// Ensure flow exists (and load caller_id)
let flow = self.redis.load_flow(context_id, flow_id).await?;
let caller_id = flow.caller_id();
// Try to register this flow in the active scheduler set
{
let mut guard = self.schedulers.lock().await;
if !guard.insert((context_id, flow_id)) {
// Already running
return Ok(false);
}
}
// Clone resources for background task
let redis = self.redis.clone();
let schedulers = self.schedulers.clone();
// Set Flow status to Started
let _ = redis
.update_flow_status(context_id, flow_id, FlowStatus::Started)
.await;
tokio::spawn(async move {
// Background loop
loop {
// Build DAG from flow
let dag = match build_flow_dag(&redis, context_id, flow_id).await {
Ok(d) => d,
Err(_) => break, // Flow missing or error
};
// Get ready nodes (dependencies satisfied, not yet dispatched)
let ready_node_ids = match dag.ready_jobs() {
Ok(ids) => ids,
Err(_) => {
// DAG error (e.g., failed job), mark flow as error and exit
let _ = redis
.update_flow_status(context_id, flow_id, FlowStatus::Error)
.await;
break;
}
};
// Dispatch ready nodes
for node_id in ready_node_ids {
let node = match dag.nodes.get(&node_id) {
Some(n) => n,
None => continue,
};
// Load the job data
let job = match redis.load_job(context_id, caller_id, node_id).await {
Ok(j) => j,
Err(_) => continue,
};
// Build Message with FlowNode for routing
let ts = crate::time::current_timestamp();
let msg_id: u32 = node_id; // Use node_id as message_id
let message = Message {
id: msg_id,
caller_id: job.caller_id.parse().unwrap_or(0),
context_id,
flow_id,
message: "job.run".to_string(),
message_type: job.executor.clone(),
message_format_type: MessageFormatType::Text,
timeout: job.timeout as u32,
timeout_ack: 10,
timeout_result: job.timeout as u32,
transport_id: None,
transport_status: None,
nodes: vec![node.clone()], // Include FlowNode for routing
job: vec![job.clone()],
logs: Vec::new(),
created_at: ts,
updated_at: ts,
status: MessageStatus::Dispatched,
};
// Persist the message and enqueue it
if redis.save_message(context_id, &message).await.is_ok() {
let caller_id_u32 = job.caller_id.parse::<u32>().unwrap_or(0);
let _ = redis.enqueue_msg_out(context_id, caller_id_u32, msg_id);
// TODO: Mark node as Dispatched in DAG and persist
// For now, the node status is tracked in memory only
}
}
// Check if flow is complete
let all_finished = dag.completed.len() == dag.nodes.len();
let any_error = dag.failed_job.is_some();
if any_error {
let _ = redis
.update_flow_status(context_id, flow_id, FlowStatus::Error)
.await;
break;
}
if all_finished {
let _ = redis
.update_flow_status(context_id, flow_id, FlowStatus::Finished)
.await;
break;
}
sleep(Duration::from_secs(1)).await;
}
// Remove from active schedulers set
let mut guard = schedulers.lock().await;
guard.remove(&(context_id, flow_id));
});
Ok(true)
}
/// Execute a flow: compute DAG, create Message entries for ready jobs, and enqueue their keys to msg_out.
/// Returns the list of enqueued message keys ("message:{caller_id}:{id}") in deterministic order (by job id).
pub async fn flow_execute(&self, context_id: u32, flow_id: u32) -> DagResult<Vec<String>> {
let dag = build_flow_dag(&self.redis, context_id, flow_id).await?;
let mut ready = dag.ready_jobs()?;
ready.sort_unstable();
let mut queued: Vec<String> = Vec::with_capacity(ready.len());
for jid in ready {
// Load the concrete Job
let job = self
.redis
.load_job(context_id, dag.caller_id, jid)
.await
.map_err(DagError::from)?;
// Build a Message that embeds this job
let ts = crate::time::current_timestamp();
let msg_id: u32 = job.id.parse().unwrap_or(0); // deterministic; adjust strategy later if needed
let message = Message {
id: msg_id,
caller_id: job.caller_id.parse().unwrap_or(0),
context_id,
flow_id, // Add flow_id for DAG tracking
message: "job.run".to_string(),
message_type: job.executor.clone(),
message_format_type: MessageFormatType::Text,
timeout: job.timeout as u32,
timeout_ack: 10,
timeout_result: job.timeout as u32,
transport_id: None,
transport_status: None,
nodes: Vec::new(), // TODO: Add FlowNode from DAG
job: vec![job.clone()],
logs: Vec::new(),
created_at: ts,
updated_at: ts,
status: MessageStatus::Dispatched,
};
// Persist the Message and enqueue its key to the outbound queue
let _ = self
.create_message(context_id, message)
.await
.map_err(DagError::from)?;
let caller_id_u32 = job.caller_id.parse::<u32>().unwrap_or(0);
self.redis
.enqueue_msg_out(context_id, caller_id_u32, msg_id)
.await
.map_err(DagError::from)?;
let key = format!("message:{}:{}", caller_id_u32, msg_id);
queued.push(key);
}
Ok(queued)
}
// -----------------------------
// Job
// -----------------------------
pub async fn create_job(&self, context_id: u32, job: Job) -> Result<Job, BoxError> {
// Validation removed - Job validation now handled at creation time
let v = as_json(&job)?;
let id = json_get_u32(&v, "id")?;
let caller_id = json_get_u32(&v, "caller_id")?;
self.ensure_job_not_exists(context_id, caller_id, id)
.await?;
self.redis.save_job(context_id, &job).await?;
Ok(job)
}
pub async fn load_job(
&self,
context_id: u32,
caller_id: u32,
id: u32,
) -> Result<Job, BoxError> {
let job = self.redis.load_job(context_id, caller_id, id).await?;
Ok(job)
}
/// Update a job status with transition validation.
///
/// Allowed transitions:
/// - Dispatched -> WaitingForPrerequisites | Started | Error
/// - WaitingForPrerequisites -> Started | Error
/// - Started -> Finished | Error
/// - Finished, Error -> terminal (no transitions)
///
/// If the new status equals the current status, this is a no-op.
/// Update node status in the DAG with transition validation.
///
/// Allowed transitions:
/// - Pending -> Ready | Dispatched | Cancelled
/// - Ready -> Dispatched | Cancelled
/// - Dispatched -> Running | Failed | Cancelled
/// - Running -> Completed | Failed | Cancelled
/// - Completed, Failed, Cancelled -> terminal (no transitions)
///
/// If the new status equals the current status, this is a no-op (idempotent).
pub async fn update_node_status(
&self,
context_id: u32,
executor_id: u32,
flow_id: u32,
node_id: u32,
new_status: NodeStatus,
) -> Result<(), BoxError> {
self.require_executor(context_id, executor_id, "update node status")
.await?;
// Load the DAG
let mut dag = build_flow_dag(&self.redis, context_id, flow_id).await?;
// Get current node status
let node = dag.nodes.get(&node_id)
.ok_or_else(|| format!("Node {} not found in flow {}", node_id, flow_id))?;
let current = node.node_status.clone();
if new_status == current {
// Idempotent: don't touch storage if no change
return Ok(());
}
// Validate state transition
let allowed = match current {
NodeStatus::Pending => matches!(
new_status,
NodeStatus::Ready | NodeStatus::Dispatched | NodeStatus::Cancelled
),
NodeStatus::Ready => matches!(
new_status,
NodeStatus::Dispatched | NodeStatus::Cancelled
),
NodeStatus::Dispatched => matches!(
new_status,
NodeStatus::Running | NodeStatus::Failed | NodeStatus::Cancelled
),
NodeStatus::Running => matches!(
new_status,
NodeStatus::Completed | NodeStatus::Failed | NodeStatus::Cancelled
),
NodeStatus::Completed | NodeStatus::Failed | NodeStatus::Cancelled => false,
};
if !allowed {
return Err(format!(
"Invalid node status transition from {:?} to {:?}",
current, new_status
).into());
}
// Update the node status
if let Some(node) = dag.nodes.get_mut(&node_id) {
node.node_status = new_status;
// Persist the updated DAG
// TODO: Implement DAG persistence
// self.redis.save_flow_dag(context_id, flow_id, &dag).await?;
}
Ok(())
}
/// Bypass-permission variant to update node status with transition validation.
/// This skips the executor permission check but enforces the same state transition rules.
pub async fn update_node_status_unchecked(
&self,
context_id: u32,
flow_id: u32,
node_id: u32,
new_status: NodeStatus,
) -> Result<(), BoxError> {
// Load the DAG
let mut dag = build_flow_dag(&self.redis, context_id, flow_id).await?;
// Get current node status
let node = dag.nodes.get(&node_id)
.ok_or_else(|| format!("Node {} not found in flow {}", node_id, flow_id))?;
let current = node.node_status.clone();
if new_status == current {
// Idempotent: don't touch storage if no change
return Ok(());
}
// Validate state transition
let allowed = match current {
NodeStatus::Pending => matches!(
new_status,
NodeStatus::Ready | NodeStatus::Dispatched | NodeStatus::Cancelled
),
NodeStatus::Ready => matches!(
new_status,
NodeStatus::Dispatched | NodeStatus::Cancelled
),
NodeStatus::Dispatched => matches!(
new_status,
NodeStatus::Running | NodeStatus::Failed | NodeStatus::Cancelled
),
NodeStatus::Running => matches!(
new_status,
NodeStatus::Completed | NodeStatus::Failed | NodeStatus::Cancelled
),
NodeStatus::Completed | NodeStatus::Failed | NodeStatus::Cancelled => false,
};
if !allowed {
return Err(format!(
"Invalid node status transition from {:?} to {:?}",
current, new_status
).into());
}
// Update the node status
if let Some(node) = dag.nodes.get_mut(&node_id) {
node.node_status = new_status.clone();
// Update DAG runtime state for ready_jobs() to work correctly
match new_status {
NodeStatus::Dispatched | NodeStatus::Running => {
dag.started.insert(node_id);
}
NodeStatus::Completed => {
dag.started.insert(node_id);
dag.completed.insert(node_id);
}
NodeStatus::Failed => {
dag.started.insert(node_id);
dag.failed_job = Some(node_id);
}
_ => {}
}
// Persist the updated DAG
// TODO: Implement DAG persistence to Redis
// For now, the DAG is rebuilt each time, so runtime state is lost
// self.redis.save_flow_dag(context_id, flow_id, &dag).await?;
}
Ok(())
}
// -----------------------------
// Message
// -----------------------------
pub async fn create_message(
&self,
context_id: u32,
message: Message,
) -> Result<Message, BoxError> {
validate_message(context_id, &message)?;
let v = as_json(&message)?;
let id = json_get_u32(&v, "id")?;
let caller_id = json_get_u32(&v, "caller_id")?;
self.ensure_message_not_exists(context_id, caller_id, id)
.await?;
self.redis.save_message(context_id, &message).await?;
Ok(message)
}
pub async fn load_message(
&self,
context_id: u32,
caller_id: u32,
id: u32,
) -> Result<Message, BoxError> {
let msg = self.redis.load_message(context_id, caller_id, id).await?;
Ok(msg)
}
pub async fn update_flow_status(
&self,
context_id: u32,
requestor_id: u32,
id: u32,
new_status: FlowStatus,
) -> Result<(), BoxError> {
self.require_admin(context_id, requestor_id, "update flow status")
.await?;
let flow = self.redis.load_flow(context_id, id).await?;
let v = as_json(&flow)?;
let cur_raw = v
.get("status")
.cloned()
.unwrap_or(Value::String("Dispatched".to_string()));
let cur = match cur_raw {
Value::String(s) if s == "Dispatched" => FlowStatus::Dispatched,
Value::String(s) if s == "Started" => FlowStatus::Started,
Value::String(s) if s == "Error" => FlowStatus::Error,
Value::String(s) if s == "Finished" => FlowStatus::Finished,
_ => FlowStatus::Dispatched,
};
if cur == new_status {
return Ok(());
}
let allowed = match cur {
FlowStatus::Created => matches!(new_status, FlowStatus::Dispatched | FlowStatus::Error),
FlowStatus::Dispatched => matches!(new_status, FlowStatus::Started | FlowStatus::Error),
FlowStatus::Started => matches!(new_status, FlowStatus::Finished | FlowStatus::Error),
FlowStatus::Finished | FlowStatus::Error => false,
};
if !allowed {
return Err(ValidationError::new(format!(
"Invalid flow status transition: {:?} -> {:?}",
cur, new_status
))
.into());
}
self.redis
.update_flow_status(context_id, id, new_status)
.await
}
pub async fn update_message_status(
&self,
context_id: u32,
caller_id: u32,
id: u32,
new_status: MessageStatus,
) -> Result<(), BoxError> {
let msg = self.redis.load_message(context_id, caller_id, id).await?;
let v = as_json(&msg)?;
let cur_raw = v
.get("status")
.cloned()
.unwrap_or(Value::String("Dispatched".to_string()));
let cur = match cur_raw {
Value::String(s) if s == "Dispatched" => MessageStatus::Dispatched,
Value::String(s) if s == "Acknowledged" => MessageStatus::Acknowledged,
Value::String(s) if s == "Error" => MessageStatus::Error,
Value::String(s) if s == "Processed" => MessageStatus::Processed,
_ => MessageStatus::Dispatched,
};
if cur == new_status {
return Ok(());
}
let allowed = match cur {
MessageStatus::Dispatched => {
matches!(
new_status,
MessageStatus::Acknowledged | MessageStatus::Error
)
}
MessageStatus::Acknowledged => {
matches!(new_status, MessageStatus::Processed | MessageStatus::Error)
}
MessageStatus::Processed | MessageStatus::Error => false,
};
if !allowed {
return Err(ValidationError::new(format!(
"Invalid message status transition: {:?} -> {:?}",
cur, new_status
))
.into());
}
self.redis
.update_message_status(context_id, caller_id, id, new_status)
.await
}
pub async fn update_message_transport(
&self,
context_id: u32,
caller_id: u32,
id: u32,
transport_id: Option<String>,
transport_status: Option<TransportStatus>,
) -> Result<(), BoxError> {
// Ensure message exists (provides clearer error)
let _ = self.redis.load_message(context_id, caller_id, id).await?;
self.redis
.update_message_transport(context_id, caller_id, id, transport_id, transport_status)
.await
}
pub async fn update_flow_env_vars_merge(
&self,
context_id: u32,
requestor_id: u32,
flow_id: u32,
patch: HashMap<String, String>,
) -> Result<(), BoxError> {
self.require_admin(context_id, requestor_id, "update flow env_vars")
.await?;
// Ensure flow exists
let _ = self.redis.load_flow(context_id, flow_id).await?;
self.redis
.update_flow_env_vars_merge(context_id, flow_id, patch)
.await
}
pub async fn update_flow_result_merge(
&self,
context_id: u32,
requestor_id: u32,
flow_id: u32,
patch: HashMap<String, String>,
) -> Result<(), BoxError> {
self.require_admin(context_id, requestor_id, "update flow result")
.await?;
let _ = self.redis.load_flow(context_id, flow_id).await?;
self.redis
.update_flow_result_merge(context_id, flow_id, patch)
.await
}
pub async fn update_flow_jobs_add_remove(
&self,
context_id: u32,
requestor_id: u32,
flow_id: u32,
add: Vec<u32>,
remove: Vec<u32>,
) -> Result<(), BoxError> {
self.require_admin(context_id, requestor_id, "update flow jobs")
.await?;
let flow = self.redis.load_flow(context_id, flow_id).await?;
let mut set: std::collections::BTreeSet<u32> = flow.jobs().iter().copied().collect();
for a in add {
set.insert(a);
}
for r in remove {
set.remove(&r);
}
let new_jobs: Vec<u32> = set.into_iter().collect();
self.redis
.update_flow_jobs_set(context_id, flow_id, new_jobs)
.await
}
pub async fn update_job_env_vars_merge(
&self,
context_id: u32,
requestor_id: u32,
caller_id: u32,
job_id: u32,
patch: HashMap<String, String>,
) -> Result<(), BoxError> {
self.require_admin(context_id, requestor_id, "update job env_vars")
.await?;
let _ = self.redis.load_job(context_id, caller_id, job_id).await?;
self.redis
.update_job_env_vars_merge(context_id, caller_id, job_id, patch)
.await
}
pub async fn update_job_result_merge(
&self,
context_id: u32,
requestor_id: u32,
caller_id: u32,
job_id: u32,
patch: HashMap<String, String>,
) -> Result<(), BoxError> {
// Allow if admin OR executor
let ctx = self.redis.load_context(context_id).await?;
let is_admin = context_has_role(&ctx, "admins", requestor_id)?;
let is_exec = context_has_role(&ctx, "executors", requestor_id)?;
if !(is_admin || is_exec) {
return Err(Box::new(PermissionDeniedError {
actor_id: requestor_id,
context_id,
action: "update job result".to_string(),
}));
}
let _ = self.redis.load_job(context_id, caller_id, job_id).await?;
self.redis
.update_job_result_merge(context_id, caller_id, job_id, patch)
.await
}
/// Bypass-permission variant to merge into a job's result field.
/// Intended for internal router/scheduler use where no actor identity is present.
pub async fn update_job_result_merge_unchecked(
&self,
context_id: u32,
caller_id: u32,
job_id: u32,
patch: HashMap<String, String>,
) -> Result<(), BoxError> {
// Ensure job exists, then write directly
let _ = self.redis.load_job(context_id, caller_id, job_id).await?;
self.redis
.update_job_result_merge(context_id, caller_id, job_id, patch)
.await
}
pub async fn append_message_logs(
&self,
context_id: u32,
caller_id: u32,
id: u32,
new_logs: Vec<String>,
) -> Result<(), BoxError> {
let _ = self.redis.load_message(context_id, caller_id, id).await?;
self.redis
.append_message_logs(context_id, caller_id, id, new_logs)
.await
}
}
// -----------------------------
// Existence checks (strict create) and permissions
// -----------------------------
impl AppService {
async fn ensure_context_not_exists(&self, id: u32) -> Result<(), BoxError> {
match self.redis.load_context(id).await {
Ok(_) => Err(Box::new(AlreadyExistsError {
key: format!("context:{}", id),
})),
Err(e) => {
if contains_key_not_found(&e) {
Ok(())
} else {
Err(e)
}
}
}
}
async fn ensure_runner_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> {
match self.redis.load_runner(db, id).await {
Ok(_) => Err(Box::new(AlreadyExistsError {
key: format!("runner:{}", id),
})),
Err(e) => {
if contains_key_not_found(&e) {
Ok(())
} else {
Err(e)
}
}
}
}
async fn ensure_flow_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> {
match self.redis.load_flow(db, id).await {
Ok(_) => Err(Box::new(AlreadyExistsError {
key: format!("flow:{}", id),
})),
Err(e) => {
if contains_key_not_found(&e) {
Ok(())
} else {
Err(e)
}
}
}
}
async fn ensure_job_not_exists(
&self,
db: u32,
caller_id: u32,
id: u32,
) -> Result<(), BoxError> {
match self.redis.load_job(db, caller_id, id).await {
Ok(_) => Err(Box::new(AlreadyExistsError {
key: format!("job:{}:{}", caller_id, id),
})),
Err(e) => {
if contains_key_not_found(&e) {
Ok(())
} else {
Err(e)
}
}
}
}
async fn ensure_message_not_exists(
&self,
db: u32,
caller_id: u32,
id: u32,
) -> Result<(), BoxError> {
match self.redis.load_message(db, caller_id, id).await {
Ok(_) => Err(Box::new(AlreadyExistsError {
key: format!("message:{}:{}", caller_id, id),
})),
Err(e) => {
if contains_key_not_found(&e) {
Ok(())
} else {
Err(e)
}
}
}
}
async fn require_admin(
&self,
context_id: u32,
actor_id: u32,
action: &str,
) -> Result<(), BoxError> {
let ctx = self.redis.load_context(context_id).await?;
let ok = context_has_role(&ctx, "admins", actor_id)?;
if !ok {
return Err(Box::new(PermissionDeniedError {
actor_id,
context_id,
action: action.to_string(),
}));
}
Ok(())
}
async fn require_executor(
&self,
context_id: u32,
actor_id: u32,
action: &str,
) -> Result<(), BoxError> {
let ctx = self.redis.load_context(context_id).await?;
let ok = context_has_role(&ctx, "executors", actor_id)?;
if !ok {
return Err(Box::new(PermissionDeniedError {
actor_id,
context_id,
action: action.to_string(),
}));
}
Ok(())
}
}
/// Router/helper wrappers exposed on AppService so background tasks don't need direct Redis access.
impl AppService {
/// Block-pop from the per-context msg_out queue with a timeout (seconds).
/// Returns Some(message_key) like "message:{caller_id}:{id}" or None on timeout.
pub async fn brpop_msg_out(
&self,
context_id: u32,
timeout_secs: usize,
) -> Result<Option<String>, BoxError> {
self.redis.brpop_msg_out(context_id, timeout_secs).await
}
/// Scan all runner:* in the given context and return deserialized Runner entries.
pub async fn scan_runners(&self, context_id: u32) -> Result<Vec<Runner>, BoxError> {
self.redis.scan_runners(context_id).await
}
/// Correlation map: store mapping from inner supervisor JSON-RPC id to context/caller/job/message.
pub async fn supcorr_set(
&self,
inner_id: u64,
context_id: u32,
caller_id: u32,
job_id: u32,
message_id: u32,
) -> Result<(), BoxError> {
self.redis
.supcorr_set(inner_id, context_id, caller_id, job_id, message_id)
.await
.map_err(Into::into)
}
/// Correlation map: load mapping by inner supervisor JSON-RPC id.
pub async fn supcorr_get(
&self,
inner_id: u64,
) -> Result<Option<(u32, u32, u32, u32)>, BoxError> {
self.redis.supcorr_get(inner_id).await.map_err(Into::into)
}
/// Correlation map: delete mapping by inner supervisor JSON-RPC id.
pub async fn supcorr_del(&self, inner_id: u64) -> Result<(), BoxError> {
self.redis.supcorr_del(inner_id).await.map_err(Into::into)
}
}
/// Auto-discovery helpers for contexts (wrappers over RedisDriver)
impl AppService {
pub async fn list_context_ids(&self) -> Result<Vec<u32>, BoxError> {
self.redis.list_context_ids().await
}
}