Actors are global

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
Lee Smet
2025-08-29 10:29:32 +02:00
parent 9c47eaaf93
commit 2aa6277385
5 changed files with 30 additions and 26 deletions

View File

@@ -20,7 +20,7 @@
"methods": [ "methods": [
{ {
"name": "actor.create", "name": "actor.create",
"summary": "Create/Upsert Actor in a context", "summary": "Create/Upsert Actor",
"params": [ "params": [
{ {
"name": "params", "name": "params",
@@ -49,7 +49,7 @@
}, },
{ {
"name": "actor.load", "name": "actor.load",
"summary": "Load an Actor by id from a context", "summary": "Load an Actor by id",
"params": [ "params": [
{ {
"name": "params", "name": "params",
@@ -1181,14 +1181,9 @@
"ActorCreateParams": { "ActorCreateParams": {
"type": "object", "type": "object",
"required": [ "required": [
"context_id",
"actor" "actor"
], ],
"properties": { "properties": {
"context_id": {
"type": "integer",
"format": "uint32"
},
"actor": { "actor": {
"$ref": "#/components/schemas/ActorCreate" "$ref": "#/components/schemas/ActorCreate"
} }
@@ -1197,14 +1192,9 @@
"ActorLoadParams": { "ActorLoadParams": {
"type": "object", "type": "object",
"required": [ "required": [
"context_id",
"id" "id"
], ],
"properties": { "properties": {
"context_id": {
"type": "integer",
"format": "uint32"
},
"id": { "id": {
"type": "integer", "type": "integer",
"format": "uint32" "format": "uint32"

View File

@@ -10,7 +10,7 @@
| **Runner** | Public key, Mycelium address, topic name, type (`v\|python\|osis\|rust`), local flag, timestamps | `runner:<id>` (hash) | The *worker* that actually executes **RunnerJob** scripts. It subscribes to a Mycelium topic (normally `runner<id>`). If `local == true` the runner also consumes jobs directly from a Redis queue that is named after the scripttype suffix (`v`, `python`, …). | | **Runner** | Public key, Mycelium address, topic name, type (`v\|python\|osis\|rust`), local flag, timestamps | `runner:<id>` (hash) | The *worker* that actually executes **RunnerJob** scripts. It subscribes to a Mycelium topic (normally `runner<id>`). If `local == true` the runner also consumes jobs directly from a Redis queue that is named after the scripttype suffix (`v`, `python`, …). |
| **RunnerJob**| Script source, type (`osis\|sal\|v\|python`), envvars, prerequisites, dependencies, status, timestamps, result map | `job:<caller_id>:<id>` (hash) | A single executable unit. It lives inside a **Context**, belongs to a **Runner**, and is queued according to its `script_type` (e.g. `queue:python`). Its status moves through the lifecycle `dispatched → waiting_for_prerequisites → started → finished|error`. | | **RunnerJob**| Script source, type (`osis\|sal\|v\|python`), envvars, prerequisites, dependencies, status, timestamps, result map | `job:<caller_id>:<id>` (hash) | A single executable unit. It lives inside a **Context**, belongs to a **Runner**, and is queued according to its `script_type` (e.g. `queue:python`). Its status moves through the lifecycle `dispatched → waiting_for_prerequisites → started → finished|error`. |
> **Key idea:** All objects are persisted as *hashes* in a **Redis** database that is dedicated to a *Context*. The system is completely **decentralised** each actor owns its own context and can spin up as many runners as needed. Communication between actors, runners and the rest of the system happens over **Mycelium**, a messagebus that uses Redis lists as queues. > **Key idea:** All objects are persisted as *hashes*. Contextscoped objects (**Context**, **Flow**, **Message**, **Runner**, **RunnerJob**) live in a **Redis** database dedicated to that context. **Actors are global** and are stored in Redis DB 0 under `actor:<id>`. The system is completely **decentralised** each actor owns its own context and can spin up as many runners as needed. Communication between actors, runners and the rest of the system happens over **Mycelium**, a messagebus that uses Redis lists as queues.
--- ---

View File

@@ -315,12 +315,10 @@ impl MessageCreate {
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
pub struct ActorCreateParams { pub struct ActorCreateParams {
pub context_id: u32,
pub actor: ActorCreate, pub actor: ActorCreate,
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
pub struct ActorLoadParams { pub struct ActorLoadParams {
pub context_id: u32,
pub id: u32, pub id: u32,
} }
@@ -397,7 +395,7 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
let actor = p.actor.into_domain().map_err(invalid_params_err)?; let actor = p.actor.into_domain().map_err(invalid_params_err)?;
let actor = state let actor = state
.service .service
.create_actor(p.context_id, actor) .create_actor(actor)
.await .await
.map_err(storage_err)?; .map_err(storage_err)?;
Ok::<_, ErrorObjectOwned>(actor) Ok::<_, ErrorObjectOwned>(actor)
@@ -414,7 +412,7 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
let p: ActorLoadParams = params.parse().map_err(invalid_params_err)?; let p: ActorLoadParams = params.parse().map_err(invalid_params_err)?;
let actor = state let actor = state
.service .service
.load_actor(p.context_id, p.id) .load_actor(p.id)
.await .await
.map_err(storage_err)?; .map_err(storage_err)?;
Ok::<_, ErrorObjectOwned>(actor) Ok::<_, ErrorObjectOwned>(actor)

View File

@@ -157,7 +157,7 @@ fn validate_context(ctx: &Context) -> Result<(), BoxError> {
Ok(()) Ok(())
} }
fn validate_actor(_context_id: u32, actor: &Actor) -> Result<(), BoxError> { fn validate_actor(actor: &Actor) -> Result<(), BoxError> {
let v = as_json(actor)?; let v = as_json(actor)?;
let id = json_get_u32(&v, "id")?; let id = json_get_u32(&v, "id")?;
if id == 0 { if id == 0 {
@@ -344,17 +344,17 @@ impl AppService {
// ----------------------------- // -----------------------------
// Actor // Actor
// ----------------------------- // -----------------------------
pub async fn create_actor(&self, context_id: u32, actor: Actor) -> Result<Actor, BoxError> { pub async fn create_actor(&self, actor: Actor) -> Result<Actor, BoxError> {
validate_actor(context_id, &actor)?; validate_actor(&actor)?;
let v = as_json(&actor)?; let v = as_json(&actor)?;
let id = json_get_u32(&v, "id")?; let id = json_get_u32(&v, "id")?;
self.ensure_actor_not_exists(context_id, id).await?; self.ensure_actor_not_exists_global(id).await?;
self.redis.save_actor(context_id, &actor).await?; self.redis.save_actor_global(&actor).await?;
Ok(actor) Ok(actor)
} }
pub async fn load_actor(&self, context_id: u32, id: u32) -> Result<Actor, BoxError> { pub async fn load_actor(&self, id: u32) -> Result<Actor, BoxError> {
let actor = self.redis.load_actor(context_id, id).await?; let actor = self.redis.load_actor_global(id).await?;
Ok(actor) Ok(actor)
} }
@@ -1023,8 +1023,8 @@ impl AppService {
} }
} }
async fn ensure_actor_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> { async fn ensure_actor_not_exists_global(&self, id: u32) -> Result<(), BoxError> {
match self.redis.load_actor(db, id).await { match self.redis.load_actor_global(id).await {
Ok(_) => Err(Box::new(AlreadyExistsError { Ok(_) => Err(Box::new(AlreadyExistsError {
key: format!("actor:{}", id), key: format!("actor:{}", id),
})), })),

View File

@@ -196,6 +196,22 @@ impl RedisDriver {
let key = Self::actor_key(id); let key = Self::actor_key(id);
self.hget_model(db, &key).await self.hget_model(db, &key).await
} }
/// Save an Actor globally in DB 0 (Actor is context-independent)
pub async fn save_actor_global(&self, actor: &Actor) -> Result<()> {
let json = serde_json::to_value(actor)?;
let id = json
.get("id")
.and_then(|v| v.as_u64())
.ok_or("Actor.id missing or not a number")? as u32;
let key = Self::actor_key(id);
self.hset_model(0, &key, actor).await
}
/// Load an Actor globally from DB 0 by id
pub async fn load_actor_global(&self, id: u32) -> Result<Actor> {
let key = Self::actor_key(id);
self.hget_model(0, &key).await
}
// ----------------------------- // -----------------------------
// Runner // Runner