1078 lines
36 KiB
Rust
1078 lines
36 KiB
Rust
use crate::dag::{DagError, DagResult, FlowDag, build_flow_dag};
|
|
use crate::models::{
|
|
Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageFormatType, MessageStatus,
|
|
Runner,
|
|
};
|
|
use crate::storage::RedisDriver;
|
|
|
|
use serde::Serialize;
|
|
use serde_json::Value;
|
|
use std::collections::{HashMap, HashSet};
|
|
use std::sync::Arc;
|
|
use tokio::sync::Mutex;
|
|
use tokio::time::{sleep, Duration};
|
|
|
|
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
|
|
|
|
#[derive(Debug)]
|
|
struct InvalidJobStatusTransition {
|
|
from: JobStatus,
|
|
to: JobStatus,
|
|
}
|
|
|
|
impl std::fmt::Display for InvalidJobStatusTransition {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
write!(
|
|
f,
|
|
"Invalid job status transition: {:?} -> {:?}",
|
|
self.from, self.to
|
|
)
|
|
}
|
|
}
|
|
impl std::error::Error for InvalidJobStatusTransition {}
|
|
|
|
#[derive(Debug)]
|
|
struct ValidationError {
|
|
msg: String,
|
|
}
|
|
impl ValidationError {
|
|
fn new(msg: impl Into<String>) -> Self {
|
|
Self { msg: msg.into() }
|
|
}
|
|
}
|
|
impl std::fmt::Display for ValidationError {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
write!(f, "Validation error: {}", self.msg)
|
|
}
|
|
}
|
|
impl std::error::Error for ValidationError {}
|
|
|
|
#[derive(Debug)]
|
|
struct PermissionDeniedError {
|
|
actor_id: u32,
|
|
context_id: u32,
|
|
action: String,
|
|
}
|
|
impl std::fmt::Display for PermissionDeniedError {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
write!(
|
|
f,
|
|
"Permission denied: actor {} cannot {} in context {}",
|
|
self.actor_id, self.action, self.context_id
|
|
)
|
|
}
|
|
}
|
|
impl std::error::Error for PermissionDeniedError {}
|
|
|
|
#[derive(Debug)]
|
|
struct AlreadyExistsError {
|
|
key: String,
|
|
}
|
|
impl std::fmt::Display for AlreadyExistsError {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
write!(f, "Already exists: {}", self.key)
|
|
}
|
|
}
|
|
impl std::error::Error for AlreadyExistsError {}
|
|
|
|
// -----------------------------
|
|
// Internal helpers
|
|
// -----------------------------
|
|
|
|
fn as_json(model: &impl Serialize) -> Result<Value, BoxError> {
|
|
Ok(serde_json::to_value(model)?)
|
|
}
|
|
|
|
fn json_get_u32(v: &Value, key: &str) -> Result<u32, BoxError> {
|
|
v.get(key)
|
|
.and_then(|v| v.as_u64())
|
|
.map(|x| x as u32)
|
|
.ok_or_else(|| {
|
|
ValidationError::new(format!("missing or invalid u32 field '{}'", key)).into()
|
|
})
|
|
}
|
|
|
|
fn json_get_str(v: &Value, key: &str) -> Result<String, BoxError> {
|
|
v.get(key)
|
|
.and_then(|v| v.as_str())
|
|
.map(|s| s.to_string())
|
|
.ok_or_else(|| {
|
|
ValidationError::new(format!("missing or invalid string field '{}'", key)).into()
|
|
})
|
|
}
|
|
|
|
fn json_get_array(v: &Value, key: &str) -> Result<Vec<Value>, BoxError> {
|
|
let arr = v
|
|
.get(key)
|
|
.and_then(|v| v.as_array())
|
|
.ok_or_else(|| ValidationError::new(format!("missing or invalid array field '{}'", key)))?;
|
|
Ok(arr.clone())
|
|
}
|
|
|
|
fn contains_key_not_found(e: &BoxError) -> bool {
|
|
e.to_string().contains("Key not found")
|
|
}
|
|
|
|
fn has_duplicate_u32s(list: &Vec<Value>) -> bool {
|
|
let mut seen = std::collections::HashSet::new();
|
|
for it in list {
|
|
if let Some(x) = it.as_u64() {
|
|
if !seen.insert(x) {
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
false
|
|
}
|
|
|
|
fn vec_u32_contains(list: &[Value], val: u32) -> bool {
|
|
list.iter().any(|v| v.as_u64() == Some(val as u64))
|
|
}
|
|
|
|
// role = "admins" | "executors" | "readers"
|
|
fn context_has_role(ctx: &Context, role: &str, actor_id: u32) -> Result<bool, BoxError> {
|
|
let v = as_json(ctx)?;
|
|
let arr = v
|
|
.get(role)
|
|
.and_then(|r| r.as_array())
|
|
.ok_or_else(|| ValidationError::new(format!("Context.{} missing or invalid", role)))?;
|
|
Ok(arr.iter().any(|x| x.as_u64() == Some(actor_id as u64)))
|
|
}
|
|
|
|
// -----------------------------
|
|
// Validation helpers (minimal, spec-aligned)
|
|
// -----------------------------
|
|
|
|
fn validate_context(ctx: &Context) -> Result<(), BoxError> {
|
|
let v = as_json(ctx)?;
|
|
let id = json_get_u32(&v, "id")?;
|
|
if id == 0 {
|
|
return Err(ValidationError::new("Context.id must be > 0").into());
|
|
}
|
|
// admins required
|
|
let admins = json_get_array(&v, "admins")?;
|
|
if admins.is_empty() {
|
|
return Err(ValidationError::new("Context.admins must not be empty").into());
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn validate_actor(_context_id: u32, actor: &Actor) -> Result<(), BoxError> {
|
|
let v = as_json(actor)?;
|
|
let id = json_get_u32(&v, "id")?;
|
|
if id == 0 {
|
|
return Err(ValidationError::new("Actor.id must be > 0").into());
|
|
}
|
|
let pubkey = json_get_str(&v, "pubkey")?;
|
|
if pubkey.trim().is_empty() {
|
|
return Err(ValidationError::new("Actor.pubkey must not be empty").into());
|
|
}
|
|
let addr = json_get_array(&v, "address")?;
|
|
if addr.is_empty() {
|
|
return Err(ValidationError::new("Actor.address must not be empty").into());
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn validate_runner(_context_id: u32, runner: &Runner) -> Result<(), BoxError> {
|
|
let v = as_json(runner)?;
|
|
let id = json_get_u32(&v, "id")?;
|
|
if id == 0 {
|
|
return Err(ValidationError::new("Runner.id must be > 0").into());
|
|
}
|
|
let pubkey = json_get_str(&v, "pubkey")?;
|
|
if pubkey.trim().is_empty() {
|
|
return Err(ValidationError::new("Runner.pubkey must not be empty").into());
|
|
}
|
|
let topic = json_get_str(&v, "topic")?;
|
|
if topic.trim().is_empty() {
|
|
return Err(ValidationError::new("Runner.topic must not be empty").into());
|
|
}
|
|
// address presence is ensured by serde typing; no additional validation here
|
|
Ok(())
|
|
}
|
|
|
|
fn validate_flow(context_id: u32, flow: &Flow) -> Result<(), BoxError> {
|
|
let v = as_json(flow)?;
|
|
let id = json_get_u32(&v, "id")?;
|
|
if id == 0 {
|
|
return Err(ValidationError::new("Flow.id must be > 0").into());
|
|
}
|
|
let ctx = json_get_u32(&v, "context_id")?;
|
|
if ctx != context_id {
|
|
return Err(ValidationError::new(format!(
|
|
"Flow.context_id ({}) does not match path context_id ({})",
|
|
ctx, context_id
|
|
))
|
|
.into());
|
|
}
|
|
let jobs = json_get_array(&v, "jobs")?;
|
|
if has_duplicate_u32s(&jobs) {
|
|
return Err(ValidationError::new("Flow.jobs must not contain duplicates").into());
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn validate_job(context_id: u32, job: &Job) -> Result<(), BoxError> {
|
|
let v = as_json(job)?;
|
|
let id = json_get_u32(&v, "id")?;
|
|
if id == 0 {
|
|
return Err(ValidationError::new("Job.id must be > 0").into());
|
|
}
|
|
let ctx = json_get_u32(&v, "context_id")?;
|
|
if ctx != context_id {
|
|
return Err(ValidationError::new(format!(
|
|
"Job.context_id ({}) does not match path context_id ({})",
|
|
ctx, context_id
|
|
))
|
|
.into());
|
|
}
|
|
let script = json_get_str(&v, "script")?;
|
|
if script.trim().is_empty() {
|
|
return Err(ValidationError::new("Job.script must not be empty").into());
|
|
}
|
|
let timeout = json_get_u32(&v, "timeout")?;
|
|
if timeout == 0 {
|
|
return Err(ValidationError::new("Job.timeout must be > 0").into());
|
|
}
|
|
let depends = json_get_array(&v, "depends")?;
|
|
if has_duplicate_u32s(&depends) {
|
|
return Err(ValidationError::new("Job.depends must not contain duplicates").into());
|
|
}
|
|
if vec_u32_contains(&depends, id) {
|
|
return Err(ValidationError::new("Job.depends must not include the job's own id").into());
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn validate_message(context_id: u32, msg: &Message) -> Result<(), BoxError> {
|
|
let v = as_json(msg)?;
|
|
let id = json_get_u32(&v, "id")?;
|
|
if id == 0 {
|
|
return Err(ValidationError::new("Message.id must be > 0").into());
|
|
}
|
|
let ctx = json_get_u32(&v, "context_id")?;
|
|
if ctx != context_id {
|
|
return Err(ValidationError::new(format!(
|
|
"Message.context_id ({}) does not match path context_id ({})",
|
|
ctx, context_id
|
|
))
|
|
.into());
|
|
}
|
|
let body = json_get_str(&v, "message")?;
|
|
if body.trim().is_empty() {
|
|
return Err(ValidationError::new("Message.message must not be empty").into());
|
|
}
|
|
let t = json_get_u32(&v, "timeout")?;
|
|
let ta = json_get_u32(&v, "timeout_ack")?;
|
|
let tr = json_get_u32(&v, "timeout_result")?;
|
|
if t == 0 || ta == 0 || tr == 0 {
|
|
return Err(ValidationError::new(
|
|
"Message timeouts (timeout|timeout_ack|timeout_result) must be > 0",
|
|
)
|
|
.into());
|
|
}
|
|
// Validate embedded jobs minimal consistency (caller/context match)
|
|
let jobs = json_get_array(&v, "job")?;
|
|
let msg_caller = json_get_u32(&v, "caller_id")?;
|
|
for jv in jobs {
|
|
if let Some(obj) = jv.as_object() {
|
|
let mut jid = 0u32;
|
|
if let Some(u) = obj.get("id").and_then(|x| x.as_u64()) {
|
|
jid = u as u32;
|
|
}
|
|
if let (Some(jctx), Some(jcaller)) = (
|
|
obj.get("context_id").and_then(|x| x.as_u64()),
|
|
obj.get("caller_id").and_then(|x| x.as_u64()),
|
|
) {
|
|
if jctx as u32 != context_id {
|
|
return Err(ValidationError::new(format!(
|
|
"Embedded Job {} context_id mismatch ({} != {})",
|
|
jid, jctx as u32, context_id
|
|
))
|
|
.into());
|
|
}
|
|
if jcaller as u32 != msg_caller {
|
|
return Err(ValidationError::new(format!(
|
|
"Embedded Job {} caller_id mismatch ({} != {})",
|
|
jid, jcaller as u32, msg_caller
|
|
))
|
|
.into());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
// -----------------------------
|
|
// Service API
|
|
// -----------------------------
|
|
|
|
pub struct AppService {
|
|
redis: Arc<RedisDriver>,
|
|
schedulers: Arc<Mutex<HashSet<(u32, u32)>>>,
|
|
}
|
|
|
|
impl AppService {
|
|
pub fn new(redis: RedisDriver) -> Self {
|
|
Self {
|
|
redis: Arc::new(redis),
|
|
schedulers: Arc::new(Mutex::new(HashSet::new())),
|
|
}
|
|
}
|
|
|
|
// -----------------------------
|
|
// Context
|
|
// -----------------------------
|
|
pub async fn create_context(&self, ctx: Context) -> Result<Context, BoxError> {
|
|
validate_context(&ctx)?;
|
|
// id
|
|
let v = as_json(&ctx)?;
|
|
let context_id = json_get_u32(&v, "id")?;
|
|
self.ensure_context_not_exists(context_id).await?;
|
|
self.redis.save_context(&ctx).await?;
|
|
Ok(ctx)
|
|
}
|
|
|
|
pub async fn load_context(&self, id: u32) -> Result<Context, BoxError> {
|
|
let ctx = self.redis.load_context(id).await?;
|
|
Ok(ctx)
|
|
}
|
|
|
|
// -----------------------------
|
|
// Actor
|
|
// -----------------------------
|
|
pub async fn create_actor(&self, context_id: u32, actor: Actor) -> Result<Actor, BoxError> {
|
|
validate_actor(context_id, &actor)?;
|
|
let v = as_json(&actor)?;
|
|
let id = json_get_u32(&v, "id")?;
|
|
self.ensure_actor_not_exists(context_id, id).await?;
|
|
self.redis.save_actor(context_id, &actor).await?;
|
|
Ok(actor)
|
|
}
|
|
|
|
pub async fn load_actor(&self, context_id: u32, id: u32) -> Result<Actor, BoxError> {
|
|
let actor = self.redis.load_actor(context_id, id).await?;
|
|
Ok(actor)
|
|
}
|
|
|
|
// -----------------------------
|
|
// Runner
|
|
// -----------------------------
|
|
pub async fn create_runner(&self, context_id: u32, runner: Runner) -> Result<Runner, BoxError> {
|
|
validate_runner(context_id, &runner)?;
|
|
let v = as_json(&runner)?;
|
|
let id = json_get_u32(&v, "id")?;
|
|
self.ensure_runner_not_exists(context_id, id).await?;
|
|
self.redis.save_runner(context_id, &runner).await?;
|
|
Ok(runner)
|
|
}
|
|
|
|
pub async fn load_runner(&self, context_id: u32, id: u32) -> Result<Runner, BoxError> {
|
|
let runner = self.redis.load_runner(context_id, id).await?;
|
|
Ok(runner)
|
|
}
|
|
|
|
// -----------------------------
|
|
// Flow
|
|
// -----------------------------
|
|
pub async fn create_flow(&self, context_id: u32, flow: Flow) -> Result<Flow, BoxError> {
|
|
validate_flow(context_id, &flow)?;
|
|
|
|
// Permission: require that flow.caller_id is admin in the context
|
|
let v = as_json(&flow)?;
|
|
let fid = json_get_u32(&v, "id")?;
|
|
let caller_id = json_get_u32(&v, "caller_id")?;
|
|
self.require_admin(context_id, caller_id, "create flow")
|
|
.await?;
|
|
|
|
self.ensure_flow_not_exists(context_id, fid).await?;
|
|
// Require that the context exists
|
|
let _ = self.redis.load_context(context_id).await?;
|
|
self.redis.save_flow(context_id, &flow).await?;
|
|
Ok(flow)
|
|
}
|
|
|
|
pub async fn load_flow(&self, context_id: u32, id: u32) -> Result<Flow, BoxError> {
|
|
let flow = self.redis.load_flow(context_id, id).await?;
|
|
Ok(flow)
|
|
}
|
|
|
|
pub async fn flow_dag(&self, context_id: u32, flow_id: u32) -> DagResult<FlowDag> {
|
|
build_flow_dag(&self.redis, context_id, flow_id).await
|
|
}
|
|
|
|
/// Start a background scheduler for a flow.
|
|
/// - Ticks every 1 second.
|
|
/// - Sets Flow status to Started immediately.
|
|
/// - Dispatches jobs whose dependencies are Finished: creates a Message and LPUSHes its key into msg_out,
|
|
/// and marks the job status to Dispatched.
|
|
/// - When all jobs are Finished sets Flow to Finished; if any job is Error sets Flow to Error.
|
|
/// Returns:
|
|
/// - Ok(true) if a scheduler was started
|
|
/// - Ok(false) if a scheduler was already running for this (context_id, flow_id)
|
|
pub async fn flow_start(&self, context_id: u32, flow_id: u32) -> Result<bool, BoxError> {
|
|
// Ensure flow exists (and load caller_id)
|
|
let flow = self.redis.load_flow(context_id, flow_id).await?;
|
|
let caller_id = flow.caller_id();
|
|
|
|
// Try to register this flow in the active scheduler set
|
|
{
|
|
let mut guard = self.schedulers.lock().await;
|
|
if !guard.insert((context_id, flow_id)) {
|
|
// Already running
|
|
return Ok(false);
|
|
}
|
|
}
|
|
|
|
// Clone resources for background task
|
|
let redis = self.redis.clone();
|
|
let schedulers = self.schedulers.clone();
|
|
|
|
// Set Flow status to Started
|
|
let _ = redis
|
|
.update_flow_status(context_id, flow_id, FlowStatus::Started)
|
|
.await;
|
|
|
|
tokio::spawn(async move {
|
|
// Background loop
|
|
loop {
|
|
// Load current flow; stop if missing
|
|
let flow = match redis.load_flow(context_id, flow_id).await {
|
|
Ok(f) => f,
|
|
Err(_) => break,
|
|
};
|
|
|
|
// Track aggregate state
|
|
let mut all_finished = true;
|
|
let mut any_error = false;
|
|
|
|
// Iterate jobs declared in the flow
|
|
for jid in flow.jobs() {
|
|
// Load job
|
|
let job = match redis.load_job(context_id, caller_id, *jid).await {
|
|
Ok(j) => j,
|
|
Err(_) => {
|
|
// If job is missing treat as error state for the flow and stop
|
|
any_error = true;
|
|
all_finished = false;
|
|
break;
|
|
}
|
|
};
|
|
|
|
match job.status() {
|
|
JobStatus::Finished => {
|
|
// done
|
|
}
|
|
JobStatus::Error => {
|
|
any_error = true;
|
|
all_finished = false;
|
|
}
|
|
JobStatus::Dispatched | JobStatus::Started => {
|
|
all_finished = false;
|
|
}
|
|
JobStatus::WaitingForPrerequisites => {
|
|
all_finished = false;
|
|
|
|
// Check dependencies complete
|
|
let mut deps_ok = true;
|
|
for dep in job.depends() {
|
|
match redis.load_job(context_id, caller_id, *dep).await {
|
|
Ok(dj) => {
|
|
if dj.status() != JobStatus::Finished {
|
|
deps_ok = false;
|
|
break;
|
|
}
|
|
}
|
|
Err(_) => {
|
|
deps_ok = false;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if deps_ok {
|
|
// Build Message embedding this job
|
|
let ts = crate::time::current_timestamp();
|
|
let msg_id: u32 = job.id(); // deterministic message id per job for now
|
|
|
|
let message = Message {
|
|
id: msg_id,
|
|
caller_id: job.caller_id(),
|
|
context_id,
|
|
message: "job.run".to_string(),
|
|
message_type: job.script_type(),
|
|
message_format_type: MessageFormatType::Text,
|
|
timeout: job.timeout,
|
|
timeout_ack: 10,
|
|
timeout_result: job.timeout,
|
|
job: vec![job.clone()],
|
|
logs: Vec::new(),
|
|
created_at: ts,
|
|
updated_at: ts,
|
|
status: MessageStatus::Dispatched,
|
|
};
|
|
|
|
// Persist the message and enqueue it
|
|
if redis.save_message(context_id, &message).await.is_ok() {
|
|
let _ = redis
|
|
.enqueue_msg_out(context_id, job.caller_id(), msg_id)
|
|
.await;
|
|
// Mark job as Dispatched
|
|
let _ = redis
|
|
.update_job_status(
|
|
context_id,
|
|
job.caller_id(),
|
|
job.id(),
|
|
JobStatus::Dispatched,
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if any_error {
|
|
let _ = redis
|
|
.update_flow_status(context_id, flow_id, FlowStatus::Error)
|
|
.await;
|
|
break;
|
|
}
|
|
if all_finished {
|
|
let _ = redis
|
|
.update_flow_status(context_id, flow_id, FlowStatus::Finished)
|
|
.await;
|
|
break;
|
|
}
|
|
|
|
sleep(Duration::from_secs(1)).await;
|
|
}
|
|
|
|
// Remove from active schedulers set
|
|
let mut guard = schedulers.lock().await;
|
|
guard.remove(&(context_id, flow_id));
|
|
});
|
|
|
|
Ok(true)
|
|
}
|
|
|
|
/// Start a background scheduler for a flow.
|
|
/// - Ticks every 1 second.
|
|
/// - Sets Flow status to Started immediately.
|
|
/// - Dispatches jobs whose dependencies are Finished: creates a Message and LPUSHes its key into msg_out,
|
|
/// and marks the job status to Dispatched.
|
|
/// - When all jobs are Finished sets Flow to Finished; if any job is Error sets Flow to Error.
|
|
/// Returns:
|
|
/// - Ok(true) if a scheduler was started
|
|
|
|
/// Execute a flow: compute DAG, create Message entries for ready jobs, and enqueue their keys to msg_out.
|
|
/// Returns the list of enqueued message keys ("message:{caller_id}:{id}") in deterministic order (by job id).
|
|
pub async fn flow_execute(&self, context_id: u32, flow_id: u32) -> DagResult<Vec<String>> {
|
|
let dag = build_flow_dag(&self.redis, context_id, flow_id).await?;
|
|
let mut ready = dag.ready_jobs()?;
|
|
ready.sort_unstable();
|
|
|
|
let mut queued: Vec<String> = Vec::with_capacity(ready.len());
|
|
for jid in ready {
|
|
// Load the concrete Job
|
|
let job = self
|
|
.redis
|
|
.load_job(context_id, dag.caller_id, jid)
|
|
.await
|
|
.map_err(DagError::from)?;
|
|
|
|
// Build a Message that embeds this job
|
|
let ts = crate::time::current_timestamp();
|
|
let msg_id: u32 = job.id(); // deterministic; adjust strategy later if needed
|
|
|
|
let message = Message {
|
|
id: msg_id,
|
|
caller_id: job.caller_id(),
|
|
context_id,
|
|
message: "job.run".to_string(),
|
|
message_type: job.script_type(), // uses ScriptType (matches model)
|
|
message_format_type: MessageFormatType::Text,
|
|
timeout: job.timeout,
|
|
timeout_ack: 10,
|
|
timeout_result: job.timeout,
|
|
job: vec![job.clone()],
|
|
logs: Vec::new(),
|
|
created_at: ts,
|
|
updated_at: ts,
|
|
status: MessageStatus::Dispatched,
|
|
};
|
|
|
|
// Persist the Message and enqueue its key to the outbound queue
|
|
let _ = self
|
|
.create_message(context_id, message)
|
|
.await
|
|
.map_err(DagError::from)?;
|
|
|
|
self.redis
|
|
.enqueue_msg_out(context_id, job.caller_id(), msg_id)
|
|
.await
|
|
.map_err(DagError::from)?;
|
|
|
|
let key = format!("message:{}:{}", job.caller_id(), msg_id);
|
|
queued.push(key);
|
|
}
|
|
|
|
Ok(queued)
|
|
}
|
|
|
|
// -----------------------------
|
|
// Job
|
|
// -----------------------------
|
|
pub async fn create_job(&self, context_id: u32, job: Job) -> Result<Job, BoxError> {
|
|
validate_job(context_id, &job)?;
|
|
let v = as_json(&job)?;
|
|
let id = json_get_u32(&v, "id")?;
|
|
let caller_id = json_get_u32(&v, "caller_id")?;
|
|
self.ensure_job_not_exists(context_id, caller_id, id)
|
|
.await?;
|
|
self.redis.save_job(context_id, &job).await?;
|
|
Ok(job)
|
|
}
|
|
|
|
pub async fn load_job(
|
|
&self,
|
|
context_id: u32,
|
|
caller_id: u32,
|
|
id: u32,
|
|
) -> Result<Job, BoxError> {
|
|
let job = self.redis.load_job(context_id, caller_id, id).await?;
|
|
Ok(job)
|
|
}
|
|
|
|
/// Update a job status with transition validation.
|
|
///
|
|
/// Allowed transitions:
|
|
/// - Dispatched -> WaitingForPrerequisites | Started | Error
|
|
/// - WaitingForPrerequisites -> Started | Error
|
|
/// - Started -> Finished | Error
|
|
/// - Finished, Error -> terminal (no transitions)
|
|
///
|
|
/// If the new status equals the current status, this is a no-op.
|
|
pub async fn update_job_status(
|
|
&self,
|
|
context_id: u32,
|
|
executor_id: u32,
|
|
caller_id: u32,
|
|
id: u32,
|
|
new_status: JobStatus,
|
|
) -> Result<(), BoxError> {
|
|
self.require_executor(context_id, executor_id, "update job status")
|
|
.await?;
|
|
let job = self.redis.load_job(context_id, caller_id, id).await?;
|
|
let current = job.status();
|
|
|
|
if new_status == current {
|
|
// Idempotent: don't touch storage if no change
|
|
return Ok(());
|
|
}
|
|
|
|
let allowed = match current {
|
|
JobStatus::Dispatched => matches!(
|
|
new_status,
|
|
JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error
|
|
),
|
|
JobStatus::WaitingForPrerequisites => {
|
|
matches!(new_status, JobStatus::Started | JobStatus::Error)
|
|
}
|
|
JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error),
|
|
JobStatus::Finished | JobStatus::Error => false,
|
|
};
|
|
|
|
if !allowed {
|
|
return Err(Box::new(InvalidJobStatusTransition {
|
|
from: current,
|
|
to: new_status,
|
|
}));
|
|
}
|
|
|
|
self.redis
|
|
.update_job_status(context_id, caller_id, id, new_status)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// -----------------------------
|
|
// Message
|
|
// -----------------------------
|
|
pub async fn create_message(
|
|
&self,
|
|
context_id: u32,
|
|
message: Message,
|
|
) -> Result<Message, BoxError> {
|
|
validate_message(context_id, &message)?;
|
|
let v = as_json(&message)?;
|
|
let id = json_get_u32(&v, "id")?;
|
|
let caller_id = json_get_u32(&v, "caller_id")?;
|
|
self.ensure_message_not_exists(context_id, caller_id, id)
|
|
.await?;
|
|
self.redis.save_message(context_id, &message).await?;
|
|
Ok(message)
|
|
}
|
|
|
|
pub async fn load_message(
|
|
&self,
|
|
context_id: u32,
|
|
caller_id: u32,
|
|
id: u32,
|
|
) -> Result<Message, BoxError> {
|
|
let msg = self.redis.load_message(context_id, caller_id, id).await?;
|
|
Ok(msg)
|
|
}
|
|
|
|
pub async fn update_flow_status(
|
|
&self,
|
|
context_id: u32,
|
|
requestor_id: u32,
|
|
id: u32,
|
|
new_status: FlowStatus,
|
|
) -> Result<(), BoxError> {
|
|
self.require_admin(context_id, requestor_id, "update flow status")
|
|
.await?;
|
|
let flow = self.redis.load_flow(context_id, id).await?;
|
|
let v = as_json(&flow)?;
|
|
let cur_raw = v
|
|
.get("status")
|
|
.cloned()
|
|
.unwrap_or(Value::String("Dispatched".to_string()));
|
|
|
|
let cur = match cur_raw {
|
|
Value::String(s) if s == "Dispatched" => FlowStatus::Dispatched,
|
|
Value::String(s) if s == "Started" => FlowStatus::Started,
|
|
Value::String(s) if s == "Error" => FlowStatus::Error,
|
|
Value::String(s) if s == "Finished" => FlowStatus::Finished,
|
|
_ => FlowStatus::Dispatched,
|
|
};
|
|
|
|
if cur == new_status {
|
|
return Ok(());
|
|
}
|
|
|
|
let allowed = match cur {
|
|
FlowStatus::Created => matches!(new_status, FlowStatus::Dispatched | FlowStatus::Error),
|
|
FlowStatus::Dispatched => matches!(new_status, FlowStatus::Started | FlowStatus::Error),
|
|
FlowStatus::Started => matches!(new_status, FlowStatus::Finished | FlowStatus::Error),
|
|
FlowStatus::Finished | FlowStatus::Error => false,
|
|
};
|
|
if !allowed {
|
|
return Err(ValidationError::new(format!(
|
|
"Invalid flow status transition: {:?} -> {:?}",
|
|
cur, new_status
|
|
))
|
|
.into());
|
|
}
|
|
|
|
self.redis
|
|
.update_flow_status(context_id, id, new_status)
|
|
.await
|
|
}
|
|
|
|
pub async fn update_message_status(
|
|
&self,
|
|
context_id: u32,
|
|
caller_id: u32,
|
|
id: u32,
|
|
new_status: MessageStatus,
|
|
) -> Result<(), BoxError> {
|
|
let msg = self.redis.load_message(context_id, caller_id, id).await?;
|
|
let v = as_json(&msg)?;
|
|
let cur_raw = v
|
|
.get("status")
|
|
.cloned()
|
|
.unwrap_or(Value::String("Dispatched".to_string()));
|
|
|
|
let cur = match cur_raw {
|
|
Value::String(s) if s == "Dispatched" => MessageStatus::Dispatched,
|
|
Value::String(s) if s == "Acknowledged" => MessageStatus::Acknowledged,
|
|
Value::String(s) if s == "Error" => MessageStatus::Error,
|
|
Value::String(s) if s == "Processed" => MessageStatus::Processed,
|
|
_ => MessageStatus::Dispatched,
|
|
};
|
|
|
|
if cur == new_status {
|
|
return Ok(());
|
|
}
|
|
|
|
let allowed = match cur {
|
|
MessageStatus::Dispatched => {
|
|
matches!(
|
|
new_status,
|
|
MessageStatus::Acknowledged | MessageStatus::Error
|
|
)
|
|
}
|
|
MessageStatus::Acknowledged => {
|
|
matches!(new_status, MessageStatus::Processed | MessageStatus::Error)
|
|
}
|
|
MessageStatus::Processed | MessageStatus::Error => false,
|
|
};
|
|
if !allowed {
|
|
return Err(ValidationError::new(format!(
|
|
"Invalid message status transition: {:?} -> {:?}",
|
|
cur, new_status
|
|
))
|
|
.into());
|
|
}
|
|
|
|
self.redis
|
|
.update_message_status(context_id, caller_id, id, new_status)
|
|
.await
|
|
}
|
|
|
|
pub async fn update_flow_env_vars_merge(
|
|
&self,
|
|
context_id: u32,
|
|
requestor_id: u32,
|
|
flow_id: u32,
|
|
patch: HashMap<String, String>,
|
|
) -> Result<(), BoxError> {
|
|
self.require_admin(context_id, requestor_id, "update flow env_vars")
|
|
.await?;
|
|
// Ensure flow exists
|
|
let _ = self.redis.load_flow(context_id, flow_id).await?;
|
|
self.redis
|
|
.update_flow_env_vars_merge(context_id, flow_id, patch)
|
|
.await
|
|
}
|
|
|
|
pub async fn update_flow_result_merge(
|
|
&self,
|
|
context_id: u32,
|
|
requestor_id: u32,
|
|
flow_id: u32,
|
|
patch: HashMap<String, String>,
|
|
) -> Result<(), BoxError> {
|
|
self.require_admin(context_id, requestor_id, "update flow result")
|
|
.await?;
|
|
let _ = self.redis.load_flow(context_id, flow_id).await?;
|
|
self.redis
|
|
.update_flow_result_merge(context_id, flow_id, patch)
|
|
.await
|
|
}
|
|
|
|
pub async fn update_flow_jobs_add_remove(
|
|
&self,
|
|
context_id: u32,
|
|
requestor_id: u32,
|
|
flow_id: u32,
|
|
add: Vec<u32>,
|
|
remove: Vec<u32>,
|
|
) -> Result<(), BoxError> {
|
|
self.require_admin(context_id, requestor_id, "update flow jobs")
|
|
.await?;
|
|
let flow = self.redis.load_flow(context_id, flow_id).await?;
|
|
let mut set: std::collections::BTreeSet<u32> = flow.jobs().iter().copied().collect();
|
|
for a in add {
|
|
set.insert(a);
|
|
}
|
|
for r in remove {
|
|
set.remove(&r);
|
|
}
|
|
let new_jobs: Vec<u32> = set.into_iter().collect();
|
|
self.redis
|
|
.update_flow_jobs_set(context_id, flow_id, new_jobs)
|
|
.await
|
|
}
|
|
|
|
pub async fn update_job_env_vars_merge(
|
|
&self,
|
|
context_id: u32,
|
|
requestor_id: u32,
|
|
caller_id: u32,
|
|
job_id: u32,
|
|
patch: HashMap<String, String>,
|
|
) -> Result<(), BoxError> {
|
|
self.require_admin(context_id, requestor_id, "update job env_vars")
|
|
.await?;
|
|
let _ = self.redis.load_job(context_id, caller_id, job_id).await?;
|
|
self.redis
|
|
.update_job_env_vars_merge(context_id, caller_id, job_id, patch)
|
|
.await
|
|
}
|
|
|
|
pub async fn update_job_result_merge(
|
|
&self,
|
|
context_id: u32,
|
|
requestor_id: u32,
|
|
caller_id: u32,
|
|
job_id: u32,
|
|
patch: HashMap<String, String>,
|
|
) -> Result<(), BoxError> {
|
|
// Allow if admin OR executor
|
|
let ctx = self.redis.load_context(context_id).await?;
|
|
let is_admin = context_has_role(&ctx, "admins", requestor_id)?;
|
|
let is_exec = context_has_role(&ctx, "executors", requestor_id)?;
|
|
if !(is_admin || is_exec) {
|
|
return Err(Box::new(PermissionDeniedError {
|
|
actor_id: requestor_id,
|
|
context_id,
|
|
action: "update job result".to_string(),
|
|
}));
|
|
}
|
|
let _ = self.redis.load_job(context_id, caller_id, job_id).await?;
|
|
self.redis
|
|
.update_job_result_merge(context_id, caller_id, job_id, patch)
|
|
.await
|
|
}
|
|
|
|
pub async fn append_message_logs(
|
|
&self,
|
|
context_id: u32,
|
|
caller_id: u32,
|
|
id: u32,
|
|
new_logs: Vec<String>,
|
|
) -> Result<(), BoxError> {
|
|
let _ = self.redis.load_message(context_id, caller_id, id).await?;
|
|
self.redis
|
|
.append_message_logs(context_id, caller_id, id, new_logs)
|
|
.await
|
|
}
|
|
}
|
|
|
|
// -----------------------------
|
|
// Existence checks (strict create) and permissions
|
|
// -----------------------------
|
|
impl AppService {
|
|
async fn ensure_context_not_exists(&self, id: u32) -> Result<(), BoxError> {
|
|
match self.redis.load_context(id).await {
|
|
Ok(_) => Err(Box::new(AlreadyExistsError {
|
|
key: format!("context:{}", id),
|
|
})),
|
|
Err(e) => {
|
|
if contains_key_not_found(&e) {
|
|
Ok(())
|
|
} else {
|
|
Err(e)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn ensure_actor_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> {
|
|
match self.redis.load_actor(db, id).await {
|
|
Ok(_) => Err(Box::new(AlreadyExistsError {
|
|
key: format!("actor:{}", id),
|
|
})),
|
|
Err(e) => {
|
|
if contains_key_not_found(&e) {
|
|
Ok(())
|
|
} else {
|
|
Err(e)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn ensure_runner_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> {
|
|
match self.redis.load_runner(db, id).await {
|
|
Ok(_) => Err(Box::new(AlreadyExistsError {
|
|
key: format!("runner:{}", id),
|
|
})),
|
|
Err(e) => {
|
|
if contains_key_not_found(&e) {
|
|
Ok(())
|
|
} else {
|
|
Err(e)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn ensure_flow_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> {
|
|
match self.redis.load_flow(db, id).await {
|
|
Ok(_) => Err(Box::new(AlreadyExistsError {
|
|
key: format!("flow:{}", id),
|
|
})),
|
|
Err(e) => {
|
|
if contains_key_not_found(&e) {
|
|
Ok(())
|
|
} else {
|
|
Err(e)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn ensure_job_not_exists(
|
|
&self,
|
|
db: u32,
|
|
caller_id: u32,
|
|
id: u32,
|
|
) -> Result<(), BoxError> {
|
|
match self.redis.load_job(db, caller_id, id).await {
|
|
Ok(_) => Err(Box::new(AlreadyExistsError {
|
|
key: format!("job:{}:{}", caller_id, id),
|
|
})),
|
|
Err(e) => {
|
|
if contains_key_not_found(&e) {
|
|
Ok(())
|
|
} else {
|
|
Err(e)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn ensure_message_not_exists(
|
|
&self,
|
|
db: u32,
|
|
caller_id: u32,
|
|
id: u32,
|
|
) -> Result<(), BoxError> {
|
|
match self.redis.load_message(db, caller_id, id).await {
|
|
Ok(_) => Err(Box::new(AlreadyExistsError {
|
|
key: format!("message:{}:{}", caller_id, id),
|
|
})),
|
|
Err(e) => {
|
|
if contains_key_not_found(&e) {
|
|
Ok(())
|
|
} else {
|
|
Err(e)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn require_admin(
|
|
&self,
|
|
context_id: u32,
|
|
actor_id: u32,
|
|
action: &str,
|
|
) -> Result<(), BoxError> {
|
|
let ctx = self.redis.load_context(context_id).await?;
|
|
let ok = context_has_role(&ctx, "admins", actor_id)?;
|
|
if !ok {
|
|
return Err(Box::new(PermissionDeniedError {
|
|
actor_id,
|
|
context_id,
|
|
action: action.to_string(),
|
|
}));
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn require_executor(
|
|
&self,
|
|
context_id: u32,
|
|
actor_id: u32,
|
|
action: &str,
|
|
) -> Result<(), BoxError> {
|
|
let ctx = self.redis.load_context(context_id).await?;
|
|
let ok = context_has_role(&ctx, "executors", actor_id)?;
|
|
if !ok {
|
|
return Err(Box::new(PermissionDeniedError {
|
|
actor_id,
|
|
context_id,
|
|
action: action.to_string(),
|
|
}));
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|