rhailib/docs/EVENT_DRIVEN_FLOW_ARCHITECTURE.md

443 lines
13 KiB
Markdown

# Event-Driven Flow Architecture
## Overview
A simple, single-threaded architecture where API calls trigger HTTP requests and spawn new Rhai scripts based on responses. No global state, no polling, no blocking - just clean event-driven flows.
## Core Concept
```mermaid
graph LR
RS1[Rhai Script] --> API[create_payment_intent]
API --> HTTP[HTTP Request]
HTTP --> SPAWN[Spawn Thread]
SPAWN --> WAIT[Wait for Response]
WAIT --> SUCCESS[200 OK]
WAIT --> ERROR[Error]
SUCCESS --> RS2[new_payment_intent.rhai]
ERROR --> RS3[payment_failed.rhai]
```
## Architecture Design
### 1. Simple Flow Manager
```rust
use std::thread;
use std::collections::HashMap;
use reqwest::Client;
use rhai::{Engine, Scope};
pub struct FlowManager {
pub client: Client,
pub engine: Engine,
pub flow_scripts: HashMap<String, String>, // event_name -> script_path
}
impl FlowManager {
pub fn new() -> Self {
let mut flow_scripts = HashMap::new();
// Define flow mappings
flow_scripts.insert("payment_intent_created".to_string(), "flows/payment_intent_created.rhai".to_string());
flow_scripts.insert("payment_intent_failed".to_string(), "flows/payment_intent_failed.rhai".to_string());
flow_scripts.insert("product_created".to_string(), "flows/product_created.rhai".to_string());
flow_scripts.insert("subscription_created".to_string(), "flows/subscription_created.rhai".to_string());
Self {
client: Client::new(),
engine: Engine::new(),
flow_scripts,
}
}
// Fire HTTP request and spawn response handler
pub fn fire_and_continue(&self,
endpoint: String,
method: String,
data: HashMap<String, String>,
success_event: String,
error_event: String,
context: HashMap<String, String>
) {
let client = self.client.clone();
let flow_scripts = self.flow_scripts.clone();
// Spawn thread for HTTP request
thread::spawn(move || {
let result = Self::make_http_request(&client, &endpoint, &method, &data);
match result {
Ok(response_data) => {
// Success: dispatch success flow
Self::dispatch_flow(&flow_scripts, &success_event, response_data, context);
}
Err(error) => {
// Error: dispatch error flow
let mut error_data = HashMap::new();
error_data.insert("error".to_string(), error);
Self::dispatch_flow(&flow_scripts, &error_event, error_data, context);
}
}
});
// Return immediately - no blocking!
}
// Execute HTTP request
fn make_http_request(
client: &Client,
endpoint: &str,
method: &str,
data: &HashMap<String, String>
) -> Result<HashMap<String, String>, String> {
// This runs in spawned thread - can block safely
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let url = format!("https://api.stripe.com/v1/{}", endpoint);
let response = client
.post(&url)
.form(data)
.send()
.await
.map_err(|e| format!("HTTP error: {}", e))?;
let response_text = response.text().await
.map_err(|e| format!("Response read error: {}", e))?;
let json: serde_json::Value = serde_json::from_str(&response_text)
.map_err(|e| format!("JSON parse error: {}", e))?;
// Convert JSON to HashMap for Rhai
let mut result = HashMap::new();
if let Some(id) = json.get("id").and_then(|v| v.as_str()) {
result.insert("id".to_string(), id.to_string());
}
if let Some(status) = json.get("status").and_then(|v| v.as_str()) {
result.insert("status".to_string(), status.to_string());
}
Ok(result)
})
}
// Dispatch new Rhai script based on event
fn dispatch_flow(
flow_scripts: &HashMap<String, String>,
event_name: &str,
response_data: HashMap<String, String>,
context: HashMap<String, String>
) {
if let Some(script_path) = flow_scripts.get(event_name) {
println!("🎯 Dispatching flow: {} -> {}", event_name, script_path);
// Create new engine instance for this flow
let mut engine = Engine::new();
register_payment_rhai_module(&mut engine);
// Create scope with response data and context
let mut scope = Scope::new();
// Add response data
for (key, value) in response_data {
scope.push(key, value);
}
// Add context data
for (key, value) in context {
scope.push(format!("context_{}", key), value);
}
// Execute flow script
if let Ok(script_content) = std::fs::read_to_string(script_path) {
match engine.eval_with_scope::<()>(&mut scope, &script_content) {
Ok(_) => println!("✅ Flow {} completed successfully", event_name),
Err(e) => println!("❌ Flow {} failed: {}", event_name, e),
}
} else {
println!("❌ Flow script not found: {}", script_path);
}
} else {
println!("⚠️ No flow defined for event: {}", event_name);
}
}
}
```
### 2. Simple Rhai Functions
```rust
#[export_module]
mod rhai_flow_module {
use super::*;
// Global flow manager instance
static FLOW_MANAGER: std::sync::OnceLock<FlowManager> = std::sync::OnceLock::new();
#[rhai_fn(name = "init_flows")]
pub fn init_flows() {
FLOW_MANAGER.set(FlowManager::new()).ok();
println!("✅ Flow manager initialized");
}
#[rhai_fn(name = "create_payment_intent")]
pub fn create_payment_intent(
amount: i64,
currency: String,
customer: String
) {
let manager = FLOW_MANAGER.get().expect("Flow manager not initialized");
let mut data = HashMap::new();
data.insert("amount".to_string(), amount.to_string());
data.insert("currency".to_string(), currency);
data.insert("customer".to_string(), customer.clone());
let mut context = HashMap::new();
context.insert("customer_id".to_string(), customer);
context.insert("original_amount".to_string(), amount.to_string());
manager.fire_and_continue(
"payment_intents".to_string(),
"POST".to_string(),
data,
"payment_intent_created".to_string(),
"payment_intent_failed".to_string(),
context
);
println!("🚀 Payment intent creation started");
// Returns immediately!
}
#[rhai_fn(name = "create_product")]
pub fn create_product(name: String, description: String) {
let manager = FLOW_MANAGER.get().expect("Flow manager not initialized");
let mut data = HashMap::new();
data.insert("name".to_string(), name.clone());
data.insert("description".to_string(), description);
let mut context = HashMap::new();
context.insert("product_name".to_string(), name);
manager.fire_and_continue(
"products".to_string(),
"POST".to_string(),
data,
"product_created".to_string(),
"product_failed".to_string(),
context
);
println!("🚀 Product creation started");
}
#[rhai_fn(name = "create_subscription")]
pub fn create_subscription(customer: String, price_id: String) {
let manager = FLOW_MANAGER.get().expect("Flow manager not initialized");
let mut data = HashMap::new();
data.insert("customer".to_string(), customer.clone());
data.insert("items[0][price]".to_string(), price_id.clone());
let mut context = HashMap::new();
context.insert("customer_id".to_string(), customer);
context.insert("price_id".to_string(), price_id);
manager.fire_and_continue(
"subscriptions".to_string(),
"POST".to_string(),
data,
"subscription_created".to_string(),
"subscription_failed".to_string(),
context
);
println!("🚀 Subscription creation started");
}
}
```
## Usage Examples
### 1. Main Script (Initiator)
```rhai
// main.rhai
init_flows();
print("Starting payment flow...");
// This returns immediately, spawns HTTP request
create_payment_intent(2000, "usd", "cus_customer123");
print("Payment intent request sent, continuing...");
// Script ends here, but flow continues in background
```
### 2. Success Flow Script
```rhai
// flows/payment_intent_created.rhai
print("🎉 Payment intent created successfully!");
print(`Payment Intent ID: ${id}`);
print(`Status: ${status}`);
print(`Customer: ${context_customer_id}`);
print(`Amount: ${context_original_amount}`);
// Continue the flow - create subscription
if status == "requires_payment_method" {
print("Creating subscription for customer...");
create_subscription(context_customer_id, "price_monthly_plan");
}
```
### 3. Error Flow Script
```rhai
// flows/payment_intent_failed.rhai
print("❌ Payment intent creation failed");
print(`Error: ${error}`);
print(`Customer: ${context_customer_id}`);
// Handle error - maybe retry or notify
print("Sending notification to customer...");
// Could trigger email notification flow here
```
### 4. Subscription Success Flow
```rhai
// flows/subscription_created.rhai
print("🎉 Subscription created!");
print(`Subscription ID: ${id}`);
print(`Customer: ${context_customer_id}`);
print(`Price: ${context_price_id}`);
// Final step - send welcome email
print("Sending welcome email...");
// Could trigger email flow here
```
## Flow Configuration
### 1. Flow Mapping
```rust
// Define in FlowManager::new()
flow_scripts.insert("payment_intent_created".to_string(), "flows/payment_intent_created.rhai".to_string());
flow_scripts.insert("payment_intent_failed".to_string(), "flows/payment_intent_failed.rhai".to_string());
flow_scripts.insert("product_created".to_string(), "flows/product_created.rhai".to_string());
flow_scripts.insert("subscription_created".to_string(), "flows/subscription_created.rhai".to_string());
```
### 2. Directory Structure
```
project/
├── main.rhai # Main script
├── flows/
│ ├── payment_intent_created.rhai # Success flow
│ ├── payment_intent_failed.rhai # Error flow
│ ├── product_created.rhai # Product success
│ ├── subscription_created.rhai # Subscription success
│ └── email_notification.rhai # Email flow
└── src/
└── flow_manager.rs # Flow manager code
```
## Execution Flow
```mermaid
sequenceDiagram
participant MS as Main Script
participant FM as FlowManager
participant TH as Spawned Thread
participant API as Stripe API
participant FS as Flow Script
MS->>FM: create_payment_intent()
FM->>TH: spawn thread
FM->>MS: return immediately
Note over MS: Script ends
TH->>API: HTTP POST /payment_intents
API->>TH: 200 OK + payment_intent data
TH->>FS: dispatch payment_intent_created.rhai
Note over FS: New Rhai execution
FS->>FM: create_subscription()
FM->>TH: spawn new thread
TH->>API: HTTP POST /subscriptions
API->>TH: 200 OK + subscription data
TH->>FS: dispatch subscription_created.rhai
```
## Benefits
### 1. **Simplicity**
- No global state management
- No complex polling or callbacks
- Each flow is a simple Rhai script
### 2. **Single-Threaded Rhai**
- Main Rhai engine never blocks
- Each flow script runs in its own engine instance
- No concurrency issues in Rhai code
### 3. **Event-Driven**
- Clear separation of concerns
- Easy to add new flows
- Composable flow chains
### 4. **No Blocking**
- HTTP requests happen in background threads
- Main script continues immediately
- Flows trigger based on responses
## Advanced Features
### 1. Flow Chaining
```rhai
// flows/payment_intent_created.rhai
if status == "requires_payment_method" {
// Chain to next flow
create_subscription(context_customer_id, "price_monthly");
}
```
### 2. Conditional Flows
```rhai
// flows/subscription_created.rhai
if context_customer_type == "enterprise" {
// Enterprise-specific flow
create_enterprise_setup(context_customer_id);
} else {
// Standard flow
send_welcome_email(context_customer_id);
}
```
### 3. Error Recovery
```rhai
// flows/payment_intent_failed.rhai
if error.contains("insufficient_funds") {
// Retry with smaller amount
let retry_amount = context_original_amount / 2;
create_payment_intent(retry_amount, "usd", context_customer_id);
} else {
// Send error notification
send_error_notification(context_customer_id, error);
}
```
This architecture is much simpler, has no global state, and provides clean event-driven flows that are easy to understand and maintain.