rename rhai client to dispatcher

This commit is contained in:
Timur Gordon 2025-07-09 23:39:48 +02:00
parent d059af9a18
commit 29ff40d1a4
19 changed files with 205 additions and 136 deletions

35
Cargo.lock generated
View File

@ -695,7 +695,7 @@ dependencies = [
"rand 0.8.5",
"redis 0.23.3",
"redis 0.25.4",
"rhai_client",
"rhai_dispatcher",
"rhailib_engine",
"rhailib_worker",
"rustls",
@ -2834,21 +2834,6 @@ dependencies = [
"thin-vec",
]
[[package]]
name = "rhai_client"
version = "0.1.0"
dependencies = [
"chrono",
"clap",
"env_logger",
"log",
"redis 0.25.4",
"serde",
"serde_json",
"tokio",
"uuid",
]
[[package]]
name = "rhai_client_macros"
version = "0.1.0"
@ -2870,6 +2855,21 @@ dependencies = [
"syn 2.0.103",
]
[[package]]
name = "rhai_dispatcher"
version = "0.1.0"
dependencies = [
"chrono",
"clap",
"env_logger",
"log",
"redis 0.25.4",
"serde",
"serde_json",
"tokio",
"uuid",
]
[[package]]
name = "rhailib_dsl"
version = "0.1.0"
@ -2883,6 +2883,7 @@ dependencies = [
"macros",
"reqwest",
"rhai",
"rhai_dispatcher",
"serde",
"serde_json",
"tokio",
@ -2911,7 +2912,7 @@ dependencies = [
"log",
"redis 0.25.4",
"rhai",
"rhai_client",
"rhai_dispatcher",
"rhailib_engine",
"serde",
"serde_json",

View File

@ -1,5 +1,5 @@
use clap::Parser;
use rhai_client::{RhaiClient, RhaiClientBuilder};
use rhai_dispatcher::{RhaiDispatcher, RhaiDispatcherBuilder};
use log::{error, info};
use std::io::{self, Write};
use std::time::Duration;
@ -46,8 +46,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure logging based on verbosity level
let log_config = match args.verbose {
0 => "warn,circles_client=info,rhai_client=info",
1 => "info,circles_client=debug,rhai_client=debug",
0 => "warn,circles_client=info,rhai_dispatcher=info",
1 => "info,circles_client=debug,rhai_dispatcher=debug",
2 => "debug",
_ => "trace",
};
@ -68,7 +68,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!();
// Create the Rhai client
let client = RhaiClientBuilder::new()
let client = RhaiDispatcherBuilder::new()
.caller_id(&args.caller_public_key)
.redis_url(&args.redis_url)
.build()?;
@ -97,7 +97,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
async fn execute_script(
client: &RhaiClient,
client: &RhaiDispatcher,
worker_key: &str,
script: String,
timeout_secs: u64,
@ -134,7 +134,7 @@ async fn execute_script(
}
async fn run_interactive_mode(
client: &RhaiClient,
client: &RhaiDispatcher,
worker_key: &str,
timeout_secs: u64,
) -> Result<(), Box<dyn std::error::Error>> {

View File

@ -160,7 +160,7 @@ Modify `setup_and_spawn_circles` to:
Update `handle_play` to route to correct worker:
```rust
// Use circle_public_key from URL path for worker routing
rhai_client
rhai_dispatcher
.new_play_request()
.recipient_id(&self.circle_public_key) // From URL path
.script_path(&script_content)

View File

@ -595,7 +595,7 @@ dependencies = [
"once_cell",
"rand 0.8.5",
"redis",
"rhai_client",
"rhai_dispatcher",
"rustls",
"rustls-pemfile",
"secp256k1",
@ -1849,7 +1849,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "rhai_client"
name = "rhai_dispatcher"
version = "0.1.0"
dependencies = [
"chrono",

View File

@ -521,7 +521,7 @@ dependencies = [
"env_logger",
"log",
"redis",
"rhai_client",
"rhai_dispatcher",
"serde",
"serde_json",
"tokio",
@ -1669,7 +1669,7 @@ dependencies = [
]
[[package]]
name = "rhai_client"
name = "rhai_dispatcher"
version = "0.1.0"
dependencies = [
"chrono",
@ -2514,7 +2514,7 @@ dependencies = [
"log",
"redis",
"rhai",
"rhai_client",
"rhai_dispatcher",
"serde",
"serde_json",
"tokio",

View File

@ -28,7 +28,7 @@ rhai = "1.18.0"
heromodels = { path = "../../../db/heromodels" }
rhailib_engine = { path = "../../../rhailib/src/engine" }
rhailib_worker = { path = "../../../rhailib/src/worker" }
rhai_client = { path = "../../../rhailib/src/client" }
rhai_dispatcher = { path = "../../../rhailib/src/dispatcher" }
ourdb = { path = "../../../db/ourdb" } # Added for IdSequence
sal-service-manager = { path = "../../../sal/service_manager" }
tokio-tungstenite = "0.23"

View File

@ -128,7 +128,7 @@ When a circle configuration includes an initialization script:
1. Worker starts and connects to Redis
2. Launcher waits 2 seconds for worker startup
3. Launcher sends script content via RhaiClient to worker's queue
3. Launcher sends script content via RhaiDispatcher to worker's queue
4. Worker executes the initialization script
## Configuration

View File

@ -42,7 +42,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Wait a moment for the launcher to start services
tokio::time::sleep(Duration::from_secs(5)).await;
let client = rhai_client::RhaiClientBuilder::new()
let client = rhai_dispatcher::RhaiDispatcherBuilder::new()
.redis_url(REDIS_URL)
.caller_id("test_launcher")
.build()?;
@ -78,7 +78,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;
println!("Received task details: {:?}", task_details_caller_pk);
assert_eq!(task_details_caller_pk.status, "completed");
// The caller should be "launcher" as set in the RhaiClient
// The caller should be "launcher" as set in the RhaiDispatcher
println!("✅ SUCCESS: Worker correctly reported CALLER_PUBLIC_KEY for init script.");
// Test 3: Simple script execution

View File

@ -1,5 +1,5 @@
use log::{info, debug};
use rhai_client::RhaiClientBuilder;
use rhai_dispatcher::RhaiDispatcherBuilder;
use sal_service_manager::{ServiceConfig as ServiceManagerConfig, ServiceStatus};
use std::sync::{Arc, Mutex};
@ -217,8 +217,8 @@ async fn send_init_script_to_worker(
) -> Result<(), Box<dyn std::error::Error>> {
println!("Sending initialization script '{}' to worker for circle: {}", init_script, public_key);
// Create RhaiClient and send script
let client = RhaiClientBuilder::new()
// Create RhaiDispatcher and send script
let client = RhaiDispatcherBuilder::new()
.redis_url(redis_url)
.caller_id("launcher")
.build()?;

View File

@ -595,7 +595,7 @@ dependencies = [
"log",
"once_cell",
"redis",
"rhai_client",
"rhai_dispatcher",
"rustls",
"rustls-pemfile",
"serde",
@ -1765,7 +1765,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "rhai_client"
name = "rhai_dispatcher"
version = "0.1.0"
dependencies = [
"chrono",

4
src/server/Cargo.lock generated
View File

@ -584,7 +584,7 @@ dependencies = [
"once_cell",
"rand 0.8.5",
"redis",
"rhai_client",
"rhai_dispatcher",
"rustls",
"rustls-pemfile",
"secp256k1",
@ -1769,7 +1769,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "rhai_client"
name = "rhai_dispatcher"
version = "0.1.0"
dependencies = [
"chrono",

View File

@ -44,7 +44,7 @@ redis = { workspace = true }
uuid = { workspace = true }
tokio = { workspace = true }
chrono = { workspace = true }
rhai_client = { path = "../../../rhailib/src/client" } # Corrected relative path
rhai_dispatcher = { path = "../../../rhailib/src/dispatcher" } # Corrected relative path
thiserror = { workspace = true }
heromodels = { path = "../../../db/heromodels" }

View File

@ -37,6 +37,9 @@ struct Args {
#[clap(long, help = "Enable webhook handling")]
webhooks: bool,
#[clap(long, value_parser, help = "Worker ID for the server")]
worker_id: String,
}
#[actix_web::main]
@ -90,17 +93,35 @@ async fn main() -> std::io::Result<()> {
std::process::exit(1);
}
let config = ServerConfig {
host: args.host,
port: args.port,
redis_url: args.redis_url,
enable_auth: args.auth,
enable_tls: args.tls,
cert_path: args.cert,
key_path: args.key,
tls_port: args.tls_port,
enable_webhooks: args.webhooks,
};
let mut builder = ServerConfig::builder(
args.host,
args.port,
args.redis_url,
args.worker_id,
);
if args.auth {
builder = builder.with_auth();
}
if args.webhooks {
builder = builder.with_webhooks();
}
if args.tls {
if let (Some(cert), Some(key)) = (args.cert, args.key) {
builder = builder.with_tls(cert, key);
} else {
eprintln!("Error: TLS is enabled but --cert or --key is missing.");
std::process::exit(1);
}
}
if let Some(tls_port) = args.tls_port {
builder = builder.with_tls_port(tls_port);
}
let config = builder.build();
println!("🚀 Starting Circles WebSocket Server");
println!("📋 Configuration:");

View File

@ -90,7 +90,7 @@ sequenceDiagram
participant HS as HttpServer
participant WH as Webhook Handler
participant WV as Webhook Verifier
participant RC as RhaiClient
participant RC as RhaiDispatcher
participant Redis as Redis
WS->>+HS: POST /webhooks/{provider}/{circle_pk}
@ -102,7 +102,7 @@ sequenceDiagram
alt Signature Valid
WH->>WH: Parse webhook payload (heromodels types)
WH->>+RC: Create RhaiClient with caller_id
WH->>+RC: Create RhaiDispatcher with caller_id
RC->>+Redis: Execute webhook script
Redis-->>-RC: Script result
RC-->>-WH: Execution result
@ -128,6 +128,6 @@ sequenceDiagram
| **Connection Type** | Persistent, bidirectional | HTTP request/response |
| **Authentication** | secp256k1 signature-based | HMAC signature verification |
| **State Management** | Stateful sessions via CircleWs actor | Stateless HTTP requests |
| **Script Execution** | Direct via authenticated session | Via RhaiClient with provider caller_id |
| **Script Execution** | Direct via authenticated session | Via RhaiDispatcher with provider caller_id |
| **Use Case** | Interactive client applications | External service notifications |
| **Data Types** | JSON-RPC messages | Provider-specific webhook payloads (heromodels) |

View File

@ -20,7 +20,7 @@ graph TB
F[Stripe Verifier]
G[iDenfy Verifier]
H[Script Dispatcher]
I[RhaiClientBuilder]
I[RhaiDispatcherBuilder]
end
subgraph "Configuration"
@ -93,7 +93,7 @@ sequenceDiagram
participant CS as Circle Server
participant WV as Webhook Verifier
participant SD as Script Dispatcher
participant RC as RhaiClient
participant RC as RhaiDispatcher
participant RW as Rhai Worker
WS->>CS: POST /webhooks/stripe/{circle_pk}
@ -113,7 +113,7 @@ sequenceDiagram
alt Verification Success
CS->>SD: Dispatch appropriate script
SD->>RC: Create RhaiClientBuilder
SD->>RC: Create RhaiDispatcherBuilder
RC->>RC: Set caller_id="stripe" or "idenfy"
RC->>RC: Set recipient_id=circle_pk
RC->>RC: Set script="stripe_webhook_received" or "idenfy_webhook_received"
@ -249,7 +249,7 @@ heromodels/src/models/
- **Type Organization**: Webhook payload types moved to `heromodels` library for reusability
- **Modular Handlers**: Separate handler files for each webhook provider
- **Simplified Architecture**: Removed unnecessary dispatcher complexity
- **Direct Script Execution**: Handlers directly use `RhaiClient` for script execution
- **Direct Script Execution**: Handlers directly use `RhaiDispatcher` for script execution
### Modified Files
- `src/lib.rs` - Add webhook routes and module imports

View File

@ -3,7 +3,7 @@ use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use log::{debug, info, error}; // Added error for better logging
use once_cell::sync::Lazy;
use rhai_client::{RhaiClientBuilder, RhaiClientError};
use rhai_dispatcher::{RhaiDispatcherBuilder, RhaiDispatcherError};
use rustls::pki_types::PrivateKeyDer;
use rustls::ServerConfig as RustlsServerConfig;
use rustls_pemfile::{certs, pkcs8_private_keys};
@ -106,6 +106,7 @@ struct CircleWs {
nonce_store: HashMap<String, NonceResponse>,
auth_enabled_on_server: bool,
authenticated_pubkey: Option<String>,
circle_worker_id: String,
}
impl CircleWs {
@ -114,6 +115,7 @@ impl CircleWs {
server_circle_public_key: String,
redis_url_for_client: String,
auth_enabled_on_server: bool,
circle_worker_id: String,
) -> Self {
Self {
server_circle_name,
@ -122,6 +124,7 @@ impl CircleWs {
nonce_store: HashMap::new(),
auth_enabled_on_server,
authenticated_pubkey: None,
circle_worker_id,
}
}
@ -284,17 +287,19 @@ impl CircleWs {
let redis_url_clone = self.redis_url_for_client.clone();
let _rpc_id_clone = client_rpc_id.clone();
let public_key = self.authenticated_pubkey.clone();
let worker_id_clone = self.circle_worker_id.clone();
let fut = async move {
let caller_id = public_key.unwrap_or_else(|| "anonymous".to_string());
match RhaiClientBuilder::new()
match RhaiDispatcherBuilder::new()
.redis_url(&redis_url_clone)
.caller_id(&caller_id)
.build() {
Ok(rhai_client) => {
rhai_client
Ok(rhai_dispatcher) => {
rhai_dispatcher
.new_play_request()
.recipient_id(&circle_pk_clone)
.context_id(&circle_pk_clone)
.worker_id(&worker_id_clone)
.script(&script_content)
.timeout(TASK_TIMEOUT_DURATION)
.await_response()
@ -339,7 +344,7 @@ impl CircleWs {
}
Err(e) => {
let (code, message) = match e {
RhaiClientError::Timeout(task_id) => (
RhaiDispatcherError::Timeout(task_id) => (
-32002,
format!(
"Timeout waiting for Rhai script (task: {})",
@ -490,58 +495,23 @@ pub struct ServerConfig {
pub host: String,
pub port: u16,
pub redis_url: String,
pub enable_auth: bool,
pub enable_tls: bool,
pub cert_path: Option<String>,
pub key_path: Option<String>,
pub tls_port: Option<u16>,
pub enable_auth: bool,
pub enable_webhooks: bool,
pub circle_worker_id: String,
}
impl ServerConfig {
/// Create a new server configuration with TLS disabled
pub fn new(
pub fn builder(
host: String,
port: u16,
redis_url: String,
) -> Self {
Self {
host,
port,
redis_url,
enable_auth: false,
enable_tls: false,
cert_path: None,
key_path: None,
tls_port: None,
enable_webhooks: false,
}
}
/// Enable TLS with certificate and key paths
pub fn with_tls(mut self, cert_path: String, key_path: String) -> Self {
self.enable_tls = true;
self.cert_path = Some(cert_path);
self.key_path = Some(key_path);
self
}
/// Set a separate port for TLS connections
pub fn with_tls_port(mut self, tls_port: u16) -> Self {
self.tls_port = Some(tls_port);
self
}
/// Enable authentication
pub fn with_auth(mut self) -> Self {
self.enable_auth = true;
self
}
/// Enable webhooks
pub fn with_webhooks(mut self) -> Self {
self.enable_webhooks = true;
self
worker_id: String,
) -> ServerConfigBuilder {
ServerConfigBuilder::new(host, port, redis_url, worker_id)
}
/// Get the effective port for TLS connections
@ -551,7 +521,75 @@ impl ServerConfig {
/// Check if TLS is properly configured
pub fn is_tls_configured(&self) -> bool {
self.enable_tls && self.cert_path.is_some() && self.key_path.is_some()
self.cert_path.is_some() && self.key_path.is_some()
}
}
/// ServerConfigBuilder
pub struct ServerConfigBuilder {
host: String,
port: u16,
redis_url: String,
enable_tls: bool,
cert_path: Option<String>,
key_path: Option<String>,
tls_port: Option<u16>,
enable_auth: bool,
enable_webhooks: bool,
circle_worker_id: String,
}
impl ServerConfigBuilder {
pub fn new(host: String, port: u16, redis_url: String, worker_id: String) -> Self {
Self {
host,
port,
redis_url,
enable_tls: false,
cert_path: None,
key_path: None,
tls_port: None,
enable_auth: false,
enable_webhooks: false,
circle_worker_id: worker_id,
}
}
pub fn with_tls(mut self, cert_path: String, key_path: String) -> Self {
self.enable_tls = true;
self.cert_path = Some(cert_path);
self.key_path = Some(key_path);
self
}
pub fn with_tls_port(mut self, tls_port: u16) -> Self {
self.tls_port = Some(tls_port);
self
}
pub fn with_auth(mut self) -> Self {
self.enable_auth = true;
self
}
pub fn with_webhooks(mut self) -> Self {
self.enable_webhooks = true;
self
}
pub fn build(self) -> ServerConfig {
ServerConfig {
host: self.host,
port: self.port,
redis_url: self.redis_url,
enable_tls: self.enable_tls,
cert_path: self.cert_path,
key_path: self.key_path,
tls_port: self.tls_port,
enable_auth: self.enable_auth,
enable_webhooks: self.enable_webhooks,
circle_worker_id: self.circle_worker_id,
}
}
}
@ -613,23 +651,23 @@ fn load_rustls_config(
async fn ws_handler(
req: HttpRequest,
stream: web::Payload,
path: web::Path<String>,
server_config: web::Data<ServerConfig>,
config: web::Data<ServerConfig>,
) -> Result<HttpResponse, Error> {
let circle_pk = path.into_inner();
info!(
"Incoming WebSocket connection for circle: {} (auth_enabled: {})",
circle_pk, server_config.enable_auth
);
let server_circle_name = req.match_info().get("circle_pk").unwrap_or("unknown").to_string();
let server_circle_public_key = server_circle_name.clone(); // Assuming pk is the name for now
let ws_actor = CircleWs::new_configured(
format!("circle-{}", &circle_pk[..8]), // Use first 8 chars as display name
circle_pk,
server_config.redis_url.clone(),
server_config.enable_auth,
);
ws::start(ws_actor, &req, stream)
// Create and start the WebSocket actor
ws::start(
CircleWs::new_configured(
server_circle_name,
server_circle_public_key,
config.redis_url.clone(),
config.enable_auth,
config.circle_worker_id.clone(),
),
&req,
stream,
)
}
pub fn spawn_circle_server(
@ -657,7 +695,7 @@ pub fn spawn_circle_server(
let webhook_app_state = create_webhook_app_state(
webhook_config,
config.redis_url.clone(),
"webhook_system".to_string()
config.circle_worker_id.clone(),
);
Some(web::Data::new(webhook_app_state))
}

View File

@ -9,18 +9,24 @@ pub struct WebhookAppState {
pub config: WebhookConfig,
pub redis_url: String,
pub caller_id: String,
pub worker_id: String,
}
/// Create webhook application state
pub fn create_webhook_app_state(
config: WebhookConfig,
redis_url: String,
caller_id: String,
worker_id: String,
) -> WebhookAppState {
// For now, we'll use the worker_id as the caller_id for webhooks.
// This can be changed if a more specific caller_id is needed.
let caller_id = worker_id.clone();
WebhookAppState {
config,
redis_url,
caller_id,
worker_id,
}
}

View File

@ -10,7 +10,7 @@ use actix_web::{web, HttpRequest, HttpResponse, Result as ActixResult};
use bytes::Bytes;
use log::{debug, error, info, warn};
use serde_json;
use rhai_client::RhaiClientBuilder;
use rhai_dispatcher::RhaiDispatcherBuilder;
/// Execute an iDenfy webhook script
async fn execute_idenfy_webhook_script(
@ -24,12 +24,13 @@ async fn execute_idenfy_webhook_script(
circle_id, event.client_id, event.status
);
// Create RhaiClient
let rhai_client = RhaiClientBuilder::new()
// Create RhaiDispatcher
let rhai_dispatcher = RhaiDispatcherBuilder::new()
.redis_url(redis_url)
.caller_id(caller_id)
.context_id(circle_id)
.build()
.map_err(|e| WebhookError::ScriptExecutionError(format!("Failed to create RhaiClient: {}", e)))?;
.map_err(|e| WebhookError::ScriptExecutionError(format!("Failed to create RhaiDispatcher: {}", e)))?;
// Serialize the event as JSON payload
let event_json = serde_json::to_string(event)
@ -43,9 +44,8 @@ async fn execute_idenfy_webhook_script(
debug!("Executing script: {}", script);
match rhai_client
match rhai_dispatcher
.new_play_request()
.recipient_id(circle_id)
.script(&script)
.timeout(std::time::Duration::from_secs(30))
.await_response()

View File

@ -10,13 +10,14 @@ use actix_web::{web, HttpRequest, HttpResponse, Result as ActixResult};
use bytes::Bytes;
use log::{debug, error, info, warn};
use serde_json;
use rhai_client::RhaiClientBuilder;
use rhai_dispatcher::RhaiDispatcherBuilder;
/// Execute a Stripe webhook script
async fn execute_stripe_webhook_script(
redis_url: &str,
caller_id: &str,
circle_id: &str,
worker_id: &str,
event: &StripeWebhookEvent,
) -> Result<serde_json::Value, WebhookError> {
info!(
@ -24,12 +25,14 @@ async fn execute_stripe_webhook_script(
circle_id, event.event_type
);
// Create RhaiClient
let rhai_client = RhaiClientBuilder::new()
// Create RhaiDispatcher
let rhai_dispatcher = RhaiDispatcherBuilder::new()
.redis_url(redis_url)
.caller_id(caller_id)
.worker_id(worker_id)
.context_id(circle_id)
.build()
.map_err(|e| WebhookError::ScriptExecutionError(format!("Failed to create RhaiClient: {}", e)))?;
.map_err(|e| WebhookError::ScriptExecutionError(format!("Failed to create RhaiDispatcher: {}", e)))?;
// Serialize the event as JSON payload
let event_json = serde_json::to_string(event)
@ -43,9 +46,8 @@ async fn execute_stripe_webhook_script(
debug!("Executing script: {}", script);
match rhai_client
match rhai_dispatcher
.new_play_request()
.recipient_id(circle_id)
.script(&script)
.timeout(std::time::Duration::from_secs(30))
.await_response()
@ -161,6 +163,7 @@ pub async fn handle_stripe_webhook(
&data.redis_url,
&verification_result.caller_id,
&circle_pk,
&data.worker_id,
&stripe_event,
).await {
Ok(script_result) => {