implement end to end access control rhai example

This commit is contained in:
Timur Gordon 2025-06-24 19:23:06 +02:00
parent aa4712b8af
commit b980f0d8c1
35 changed files with 1068 additions and 1000 deletions

52
Cargo.lock generated
View File

@ -583,6 +583,14 @@ dependencies = [
"tokio",
]
[[package]]
name = "derive"
version = "0.1.0"
dependencies = [
"quote",
"syn 1.0.109",
]
[[package]]
name = "dirs"
version = "4.0.0"
@ -653,17 +661,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d"
[[package]]
name = "engine"
version = "0.1.0"
dependencies = [
"chrono",
"heromodels",
"heromodels-derive",
"heromodels_core",
"rhai",
]
[[package]]
name = "env_logger"
version = "0.10.2"
@ -2349,11 +2346,13 @@ dependencies = [
"anyhow",
"chrono",
"criterion",
"derive",
"env_logger",
"log",
"redis",
"rhai",
"rhai_client",
"rhailib_engine",
"rhailib_worker",
"serde",
"serde_json",
@ -2361,24 +2360,12 @@ dependencies = [
"uuid",
]
[[package]]
name = "rhailib-examples"
version = "0.1.0"
dependencies = [
"chrono",
"env_logger",
"log",
"rhai",
"rhai_client",
"serde_json",
"tokio",
]
[[package]]
name = "rhailib_dsl"
version = "0.1.0"
dependencies = [
"chrono",
"derive",
"heromodels",
"heromodels-derive",
"heromodels_core",
@ -2389,19 +2376,31 @@ dependencies = [
"tempfile",
]
[[package]]
name = "rhailib_engine"
version = "0.1.0"
dependencies = [
"chrono",
"heromodels",
"heromodels-derive",
"heromodels_core",
"rhai",
"rhailib_dsl",
]
[[package]]
name = "rhailib_worker"
version = "0.1.0"
dependencies = [
"chrono",
"clap",
"engine",
"env_logger",
"heromodels",
"log",
"redis",
"rhai",
"rhai_client",
"rhailib_engine",
"serde",
"serde_json",
"tokio",
@ -2709,6 +2708,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
@ -3060,11 +3060,11 @@ name = "ui_repl"
version = "0.1.0"
dependencies = [
"anyhow",
"engine",
"heromodels",
"log",
"rhai",
"rhai_client",
"rhailib_engine",
"rhailib_worker",
"rustyline",
"tempfile",

View File

@ -15,6 +15,8 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "sync"
rhai = "1.21.0"
rhailib_worker = { path = "src/worker" }
rhai_client = { path = "src/client" }
rhailib_engine = { path = "src/engine" }
derive = { path = "src/derive" }
[dev-dependencies]
@ -38,7 +40,6 @@ members = [
"src/worker",
"src/monitor", # Added the new monitor package to workspace
"src/repl", # Added the refactored REPL package
"examples",
"src/rhai_engine_ui", "src/macros", "src/dsl",
"src/rhai_engine_ui", "src/macros", "src/dsl", "src/derive",
]
resolver = "2" # Recommended for new workspaces

BIN
db/alice_pk/data/0.db Normal file

Binary file not shown.

View File

@ -0,0 +1 @@
74

Binary file not shown.

BIN
db/alice_pk/index/0.db Normal file

Binary file not shown.

View File

@ -0,0 +1 @@
276

Binary file not shown.

View File

@ -0,0 +1 @@
1

Binary file not shown.

View File

@ -0,0 +1 @@
2

View File

@ -1,29 +0,0 @@
[package]
name = "rhailib-examples"
version = "0.1.0"
edition = "2021"
publish = false # This is a package of examples, not meant to be published
[dependencies]
# Local Rhailib crates
rhai_client = { path = "../src/client" }
# External dependencies
rhai = "1.18.0"
tokio = { version = "1", features = ["full"] }
log = "0.4"
env_logger = "0.10"
serde_json = "1.0"
chrono = "0.4"
[[bin]]
name = "example_math_worker"
path = "example_math_worker.rs"
[[bin]]
name = "example_string_worker"
path = "example_string_worker.rs"
[[bin]]
name = "dedicated_reply_queue_demo"
path = "dedicated_reply_queue_demo.rs"

View File

@ -0,0 +1,45 @@
let private_object = new_object()
.title("Alice's Private Object")
.description("This object can only be seen and modified by Alice")
.save_object();
let object_shared_with_bob = new_object()
.title("Alice's Shared Object")
.description("This object can be seen by Bob but modified only by Alice")
.save_object();
let new_access = new_access()
.object_id(object_shared_with_bob.id())
.circle_public_key("bob_pk")
.save_access();
let book_private = new_book()
.title("Alice's private book")
.description("This book is prive to Alice")
.save_book();
let slides_shared = new_slides()
.title("Alice's shared slides")
.description("These slides, despite being in a private collection, are shared with Bob")
.save_slides();
let new_access = new_access()
.object_id(slides_shared.id)
.circle_public_key("bob_pk")
.save_access();
let collection_private = new_collection()
.title("Alice's private collection")
.description("This collection is only visible to Alice")
.add_book(book_private.id)
.add_slides(slides_shared.id)
.save_collection();
let collection_shared = new_collection()
.title("Alice's shared collection")
.description("This collection is shared with Bob")
.save_collection();

View File

@ -1,6 +0,0 @@
// auth_script.rhai
// This script calls a custom registered function 'check_permission'
// and passes the CALLER_PUBLIC_KEY to it.
// CALLER_PUBLIC_KEY is injected into the script's scope by the rhailib_worker.
check_permission(CALLER_PUBLIC_KEY)

View File

@ -0,0 +1,16 @@
let private_object = new_object()
.title("Alice's Private Object")
.description("This object can only be seen and modified by Alice")
.save_object();
let object_shared_with_bob = new_object()
.title("Alice's Shared Collection")
.description("This object can be seen by Bob but modified only by Alice")
.save_object();
let new_access = new_access()
.object_id(object_shared_with_bob.id())
.circle_public_key("bob_pk")
.save_access();

View File

@ -0,0 +1,16 @@
let private_object = new_object()
.title("Alice's Private Object")
.description("This object can only be seen and modified by Alice")
.save_object();
let object_shared_with_bob = new_object()
.title("Alice's Shared Collection")
.description("This object can be seen by Bob but modified only by Alice")
.save_object();
let new_access = new_access()
.object_id(object_shared_with_bob.id())
.circle_public_key("bob_pk")
.save_access();

View File

@ -1,137 +1,84 @@
use rhai::{Engine, EvalAltResult};
use rhai_client::RhaiClient;
use rhai_client::RhaiClientBuilder;
use rhailib_engine::create_heromodels_engine;
use rhailib_worker::spawn_rhai_worker;
use std::{fs, path::Path, time::Duration};
use tokio::sync::mpsc;
use uuid::Uuid;
// Custom Rhai function for authorization
// It takes the caller's public key as an argument.
fn check_permission(caller_pk: String) -> Result<String, Box<EvalAltResult>> {
log::info!("check_permission called with PK: {}", caller_pk);
if caller_pk == "admin_pk" {
Ok("Access Granted: Welcome Admin!".to_string())
} else if caller_pk == "user_pk" {
Ok("Limited Access: Welcome User!".to_string())
} else {
Ok(format!("Access Denied: Unknown public key '{}'", caller_pk))
}
}
const ALICE_ID: &str = "alice_pk";
const BOB_ID: &str = "bob_pk";
const CHARLIE_ID: &str = "charlie_pk";
const REDIS_URL: &str = "redis://127.0.0.1/";
const DB_DIRECTORY: &str = "./db";
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let redis_url = "redis://127.0.0.1/";
let worker_circle_pk = "auth_worker_circle".to_string();
// 1. Create a Rhai engine and register custom functionality
let mut engine = Engine::new();
engine.register_fn("check_permission", check_permission);
log::info!("Custom 'check_permission' function registered with Rhai engine.");
let mut engine = rhailib_engine::create_heromodels_engine();
// 2. Spawn the Rhai worker
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let worker_handle = tokio::spawn(spawn_rhai_worker(
0, // worker_id
worker_circle_pk.clone(),
ALICE_ID.to_string(),
DB_DIRECTORY.to_string(),
engine,
redis_url.to_string(),
REDIS_URL.to_string(),
shutdown_rx,
false, // use_sentinel
));
log::info!("Rhai worker spawned for circle: {}", worker_circle_pk);
log::info!("Rhai worker spawned for circle: {}", ALICE_ID);
// Give the worker a moment to start up
tokio::time::sleep(Duration::from_secs(1)).await;
// 3. Create a Rhai client
let client = RhaiClient::new(redis_url)?;
log::info!("Rhai client created.");
// Alice populates her rhai worker
let client_alice = RhaiClientBuilder::new()
.redis_url(REDIS_URL)
.caller_id(ALICE_ID)
.build()
.unwrap();
// 4. Load the Rhai script content
let script_path_str = "examples/end_to_end/auth_script.rhai"; // Relative to Cargo.toml / rhailib root
let script_content = match fs::read_to_string(script_path_str) {
Ok(content) => content,
Err(e) => {
log::error!("Failed to read script file '{}': {}", script_path_str, e);
// Attempt to read from an alternative path if run via `cargo run --example`
// where current dir might be the crate root.
let alt_script_path = Path::new(file!())
.parent()
.unwrap()
.join("auth_script.rhai");
log::info!("Attempting alternative script path: {:?}", alt_script_path);
fs::read_to_string(&alt_script_path)?
}
};
log::info!("Loaded script content from '{}'", script_path_str);
client_alice.new_play_request()
.recipient_id(&ALICE_ID)
.script_path("examples/end_to_end/alice.rhai")
.timeout(Duration::from_secs(10))
.await_response().await.unwrap();
log::info!("Alice's database populated.");
// Define different caller public keys
let admin_caller_pk = "admin_pk".to_string();
let user_caller_pk = "user_pk".to_string();
let unknown_caller_pk = "unknown_pk".to_string();
// Bob queries Alice's rhai worker
let client_bob = RhaiClientBuilder::new()
.redis_url(REDIS_URL)
.caller_id(BOB_ID)
.build()
.unwrap();
client_bob.new_play_request()
.recipient_id(&ALICE_ID)
.script_path("examples/end_to_end/bob.rhai")
.timeout(Duration::from_secs(10))
.await_response().await.unwrap();
log::info!("Bob's query to Alice's database completed.");
let callers = vec![
("Admin", admin_caller_pk),
("User", user_caller_pk),
("Unknown", unknown_caller_pk),
];
for (caller_name, caller_pk) in callers {
let task_id = Uuid::new_v4().to_string();
log::info!(
"Submitting script for caller '{}' (PK: {}) with task_id: {}",
caller_name,
caller_pk,
task_id
);
match client
.submit_script_and_await_result(
&worker_circle_pk,
task_id.clone(), // task_id (UUID) first
script_content.clone(), // script_content second
Duration::from_secs(10),
Some(caller_pk.clone()), // This is the CALLER_PUBLIC_KEY
)
.await
{
Ok(details) => {
log::info!(
"Task {} for caller '{}' (PK: {}) completed. Status: {}, Output: {:?}, Error: {:?}",
task_id,
caller_name,
caller_pk,
details.status,
details.output,
details.error
);
// Basic assertion for expected output
if caller_pk == "admin_pk" {
assert_eq!(
details.output,
Some("Access Granted: Welcome Admin!".to_string())
);
} else if caller_pk == "user_pk" {
assert_eq!(
details.output,
Some("Limited Access: Welcome User!".to_string())
);
}
}
Err(e) => {
log::error!(
"Task {} for caller '{}' (PK: {}) failed: {}",
task_id,
caller_name,
caller_pk,
e
);
}
}
tokio::time::sleep(Duration::from_millis(100)).await; // Small delay between submissions
}
// Charlie queries Alice's rhai worker
let client_charlie = RhaiClientBuilder::new()
.redis_url(REDIS_URL)
.caller_id(CHARLIE_ID)
.build()
.unwrap();
client_charlie.new_play_request()
.recipient_id(&ALICE_ID)
.script_path("examples/end_to_end/charlie.rhai")
.timeout(Duration::from_secs(10))
.await_response().await.unwrap();
log::info!("Charlie's query to Alice's database completed.");
// 5. Shutdown the worker (optional, could also let it run until program exits)
log::info!("Signaling worker to shutdown...");

View File

@ -0,0 +1,16 @@
let private_object = new_object()
.title("Alice's Private Object")
.description("This object can only be seen and modified by Alice")
.save_object();
let object_shared_with_bob = new_object()
.title("Alice's Shared Collection")
.description("This object can be seen by Bob but modified only by Alice")
.save_object();
let new_access = new_access()
.object_id(object_shared_with_bob.id())
.circle_public_key("bob_pk")
.save_access();

View File

@ -2,12 +2,11 @@ use chrono::Utc;
use log::{debug, error, info, warn}; // Added error
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use tokio::time::timeout;
use std::time::Duration; // Duration is still used, Instant and sleep were removed
use uuid::Uuid;
const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:";
const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:";
const REDIS_REPLY_QUEUE_PREFIX: &str = "rhai_reply:";
const NAMESPACE_PREFIX: &str = "rhailib:";
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RhaiTaskDetails {
@ -23,10 +22,8 @@ pub struct RhaiTaskDetails {
pub created_at: chrono::DateTime<chrono::Utc>,
#[serde(rename = "updatedAt")]
pub updated_at: chrono::DateTime<chrono::Utc>,
// reply_to_queue: Option<String> is removed from the struct.
// It's passed to submit_script_to_worker_queue if needed and stored in Redis directly.
#[serde(rename = "publicKey")]
pub public_key: Option<String>,
#[serde(rename = "callerId")]
pub caller_id: String,
}
#[derive(Debug)]
@ -68,60 +65,172 @@ impl std::error::Error for RhaiClientError {}
pub struct RhaiClient {
redis_client: redis::Client,
caller_id: String,
}
pub struct RhaiClientBuilder {
redis_url: Option<String>,
caller_id: String,
}
impl RhaiClientBuilder {
pub fn new() -> Self {
Self { redis_url: None, caller_id: "".to_string() }
}
pub fn caller_id(mut self, caller_id: &str) -> Self {
self.caller_id = caller_id.to_string();
self
}
pub fn redis_url(mut self, url: &str) -> Self {
self.redis_url = Some(url.to_string());
self
}
pub fn build(self) -> Result<RhaiClient, RhaiClientError> {
let url = self.redis_url.unwrap_or_else(|| "redis://127.0.0.1/".to_string());
let client = redis::Client::open(url)?;
if self.caller_id.is_empty() {
return Err(RhaiClientError::RedisError(redis::RedisError::from((redis::ErrorKind::InvalidClientConfig, "Caller ID is empty"))));
}
Ok(RhaiClient { redis_client: client, caller_id: self.caller_id })
}
}
pub struct PlayRequest {
id: String,
recipient_id: String,
script: String,
timeout: Duration
}
pub struct PlayRequestBuilder<'a> {
client: &'a RhaiClient,
request_id: String,
recipient_id: String,
script: String,
timeout: Duration
}
impl<'a> PlayRequestBuilder<'a> {
pub fn new(client: &'a RhaiClient) -> Self {
Self {
client,
request_id: "".to_string(),
recipient_id: "".to_string(),
script: "".to_string(),
timeout: Duration::from_secs(10),
}
}
pub fn request_id(mut self, request_id: &str) -> Self {
self.request_id = request_id.to_string();
self
}
pub fn recipient_id(mut self, recipient_id: &str) -> Self {
self.recipient_id = recipient_id.to_string();
self
}
pub fn script(mut self, script: &str) -> Self {
self.script = script.to_string();
self
}
pub fn script_path(mut self, script_path: &str) -> Self {
self.script = std::fs::read_to_string(script_path).unwrap();
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn submit(self) -> Result<(), RhaiClientError> {
let request_id = if self.request_id.is_empty() {
// Generate a UUID for the request_id
Uuid::new_v4().to_string()
} else {
self.request_id.clone()
};
// Build the request and submit using self.client
println!("Submitting request {} with timeout {:?}", self.request_id, self.timeout);
self.client.submit_play_request(
&PlayRequest {
id: request_id,
recipient_id: self.recipient_id.clone(),
script: self.script.clone(),
timeout: self.timeout,
}
);
Ok(())
}
pub async fn await_response(self) -> Result<RhaiTaskDetails, RhaiClientError> {
let request_id = if self.request_id.is_empty() {
// Generate a UUID for the request_id
Uuid::new_v4().to_string()
} else {
self.request_id.clone()
};
// Build the request and submit using self.client
println!("Awaiting response for request {} with timeout {:?}", self.request_id, self.timeout);
let result = self.client.submit_play_request_and_await_result(
&PlayRequest {
id: request_id,
recipient_id: self.recipient_id.clone(),
script: self.script.clone(),
timeout: self.timeout,
}
).await;
result
}
}
impl RhaiClient {
pub fn new(redis_url: &str) -> Result<Self, RhaiClientError> {
let client = redis::Client::open(redis_url)?;
Ok(Self {
redis_client: client,
})
pub fn new_play_request(&self) -> PlayRequestBuilder {
PlayRequestBuilder::new(self)
}
// Internal helper to submit script details and push to work queue
async fn submit_script_to_worker_queue(
async fn submit_play_request_using_connection(
&self,
conn: &mut redis::aio::MultiplexedConnection,
circle_name: &str,
task_id: &str, // This is the main task_id
script: String,
// client_rpc_id: Option<Value> is removed
reply_to_queue_name: Option<String>, // Still needed to tell the worker where to reply, if applicable
public_key: Option<String>,
play_request: &PlayRequest,
) -> Result<(), RhaiClientError> {
let now = Utc::now();
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
let task_key = format!(
"{}{}",
NAMESPACE_PREFIX,
play_request.id
);
let worker_queue_key = format!(
"{}{}",
REDIS_QUEUE_PREFIX,
circle_name.replace(" ", "_").to_lowercase()
NAMESPACE_PREFIX,
play_request.recipient_id.replace(" ", "_").to_lowercase()
);
debug!(
"Preparing task_id: {} for circle: {} to worker_queue: {}. Script: {}, replyToQueue: {:?}, publicKey: {:?}",
task_id, circle_name, worker_queue_key, script, reply_to_queue_name, public_key
"Submitting play request: {} to worker: {} with namespace prefix: {}",
play_request.id,
play_request.recipient_id,
NAMESPACE_PREFIX
);
let mut hset_args: Vec<(String, String)> = vec![
("taskId".to_string(), task_id.to_string()), // Add taskId
("script".to_string(), script), // script is moved here
("taskId".to_string(), play_request.id.to_string()), // Add taskId
("script".to_string(), play_request.script.clone()), // script is moved here
("callerId".to_string(), self.caller_id.clone()), // script is moved here
("status".to_string(), "pending".to_string()),
("createdAt".to_string(), now.to_rfc3339()),
("updatedAt".to_string(), now.to_rfc3339()),
];
// clientRpcId field and its corresponding hset_args logic are removed.
if let Some(queue_name) = &reply_to_queue_name {
// Use the passed parameter
hset_args.push(("replyToQueue".to_string(), queue_name.clone()));
}
if let Some(pk) = &public_key {
// Use the passed parameter
hset_args.push(("publicKey".to_string(), pk.clone()));
}
// Ensure hset_args is a slice of tuples (String, String)
// The redis crate's hset_multiple expects &[(K, V)]
// conn.hset_multiple::<_, String, String, ()>(&task_key, &hset_args).await?;
@ -134,38 +243,122 @@ impl RhaiClient {
// lpush also infers its types, RV is typically i64 (length of list) or () depending on exact command variant
// For `redis::AsyncCommands::lpush`, it's `RedisResult<R>` where R: FromRedisValue
// Often this is the length of the list. Let's allow inference or specify if needed.
let _: redis::RedisResult<i64> = conn.lpush(&worker_queue_key, task_id).await;
let _: redis::RedisResult<i64> = conn.lpush(&worker_queue_key, play_request.id.clone()).await;
Ok(())
}
// Public method for fire-and-forget submission (doesn't wait for result)
pub async fn submit_script(
// Internal helper to await response from worker
async fn await_response_from_connection(
&self,
conn: &mut redis::aio::MultiplexedConnection,
task_key: &String,
reply_queue_key: &String,
timeout: Duration
) -> Result<RhaiTaskDetails, RhaiClientError> {
// BLPOP on the reply queue
// The timeout for BLPOP is in seconds (integer)
let blpop_timeout_secs = timeout.as_secs().max(1); // Ensure at least 1 second for BLPOP timeout
match conn
.blpop::<&String, Option<(String, String)>>(
reply_queue_key,
blpop_timeout_secs as f64,
)
.await
{
Ok(Some((_queue, result_message_str))) => {
// Attempt to deserialize the result message into RhaiTaskDetails or a similar structure
// For now, we assume the worker sends back a JSON string of RhaiTaskDetails
// or at least status, output, error.
// Let's refine what the worker sends. For now, assume it's a simplified result.
// The worker should ideally send a JSON string that can be parsed into RhaiTaskDetails.
// For this example, let's assume the worker sends a JSON string of a simplified result structure.
// A more robust approach would be for the worker to send the full RhaiTaskDetails (or relevant parts)
// and the client deserializes that.
// For now, let's assume the worker sends a JSON string of RhaiTaskDetails.
match serde_json::from_str::<RhaiTaskDetails>(&result_message_str) {
Ok(details) => {
info!("Task {} finished with status: {}", details.task_id, details.status);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_queue_key).await;
Ok(details)
}
Err(e) => {
error!("Failed to deserialize result message from reply queue: {}", e);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_queue_key).await;
Err(RhaiClientError::SerializationError(e))
}
}
}
Ok(None) => {
// BLPOP timed out
warn!(
"Timeout waiting for result on reply queue {} for task {}",
reply_queue_key, task_key
);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_queue_key).await;
Err(RhaiClientError::Timeout(task_key.clone()))
}
Err(e) => {
// Redis error
error!(
"Redis error on BLPOP for reply queue {}: {}",
reply_queue_key, e
);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_queue_key).await;
Err(RhaiClientError::RedisError(e))
}
}
}
// New method using dedicated reply queue
pub async fn submit_play_request(
&self,
circle_name: &str,
script: String,
public_key: Option<String>,
) -> Result<String, RhaiClientError> {
play_request: &PlayRequest,
) -> Result<(), RhaiClientError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let task_id = Uuid::new_v4().to_string(); // task_id is generated here for fire-and-forget
debug!(
"Client submitting script (fire-and-forget) with new task_id: {} to circle: {}",
task_id, circle_name
);
self.submit_script_to_worker_queue(
self.submit_play_request_using_connection(
&mut conn,
circle_name,
&task_id,
script,
// client_rpc_id argument removed
None, // No dedicated reply queue for fire-and-forget
public_key,
&play_request // Pass the task_id parameter
)
.await?;
Ok(())
}
// New method using dedicated reply queue
pub async fn submit_play_request_and_await_result(
&self,
play_request: &PlayRequest,
) -> Result<RhaiTaskDetails, RhaiClientError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let reply_queue_key = format!("{}:reply:{}", NAMESPACE_PREFIX, play_request.id); // Derived from the passed task_id
self.submit_play_request_using_connection(
&mut conn,
&play_request // Pass the task_id parameter
)
.await?;
Ok(task_id)
info!(
"Task {} submitted. Waiting for result on queue {} with timeout {:?}...",
play_request.id, // This is the UUID
reply_queue_key,
play_request.timeout
);
self.await_response_from_connection(
&mut conn,
&play_request.id,
&reply_queue_key,
play_request.timeout
)
.await
}
// Optional: A method to check task status, similar to what circle_server_ws polling does.
@ -175,7 +368,7 @@ impl RhaiClient {
task_id: &str,
) -> Result<Option<RhaiTaskDetails>, RhaiClientError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
let task_key = format!("{}{}", NAMESPACE_PREFIX, task_id);
let result_map: Option<std::collections::HashMap<String, String>> =
conn.hgetall(&task_key).await?;
@ -211,7 +404,7 @@ impl RhaiClient {
Utc::now()
}),
// reply_to_queue is no longer a field in RhaiTaskDetails (it's stored in Redis but not in this struct)
public_key: map.get("publicKey").cloned(),
caller_id: map.get("callerId").cloned().expect("callerId field missing from Redis hash"),
};
// It's important to also check if the 'taskId' field exists in the map and matches the input task_id
// for data integrity, though the struct construction above uses the input task_id directly.
@ -227,96 +420,6 @@ impl RhaiClient {
None => Ok(None),
}
}
// New method using dedicated reply queue
pub async fn submit_script_and_await_result(
&self,
circle_name: &str,
task_id: String, // task_id is now a mandatory parameter provided by the caller
script: String,
timeout: Duration,
public_key: Option<String>,
) -> Result<RhaiTaskDetails, RhaiClientError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// let task_id = Uuid::new_v4().to_string(); // Removed, task_id is a parameter
let reply_to_queue_name = format!("{}{}", REDIS_REPLY_QUEUE_PREFIX, task_id); // Derived from the passed task_id
self.submit_script_to_worker_queue(
&mut conn,
circle_name,
&task_id, // Pass the task_id parameter
script,
// client_rpc_id argument removed
Some(reply_to_queue_name.clone()), // Pass the derived reply_to_queue_name
public_key,
)
.await?;
info!(
"Task {} submitted. Waiting for result on queue {} with timeout {:?}...",
task_id, // This is the UUID
reply_to_queue_name,
timeout
);
// BLPOP on the reply queue
// The timeout for BLPOP is in seconds (integer)
let blpop_timeout_secs = timeout.as_secs().max(1); // Ensure at least 1 second for BLPOP timeout
match conn
.blpop::<&String, Option<(String, String)>>(
&reply_to_queue_name,
blpop_timeout_secs as f64,
)
.await
{
Ok(Some((_queue, result_message_str))) => {
// Attempt to deserialize the result message into RhaiTaskDetails or a similar structure
// For now, we assume the worker sends back a JSON string of RhaiTaskDetails
// or at least status, output, error.
// Let's refine what the worker sends. For now, assume it's a simplified result.
// The worker should ideally send a JSON string that can be parsed into RhaiTaskDetails.
// For this example, let's assume the worker sends a JSON string of a simplified result structure.
// A more robust approach would be for the worker to send the full RhaiTaskDetails (or relevant parts)
// and the client deserializes that.
// For now, let's assume the worker sends a JSON string of RhaiTaskDetails.
match serde_json::from_str::<RhaiTaskDetails>(&result_message_str) {
Ok(details) => {
info!("Task {} finished with status: {}", task_id, details.status);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_to_queue_name).await;
Ok(details)
}
Err(e) => {
error!("Task {}: Failed to deserialize result message from reply queue: {}. Message: {}", task_id, e, result_message_str);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_to_queue_name).await;
Err(RhaiClientError::SerializationError(e))
}
}
}
Ok(None) => {
// BLPOP timed out
warn!(
"Timeout waiting for result on reply queue {} for task {}",
reply_to_queue_name, task_id
);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_to_queue_name).await;
Err(RhaiClientError::Timeout(task_id))
}
Err(e) => {
// Redis error
error!(
"Redis error on BLPOP for reply queue {}: {}",
reply_to_queue_name, e
);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_to_queue_name).await;
Err(RhaiClientError::RedisError(e))
}
}
}
}
#[cfg(test)]

11
src/derive/Cargo.toml Normal file
View File

@ -0,0 +1,11 @@
[package]
name = "derive"
version = "0.1.0"
edition = "2024"
[lib]
proc-macro = true
[dependencies]
syn = { version = "1.0", features = ["full"] }
quote = "1.0"

34
src/derive/src/lib.rs Normal file
View File

@ -0,0 +1,34 @@
extern crate proc_macro;
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, Data, DeriveInput, Fields};
#[proc_macro_derive(FromVec)]
pub fn from_vec_derive(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);
let name = input.ident;
let inner_type = match input.data {
Data::Struct(s) => match s.fields {
Fields::Unnamed(mut fields) => {
if fields.unnamed.len() != 1 {
panic!("FromVec can only be derived for tuple structs with one field.");
}
let field = fields.unnamed.pop().unwrap().into_value();
field.ty
}
_ => panic!("FromVec can only be derived for tuple structs."),
},
_ => panic!("FromVec can only be derived for structs."),
};
let expanded = quote! {
impl From<#inner_type> for #name {
fn from(vec: #inner_type) -> Self {
#name(vec)
}
}
};
TokenStream::from(expanded)
}

View File

@ -11,6 +11,7 @@ heromodels_core = { path = "../../../db/heromodels_core" }
chrono = "0.4"
heromodels-derive = { path = "../../../db/heromodels-derive" }
macros = { path = "../macros"}
derive = { path = "../derive"}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View File

@ -1,4 +1,5 @@
use heromodels::db::Db;
use macros::{register_authorized_create_by_id_fn, register_authorized_delete_by_id_fn, register_authorized_get_by_id_fn};
use rhai::plugin::*;
use rhai::{Array, Dynamic, Engine, EvalAltResult, INT, Module, Position};
use std::mem;
@ -9,16 +10,6 @@ type RhaiAccess = Access;
use heromodels::db::Collection;
use heromodels::db::hero::OurDB;
// Helper to convert i64 from Rhai to u32 for IDs
fn id_from_i64_to_u32(id_i64: i64) -> Result<u32, Box<EvalAltResult>> {
u32::try_from(id_i64).map_err(|_| {
Box::new(EvalAltResult::ErrorArithmetic(
format!("Failed to convert ID '{}' to u32", id_i64).into(),
Position::NONE,
))
})
}
#[export_module]
mod rhai_access_module {
// --- Access Functions ---
@ -29,19 +20,19 @@ mod rhai_access_module {
}
/// Sets the access name
#[rhai_fn(name = "object_id", return_raw, global, pure)]
#[rhai_fn(name = "object_id", return_raw)]
pub fn set_object_id(
access: &mut RhaiAccess,
object_id: i64,
) -> Result<RhaiAccess, Box<EvalAltResult>> {
let id = id_from_i64_to_u32(object_id)?;
let id = macros::id_from_i64_to_u32(object_id)?;
let owned_access = std::mem::take(access);
*access = owned_access.object_id(id);
Ok(access.clone())
}
/// Sets the access name
#[rhai_fn(name = "circle_pk", return_raw, global, pure)]
#[rhai_fn(name = "circle_public_key", return_raw, global, pure)]
pub fn set_circle_pk(
access: &mut RhaiAccess,
circle_pk: String,
@ -57,7 +48,7 @@ mod rhai_access_module {
access: &mut RhaiAccess,
group_id: i64,
) -> Result<RhaiAccess, Box<EvalAltResult>> {
let id = id_from_i64_to_u32(group_id)?;
let id = macros::id_from_i64_to_u32(group_id)?;
let owned_access = std::mem::take(access);
*access = owned_access.group_id(id);
Ok(access.clone())
@ -68,7 +59,7 @@ mod rhai_access_module {
access: &mut RhaiAccess,
contact_id: i64,
) -> Result<RhaiAccess, Box<EvalAltResult>> {
let id = id_from_i64_to_u32(contact_id)?;
let id = macros::id_from_i64_to_u32(contact_id)?;
let owned_access = std::mem::take(access);
*access = owned_access.contact_id(id);
Ok(access.clone())
@ -127,137 +118,33 @@ mod rhai_access_module {
}
}
// // A function that takes the call context and an integer argument.
// fn save_access(context: NativeCallContext, access: RhaiAccess) -> Result<(), Box<EvalAltResult>> {
// let optional_tag_ref: Option<&Dynamic> = context.tag();
// // Ensure the tag exists
// let tag_ref: &Dynamic = optional_tag_ref.ok_or_else(|| {
// Box::new(EvalAltResult::ErrorRuntime(
// "Custom tag not set for this evaluation run.".into(),
// context.position(), // Use context.position() if available and relevant
// ))
// })?;
// // Initialize database with OurDB for the Rhai engine
// // Using a temporary/in-memory like database for the worker
// let tag_map = tag_ref.read_lock::<rhai::Map>().ok_or_else(|| {
// Box::new(EvalAltResult::ErrorRuntime(
// "Tag is not a Map or is locked".into(),
// Position::NONE,
// ))
// })?;
// let db_path = tag_map.get("CIRCLE_DB_PATH").expect("CIRCLE_DB_PATH not found").as_str().to_string();
// let db = Arc::new(
// OurDB::new(db_path, false)
// .expect("Failed to create temporary DB for Rhai engine"),
// );
// let result = db.set(&access).map_err(|e| {
// Box::new(EvalAltResult::ErrorRuntime(
// format!("DB Error set_access: {}", e).into(),
// Position::NONE,
// ))
// })?;
// // Return the updated access with the correct ID
// Ok(result)
// }
pub fn register_access_rhai_module(engine: &mut Engine, db: Arc<OurDB>) {
pub fn register_access_rhai_module(engine: &mut Engine) {
// Register the exported module globally
let module = exported_module!(rhai_access_module);
let mut module = exported_module!(rhai_access_module);
register_authorized_create_by_id_fn!(
module: &mut module,
rhai_fn_name: "save_access",
resource_type_str: "Access",
rhai_return_rust_type: heromodels::models::access::Access
);
register_authorized_get_by_id_fn!(
module: &mut module,
rhai_fn_name: "get_access",
resource_type_str: "Access",
rhai_return_rust_type: heromodels::models::access::Access
);
register_authorized_delete_by_id_fn!(
module: &mut module,
rhai_fn_name: "delete_access",
resource_type_str: "Access",
rhai_return_rust_type: heromodels::models::access::Access
);
engine.register_global_module(module.into());
// Create a module for database functions
let mut db_module = Module::new();
// let db_clone_set_access = db.clone();
// db_module.set_native_fn(
// "save_access",
// move |access: Access| -> Result<Access, Box<EvalAltResult>> {
// // Use the Collection trait method directly
// let result = db_clone_set_access.set(&access).map_err(|e| {
// Box::new(EvalAltResult::ErrorRuntime(
// format!("DB Error set_access: {}", e).into(),
// Position::NONE,
// ))
// })?;
// // Return the updated access with the correct ID
// Ok(result.1)
// },
// );
// Manually register database functions as they need to capture 'db'
let db_clone_delete_access = db.clone();
db_module.set_native_fn(
"delete_access",
move |access: Access| -> Result<(), Box<EvalAltResult>> {
// Use the Collection trait method directly
let result = db_clone_delete_access
.collection::<Access>()
.expect("can open access collection")
.delete_by_id(access.base_data.id)
.expect("can delete event");
// Return the updated event with the correct ID
Ok(result)
},
);
let db_clone_get_access = db.clone();
db_module.set_native_fn(
"get_access_by_id",
move |id_i64: INT| -> Result<Access, Box<EvalAltResult>> {
let id_u32 = id_from_i64_to_u32(id_i64)?;
// Use the Collection trait method directly
db_clone_get_access
.get_by_id(id_u32)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error get_access_by_id: {}", e).into(),
Position::NONE,
))
})?
.ok_or_else(|| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Access with ID {} not found", id_u32).into(),
Position::NONE,
))
})
},
);
// Add list_accesss function to get all accesss
let db_clone_list_accesss = db.clone();
db_module.set_native_fn(
"list_accesss",
move || -> Result<Dynamic, Box<EvalAltResult>> {
let collection = db_clone_list_accesss.collection::<Access>().map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Failed to get access collection: {:?}", e).into(),
Position::NONE,
))
})?;
let accesss = collection.get_all().map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Failed to get all accesss: {:?}", e).into(),
Position::NONE,
))
})?;
let mut array = Array::new();
for access in accesss {
array.push(Dynamic::from(access));
}
Ok(Dynamic::from(array))
},
);
// Register the database module globally
engine.register_global_module(db_module.into());
println!("Successfully registered access Rhai module using export_module approach.");
}

View File

@ -1,6 +1,16 @@
use rhai::Engine;
pub mod library;
pub mod access;
pub mod object;
pub use macros::register_authorized_get_by_id_fn;
pub use macros::register_authorized_list_fn;
pub use macros::id_from_i64_to_u32;
pub use macros::id_from_i64_to_u32;
/// Register all Rhai modules with the engine
pub fn register_dsl_modules(engine: &mut Engine) {
access::register_access_rhai_module(engine);
library::register_library_rhai_module(engine);
object::register_object_rhai_module(engine);
println!("Rhailib Domain Specific Language modules registered successfully.");
}

View File

@ -1,3 +1,5 @@
use derive::FromVec;
use macros::{register_authorized_create_by_id_fn, register_authorized_delete_by_id_fn, register_authorized_get_by_id_fn, register_authorized_list_fn};
use rhai::plugin::*;
use rhai::{CustomType, Dynamic, Engine, EvalAltResult, Module, Position, TypeBuilder};
use serde::Serialize;
@ -6,7 +8,6 @@ use serde_json;
use std::mem;
use std::sync::Arc;
use heromodels::models::library::collection::Collection as RhaiCollection;
use heromodels::models::library::items::{
Book as RhaiBook, Image as RhaiImage, Markdown as RhaiMarkdown, Pdf as RhaiPdf,
@ -15,18 +16,6 @@ use heromodels::models::library::items::{
use heromodels::db::Collection as DbCollectionTrait;
use heromodels::db::hero::OurDB;
// Helper to convert i64 from Rhai to u32 for IDs
fn id_from_i64_to_u32(id_i64: i64) -> Result<u32, Box<EvalAltResult>> {
u32::try_from(id_i64).map_err(|_| {
Box::new(EvalAltResult::ErrorMismatchDataType(
"u32".to_string(), // Expected type
format!("i64 value ({}) that cannot be represented as u32", id_i64), // Actual type/value description
Position::NONE,
))
})
}
/// Registers a `.json()` method for any type `T` that implements the required traits.
fn register_json_method<T>(engine: &mut Engine)
where
@ -45,10 +34,34 @@ where
}
// Wrapper type for a list of collections to enable .json() method via register_json_method
#[derive(Debug, Clone, Serialize, CustomType)]
#[derive(Debug, Clone, Serialize, CustomType, FromVec)]
#[rhai_type(name = "CollectionArray")]
pub struct RhaiCollectionArray(pub Vec<RhaiCollection>);
#[derive(Debug, Clone, Serialize, CustomType, FromVec)]
#[rhai_type(name = "ImageArray")]
pub struct RhaiImageArray(pub Vec<RhaiImage>);
#[derive(Debug, Clone, Serialize, CustomType, FromVec)]
#[rhai_type(name = "PdfArray")]
pub struct RhaiPdfArray(pub Vec<RhaiPdf>);
#[derive(Debug, Clone, Serialize, CustomType, FromVec)]
#[rhai_type(name = "MarkdownArray")]
pub struct RhaiMarkdownArray(pub Vec<RhaiMarkdown>);
#[derive(Debug, Clone, Serialize, CustomType, FromVec)]
#[rhai_type(name = "BookArray")]
pub struct RhaiBookArray(pub Vec<RhaiBook>);
#[derive(Debug, Clone, Serialize, CustomType, FromVec)]
#[rhai_type(name = "SlidesArray")]
pub struct RhaiSlidesArray(pub Vec<RhaiSlides>);
#[derive(Debug, Clone, Serialize, CustomType, FromVec)]
#[rhai_type(name = "TocEntryArray")]
pub struct RhaiTocEntryArray(pub Vec<RhaiTocEntry>);
#[export_module]
mod rhai_library_module {
// --- Collection Functions ---
@ -82,7 +95,7 @@ mod rhai_library_module {
collection: &mut RhaiCollection,
image_id: i64,
) -> Result<RhaiCollection, Box<EvalAltResult>> {
let id = id_from_i64_to_u32(image_id)?;
let id = macros::id_from_i64_to_u32(image_id)?;
let owned = mem::take(collection);
*collection = owned.add_image(id);
Ok(collection.clone())
@ -93,7 +106,7 @@ mod rhai_library_module {
collection: &mut RhaiCollection,
pdf_id: i64,
) -> Result<RhaiCollection, Box<EvalAltResult>> {
let id = id_from_i64_to_u32(pdf_id)?;
let id = macros::id_from_i64_to_u32(pdf_id)?;
let owned = mem::take(collection);
*collection = owned.add_pdf(id);
Ok(collection.clone())
@ -104,7 +117,7 @@ mod rhai_library_module {
collection: &mut RhaiCollection,
markdown_id: i64,
) -> Result<RhaiCollection, Box<EvalAltResult>> {
let id = id_from_i64_to_u32(markdown_id)?;
let id = macros::id_from_i64_to_u32(markdown_id)?;
let owned = mem::take(collection);
*collection = owned.add_markdown(id);
Ok(collection.clone())
@ -115,7 +128,7 @@ mod rhai_library_module {
collection: &mut RhaiCollection,
book_id: i64,
) -> Result<RhaiCollection, Box<EvalAltResult>> {
let id = id_from_i64_to_u32(book_id)?;
let id = macros::id_from_i64_to_u32(book_id)?;
let owned = mem::take(collection);
*collection = owned.add_book(id);
Ok(collection.clone())
@ -126,7 +139,7 @@ mod rhai_library_module {
collection: &mut RhaiCollection,
slides_id: i64,
) -> Result<RhaiCollection, Box<EvalAltResult>> {
let id = id_from_i64_to_u32(slides_id)?;
let id = macros::id_from_i64_to_u32(slides_id)?;
let owned = mem::take(collection);
*collection = owned.add_slides(id);
Ok(collection.clone())
@ -573,6 +586,11 @@ mod rhai_library_module {
RhaiSlides::new()
}
#[rhai_fn(get = "id", pure)]
pub fn get_slides_id(slides: &mut RhaiSlides) -> i64 {
slides.base_data.id as i64
}
#[rhai_fn(name = "title", return_raw, global, pure)]
pub fn slides_title(
slides: &mut RhaiSlides,
@ -615,11 +633,6 @@ mod rhai_library_module {
Ok(slides.clone())
}
#[rhai_fn(get = "id", pure)]
pub fn get_slides_id(slides: &mut RhaiSlides) -> i64 {
slides.base_data.id as i64
}
#[rhai_fn(get = "created_at", pure)]
pub fn get_slides_created_at(slides: &mut RhaiSlides) -> i64 {
slides.base_data.created_at
@ -651,11 +664,8 @@ mod rhai_library_module {
}
}
pub fn register_library_rhai_module(engine: &mut Engine, db: Arc<OurDB>) {
let module = exported_module!(rhai_library_module);
engine.register_global_module(module.into());
let mut db_module = Module::new();
pub fn register_library_rhai_module(engine: &mut Engine) {
let mut module = exported_module!(rhai_library_module);
register_json_method::<RhaiCollection>(engine);
register_json_method::<RhaiImage>(engine);
@ -664,368 +674,174 @@ pub fn register_library_rhai_module(engine: &mut Engine, db: Arc<OurDB>) {
register_json_method::<RhaiBook>(engine);
register_json_method::<RhaiSlides>(engine);
register_json_method::<RhaiTocEntry>(engine);
// Register .json() method for our custom CollectionArray type
register_json_method::<RhaiCollectionArray>(engine);
// --- Collection DB Functions ---
let db_clone = db.clone();
db_module.set_native_fn(
"save_collection",
move |collection: RhaiCollection| -> Result<RhaiCollection, Box<EvalAltResult>> {
let result = db_clone.set(&collection).map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?;
Ok(result.1)
},
register_authorized_create_by_id_fn!(
module: &mut module,
rhai_fn_name: "save_collection",
resource_type_str: "Collection",
rhai_return_rust_type: heromodels::models::library::collection::Collection
);
let db_clone = db.clone();
db_module.set_native_fn(
"get_collection",
move |id: i64| -> Result<RhaiCollection, Box<EvalAltResult>> {
let collection_id = id_from_i64_to_u32(id)?;
db_clone
.get_by_id(collection_id)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?
.ok_or_else(|| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Collection with ID {} not found", collection_id).into(),
Position::NONE,
))
})
},
register_authorized_get_by_id_fn!(
module: &mut module,
rhai_fn_name: "get_collection",
resource_type_str: "Collection",
rhai_return_rust_type: heromodels::models::library::collection::Collection
);
let db_clone_list_collections = db.clone();
db_module.set_native_fn(
"list_collections",
move || -> Result<RhaiCollectionArray, Box<EvalAltResult>> {
let collections_vec: Vec<RhaiCollection> = db_clone_list_collections
.collection::<RhaiCollection>()
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error (list_collections - access): {:?}", e).into(),
Position::NONE,
))
})?
.get_all()
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error (list_collections - get_all): {:?}", e).into(),
Position::NONE,
))
})?;
Ok(RhaiCollectionArray(collections_vec)) // Wrap in RhaiCollectionArray
},
register_authorized_delete_by_id_fn!(
module: &mut module,
rhai_fn_name: "delete_collection",
resource_type_str: "Collection",
rhai_return_rust_type: heromodels::models::library::collection::Collection
);
let db_clone = db.clone();
db_module.set_native_fn(
"delete_collection",
move |id: i64| -> Result<(), Box<EvalAltResult>> {
let collection_id = id_from_i64_to_u32(id)?;
db_clone
.collection::<RhaiCollection>()
.unwrap()
.delete_by_id(collection_id)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?;
Ok(())
},
register_authorized_list_fn!(
module: &mut module,
rhai_fn_name: "list_collections",
resource_type_str: "Collection",
rhai_return_rust_type: heromodels::models::library::collection::Collection,
rhai_return_wrapper_type: RhaiCollectionArray
);
// --- Image DB Functions ---
let db_clone = db.clone();
db_module.set_native_fn(
"save_image",
move |image: RhaiImage| -> Result<RhaiImage, Box<EvalAltResult>> {
let result = db_clone.set(&image).map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?;
Ok(result.1)
},
register_authorized_create_by_id_fn!(
module: &mut module,
rhai_fn_name: "save_image",
resource_type_str: "Image",
rhai_return_rust_type: heromodels::models::library::items::Image
);
let db_clone = db.clone();
db_module.set_native_fn(
"get_image",
move |id: i64| -> Result<RhaiImage, Box<EvalAltResult>> {
let image_id = id_from_i64_to_u32(id)?;
db_clone
.get_by_id(image_id)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?
.ok_or_else(|| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Image with ID {} not found", image_id).into(),
Position::NONE,
))
})
},
register_authorized_get_by_id_fn!(
module: &mut module,
rhai_fn_name: "get_image",
resource_type_str: "Image",
rhai_return_rust_type: heromodels::models::library::items::Image
);
let db_clone = db.clone();
db_module.set_native_fn(
"delete_image",
move |id: i64| -> Result<(), Box<EvalAltResult>> {
let image_id = id_from_i64_to_u32(id)?;
db_clone
.collection::<RhaiImage>()
.unwrap()
.delete_by_id(image_id)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?;
Ok(())
},
register_authorized_delete_by_id_fn!(
module: &mut module,
rhai_fn_name: "delete_image",
resource_type_str: "Image",
rhai_return_rust_type: heromodels::models::library::items::Image
);
// --- Pdf DB Functions ---
let db_clone = db.clone();
db_module.set_native_fn(
"save_pdf",
move |pdf: RhaiPdf| -> Result<RhaiPdf, Box<EvalAltResult>> {
let result = db_clone.set(&pdf).map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?;
Ok(result.1)
},
register_authorized_list_fn!(
module: &mut module,
rhai_fn_name: "list_images",
resource_type_str: "Image",
rhai_return_rust_type: heromodels::models::library::items::Image,
rhai_return_wrapper_type: RhaiImageArray
);
let db_clone = db.clone();
db_module.set_native_fn(
"get_pdf",
move |id: i64| -> Result<RhaiPdf, Box<EvalAltResult>> {
let pdf_id = id_from_i64_to_u32(id)?;
db_clone
.get_by_id(pdf_id)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?
.ok_or_else(|| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Pdf with ID {} not found", pdf_id).into(),
Position::NONE,
))
})
},
register_authorized_create_by_id_fn!(
module: &mut module,
rhai_fn_name: "save_pdf",
resource_type_str: "Pdf",
rhai_return_rust_type: heromodels::models::library::items::Pdf
);
let db_clone = db.clone();
db_module.set_native_fn(
"delete_pdf",
move |id: i64| -> Result<(), Box<EvalAltResult>> {
let pdf_id = id_from_i64_to_u32(id)?;
db_clone
.collection::<RhaiPdf>()
.unwrap()
.delete_by_id(pdf_id)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?;
Ok(())
},
register_authorized_get_by_id_fn!(
module: &mut module,
rhai_fn_name: "get_pdf",
resource_type_str: "Pdf",
rhai_return_rust_type: heromodels::models::library::items::Pdf
);
// --- Markdown DB Functions ---
let db_clone = db.clone();
db_module.set_native_fn(
"save_markdown",
move |markdown: RhaiMarkdown| -> Result<RhaiMarkdown, Box<EvalAltResult>> {
let result = db_clone.set(&markdown).map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?;
Ok(result.1)
},
register_authorized_delete_by_id_fn!(
module: &mut module,
rhai_fn_name: "delete_pdf",
resource_type_str: "Pdf",
rhai_return_rust_type: heromodels::models::library::items::Pdf
);
let db_clone = db.clone();
db_module.set_native_fn(
"get_markdown",
move |id: i64| -> Result<RhaiMarkdown, Box<EvalAltResult>> {
let markdown_id = id_from_i64_to_u32(id)?;
db_clone
.get_by_id(markdown_id)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?
.ok_or_else(|| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Markdown with ID {} not found", markdown_id).into(),
Position::NONE,
))
})
},
register_authorized_list_fn!(
module: &mut module,
rhai_fn_name: "list_pdfs",
resource_type_str: "Pdf",
rhai_return_rust_type: heromodels::models::library::items::Pdf,
rhai_return_wrapper_type: RhaiPdfArray
);
register_authorized_get_by_id_fn!(
module: &mut module,
rhai_fn_name: "get_markdown",
resource_type_str: "Markdown",
rhai_return_rust_type: heromodels::models::library::items::Markdown
);
let db_clone = db.clone();
db_module.set_native_fn(
"delete_markdown",
move |id: i64| -> Result<(), Box<EvalAltResult>> {
let markdown_id = id_from_i64_to_u32(id)?;
db_clone
.collection::<RhaiMarkdown>()
.unwrap()
.delete_by_id(markdown_id)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?;
Ok(())
},
register_authorized_delete_by_id_fn!(
module: &mut module,
rhai_fn_name: "delete_markdown",
resource_type_str: "Markdown",
rhai_return_rust_type: heromodels::models::library::items::Markdown
);
// --- Book DB Functions ---
let db_clone = db.clone();
db_module.set_native_fn(
"save_book",
move |book: RhaiBook| -> Result<RhaiBook, Box<EvalAltResult>> {
let result = db_clone.set(&book).map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?;
Ok(result.1)
},
register_authorized_list_fn!(
module: &mut module,
rhai_fn_name: "list_markdowns",
resource_type_str: "Markdown",
rhai_return_rust_type: heromodels::models::library::items::Markdown,
rhai_return_wrapper_type: RhaiMarkdownArray
);
let db_clone = db.clone();
db_module.set_native_fn(
"get_book",
move |id: i64| -> Result<RhaiBook, Box<EvalAltResult>> {
let book_id = id_from_i64_to_u32(id)?;
db_clone
.get_by_id(book_id)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?
.ok_or_else(|| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Book with ID {} not found", book_id).into(),
Position::NONE,
))
})
},
register_authorized_create_by_id_fn!(
module: &mut module,
rhai_fn_name: "save_book",
resource_type_str: "Book",
rhai_return_rust_type: heromodels::models::library::items::Book
);
let db_clone = db.clone();
db_module.set_native_fn(
"delete_book",
move |id: i64| -> Result<(), Box<EvalAltResult>> {
let book_id = id_from_i64_to_u32(id)?;
db_clone
.collection::<RhaiBook>()
.unwrap()
.delete_by_id(book_id)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?;
Ok(())
},
register_authorized_get_by_id_fn!(
module: &mut module,
rhai_fn_name: "get_book",
resource_type_str: "Book",
rhai_return_rust_type: heromodels::models::library::items::Book
);
// --- Slides DB Functions ---
let db_clone = db.clone();
db_module.set_native_fn(
"save_slides",
move |slides: RhaiSlides| -> Result<RhaiSlides, Box<EvalAltResult>> {
let result = db_clone.set(&slides).map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?;
Ok(result.1)
},
register_authorized_delete_by_id_fn!(
module: &mut module,
rhai_fn_name: "delete_book",
resource_type_str: "Book",
rhai_return_rust_type: heromodels::models::library::items::Book
);
let db_clone = db.clone();
db_module.set_native_fn(
"get_slides",
move |id: i64| -> Result<RhaiSlides, Box<EvalAltResult>> {
let slides_id = id_from_i64_to_u32(id)?;
db_clone
.get_by_id(slides_id)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?
.ok_or_else(|| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Slides with ID {} not found", slides_id).into(),
Position::NONE,
))
})
},
register_authorized_list_fn!(
module: &mut module,
rhai_fn_name: "list_books",
resource_type_str: "Book",
rhai_return_rust_type: heromodels::models::library::items::Book,
rhai_return_wrapper_type: RhaiBookArray
);
let db_clone = db.clone();
db_module.set_native_fn(
"delete_slides",
move |id: i64| -> Result<(), Box<EvalAltResult>> {
let slides_id = id_from_i64_to_u32(id)?;
db_clone
.collection::<RhaiSlides>()
.unwrap()
.delete_by_id(slides_id)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("DB Error: {:?}", e).into(),
Position::NONE,
))
})?;
Ok(())
},
register_authorized_create_by_id_fn!(
module: &mut module,
rhai_fn_name: "save_slides",
resource_type_str: "Slides",
rhai_return_rust_type: heromodels::models::library::items::Slides
);
engine.register_global_module(db_module.into());
register_authorized_get_by_id_fn!(
module: &mut module,
rhai_fn_name: "get_slides",
resource_type_str: "Slides",
rhai_return_rust_type: heromodels::models::library::items::Slides
);
register_authorized_delete_by_id_fn!(
module: &mut module,
rhai_fn_name: "delete_slides",
resource_type_str: "Slides",
rhai_return_rust_type: heromodels::models::library::items::Slides
);
register_authorized_list_fn!(
module: &mut module,
rhai_fn_name: "list_slides",
resource_type_str: "Slides",
rhai_return_rust_type: heromodels::models::library::items::Slides,
rhai_return_wrapper_type: RhaiSlidesArray
);
engine.register_global_module(module.into());
}

67
src/dsl/src/object.rs Normal file
View File

@ -0,0 +1,67 @@
use heromodels::db::Db;
use macros::{register_authorized_create_by_id_fn, register_authorized_get_by_id_fn};
use rhai::plugin::*;
use rhai::{Array, Dynamic, Engine, EvalAltResult, INT, Module, Position};
use std::mem;
use std::sync::Arc;
use heromodels::db::Collection;
use heromodels::models::object::Object;
type RhaiObject = Object;
use heromodels::db::hero::OurDB;
#[export_module]
mod rhai_access_module {
// --- Access Functions ---
#[rhai_fn(name = "new_object", return_raw)]
pub fn new_object() -> Result<RhaiObject, Box<EvalAltResult>> {
let object = Object::new();
Ok(object)
}
#[rhai_fn(name = "id", return_raw, global, pure)]
pub fn object_id(object: &mut RhaiObject) -> Result<i64, Box<EvalAltResult>> {
Ok(object.id() as i64)
}
#[rhai_fn(name = "title", return_raw, global, pure)]
pub fn object_title(
object: &mut RhaiObject,
title: String,
) -> Result<RhaiObject, Box<EvalAltResult>> {
let owned_object = std::mem::take(object);
*object = owned_object.title(title);
Ok(object.clone())
}
/// Sets the access name
#[rhai_fn(name = "description", return_raw, global, pure)]
pub fn object_description(
object: &mut RhaiObject,
description: String,
) -> Result<RhaiObject, Box<EvalAltResult>> {
let owned_object = std::mem::take(object);
*object = owned_object.description(description);
Ok(object.clone())
}
}
pub fn register_object_rhai_module(engine: &mut Engine) {
let mut module = exported_module!(rhai_access_module);
register_authorized_create_by_id_fn!(
module: &mut module,
rhai_fn_name: "save_object",
resource_type_str: "Object",
rhai_return_rust_type: heromodels::models::object::Object
);
register_authorized_get_by_id_fn!(
module: &mut module,
rhai_fn_name: "get_object",
resource_type_str: "Object",
rhai_return_rust_type: heromodels::models::object::Object
);
engine.register_global_module(module.into());
}

View File

@ -1,5 +1,5 @@
[package]
name = "engine"
name = "rhailib_engine"
version = "0.1.0"
edition = "2021"
description = "Central Rhai engine for heromodels"
@ -10,6 +10,7 @@ heromodels = { path = "../../../db/heromodels", features = ["rhai"] }
heromodels_core = { path = "../../../db/heromodels_core" }
chrono = "0.4"
heromodels-derive = { path = "../../../db/heromodels-derive" }
rhailib_dsl = { path = "../dsl" }
[features]
default = ["calendar", "finance"]

View File

@ -5,11 +5,11 @@ use std::sync::Arc;
use heromodels::db::hero::OurDB;
use std::fs; // For file operations
use std::path::Path; // For path handling
use rhailib_dsl;
// Export the mock database module
pub mod mock_db;
pub fn create_heromodels_engine(db: Arc<OurDB>) -> Engine {
pub fn create_heromodels_engine() -> Engine {
let mut engine = Engine::new();
// Configure engine settings
@ -19,43 +19,43 @@ pub fn create_heromodels_engine(db: Arc<OurDB>) -> Engine {
engine.set_max_map_size(10 * 1024); // 10K elements
// Register all heromodels Rhai modules
register_all_modules(&mut engine, db);
rhailib_dsl::register_dsl_modules(&mut engine);
engine
}
/// Register all heromodels Rhai modules with the engine
pub fn register_all_modules(engine: &mut Engine, db: Arc<OurDB>) {
// Register the calendar module if the feature is enabled
heromodels::models::access::register_access_rhai_module(engine, db.clone());
#[cfg(feature = "calendar")]
heromodels::models::calendar::register_calendar_rhai_module(engine, db.clone());
heromodels::models::contact::register_contact_rhai_module(engine, db.clone());
heromodels::models::library::register_library_rhai_module(engine, db.clone());
heromodels::models::circle::register_circle_rhai_module(engine, db.clone());
// /// Register all heromodels Rhai modules with the engine
// pub fn register_all_modules(engine: &mut Engine, db: Arc<OurDB>) {
// // Register the calendar module if the feature is enabled
// heromodels::models::access::register_access_rhai_module(engine, db.clone());
// #[cfg(feature = "calendar")]
// heromodels::models::calendar::register_calendar_rhai_module(engine, db.clone());
// heromodels::models::contact::register_contact_rhai_module(engine, db.clone());
// heromodels::models::library::register_library_rhai_module(engine, db.clone());
// heromodels::models::circle::register_circle_rhai_module(engine, db.clone());
// Register the flow module if the feature is enabled
#[cfg(feature = "flow")]
heromodels::models::flow::register_flow_rhai_module(engine, db.clone());
// // Register the flow module if the feature is enabled
// #[cfg(feature = "flow")]
// heromodels::models::flow::register_flow_rhai_module(engine, db.clone());
// // Register the finance module if the feature is enabled
// #[cfg(feature = "finance")]
// heromodels::models::finance::register_finance_rhai_module(engine, db.clone());
// // // Register the finance module if the feature is enabled
// // #[cfg(feature = "finance")]
// // heromodels::models::finance::register_finance_rhai_module(engine, db.clone());
// Register the legal module if the feature is enabled
#[cfg(feature = "legal")]
heromodels::models::legal::register_legal_rhai_module(engine, db.clone());
// // Register the legal module if the feature is enabled
// #[cfg(feature = "legal")]
// heromodels::models::legal::register_legal_rhai_module(engine, db.clone());
// Register the projects module if the feature is enabled
#[cfg(feature = "projects")]
heromodels::models::projects::register_projects_rhai_module(engine, db.clone());
// // Register the projects module if the feature is enabled
// #[cfg(feature = "projects")]
// heromodels::models::projects::register_projects_rhai_module(engine, db.clone());
// Register the biz module if the feature is enabled
#[cfg(feature = "biz")]
heromodels::models::biz::register_biz_rhai_module(engine, db.clone());
// // Register the biz module if the feature is enabled
// #[cfg(feature = "biz")]
// heromodels::models::biz::register_biz_rhai_module(engine, db.clone());
println!("Heromodels Rhai modules registered successfully.");
}
// println!("Heromodels Rhai modules registered successfully.");
// }
/// Evaluate a Rhai script string
pub fn eval_script(

View File

@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2024"
[dependencies]
rhai = { version = "=1.21.0", features = ["std", "sync", "decimal", "internals"] }
rhai = { version = "1.21.0", features = ["std", "sync", "decimal", "internals"] }
heromodels = { path = "../../../db/heromodels" }
heromodels_core = { path = "../../../db/heromodels_core" }
serde = { version = "1.0", features = ["derive"] }

View File

@ -82,7 +82,7 @@ macro_rules! register_authorized_get_by_id_fn {
) => {
FuncRegistration::new($rhai_fn_name).set_into_module(
$module,
move |context: rhai::NativeCallContext, id_val: i64| -> Result<Option<$rhai_return_rust_type>, Box<EvalAltResult>> {
move |context: rhai::NativeCallContext, id_val: i64| -> Result<$rhai_return_rust_type, Box<EvalAltResult>> {
let actual_id: u32 = $crate::id_from_i64_to_u32(id_val)?;
// Inlined logic to get caller public key
@ -98,8 +98,6 @@ macro_rules! register_authorized_get_by_id_fn {
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'DB_PATH' not found in context tag Map.".into(), context.position())))?;
let db_path = db_path.clone().into_string()?;
println!("DB Path: {}", db_path);
let circle_pk = tag_map.get("CIRCLE_PUBLIC_KEY")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CIRCLE_PUBLIC_KEY' not found in context tag Map.".into(), context.position())))?;
@ -122,37 +120,165 @@ macro_rules! register_authorized_get_by_id_fn {
);
if !has_access {
return Ok(None);
return Err(Box::new(EvalAltResult::ErrorRuntime(
format!("Access denied for public key: {}", caller_pk_str).into(),
context.position(),
)));
}
}
let all_items: Vec<$rhai_return_rust_type> = db
.collection::<$rhai_return_rust_type>()
.map_err(|e| Box::new(EvalAltResult::ErrorRuntime(format!("{:?}", e).into(), Position::NONE)))?
.get_all()
.map_err(|e| Box::new(EvalAltResult::ErrorRuntime(format!("{:?}", e).into(), Position::NONE)))?;
for item in all_items {
println!("{} with ID: {}", $resource_type_str, item.id());
}
println!("Fetching {} with ID: {}", $resource_type_str, actual_id);
let result = db.get_by_id(actual_id).map_err(|e| {
let result = db
.collection::<$rhai_return_rust_type>()
.unwrap()
.get_by_id(actual_id)
.map_err(|e| {
println!("Database error fetching {} with ID: {}", $resource_type_str, actual_id);
Box::new(EvalAltResult::ErrorRuntime(
format!("Database error fetching {}: {:?}", $resource_type_str, e).into(),
context.position(),
))
})?;
println!("Database fetched");
})?
.ok_or_else(|| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Database error fetching {} with ID: {}", $resource_type_str, actual_id).into(),
context.position(),
))
})?;
Ok(result)
},
);
};
}
// Macro to register a Rhai function that retrieves a single resource by its ID, with authorization.
#[macro_export]
macro_rules! register_authorized_create_by_id_fn {
(
module: $module:expr,
rhai_fn_name: $rhai_fn_name:expr, // String literal for the Rhai function name (e.g., "get_collection")
resource_type_str: $resource_type_str:expr, // String literal for the resource type (e.g., "Collection")
rhai_return_rust_type: $rhai_return_rust_type:ty // Rust type of the resource returned (e.g., `RhaiCollection`)
) => {
FuncRegistration::new($rhai_fn_name).set_into_module(
$module,
move |context: rhai::NativeCallContext, object: $rhai_return_rust_type| -> Result<$rhai_return_rust_type, Box<EvalAltResult>> {
// Inlined logic to get caller public key
let tag_map = context
.tag()
.and_then(|tag| tag.read_lock::<rhai::Map>())
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("Context tag must be a Map.".into(), context.position())))?;
let pk_dynamic = tag_map.get("CALLER_PUBLIC_KEY")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CALLER_PUBLIC_KEY' not found in context tag Map.".into(), context.position())))?;
let db_path = tag_map.get("DB_PATH")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'DB_PATH' not found in context tag Map.".into(), context.position())))?;
let db_path = db_path.clone().into_string()?;
let circle_pk = tag_map.get("CIRCLE_PUBLIC_KEY")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CIRCLE_PUBLIC_KEY' not found in context tag Map.".into(), context.position())))?;
let circle_pk = circle_pk.clone().into_string()?;
let db_path = format!("{}/{}", db_path, circle_pk);
let db = Arc::new(OurDB::new(db_path, false).expect("Failed to create DB"));
let caller_pk_str = pk_dynamic.clone().into_string()?;
if circle_pk != caller_pk_str {
// TODO: check if caller pk is member of circle
return Err(Box::new(EvalAltResult::ErrorRuntime(
format!("Insufficient authorization. Caller public key {} does not match circle public key {}", caller_pk_str, circle_pk).into(),
context.position(),
)));
}
let result = db.set(&object).map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Database error creating {}: {:?}", $resource_type_str, e).into(),
context.position(),
))
})?;
Ok(result.1)
},
);
};
}
// Macro to register a Rhai function that retrieves a single resource by its ID, with authorization.
#[macro_export]
macro_rules! register_authorized_delete_by_id_fn {
(
module: $module:expr,
rhai_fn_name: $rhai_fn_name:expr, // String literal for the Rhai function name (e.g., "get_collection")
resource_type_str: $resource_type_str:expr, // String literal for the resource type (e.g., "Collection")
rhai_return_rust_type: $rhai_return_rust_type:ty // Rust type of the resource returned (e.g., `RhaiCollection`)
) => {
FuncRegistration::new($rhai_fn_name).set_into_module(
$module,
move |context: rhai::NativeCallContext, id_val: i64| -> Result<(), Box<EvalAltResult>> {
let actual_id: u32 = $crate::id_from_i64_to_u32(id_val)?;
// Inlined logic to get caller public key
let tag_map = context
.tag()
.and_then(|tag| tag.read_lock::<rhai::Map>())
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("Context tag must be a Map.".into(), context.position())))?;
let pk_dynamic = tag_map.get("CALLER_PUBLIC_KEY")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CALLER_PUBLIC_KEY' not found in context tag Map.".into(), context.position())))?;
let db_path = tag_map.get("DB_PATH")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'DB_PATH' not found in context tag Map.".into(), context.position())))?;
let db_path = db_path.clone().into_string()?;
let circle_pk = tag_map.get("CIRCLE_PUBLIC_KEY")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CIRCLE_PUBLIC_KEY' not found in context tag Map.".into(), context.position())))?;
let circle_pk = circle_pk.clone().into_string()?;
let db_path = format!("{}/{}", db_path, circle_pk);
let db = Arc::new(OurDB::new(db_path, false).expect("Failed to create DB"));
let caller_pk_str = pk_dynamic.clone().into_string()?;
if circle_pk != caller_pk_str {
// Use the standalone can_access_resource function from heromodels
let has_access = heromodels::models::access::access::can_access_resource(
db.clone(),
&caller_pk_str,
actual_id,
$resource_type_str,
);
if !has_access {
return Err(Box::new(EvalAltResult::ErrorRuntime(
format!("Access denied for public key: {}", caller_pk_str).into(),
context.position(),
)));
}
}
let result = db
.collection::<$rhai_return_rust_type>()
.unwrap()
.delete_by_id(actual_id)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Database error deleting {}: {:?}", $resource_type_str, e).into(),
context.position(),
))
})?;
Ok(())
},
);
};
}
/// Macro to register a Rhai function that lists all resources of a certain type, with authorization.
///
/// The macro handles:
@ -164,7 +290,6 @@ macro_rules! register_authorized_get_by_id_fn {
///
/// # Arguments
/// * `module`: Mutable reference to the Rhai `Module`.
/// * `db_clone`: Cloned `Arc<OurDB>` for database access.
/// * `rhai_fn_name`: String literal for the Rhai function name (e.g., "list_collections").
/// * `resource_type_str`: String literal for the resource type (e.g., "Collection"), used in authorization checks.
/// * `rhai_return_rust_type`: Rust type of the resource item (e.g., `RhaiCollection`).
@ -174,16 +299,11 @@ macro_rules! register_authorized_get_by_id_fn {
macro_rules! register_authorized_list_fn {
(
module: $module:expr,
db_clone: $db_instance:expr,
rhai_fn_name: $rhai_fn_name:expr,
resource_type_str: $resource_type_str:expr,
rhai_return_rust_type: $rhai_return_rust_type:ty,
item_id_accessor: $item_id_accessor:ident,
rhai_return_wrapper_type: $rhai_return_wrapper_type:ty
) => {
let db_instance_auth_outer = $db_instance.clone();
let db_instance_fetch = $db_instance.clone();
FuncRegistration::new($rhai_fn_name).set_into_module(
$module,
move |context: rhai::NativeCallContext| -> Result<$rhai_return_wrapper_type, Box<EvalAltResult>> {
@ -198,7 +318,20 @@ macro_rules! register_authorized_list_fn {
let caller_pk_str = pk_dynamic.clone().into_string()?;
let all_items: Vec<$rhai_return_rust_type> = db_instance_fetch
let db_path = tag_map.get("DB_PATH")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'DB_PATH' not found in context tag Map.".into(), context.position())))?;
let db_path = db_path.clone().into_string()?;
let circle_pk = tag_map.get("CIRCLE_PUBLIC_KEY")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CIRCLE_PUBLIC_KEY' not found in context tag Map.".into(), context.position())))?;
let circle_pk = circle_pk.clone().into_string()?;
let db_path = format!("{}/{}", db_path, circle_pk);
let db = Arc::new(OurDB::new(db_path, false).expect("Failed to create DB"));
let all_items: Vec<$rhai_return_rust_type> = db
.collection::<$rhai_return_rust_type>()
.map_err(|e| Box::new(EvalAltResult::ErrorRuntime(format!("{:?}", e).into(), Position::NONE)))?
.get_all()
@ -207,9 +340,9 @@ macro_rules! register_authorized_list_fn {
let authorized_items: Vec<$rhai_return_rust_type> = all_items
.into_iter()
.filter(|item| {
let resource_id = item.$item_id_accessor();
let resource_id = item.id();
heromodels::models::access::access::can_access_resource(
db_instance_auth_outer.clone(),
db.clone(),
&caller_pk_str,
resource_id,
$resource_type_str,
@ -221,4 +354,4 @@ macro_rules! register_authorized_list_fn {
},
);
};
}
}

View File

@ -16,6 +16,6 @@ rhai_client = { path = "../client" }
anyhow = "1.0" # For simpler error handling
rhailib_worker = { path = "../worker", package = "rhailib_worker" }
engine = { path = "../engine" }
rhailib_engine = { path = "../engine" }
heromodels = { path = "../../../db/heromodels", features = ["rhai"] }
rhai = { version = "1.18.0" } # Match version used by worker/engine

View File

@ -25,5 +25,5 @@ clap = { version = "4.4", features = ["derive"] }
uuid = { version = "1.6", features = ["v4", "serde"] } # Though task_id is string, uuid might be useful
chrono = { version = "0.4", features = ["serde"] }
rhai_client = { path = "../client" }
engine = { path = "../engine" }
rhailib_engine = { path = "../engine" }
heromodels = { path = "../../../db/heromodels", features = ["rhai"] }

View File

@ -1,15 +1,14 @@
use chrono::Utc;
use log::{debug, error, info};
use redis::AsyncCommands;
use rhai::{Dynamic, Engine, Scope};
use rhai::{Dynamic, Engine};
use rhai_client::RhaiTaskDetails; // Import for constructing the reply message
use serde_json;
use std::collections::HashMap;
use tokio::sync::mpsc; // For shutdown signal
use tokio::task::JoinHandle; // For serializing the reply message
const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:";
const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:";
const NAMESPACE_PREFIX: &str = "rhailib:";
const BLPOP_TIMEOUT_SECONDS: usize = 5;
// This function updates specific fields in the Redis hash.
@ -21,7 +20,7 @@ async fn update_task_status_in_redis(
output: Option<String>,
error_msg: Option<String>,
) -> redis::RedisResult<()> {
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
let task_key = format!("{}{}", NAMESPACE_PREFIX, task_id);
let mut updates: Vec<(&str, String)> = vec![
("status", status.to_string()),
("updatedAt", Utc::now().timestamp().to_string()),
@ -42,7 +41,6 @@ async fn update_task_status_in_redis(
}
pub fn spawn_rhai_worker(
_circle_id: u32, // For logging or specific logic if needed in the future
circle_public_key: String,
db_path: String,
mut engine: Engine,
@ -51,7 +49,7 @@ pub fn spawn_rhai_worker(
preserve_tasks: bool, // Flag to control task cleanup
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
tokio::spawn(async move {
let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_public_key);
let queue_key = format!("{}{}", NAMESPACE_PREFIX, circle_public_key);
info!(
"Rhai Worker for Circle Public Key '{}' starting. Connecting to Redis at {}. Listening on queue: {}. Waiting for tasks or shutdown signal.",
circle_public_key, redis_url, queue_key
@ -85,97 +83,96 @@ pub fn spawn_rhai_worker(
loop {
let blpop_keys = vec![queue_key.clone()];
tokio::select! {
// Listen for shutdown signal
_ = shutdown_rx.recv() => {
info!("Worker for Circle Public Key '{}': Shutdown signal received. Terminating loop.", circle_public_key.clone());
break;
}
// Listen for tasks from Redis
blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => {
debug!("Worker for Circle Public Key '{}': Attempting BLPOP on queue: {}", circle_public_key.clone(), queue_key);
let response: Option<(String, String)> = match blpop_result {
Ok(resp) => resp,
Err(e) => {
error!("Worker for Circle Public Key '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", circle_public_key, queue_key, e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
}
};
// Listen for shutdown signal
_ = shutdown_rx.recv() => {
info!("Worker for Circle Public Key '{}': Shutdown signal received. Terminating loop.", circle_public_key.clone());
break;
}
// Listen for tasks from Redis
blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => {
debug!("Worker for Circle Public Key '{}': Attempting BLPOP on queue: {}", circle_public_key.clone(), queue_key);
let response: Option<(String, String)> = match blpop_result {
Ok(resp) => resp,
Err(e) => {
error!("Worker for Circle Public Key '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", circle_public_key, queue_key, e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
}
};
if let Some((_queue_name_recv, task_id)) = response {
info!("Worker for Circle Public Key '{}' received task_id: {} from queue: {}", circle_public_key, task_id, _queue_name_recv);
debug!("Worker for Circle Public Key '{}', Task {}: Processing started.", circle_public_key, task_id);
if let Some((_queue_name_recv, task_id)) = response {
info!("Worker for Circle Public Key '{}' received task_id: {} from queue: {}", circle_public_key, task_id, _queue_name_recv);
debug!("Worker for Circle Public Key '{}', Task {}: Processing started.", circle_public_key, task_id);
let task_details_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
debug!("Worker for Circle Public Key '{}', Task {}: Attempting HGETALL from key: {}", circle_public_key, task_id, task_details_key);
let task_details_key = format!("{}{}", NAMESPACE_PREFIX, task_id);
debug!("Worker for Circle Public Key '{}', Task {}: Attempting HGETALL from key: {}", circle_public_key, task_id, task_details_key);
let task_details_map_result: Result<HashMap<String, String>, _> =
redis_conn.hgetall(&task_details_key).await;
let task_details_map_result: Result<HashMap<String, String>, _> =
redis_conn.hgetall(&task_details_key).await;
match task_details_map_result {
Ok(details_map) => {
debug!("Worker for Circle Public Key '{}', Task {}: HGETALL successful. Details: {:?}", circle_public_key, task_id, details_map);
let script_content_opt = details_map.get("script").cloned();
let reply_to_queue_opt = details_map.get("replyToQueue").cloned();
let created_at_str_opt = details_map.get("createdAt").cloned();
let public_key_opt = details_map.get("publicKey").cloned();
match task_details_map_result {
Ok(details_map) => {
debug!("Worker for Circle Public Key '{}', Task {}: HGETALL successful. Details: {:?}", circle_public_key, task_id, details_map);
let script_content_opt = details_map.get("script").cloned();
let created_at_str_opt = details_map.get("createdAt").cloned();
let caller_id = details_map.get("callerId").cloned().expect("callerId field missing from Redis hash");
if let Some(script_content) = script_content_opt {
info!("Worker for Circle Public Key '{}' processing task_id: {}. Script: {:.50}...", circle_public_key, task_id, script_content);
debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to 'processing'.", circle_public_key, task_id);
if let Err(e) = update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to update status to 'processing': {}", circle_public_key, task_id, e);
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Status updated to 'processing'.", circle_public_key, task_id);
}
if let Some(script_content) = script_content_opt {
info!("Worker for Circle Public Key '{}' processing task_id: {}. Script: {:.50}...", circle_public_key, task_id, script_content);
debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to 'processing'.", circle_public_key, task_id);
if let Err(e) = update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to update status to 'processing': {}", circle_public_key, task_id, e);
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Status updated to 'processing'.", circle_public_key, task_id);
}
let mut db_config = rhai::Map::new();
db_config.insert("DB_PATH".into(), db_path.clone().into());
db_config.insert("CALLER_PUBLIC_KEY".into(), public_key_opt.unwrap_or_default().into());
db_config.insert("CIRCLE_PUBLIC_KEY".into(), circle_public_key.clone().into());
engine.set_default_tag(Dynamic::from(db_config)); // Or pass via CallFnOptions
debug!("Worker for Circle Public Key '{}', Task {}: Evaluating script with Rhai engine.", circle_public_key, task_id);
let mut db_config = rhai::Map::new();
db_config.insert("DB_PATH".into(), db_path.clone().into());
db_config.insert("CALLER_PUBLIC_KEY".into(), caller_id.clone().into());
db_config.insert("CIRCLE_PUBLIC_KEY".into(), circle_public_key.clone().into());
engine.set_default_tag(Dynamic::from(db_config)); // Or pass via CallFnOptions
debug!("Worker for Circle Public Key '{}', Task {}: Evaluating script with Rhai engine.", circle_public_key, task_id);
let mut final_status = "error".to_string(); // Default to error
let mut final_output: Option<String> = None;
let mut final_error_msg: Option<String> = None;
let mut final_status = "error".to_string(); // Default to error
let mut final_output: Option<String> = None;
let mut final_error_msg: Option<String> = None;
match engine.eval::<rhai::Dynamic>(&script_content) {
Ok(result) => {
let output_str = if result.is::<String>() {
// If the result is a string, we can unwrap it directly.
// This moves `result`, which is fine because it's the last time we use it in this branch.
result.into_string().unwrap()
} else {
result.to_string()
};
match engine.eval::<rhai::Dynamic>(&script_content) {
Ok(result) => {
let output_str = if result.is::<String>() {
// If the result is a string, we can unwrap it directly.
// This moves `result`, which is fine because it's the last time we use it in this branch.
result.into_string().unwrap()
} else {
result.to_string()
};
info!("Worker for Circle Public Key '{}' task {} completed. Output: {}", circle_public_key, task_id, output_str);
final_status = "completed".to_string();
final_output = Some(output_str);
}
Err(e) => {
let error_str = format!("{:?}", *e);
error!("Worker for Circle Public Key '{}' task {} script evaluation failed. Error: {}", circle_public_key, task_id, error_str);
final_error_msg = Some(error_str);
// final_status remains "error"
Err(e) => {
let error_str = format!("{:?}", *e);
error!("Worker for Circle Public Key '{}' task {} script evaluation failed. Error: {}", circle_public_key, task_id, error_str);
final_error_msg = Some(error_str);
// final_status remains "error"
}
}
}
debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to '{}'.", circle_public_key, task_id, final_status);
if let Err(e) = update_task_status_in_redis(
&mut redis_conn,
&task_id,
&final_status,
final_output.clone(), // Clone for task hash update
final_error_msg.clone(), // Clone for task hash update
).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to update final status to '{}': {}", circle_public_key, task_id, final_status, e);
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Final status updated to '{}'.", circle_public_key, task_id, final_status);
}
debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to '{}'.", circle_public_key, task_id, final_status);
if let Err(e) = update_task_status_in_redis(
&mut redis_conn,
&task_id,
&final_status,
final_output.clone(), // Clone for task hash update
final_error_msg.clone(), // Clone for task hash update
).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to update final status to '{}': {}", circle_public_key, task_id, final_status, e);
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Final status updated to '{}'.", circle_public_key, task_id, final_status);
}
// Send to reply queue if specified
// Send to reply queue if specified
if let Some(reply_q) = reply_to_queue_opt {
let created_at = created_at_str_opt
.and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
.map(|dt| dt.with_timezone(&Utc))
@ -185,45 +182,43 @@ pub fn spawn_rhai_worker(
task_id: task_id.to_string(), // Add the task_id
script: script_content.clone(), // Include script for context in reply
status: final_status, // The final status
// client_rpc_id is no longer a field
output: final_output, // The final output
error: final_error_msg, // The final error
created_at, // Original creation time
updated_at: Utc::now(), // Time of this final update/reply
// reply_to_queue is no longer a field
public_key: public_key_opt.clone(),
caller_id: caller_id.clone(),
};
let reply_queue_key = format!("{}:reply:{}", NAMESPACE_PREFIX, task_id);
match serde_json::to_string(&reply_details) {
Ok(reply_json) => {
let lpush_result: redis::RedisResult<i64> = redis_conn.lpush(&reply_q, &reply_json).await;
let lpush_result: redis::RedisResult<i64> = redis_conn.lpush(&reply_queue_key, &reply_json).await;
match lpush_result {
Ok(_) => debug!("Worker for Circle Public Key '{}', Task {}: Successfully sent result to reply queue {}", circle_public_key, task_id, reply_q),
Err(e_lpush) => error!("Worker for Circle Public Key '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", circle_public_key, task_id, reply_q, e_lpush),
Ok(_) => debug!("Worker for Circle Public Key '{}', Task {}: Successfully sent result to reply queue {}", circle_public_key, task_id, reply_queue_key),
Err(e_lpush) => error!("Worker for Circle Public Key '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", circle_public_key, task_id, reply_queue_key, e_lpush),
}
}
Err(e_json) => {
error!("Worker for Circle Public Key '{}', Task {}: Failed to serialize reply details for queue {}: {}", circle_public_key, task_id, reply_q, e_json);
error!("Worker for Circle Public Key '{}', Task {}: Failed to serialize reply details for queue {}: {}", circle_public_key, task_id, reply_queue_key, e_json);
}
}
}
// Clean up task details based on preserve_tasks flag
if !preserve_tasks {
// The worker is responsible for cleaning up the task details hash.
if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to delete task details key '{}': {}", circle_public_key, task_id, task_details_key, e);
// Clean up task details based on preserve_tasks flag
if !preserve_tasks {
// The worker is responsible for cleaning up the task details hash.
if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to delete task details key '{}': {}", circle_public_key, task_id, task_details_key, e);
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Cleaned up task details key '{}'.", circle_public_key, task_id, task_details_key);
}
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Cleaned up task details key '{}'.", circle_public_key, task_id, task_details_key);
debug!("Worker for Circle Public Key '{}', Task {}: Preserving task details (preserve_tasks=true)", circle_public_key, task_id);
}
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Preserving task details (preserve_tasks=true)", circle_public_key, task_id);
}
} else { // Script content not found in hash
error!(
"Worker for Circle Public Key '{}', Task {}: Script content not found in Redis hash. Details map: {:?}",
circle_public_key, task_id, details_map
);
// Clean up invalid task details based on preserve_tasks flag
if !preserve_tasks {
} else { // Script content not found in hash
error!(
"Worker for Circle Public Key '{}', Task {}: Script content not found in Redis hash. Details map: {:?}",
circle_public_key, task_id, details_map
);
// Clean up invalid task details based on preserve_tasks flag
if !preserve_tasks {
// Even if the script is not found, the worker should clean up the invalid task hash.
if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to delete invalid task details key '{}': {}", circle_public_key, task_id, task_details_key, e);
@ -233,23 +228,23 @@ pub fn spawn_rhai_worker(
}
}
}
Err(e) => {
error!(
"Worker for Circle Public Key '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}",
circle_public_key, task_id, task_details_key, e
);
}
Err(e) => {
error!(
"Worker for Circle Public Key '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}",
circle_public_key, task_id, task_details_key, e
);
}
} else {
debug!("Worker for Circle Public Key '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", &circle_public_key, &queue_key);
}
} // End of blpop_result match
} // End of tokio::select!
}
} else {
debug!("Worker for Circle Public Key '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", &circle_public_key, &queue_key);
}
} // End of blpop_result match
} // End of tokio::select!
} // End of loop
info!(
"Worker for Circle Public Key '{}' has shut down.",
circle_public_key
);
Ok(())
Ok(())
})
}