Files
horus/bin/coordinator/src/rpc.rs

531 lines
16 KiB
Rust

use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
sync::Arc,
};
use jsonrpsee::{
RpcModule,
server::{ServerBuilder, ServerHandle},
types::error::ErrorObjectOwned,
};
use serde::Deserialize;
use serde_json::{Value, json};
use crate::{
dag::{DagError, FlowDag},
models::{
Context, Flow, FlowStatus, Job, JobStatus, Message, MessageFormatType,
MessageStatus, Runner,
},
service::AppService,
time::current_timestamp,
};
/// The OpenRPC specification for the HeroCoordinator JSON-RPC API
const OPENRPC_SPEC: &str = include_str!("../specs/openrpc.json");
pub struct AppState {
pub service: AppService,
}
impl AppState {
pub fn new(service: AppService) -> Self {
Self { service }
}
}
// -----------------------------
// Error helpers
// -----------------------------
fn invalid_params_err<E: std::fmt::Display>(e: E) -> ErrorObjectOwned {
ErrorObjectOwned::owned(-32602, "Invalid params", Some(Value::String(e.to_string())))
}
fn storage_err(e: Box<dyn std::error::Error + Send + Sync>) -> ErrorObjectOwned {
let msg = e.to_string();
if msg.contains("Key not found") {
ErrorObjectOwned::owned(-32001, "Not Found", Some(Value::String(msg)))
} else {
ErrorObjectOwned::owned(-32010, "Storage Error", Some(Value::String(msg)))
}
}
fn dag_err(e: DagError) -> ErrorObjectOwned {
match e {
DagError::Storage(inner) => storage_err(inner),
DagError::MissingDependency { .. } => ErrorObjectOwned::owned(
-32020,
"DAG Missing Dependency",
Some(Value::String(e.to_string())),
),
DagError::CycleDetected { .. } => ErrorObjectOwned::owned(
-32021,
"DAG Cycle Detected",
Some(Value::String(e.to_string())),
),
DagError::UnknownJob { .. } => ErrorObjectOwned::owned(
-32022,
"DAG Unknown Job",
Some(Value::String(e.to_string())),
),
DagError::DependenciesIncomplete { .. } => ErrorObjectOwned::owned(
-32023,
"DAG Dependencies Incomplete",
Some(Value::String(e.to_string())),
),
DagError::FlowFailed { .. } => ErrorObjectOwned::owned(
-32024,
"DAG Flow Failed",
Some(Value::String(e.to_string())),
),
DagError::JobNotStarted { .. } => ErrorObjectOwned::owned(
-32025,
"DAG Job Not Started",
Some(Value::String(e.to_string())),
),
}
}
// -----------------------------
// Create DTOs and Param wrappers
// -----------------------------
// Actor was renamed to Runner - ActorCreate is deprecated
// #[derive(Debug, Deserialize)]
// pub struct ActorCreate {
// pub id: u32,
// pub pubkey: String,
// pub address: Vec<IpAddr>,
// }
#[derive(Debug, Deserialize)]
pub struct ContextCreate {
pub id: u32,
pub admins: Vec<u32>,
pub readers: Vec<u32>,
pub executors: Vec<u32>,
}
impl ContextCreate {
pub fn into_domain(self) -> Context {
let ts = current_timestamp();
let ContextCreate {
id,
admins,
readers,
executors,
} = self;
Context {
id,
admins,
readers,
executors,
created_at: ts,
updated_at: ts,
}
}
}
#[derive(Debug, Deserialize)]
pub struct RunnerCreate {
pub id: u32,
pub pubkey: String,
pub address: IpAddr,
pub topic: String,
pub local: bool,
/// Optional secret used for authenticated supervisor calls (if required)
pub secret: Option<String>,
}
impl RunnerCreate {
pub fn into_domain(self) -> Runner {
let ts = current_timestamp();
let RunnerCreate {
id,
pubkey,
address,
topic,
local,
secret,
} = self;
Runner {
id,
pubkey,
address,
topic,
local,
secret,
created_at: ts,
updated_at: ts,
}
}
}
#[derive(Debug, Deserialize)]
pub struct FlowCreate {
pub id: u32,
pub caller_id: u32,
pub context_id: u32,
pub jobs: Vec<u32>,
pub env_vars: HashMap<String, String>,
}
impl FlowCreate {
pub fn into_domain(self) -> Flow {
let ts = current_timestamp();
let FlowCreate {
id,
caller_id,
context_id,
jobs,
env_vars,
} = self;
Flow {
id,
caller_id,
context_id,
jobs,
env_vars,
result: HashMap::new(),
created_at: ts,
updated_at: ts,
status: FlowStatus::Created,
}
}
}
// JobCreate removed - coordinator only manages flows, not individual jobs
// Jobs should be created by the supervisor or other services
#[derive(Debug, Deserialize)]
pub struct MessageCreate {
pub id: u32,
pub caller_id: u32,
pub context_id: u32,
pub message: String,
pub message_type: String,
pub message_format_type: MessageFormatType,
pub timeout: u32,
pub timeout_ack: u32,
pub timeout_result: u32,
// Jobs removed - use flow nodes instead
}
impl MessageCreate {
pub fn into_domain(self) -> Message {
use crate::time::current_timestamp;
let ts = current_timestamp();
// Convert to Message
// Note: flow_id is set to 0 for now, should be set by the caller
Message {
id: self.id,
caller_id: self.caller_id,
context_id: self.context_id,
flow_id: 0, // TODO: Get from params or context
message: self.message,
message_type: self.message_type,
message_format_type: self.message_format_type,
timeout: self.timeout,
timeout_ack: self.timeout_ack,
timeout_result: self.timeout_result,
transport_id: None,
transport_status: None,
nodes: Vec::new(), // TODO: MessageCreate should include nodes
job: Vec::new(), // Jobs removed - coordinator only manages flows
logs: Vec::new(),
created_at: ts,
updated_at: ts,
status: MessageStatus::Dispatched,
}
}
}
// Actor was renamed to Runner - ActorCreateParams and ActorLoadParams are deprecated
// #[derive(Debug, Deserialize)]
// pub struct ActorCreateParams {
// pub actor: ActorCreate,
// }
// #[derive(Debug, Deserialize)]
// pub struct ActorLoadParams {
// pub id: u32,
// }
#[derive(Debug, Deserialize)]
pub struct ContextCreateParams {
pub context: ContextCreate,
}
#[derive(Debug, Deserialize)]
pub struct ContextLoadParams {
pub id: u32,
}
#[derive(Debug, Deserialize)]
pub struct RunnerCreateParams {
pub context_id: u32,
pub runner: RunnerCreate,
}
#[derive(Debug, Deserialize)]
pub struct RunnerLoadParams {
pub context_id: u32,
pub id: u32,
}
#[derive(Debug, Deserialize)]
pub struct FlowCreateParams {
pub context_id: u32,
pub flow: FlowCreate,
}
#[derive(Debug, Deserialize)]
pub struct FlowLoadParams {
pub context_id: u32,
pub id: u32,
}
// JobCreateParams and JobLoadParams removed - coordinator only manages flows
#[derive(Debug, Deserialize)]
pub struct MessageCreateParams {
pub context_id: u32,
pub message: MessageCreate,
}
#[derive(Debug, Deserialize)]
pub struct MessageLoadParams {
pub context_id: u32,
pub caller_id: u32,
pub id: u32,
}
// -----------------------------
// Rpc module builder (manual registration)
// -----------------------------
pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
let mut module: RpcModule<()> = RpcModule::new(());
// Context
{
let state = state.clone();
module
.register_async_method("context.create", move |params, _caller, _ctx| {
let state = state.clone();
async move {
let p: ContextCreateParams = params.parse().map_err(invalid_params_err)?;
let ctx = p.context.into_domain();
let ctx = state
.service
.create_context(ctx)
.await
.map_err(storage_err)?;
Ok::<_, ErrorObjectOwned>(ctx)
}
})
.expect("register context.create");
}
{
let state = state.clone();
module
.register_async_method("context.load", move |params, _caller, _ctx| {
let state = state.clone();
async move {
let p: ContextLoadParams = params.parse().map_err(invalid_params_err)?;
let ctx = state
.service
.load_context(p.id)
.await
.map_err(storage_err)?;
Ok::<_, ErrorObjectOwned>(ctx)
}
})
.expect("register context.load");
}
// Runner
{
let state = state.clone();
module
.register_async_method("runner.create", move |params, _caller, _ctx| {
let state = state.clone();
async move {
let p: RunnerCreateParams = params.parse().map_err(invalid_params_err)?;
let runner = p.runner.into_domain();
let runner = state
.service
.create_runner(p.context_id, runner)
.await
.map_err(storage_err)?;
Ok::<_, ErrorObjectOwned>(runner)
}
})
.expect("register runner.create");
}
{
let state = state.clone();
module
.register_async_method("runner.load", move |params, _caller, _ctx| {
let state = state.clone();
async move {
let p: RunnerLoadParams = params.parse().map_err(invalid_params_err)?;
let runner = state
.service
.load_runner(p.context_id, p.id)
.await
.map_err(storage_err)?;
Ok::<_, ErrorObjectOwned>(runner)
}
})
.expect("register runner.load");
}
// Flow
{
let state = state.clone();
module
.register_async_method("flow.create", move |params, _caller, _ctx| {
let state = state.clone();
async move {
let p: FlowCreateParams = params.parse().map_err(invalid_params_err)?;
let flow = p.flow.into_domain();
let flow = state
.service
.create_flow(p.context_id, flow)
.await
.map_err(storage_err)?;
Ok::<_, ErrorObjectOwned>(flow)
}
})
.expect("register flow.create");
}
{
let state = state.clone();
module
.register_async_method("flow.load", move |params, _caller, _ctx| {
let state = state.clone();
async move {
let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?;
let flow = state
.service
.load_flow(p.context_id, p.id)
.await
.map_err(storage_err)?;
Ok::<_, ErrorObjectOwned>(flow)
}
})
.expect("register flow.load");
}
{
let state = state.clone();
module
.register_async_method("flow.dag", move |params, _caller, _ctx| {
let state = state.clone();
async move {
let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?;
let dag: FlowDag = state
.service
.flow_dag(p.context_id, p.id)
.await
.map_err(dag_err)?;
Ok::<_, ErrorObjectOwned>(dag)
}
})
.expect("register flow.dag");
}
{
let state = state.clone();
module
.register_async_method("flow.start", move |params, _caller, _ctx| {
let state = state.clone();
async move {
let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?;
let started: bool = state
.service
.flow_start(p.context_id, p.id)
.await
.map_err(storage_err)?;
Ok::<_, ErrorObjectOwned>(started)
}
})
.expect("register flow.start");
}
// Job endpoints removed - coordinator only manages flows
// Jobs should be created and managed by the supervisor
// Message
{
let state = state.clone();
module
.register_async_method("message.create", move |params, _caller, _ctx| {
let state = state.clone();
async move {
let p: MessageCreateParams = params.parse().map_err(invalid_params_err)?;
let message = p.message.into_domain();
let message = state
.service
.create_message(p.context_id, message)
.await
.map_err(storage_err)?;
Ok::<_, ErrorObjectOwned>(message)
}
})
.expect("register message.create");
}
{
let state = state;
module
.register_async_method("message.load", move |params, _caller, _ctx| {
let state = state.clone();
async move {
let p: MessageLoadParams = params.parse().map_err(invalid_params_err)?;
let msg = state
.service
.load_message(p.context_id, p.caller_id, p.id)
.await
.map_err(storage_err)?;
Ok::<_, ErrorObjectOwned>(msg)
}
})
.expect("register message.load");
}
{
module
.register_async_method("rpc.discover", move |_params, _caller, _ctx| async move {
let spec = serde_json::from_str::<serde_json::Value>(OPENRPC_SPEC)
.expect("Failed to parse OpenRPC spec");
Ok::<_, ErrorObjectOwned>(spec)
})
.expect("register rpc.discover");
}
module
}
// -----------------------------
// Server runners (HTTP/WS on separate listeners)
// -----------------------------
pub async fn start_http<C>(
addr: SocketAddr,
module: RpcModule<C>,
) -> Result<ServerHandle, Box<dyn std::error::Error + Send + Sync>> {
let server = ServerBuilder::default().build(addr).await?;
let handle = server.start(module);
Ok(handle)
}
pub async fn start_ws<C>(
addr: SocketAddr,
module: RpcModule<C>,
) -> Result<ServerHandle, Box<dyn std::error::Error + Send + Sync>> {
// jsonrpsee server supports both HTTP and WS; using a second listener gives us a dedicated WS port.
let server = ServerBuilder::default().build(addr).await?;
let handle = server.start(module);
Ok(handle)
}