diff --git a/specs/openrpc.json b/specs/openrpc.json index ea12271..a566b15 100644 --- a/specs/openrpc.json +++ b/specs/openrpc.json @@ -20,7 +20,7 @@ "methods": [ { "name": "actor.create", - "summary": "Create/Upsert Actor in a context", + "summary": "Create/Upsert Actor", "params": [ { "name": "params", @@ -49,7 +49,7 @@ }, { "name": "actor.load", - "summary": "Load an Actor by id from a context", + "summary": "Load an Actor by id", "params": [ { "name": "params", @@ -1181,14 +1181,9 @@ "ActorCreateParams": { "type": "object", "required": [ - "context_id", "actor" ], "properties": { - "context_id": { - "type": "integer", - "format": "uint32" - }, "actor": { "$ref": "#/components/schemas/ActorCreate" } @@ -1197,14 +1192,9 @@ "ActorLoadParams": { "type": "object", "required": [ - "context_id", "id" ], "properties": { - "context_id": { - "type": "integer", - "format": "uint32" - }, "id": { "type": "integer", "format": "uint32" diff --git a/specs/specs.md b/specs/specs.md index 9e58f7d..9de6942 100644 --- a/specs/specs.md +++ b/specs/specs.md @@ -10,7 +10,7 @@ | **Runner** | Public key, Mycelium address, topic name, type (`v\|python\|osis\|rust`), local flag, timestamps | `runner:` (hash) | The *worker* that actually executes **RunnerJob** scripts. It subscribes to a Mycelium topic (normally `runner`). If `local == true` the runner also consumes jobs directly from a Redis queue that is named after the script‑type suffix (`v`, `python`, …). | | **RunnerJob**| Script source, type (`osis\|sal\|v\|python`), env‑vars, prerequisites, dependencies, status, timestamps, result map | `job::` (hash) | A single executable unit. It lives inside a **Context**, belongs to a **Runner**, and is queued according to its `script_type` (e.g. `queue:python`). Its status moves through the lifecycle `dispatched → waiting_for_prerequisites → started → finished|error`. | -> **Key idea:** All objects are persisted as *hashes* in a **Redis** database that is dedicated to a *Context*. The system is completely **decentralised** – each actor owns its own context and can spin up as many runners as needed. Communication between actors, runners and the rest of the system happens over **Mycelium**, a message‑bus that uses Redis lists as queues. +> **Key idea:** All objects are persisted as *hashes*. Context‑scoped objects (**Context**, **Flow**, **Message**, **Runner**, **RunnerJob**) live in a **Redis** database dedicated to that context. **Actors are global** and are stored in Redis DB 0 under `actor:`. The system is completely **decentralised** – each actor owns its own context and can spin up as many runners as needed. Communication between actors, runners and the rest of the system happens over **Mycelium**, a message‑bus that uses Redis lists as queues. --- diff --git a/src/rpc.rs b/src/rpc.rs index 9893d33..a9703c2 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -315,12 +315,10 @@ impl MessageCreate { #[derive(Debug, Deserialize)] pub struct ActorCreateParams { - pub context_id: u32, pub actor: ActorCreate, } #[derive(Debug, Deserialize)] pub struct ActorLoadParams { - pub context_id: u32, pub id: u32, } @@ -397,7 +395,7 @@ pub fn build_module(state: Arc) -> RpcModule<()> { let actor = p.actor.into_domain().map_err(invalid_params_err)?; let actor = state .service - .create_actor(p.context_id, actor) + .create_actor(actor) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(actor) @@ -414,7 +412,7 @@ pub fn build_module(state: Arc) -> RpcModule<()> { let p: ActorLoadParams = params.parse().map_err(invalid_params_err)?; let actor = state .service - .load_actor(p.context_id, p.id) + .load_actor(p.id) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(actor) diff --git a/src/service.rs b/src/service.rs index 0d2c6dd..5b99eed 100644 --- a/src/service.rs +++ b/src/service.rs @@ -157,7 +157,7 @@ fn validate_context(ctx: &Context) -> Result<(), BoxError> { Ok(()) } -fn validate_actor(_context_id: u32, actor: &Actor) -> Result<(), BoxError> { +fn validate_actor(actor: &Actor) -> Result<(), BoxError> { let v = as_json(actor)?; let id = json_get_u32(&v, "id")?; if id == 0 { @@ -344,17 +344,17 @@ impl AppService { // ----------------------------- // Actor // ----------------------------- - pub async fn create_actor(&self, context_id: u32, actor: Actor) -> Result { - validate_actor(context_id, &actor)?; + pub async fn create_actor(&self, actor: Actor) -> Result { + validate_actor(&actor)?; let v = as_json(&actor)?; let id = json_get_u32(&v, "id")?; - self.ensure_actor_not_exists(context_id, id).await?; - self.redis.save_actor(context_id, &actor).await?; + self.ensure_actor_not_exists_global(id).await?; + self.redis.save_actor_global(&actor).await?; Ok(actor) } - pub async fn load_actor(&self, context_id: u32, id: u32) -> Result { - let actor = self.redis.load_actor(context_id, id).await?; + pub async fn load_actor(&self, id: u32) -> Result { + let actor = self.redis.load_actor_global(id).await?; Ok(actor) } @@ -1023,8 +1023,8 @@ impl AppService { } } - async fn ensure_actor_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> { - match self.redis.load_actor(db, id).await { + async fn ensure_actor_not_exists_global(&self, id: u32) -> Result<(), BoxError> { + match self.redis.load_actor_global(id).await { Ok(_) => Err(Box::new(AlreadyExistsError { key: format!("actor:{}", id), })), diff --git a/src/storage/redis.rs b/src/storage/redis.rs index 845f7a6..0e99dbb 100644 --- a/src/storage/redis.rs +++ b/src/storage/redis.rs @@ -196,6 +196,22 @@ impl RedisDriver { let key = Self::actor_key(id); self.hget_model(db, &key).await } +/// Save an Actor globally in DB 0 (Actor is context-independent) + pub async fn save_actor_global(&self, actor: &Actor) -> Result<()> { + let json = serde_json::to_value(actor)?; + let id = json + .get("id") + .and_then(|v| v.as_u64()) + .ok_or("Actor.id missing or not a number")? as u32; + let key = Self::actor_key(id); + self.hset_model(0, &key, actor).await + } + + /// Load an Actor globally from DB 0 by id + pub async fn load_actor_global(&self, id: u32) -> Result { + let key = Self::actor_key(id); + self.hget_model(0, &key).await + } // ----------------------------- // Runner