# Dispatcher-Based Event-Driven Flow Architecture ## Overview This document describes the implementation of a non-blocking, event-driven flow architecture for Rhai payment functions using the existing RhaiDispatcher. The system transforms blocking API calls into fire-and-continue patterns where HTTP requests spawn background threads that dispatch new Rhai scripts based on API responses. ## Architecture Principles ### 1. **Non-Blocking API Calls** - All payment functions (e.g., `create_payment_intent()`) return immediately - HTTP requests happen in background threads - No blocking of the main Rhai engine thread ### 2. **Self-Dispatching Pattern** - Worker dispatches scripts to itself - Same `worker_id` and `context_id` maintained - `caller_id` changes to reflect the API response source ### 3. **Generic Request/Response Flow** - Request functions: `new_..._request` pattern - Response scripts: `new_..._response` pattern - Consistent naming across all API operations ## Flow Architecture ```mermaid graph TD A[main.rhai] --> B[create_payment_intent] B --> C[HTTP Thread Spawned] B --> D[Return Immediately] C --> E[Stripe API Call] E --> F{API Response} F -->|Success| G[Dispatch: new_create_payment_intent_response] F -->|Error| H[Dispatch: new_create_payment_intent_error] G --> I[Response Script Execution] H --> J[Error Script Execution] ``` ## Implementation Components ### 1. **FlowManager** ```rust use rhai_dispatcher::{RhaiDispatcher, RhaiDispatcherBuilder, RhaiDispatcherError}; use std::sync::{Arc, Mutex}; pub struct FlowManager { dispatcher: RhaiDispatcher, worker_id: String, context_id: String, } #[derive(Debug)] pub enum FlowError { DispatcherError(RhaiDispatcherError), ConfigurationError(String), } impl From for FlowError { fn from(err: RhaiDispatcherError) -> Self { FlowError::DispatcherError(err) } } impl FlowManager { pub fn new(worker_id: String, context_id: String) -> Result { let dispatcher = RhaiDispatcherBuilder::new() .caller_id("stripe") // API responses come from Stripe .worker_id(&worker_id) .context_id(&context_id) .redis_url("redis://127.0.0.1/") .build()?; Ok(Self { dispatcher, worker_id, context_id, }) } pub async fn dispatch_response_script(&self, script_name: &str, data: &str) -> Result<(), FlowError> { let script_content = format!( r#" // Auto-generated response script for {} let response_data = `{}`; let parsed_data = parse_json(response_data); // Include the response script eval_file("flows/{}.rhai"); "#, script_name, data.replace('`', r#"\`"#), script_name ); self.dispatcher .new_play_request() .worker_id(&self.worker_id) .context_id(&self.context_id) .script(&script_content) .submit() .await?; Ok(()) } pub async fn dispatch_error_script(&self, script_name: &str, error: &str) -> Result<(), FlowError> { let script_content = format!( r#" // Auto-generated error script for {} let error_data = `{}`; let parsed_error = parse_json(error_data); // Include the error script eval_file("flows/{}.rhai"); "#, script_name, error.replace('`', r#"\`"#), script_name ); self.dispatcher .new_play_request() .worker_id(&self.worker_id) .context_id(&self.context_id) .script(&script_content) .submit() .await?; Ok(()) } } // Global flow manager instance static FLOW_MANAGER: Mutex> = Mutex::new(None); pub fn initialize_flow_manager(worker_id: String, context_id: String) -> Result<(), FlowError> { let manager = FlowManager::new(worker_id, context_id)?; let mut global_manager = FLOW_MANAGER.lock().unwrap(); *global_manager = Some(manager); Ok(()) } pub fn get_flow_manager() -> Result { let global_manager = FLOW_MANAGER.lock().unwrap(); global_manager.as_ref() .ok_or_else(|| FlowError::ConfigurationError("Flow manager not initialized".to_string())) .map(|manager| FlowManager { dispatcher: manager.dispatcher.clone(), // Assuming Clone is implemented worker_id: manager.worker_id.clone(), context_id: manager.context_id.clone(), }) } ``` ### 2. **Non-Blocking Payment Functions** ```rust // Transform blocking function into non-blocking #[rhai_fn(name = "create", return_raw)] pub fn create_payment_intent(intent: &mut RhaiPaymentIntent) -> Result> { let form_data = prepare_payment_intent_data(intent); // Get flow manager let flow_manager = get_flow_manager() .map_err(|e| format!("Flow manager error: {:?}", e))?; // Spawn background thread for HTTP request let stripe_config = get_stripe_config()?; thread::spawn(move || { let rt = Runtime::new().expect("Failed to create runtime"); rt.block_on(async { match make_stripe_request(&stripe_config, "payment_intents", &form_data).await { Ok(response) => { if let Err(e) = flow_manager.dispatch_response_script( "new_create_payment_intent_response", &response ).await { eprintln!("Failed to dispatch response: {:?}", e); } } Err(error) => { if let Err(e) = flow_manager.dispatch_error_script( "new_create_payment_intent_error", &error ).await { eprintln!("Failed to dispatch error: {:?}", e); } } } }); }); // Return immediately with confirmation Ok("payment_intent_request_dispatched".to_string()) } // Generic async HTTP request function async fn make_stripe_request( config: &StripeConfig, endpoint: &str, form_data: &HashMap ) -> Result { let url = format!("{}/{}", STRIPE_API_BASE, endpoint); let response = config.client .post(&url) .basic_auth(&config.secret_key, None::<&str>) .form(form_data) .send() .await .map_err(|e| format!("HTTP request failed: {}", e))?; let response_text = response.text().await .map_err(|e| format!("Failed to read response: {}", e))?; let json: serde_json::Value = serde_json::from_str(&response_text) .map_err(|e| format!("Failed to parse JSON: {}", e))?; if json.get("error").is_some() { Err(response_text) } else { Ok(response_text) } } ``` ### 3. **Flow Script Templates** #### Success Response Script ```rhai // flows/new_create_payment_intent_response.rhai let payment_intent_id = parsed_data.id; let status = parsed_data.status; print(`✅ Payment Intent Created: ${payment_intent_id}`); print(`Status: ${status}`); // Continue the flow based on status if status == "requires_payment_method" { print("Payment method required - ready for frontend"); // Could dispatch another flow here } else if status == "succeeded" { print("Payment completed successfully!"); // Dispatch success notification flow } // Store the payment intent ID for later use set_context("payment_intent_id", payment_intent_id); set_context("payment_status", status); ``` #### Error Response Script ```rhai // flows/new_create_payment_intent_error.rhai let error_type = parsed_error.error.type; let error_message = parsed_error.error.message; print(`❌ Payment Intent Error: ${error_type}`); print(`Message: ${error_message}`); // Handle different error types if error_type == "card_error" { print("Card was declined - notify user"); // Dispatch user notification flow } else if error_type == "rate_limit_error" { print("Rate limited - retry later"); // Dispatch retry flow } else { print("Unknown error - log for investigation"); // Dispatch error logging flow } // Store error details for debugging set_context("last_error_type", error_type); set_context("last_error_message", error_message); ``` ### 4. **Configuration and Initialization** ```rust // Add to payment module initialization #[rhai_fn(name = "init_flows", return_raw)] pub fn init_flows(worker_id: String, context_id: String) -> Result> { initialize_flow_manager(worker_id, context_id) .map_err(|e| format!("Failed to initialize flow manager: {:?}", e))?; Ok("Flow manager initialized successfully".to_string()) } ``` ## Usage Examples ### 1. **Basic Payment Flow** ```rhai // main.rhai init_flows("worker-1", "context-123"); configure_stripe("sk_test_..."); let payment_intent = new_payment_intent() .amount(2000) .currency("usd") .customer("cus_customer123"); // This returns immediately, HTTP happens in background let result = payment_intent.create(); print(`Request dispatched: ${result}`); // Script ends here, but flow continues in background ``` ### 2. **Chained Flow Example** ```rhai // flows/new_create_payment_intent_response.rhai let payment_intent_id = parsed_data.id; if parsed_data.status == "requires_payment_method" { // Chain to next operation let subscription = new_subscription() .customer(get_context("customer_id")) .add_price("price_monthly"); // This will trigger new_create_subscription_response flow subscription.create(); } ``` ## Benefits ### 1. **Non-Blocking Execution** - Main Rhai script never blocks on HTTP requests - Multiple API calls can happen concurrently - Engine remains responsive for other scripts ### 2. **Event-Driven Architecture** - Clear separation between request and response handling - Easy to add new flow steps - Composable and chainable operations ### 3. **Error Handling** - Dedicated error flows for each operation - Contextual error information preserved - Retry and recovery patterns possible ### 4. **Scalability** - Each HTTP request runs in its own thread - No shared state between concurrent operations - Redis-based dispatch scales horizontally ## Implementation Checklist - [ ] Implement FlowManager with RhaiDispatcher integration - [ ] Convert all payment functions to non-blocking pattern - [ ] Create flow script templates for all operations - [ ] Add flow initialization functions - [ ] Test with example payment flows - [ ] Update documentation and examples ## Migration Path 1. **Phase 1**: Implement FlowManager and basic infrastructure 2. **Phase 2**: Convert payment_intent functions to non-blocking 3. **Phase 3**: Convert remaining payment functions (products, prices, subscriptions, coupons) 4. **Phase 4**: Create comprehensive flow script library 5. **Phase 5**: Add advanced features (retries, timeouts, monitoring)