367 lines
11 KiB
Markdown
367 lines
11 KiB
Markdown
# Dispatcher-Based Event-Driven Flow Architecture
|
|
|
|
## Overview
|
|
|
|
This document describes the implementation of a non-blocking, event-driven flow architecture for Rhai payment functions using the existing RhaiDispatcher. The system transforms blocking API calls into fire-and-continue patterns where HTTP requests spawn background threads that dispatch new Rhai scripts based on API responses.
|
|
|
|
## Architecture Principles
|
|
|
|
### 1. **Non-Blocking API Calls**
|
|
- All payment functions (e.g., `create_payment_intent()`) return immediately
|
|
- HTTP requests happen in background threads
|
|
- No blocking of the main Rhai engine thread
|
|
|
|
### 2. **Self-Dispatching Pattern**
|
|
- Worker dispatches scripts to itself
|
|
- Same `worker_id` and `context_id` maintained
|
|
- `caller_id` changes to reflect the API response source
|
|
|
|
### 3. **Generic Request/Response Flow**
|
|
- Request functions: `new_..._request` pattern
|
|
- Response scripts: `new_..._response` pattern
|
|
- Consistent naming across all API operations
|
|
|
|
## Flow Architecture
|
|
|
|
```mermaid
|
|
graph TD
|
|
A[main.rhai] --> B[create_payment_intent]
|
|
B --> C[HTTP Thread Spawned]
|
|
B --> D[Return Immediately]
|
|
C --> E[Stripe API Call]
|
|
E --> F{API Response}
|
|
F -->|Success| G[Dispatch: new_create_payment_intent_response]
|
|
F -->|Error| H[Dispatch: new_create_payment_intent_error]
|
|
G --> I[Response Script Execution]
|
|
H --> J[Error Script Execution]
|
|
```
|
|
|
|
## Implementation Components
|
|
|
|
### 1. **FlowManager**
|
|
|
|
```rust
|
|
use rhai_dispatcher::{RhaiDispatcher, RhaiDispatcherBuilder, RhaiDispatcherError};
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
pub struct FlowManager {
|
|
dispatcher: RhaiDispatcher,
|
|
worker_id: String,
|
|
context_id: String,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub enum FlowError {
|
|
DispatcherError(RhaiDispatcherError),
|
|
ConfigurationError(String),
|
|
}
|
|
|
|
impl From<RhaiDispatcherError> for FlowError {
|
|
fn from(err: RhaiDispatcherError) -> Self {
|
|
FlowError::DispatcherError(err)
|
|
}
|
|
}
|
|
|
|
impl FlowManager {
|
|
pub fn new(worker_id: String, context_id: String) -> Result<Self, FlowError> {
|
|
let dispatcher = RhaiDispatcherBuilder::new()
|
|
.caller_id("stripe") // API responses come from Stripe
|
|
.worker_id(&worker_id)
|
|
.context_id(&context_id)
|
|
.redis_url("redis://127.0.0.1/")
|
|
.build()?;
|
|
|
|
Ok(Self {
|
|
dispatcher,
|
|
worker_id,
|
|
context_id,
|
|
})
|
|
}
|
|
|
|
pub async fn dispatch_response_script(&self, script_name: &str, data: &str) -> Result<(), FlowError> {
|
|
let script_content = format!(
|
|
r#"
|
|
// Auto-generated response script for {}
|
|
let response_data = `{}`;
|
|
let parsed_data = parse_json(response_data);
|
|
|
|
// Include the response script
|
|
eval_file("flows/{}.rhai");
|
|
"#,
|
|
script_name,
|
|
data.replace('`', r#"\`"#),
|
|
script_name
|
|
);
|
|
|
|
self.dispatcher
|
|
.new_play_request()
|
|
.worker_id(&self.worker_id)
|
|
.context_id(&self.context_id)
|
|
.script(&script_content)
|
|
.submit()
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn dispatch_error_script(&self, script_name: &str, error: &str) -> Result<(), FlowError> {
|
|
let script_content = format!(
|
|
r#"
|
|
// Auto-generated error script for {}
|
|
let error_data = `{}`;
|
|
let parsed_error = parse_json(error_data);
|
|
|
|
// Include the error script
|
|
eval_file("flows/{}.rhai");
|
|
"#,
|
|
script_name,
|
|
error.replace('`', r#"\`"#),
|
|
script_name
|
|
);
|
|
|
|
self.dispatcher
|
|
.new_play_request()
|
|
.worker_id(&self.worker_id)
|
|
.context_id(&self.context_id)
|
|
.script(&script_content)
|
|
.submit()
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
// Global flow manager instance
|
|
static FLOW_MANAGER: Mutex<Option<FlowManager>> = Mutex::new(None);
|
|
|
|
pub fn initialize_flow_manager(worker_id: String, context_id: String) -> Result<(), FlowError> {
|
|
let manager = FlowManager::new(worker_id, context_id)?;
|
|
let mut global_manager = FLOW_MANAGER.lock().unwrap();
|
|
*global_manager = Some(manager);
|
|
Ok(())
|
|
}
|
|
|
|
pub fn get_flow_manager() -> Result<FlowManager, FlowError> {
|
|
let global_manager = FLOW_MANAGER.lock().unwrap();
|
|
global_manager.as_ref()
|
|
.ok_or_else(|| FlowError::ConfigurationError("Flow manager not initialized".to_string()))
|
|
.map(|manager| FlowManager {
|
|
dispatcher: manager.dispatcher.clone(), // Assuming Clone is implemented
|
|
worker_id: manager.worker_id.clone(),
|
|
context_id: manager.context_id.clone(),
|
|
})
|
|
}
|
|
```
|
|
|
|
### 2. **Non-Blocking Payment Functions**
|
|
|
|
```rust
|
|
// Transform blocking function into non-blocking
|
|
#[rhai_fn(name = "create", return_raw)]
|
|
pub fn create_payment_intent(intent: &mut RhaiPaymentIntent) -> Result<String, Box<EvalAltResult>> {
|
|
let form_data = prepare_payment_intent_data(intent);
|
|
|
|
// Get flow manager
|
|
let flow_manager = get_flow_manager()
|
|
.map_err(|e| format!("Flow manager error: {:?}", e))?;
|
|
|
|
// Spawn background thread for HTTP request
|
|
let stripe_config = get_stripe_config()?;
|
|
thread::spawn(move || {
|
|
let rt = Runtime::new().expect("Failed to create runtime");
|
|
rt.block_on(async {
|
|
match make_stripe_request(&stripe_config, "payment_intents", &form_data).await {
|
|
Ok(response) => {
|
|
if let Err(e) = flow_manager.dispatch_response_script(
|
|
"new_create_payment_intent_response",
|
|
&response
|
|
).await {
|
|
eprintln!("Failed to dispatch response: {:?}", e);
|
|
}
|
|
}
|
|
Err(error) => {
|
|
if let Err(e) = flow_manager.dispatch_error_script(
|
|
"new_create_payment_intent_error",
|
|
&error
|
|
).await {
|
|
eprintln!("Failed to dispatch error: {:?}", e);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
});
|
|
|
|
// Return immediately with confirmation
|
|
Ok("payment_intent_request_dispatched".to_string())
|
|
}
|
|
|
|
// Generic async HTTP request function
|
|
async fn make_stripe_request(
|
|
config: &StripeConfig,
|
|
endpoint: &str,
|
|
form_data: &HashMap<String, String>
|
|
) -> Result<String, String> {
|
|
let url = format!("{}/{}", STRIPE_API_BASE, endpoint);
|
|
|
|
let response = config.client
|
|
.post(&url)
|
|
.basic_auth(&config.secret_key, None::<&str>)
|
|
.form(form_data)
|
|
.send()
|
|
.await
|
|
.map_err(|e| format!("HTTP request failed: {}", e))?;
|
|
|
|
let response_text = response.text().await
|
|
.map_err(|e| format!("Failed to read response: {}", e))?;
|
|
|
|
let json: serde_json::Value = serde_json::from_str(&response_text)
|
|
.map_err(|e| format!("Failed to parse JSON: {}", e))?;
|
|
|
|
if json.get("error").is_some() {
|
|
Err(response_text)
|
|
} else {
|
|
Ok(response_text)
|
|
}
|
|
}
|
|
```
|
|
|
|
### 3. **Flow Script Templates**
|
|
|
|
#### Success Response Script
|
|
```rhai
|
|
// flows/new_create_payment_intent_response.rhai
|
|
let payment_intent_id = parsed_data.id;
|
|
let status = parsed_data.status;
|
|
|
|
print(`✅ Payment Intent Created: ${payment_intent_id}`);
|
|
print(`Status: ${status}`);
|
|
|
|
// Continue the flow based on status
|
|
if status == "requires_payment_method" {
|
|
print("Payment method required - ready for frontend");
|
|
// Could dispatch another flow here
|
|
} else if status == "succeeded" {
|
|
print("Payment completed successfully!");
|
|
// Dispatch success notification flow
|
|
}
|
|
|
|
// Store the payment intent ID for later use
|
|
set_context("payment_intent_id", payment_intent_id);
|
|
set_context("payment_status", status);
|
|
```
|
|
|
|
#### Error Response Script
|
|
```rhai
|
|
// flows/new_create_payment_intent_error.rhai
|
|
let error_type = parsed_error.error.type;
|
|
let error_message = parsed_error.error.message;
|
|
|
|
print(`❌ Payment Intent Error: ${error_type}`);
|
|
print(`Message: ${error_message}`);
|
|
|
|
// Handle different error types
|
|
if error_type == "card_error" {
|
|
print("Card was declined - notify user");
|
|
// Dispatch user notification flow
|
|
} else if error_type == "rate_limit_error" {
|
|
print("Rate limited - retry later");
|
|
// Dispatch retry flow
|
|
} else {
|
|
print("Unknown error - log for investigation");
|
|
// Dispatch error logging flow
|
|
}
|
|
|
|
// Store error details for debugging
|
|
set_context("last_error_type", error_type);
|
|
set_context("last_error_message", error_message);
|
|
```
|
|
|
|
### 4. **Configuration and Initialization**
|
|
|
|
```rust
|
|
// Add to payment module initialization
|
|
#[rhai_fn(name = "init_flows", return_raw)]
|
|
pub fn init_flows(worker_id: String, context_id: String) -> Result<String, Box<EvalAltResult>> {
|
|
initialize_flow_manager(worker_id, context_id)
|
|
.map_err(|e| format!("Failed to initialize flow manager: {:?}", e))?;
|
|
|
|
Ok("Flow manager initialized successfully".to_string())
|
|
}
|
|
```
|
|
|
|
## Usage Examples
|
|
|
|
### 1. **Basic Payment Flow**
|
|
|
|
```rhai
|
|
// main.rhai
|
|
init_flows("worker-1", "context-123");
|
|
configure_stripe("sk_test_...");
|
|
|
|
let payment_intent = new_payment_intent()
|
|
.amount(2000)
|
|
.currency("usd")
|
|
.customer("cus_customer123");
|
|
|
|
// This returns immediately, HTTP happens in background
|
|
let result = payment_intent.create();
|
|
print(`Request dispatched: ${result}`);
|
|
|
|
// Script ends here, but flow continues in background
|
|
```
|
|
|
|
### 2. **Chained Flow Example**
|
|
|
|
```rhai
|
|
// flows/new_create_payment_intent_response.rhai
|
|
let payment_intent_id = parsed_data.id;
|
|
|
|
if parsed_data.status == "requires_payment_method" {
|
|
// Chain to next operation
|
|
let subscription = new_subscription()
|
|
.customer(get_context("customer_id"))
|
|
.add_price("price_monthly");
|
|
|
|
// This will trigger new_create_subscription_response flow
|
|
subscription.create();
|
|
}
|
|
```
|
|
|
|
## Benefits
|
|
|
|
### 1. **Non-Blocking Execution**
|
|
- Main Rhai script never blocks on HTTP requests
|
|
- Multiple API calls can happen concurrently
|
|
- Engine remains responsive for other scripts
|
|
|
|
### 2. **Event-Driven Architecture**
|
|
- Clear separation between request and response handling
|
|
- Easy to add new flow steps
|
|
- Composable and chainable operations
|
|
|
|
### 3. **Error Handling**
|
|
- Dedicated error flows for each operation
|
|
- Contextual error information preserved
|
|
- Retry and recovery patterns possible
|
|
|
|
### 4. **Scalability**
|
|
- Each HTTP request runs in its own thread
|
|
- No shared state between concurrent operations
|
|
- Redis-based dispatch scales horizontally
|
|
|
|
## Implementation Checklist
|
|
|
|
- [ ] Implement FlowManager with RhaiDispatcher integration
|
|
- [ ] Convert all payment functions to non-blocking pattern
|
|
- [ ] Create flow script templates for all operations
|
|
- [ ] Add flow initialization functions
|
|
- [ ] Test with example payment flows
|
|
- [ ] Update documentation and examples
|
|
|
|
## Migration Path
|
|
|
|
1. **Phase 1**: Implement FlowManager and basic infrastructure
|
|
2. **Phase 2**: Convert payment_intent functions to non-blocking
|
|
3. **Phase 3**: Convert remaining payment functions (products, prices, subscriptions, coupons)
|
|
4. **Phase 4**: Create comprehensive flow script library
|
|
5. **Phase 5**: Add advanced features (retries, timeouts, monitoring) |