rhailib/docs/DISPATCHER_FLOW_ARCHITECTURE.md

367 lines
11 KiB
Markdown

# Dispatcher-Based Event-Driven Flow Architecture
## Overview
This document describes the implementation of a non-blocking, event-driven flow architecture for Rhai payment functions using the existing RhaiDispatcher. The system transforms blocking API calls into fire-and-continue patterns where HTTP requests spawn background threads that dispatch new Rhai scripts based on API responses.
## Architecture Principles
### 1. **Non-Blocking API Calls**
- All payment functions (e.g., `create_payment_intent()`) return immediately
- HTTP requests happen in background threads
- No blocking of the main Rhai engine thread
### 2. **Self-Dispatching Pattern**
- Worker dispatches scripts to itself
- Same `worker_id` and `context_id` maintained
- `caller_id` changes to reflect the API response source
### 3. **Generic Request/Response Flow**
- Request functions: `new_..._request` pattern
- Response scripts: `new_..._response` pattern
- Consistent naming across all API operations
## Flow Architecture
```mermaid
graph TD
A[main.rhai] --> B[create_payment_intent]
B --> C[HTTP Thread Spawned]
B --> D[Return Immediately]
C --> E[Stripe API Call]
E --> F{API Response}
F -->|Success| G[Dispatch: new_create_payment_intent_response]
F -->|Error| H[Dispatch: new_create_payment_intent_error]
G --> I[Response Script Execution]
H --> J[Error Script Execution]
```
## Implementation Components
### 1. **FlowManager**
```rust
use rhai_dispatcher::{RhaiDispatcher, RhaiDispatcherBuilder, RhaiDispatcherError};
use std::sync::{Arc, Mutex};
pub struct FlowManager {
dispatcher: RhaiDispatcher,
worker_id: String,
context_id: String,
}
#[derive(Debug)]
pub enum FlowError {
DispatcherError(RhaiDispatcherError),
ConfigurationError(String),
}
impl From<RhaiDispatcherError> for FlowError {
fn from(err: RhaiDispatcherError) -> Self {
FlowError::DispatcherError(err)
}
}
impl FlowManager {
pub fn new(worker_id: String, context_id: String) -> Result<Self, FlowError> {
let dispatcher = RhaiDispatcherBuilder::new()
.caller_id("stripe") // API responses come from Stripe
.worker_id(&worker_id)
.context_id(&context_id)
.redis_url("redis://127.0.0.1/")
.build()?;
Ok(Self {
dispatcher,
worker_id,
context_id,
})
}
pub async fn dispatch_response_script(&self, script_name: &str, data: &str) -> Result<(), FlowError> {
let script_content = format!(
r#"
// Auto-generated response script for {}
let response_data = `{}`;
let parsed_data = parse_json(response_data);
// Include the response script
eval_file("flows/{}.rhai");
"#,
script_name,
data.replace('`', r#"\`"#),
script_name
);
self.dispatcher
.new_play_request()
.worker_id(&self.worker_id)
.context_id(&self.context_id)
.script(&script_content)
.submit()
.await?;
Ok(())
}
pub async fn dispatch_error_script(&self, script_name: &str, error: &str) -> Result<(), FlowError> {
let script_content = format!(
r#"
// Auto-generated error script for {}
let error_data = `{}`;
let parsed_error = parse_json(error_data);
// Include the error script
eval_file("flows/{}.rhai");
"#,
script_name,
error.replace('`', r#"\`"#),
script_name
);
self.dispatcher
.new_play_request()
.worker_id(&self.worker_id)
.context_id(&self.context_id)
.script(&script_content)
.submit()
.await?;
Ok(())
}
}
// Global flow manager instance
static FLOW_MANAGER: Mutex<Option<FlowManager>> = Mutex::new(None);
pub fn initialize_flow_manager(worker_id: String, context_id: String) -> Result<(), FlowError> {
let manager = FlowManager::new(worker_id, context_id)?;
let mut global_manager = FLOW_MANAGER.lock().unwrap();
*global_manager = Some(manager);
Ok(())
}
pub fn get_flow_manager() -> Result<FlowManager, FlowError> {
let global_manager = FLOW_MANAGER.lock().unwrap();
global_manager.as_ref()
.ok_or_else(|| FlowError::ConfigurationError("Flow manager not initialized".to_string()))
.map(|manager| FlowManager {
dispatcher: manager.dispatcher.clone(), // Assuming Clone is implemented
worker_id: manager.worker_id.clone(),
context_id: manager.context_id.clone(),
})
}
```
### 2. **Non-Blocking Payment Functions**
```rust
// Transform blocking function into non-blocking
#[rhai_fn(name = "create", return_raw)]
pub fn create_payment_intent(intent: &mut RhaiPaymentIntent) -> Result<String, Box<EvalAltResult>> {
let form_data = prepare_payment_intent_data(intent);
// Get flow manager
let flow_manager = get_flow_manager()
.map_err(|e| format!("Flow manager error: {:?}", e))?;
// Spawn background thread for HTTP request
let stripe_config = get_stripe_config()?;
thread::spawn(move || {
let rt = Runtime::new().expect("Failed to create runtime");
rt.block_on(async {
match make_stripe_request(&stripe_config, "payment_intents", &form_data).await {
Ok(response) => {
if let Err(e) = flow_manager.dispatch_response_script(
"new_create_payment_intent_response",
&response
).await {
eprintln!("Failed to dispatch response: {:?}", e);
}
}
Err(error) => {
if let Err(e) = flow_manager.dispatch_error_script(
"new_create_payment_intent_error",
&error
).await {
eprintln!("Failed to dispatch error: {:?}", e);
}
}
}
});
});
// Return immediately with confirmation
Ok("payment_intent_request_dispatched".to_string())
}
// Generic async HTTP request function
async fn make_stripe_request(
config: &StripeConfig,
endpoint: &str,
form_data: &HashMap<String, String>
) -> Result<String, String> {
let url = format!("{}/{}", STRIPE_API_BASE, endpoint);
let response = config.client
.post(&url)
.basic_auth(&config.secret_key, None::<&str>)
.form(form_data)
.send()
.await
.map_err(|e| format!("HTTP request failed: {}", e))?;
let response_text = response.text().await
.map_err(|e| format!("Failed to read response: {}", e))?;
let json: serde_json::Value = serde_json::from_str(&response_text)
.map_err(|e| format!("Failed to parse JSON: {}", e))?;
if json.get("error").is_some() {
Err(response_text)
} else {
Ok(response_text)
}
}
```
### 3. **Flow Script Templates**
#### Success Response Script
```rhai
// flows/new_create_payment_intent_response.rhai
let payment_intent_id = parsed_data.id;
let status = parsed_data.status;
print(`✅ Payment Intent Created: ${payment_intent_id}`);
print(`Status: ${status}`);
// Continue the flow based on status
if status == "requires_payment_method" {
print("Payment method required - ready for frontend");
// Could dispatch another flow here
} else if status == "succeeded" {
print("Payment completed successfully!");
// Dispatch success notification flow
}
// Store the payment intent ID for later use
set_context("payment_intent_id", payment_intent_id);
set_context("payment_status", status);
```
#### Error Response Script
```rhai
// flows/new_create_payment_intent_error.rhai
let error_type = parsed_error.error.type;
let error_message = parsed_error.error.message;
print(`❌ Payment Intent Error: ${error_type}`);
print(`Message: ${error_message}`);
// Handle different error types
if error_type == "card_error" {
print("Card was declined - notify user");
// Dispatch user notification flow
} else if error_type == "rate_limit_error" {
print("Rate limited - retry later");
// Dispatch retry flow
} else {
print("Unknown error - log for investigation");
// Dispatch error logging flow
}
// Store error details for debugging
set_context("last_error_type", error_type);
set_context("last_error_message", error_message);
```
### 4. **Configuration and Initialization**
```rust
// Add to payment module initialization
#[rhai_fn(name = "init_flows", return_raw)]
pub fn init_flows(worker_id: String, context_id: String) -> Result<String, Box<EvalAltResult>> {
initialize_flow_manager(worker_id, context_id)
.map_err(|e| format!("Failed to initialize flow manager: {:?}", e))?;
Ok("Flow manager initialized successfully".to_string())
}
```
## Usage Examples
### 1. **Basic Payment Flow**
```rhai
// main.rhai
init_flows("worker-1", "context-123");
configure_stripe("sk_test_...");
let payment_intent = new_payment_intent()
.amount(2000)
.currency("usd")
.customer("cus_customer123");
// This returns immediately, HTTP happens in background
let result = payment_intent.create();
print(`Request dispatched: ${result}`);
// Script ends here, but flow continues in background
```
### 2. **Chained Flow Example**
```rhai
// flows/new_create_payment_intent_response.rhai
let payment_intent_id = parsed_data.id;
if parsed_data.status == "requires_payment_method" {
// Chain to next operation
let subscription = new_subscription()
.customer(get_context("customer_id"))
.add_price("price_monthly");
// This will trigger new_create_subscription_response flow
subscription.create();
}
```
## Benefits
### 1. **Non-Blocking Execution**
- Main Rhai script never blocks on HTTP requests
- Multiple API calls can happen concurrently
- Engine remains responsive for other scripts
### 2. **Event-Driven Architecture**
- Clear separation between request and response handling
- Easy to add new flow steps
- Composable and chainable operations
### 3. **Error Handling**
- Dedicated error flows for each operation
- Contextual error information preserved
- Retry and recovery patterns possible
### 4. **Scalability**
- Each HTTP request runs in its own thread
- No shared state between concurrent operations
- Redis-based dispatch scales horizontally
## Implementation Checklist
- [ ] Implement FlowManager with RhaiDispatcher integration
- [ ] Convert all payment functions to non-blocking pattern
- [ ] Create flow script templates for all operations
- [ ] Add flow initialization functions
- [ ] Test with example payment flows
- [ ] Update documentation and examples
## Migration Path
1. **Phase 1**: Implement FlowManager and basic infrastructure
2. **Phase 2**: Convert payment_intent functions to non-blocking
3. **Phase 3**: Convert remaining payment functions (products, prices, subscriptions, coupons)
4. **Phase 4**: Create comprehensive flow script library
5. **Phase 5**: Add advanced features (retries, timeouts, monitoring)