# Event-Driven Flow Implementation Specification ## Overview This document provides the complete implementation specification for converting the blocking payment.rs architecture to an event-driven flow system using RhaiDispatcher. ## File Structure ``` src/dsl/src/ ├── flow_manager.rs # New: FlowManager implementation ├── payment.rs # Modified: Non-blocking payment functions └── lib.rs # Modified: Include flow_manager module ``` ## 1. FlowManager Implementation ### File: `src/dsl/src/flow_manager.rs` ```rust use rhai_dispatcher::{RhaiDispatcher, RhaiDispatcherBuilder, RhaiDispatcherError}; use std::sync::{Arc, Mutex}; use std::collections::HashMap; use serde_json; use tokio::runtime::Runtime; #[derive(Debug)] pub enum FlowError { DispatcherError(RhaiDispatcherError), ConfigurationError(String), SerializationError(serde_json::Error), } impl From for FlowError { fn from(err: RhaiDispatcherError) -> Self { FlowError::DispatcherError(err) } } impl From for FlowError { fn from(err: serde_json::Error) -> Self { FlowError::SerializationError(err) } } impl std::fmt::Display for FlowError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { FlowError::DispatcherError(e) => write!(f, "Dispatcher error: {}", e), FlowError::ConfigurationError(e) => write!(f, "Configuration error: {}", e), FlowError::SerializationError(e) => write!(f, "Serialization error: {}", e), } } } impl std::error::Error for FlowError {} #[derive(Clone)] pub struct FlowManager { dispatcher: RhaiDispatcher, worker_id: String, context_id: String, } impl FlowManager { pub fn new(worker_id: String, context_id: String, redis_url: Option) -> Result { let redis_url = redis_url.unwrap_or_else(|| "redis://127.0.0.1/".to_string()); let dispatcher = RhaiDispatcherBuilder::new() .caller_id("stripe") // API responses come from Stripe .worker_id(&worker_id) .context_id(&context_id) .redis_url(&redis_url) .build()?; Ok(Self { dispatcher, worker_id, context_id, }) } pub async fn dispatch_response_script(&self, script_name: &str, data: &str) -> Result<(), FlowError> { let script_content = format!( r#" // Auto-generated response script for {} let response_data = `{}`; let parsed_data = parse_json(response_data); // Include the response script eval_file("flows/{}.rhai"); "#, script_name, data.replace('`', r#"\`"#), script_name ); self.dispatcher .new_play_request() .worker_id(&self.worker_id) .context_id(&self.context_id) .script(&script_content) .submit() .await?; Ok(()) } pub async fn dispatch_error_script(&self, script_name: &str, error: &str) -> Result<(), FlowError> { let script_content = format!( r#" // Auto-generated error script for {} let error_data = `{}`; let parsed_error = parse_json(error_data); // Include the error script eval_file("flows/{}.rhai"); "#, script_name, error.replace('`', r#"\`"#), script_name ); self.dispatcher .new_play_request() .worker_id(&self.worker_id) .context_id(&self.context_id) .script(&script_content) .submit() .await?; Ok(()) } } // Global flow manager instance static FLOW_MANAGER: Mutex> = Mutex::new(None); pub fn initialize_flow_manager(worker_id: String, context_id: String, redis_url: Option) -> Result<(), FlowError> { let manager = FlowManager::new(worker_id, context_id, redis_url)?; let mut global_manager = FLOW_MANAGER.lock().unwrap(); *global_manager = Some(manager); Ok(()) } pub fn get_flow_manager() -> Result { let global_manager = FLOW_MANAGER.lock().unwrap(); global_manager.as_ref() .ok_or_else(|| FlowError::ConfigurationError("Flow manager not initialized".to_string())) .cloned() } // Async HTTP request function for Stripe API pub async fn make_stripe_request( config: &super::StripeConfig, endpoint: &str, form_data: &HashMap ) -> Result { let url = format!("{}/{}", super::STRIPE_API_BASE, endpoint); let response = config.client .post(&url) .basic_auth(&config.secret_key, None::<&str>) .form(form_data) .send() .await .map_err(|e| format!("HTTP request failed: {}", e))?; let response_text = response.text().await .map_err(|e| format!("Failed to read response: {}", e))?; let json: serde_json::Value = serde_json::from_str(&response_text) .map_err(|e| format!("Failed to parse JSON: {}", e))?; if json.get("error").is_some() { Err(response_text) } else { Ok(response_text) } } ``` ## 2. Payment.rs Modifications ### Add Dependencies Add to the top of `payment.rs`: ```rust mod flow_manager; use flow_manager::{get_flow_manager, initialize_flow_manager, make_stripe_request, FlowError}; use std::thread; use tokio::runtime::Runtime; ``` ### Add Flow Initialization Function Add to the `rhai_payment_module`: ```rust #[rhai_fn(name = "init_flows", return_raw)] pub fn init_flows(worker_id: String, context_id: String) -> Result> { initialize_flow_manager(worker_id, context_id, None) .map_err(|e| format!("Failed to initialize flow manager: {:?}", e))?; Ok("Flow manager initialized successfully".to_string()) } #[rhai_fn(name = "init_flows_with_redis", return_raw)] pub fn init_flows_with_redis(worker_id: String, context_id: String, redis_url: String) -> Result> { initialize_flow_manager(worker_id, context_id, Some(redis_url)) .map_err(|e| format!("Failed to initialize flow manager: {:?}", e))?; Ok("Flow manager initialized successfully".to_string()) } ``` ### Helper Function for Stripe Config Add helper function to get stripe config: ```rust fn get_stripe_config() -> Result> { let registry = ASYNC_REGISTRY.lock().unwrap(); let registry = registry.as_ref().ok_or("Stripe not configured. Call configure_stripe() first.")?; Ok(registry.stripe_config.clone()) } ``` ### Convert Payment Intent Function Replace the existing `create_payment_intent` function: ```rust #[rhai_fn(name = "create", return_raw)] pub fn create_payment_intent(intent: &mut RhaiPaymentIntent) -> Result> { let form_data = prepare_payment_intent_data(intent); // Get flow manager and stripe config let flow_manager = get_flow_manager() .map_err(|e| format!("Flow manager error: {:?}", e))?; let stripe_config = get_stripe_config()?; // Spawn background thread for HTTP request thread::spawn(move || { let rt = Runtime::new().expect("Failed to create runtime"); rt.block_on(async { match make_stripe_request(&stripe_config, "payment_intents", &form_data).await { Ok(response) => { if let Err(e) = flow_manager.dispatch_response_script( "new_create_payment_intent_response", &response ).await { eprintln!("Failed to dispatch response: {:?}", e); } } Err(error) => { if let Err(e) = flow_manager.dispatch_error_script( "new_create_payment_intent_error", &error ).await { eprintln!("Failed to dispatch error: {:?}", e); } } } }); }); // Return immediately with confirmation Ok("payment_intent_request_dispatched".to_string()) } ``` ### Convert Product Function Replace the existing `create_product` function: ```rust #[rhai_fn(name = "create", return_raw)] pub fn create_product(product: &mut RhaiProduct) -> Result> { let form_data = prepare_product_data(product); // Get flow manager and stripe config let flow_manager = get_flow_manager() .map_err(|e| format!("Flow manager error: {:?}", e))?; let stripe_config = get_stripe_config()?; // Spawn background thread for HTTP request thread::spawn(move || { let rt = Runtime::new().expect("Failed to create runtime"); rt.block_on(async { match make_stripe_request(&stripe_config, "products", &form_data).await { Ok(response) => { if let Err(e) = flow_manager.dispatch_response_script( "new_create_product_response", &response ).await { eprintln!("Failed to dispatch response: {:?}", e); } } Err(error) => { if let Err(e) = flow_manager.dispatch_error_script( "new_create_product_error", &error ).await { eprintln!("Failed to dispatch error: {:?}", e); } } } }); }); // Return immediately with confirmation Ok("product_request_dispatched".to_string()) } ``` ### Convert Price Function Replace the existing `create_price` function: ```rust #[rhai_fn(name = "create", return_raw)] pub fn create_price(price: &mut RhaiPrice) -> Result> { let form_data = prepare_price_data(price); // Get flow manager and stripe config let flow_manager = get_flow_manager() .map_err(|e| format!("Flow manager error: {:?}", e))?; let stripe_config = get_stripe_config()?; // Spawn background thread for HTTP request thread::spawn(move || { let rt = Runtime::new().expect("Failed to create runtime"); rt.block_on(async { match make_stripe_request(&stripe_config, "prices", &form_data).await { Ok(response) => { if let Err(e) = flow_manager.dispatch_response_script( "new_create_price_response", &response ).await { eprintln!("Failed to dispatch response: {:?}", e); } } Err(error) => { if let Err(e) = flow_manager.dispatch_error_script( "new_create_price_error", &error ).await { eprintln!("Failed to dispatch error: {:?}", e); } } } }); }); // Return immediately with confirmation Ok("price_request_dispatched".to_string()) } ``` ### Convert Subscription Function Replace the existing `create_subscription` function: ```rust #[rhai_fn(name = "create", return_raw)] pub fn create_subscription(subscription: &mut RhaiSubscription) -> Result> { let form_data = prepare_subscription_data(subscription); // Get flow manager and stripe config let flow_manager = get_flow_manager() .map_err(|e| format!("Flow manager error: {:?}", e))?; let stripe_config = get_stripe_config()?; // Spawn background thread for HTTP request thread::spawn(move || { let rt = Runtime::new().expect("Failed to create runtime"); rt.block_on(async { match make_stripe_request(&stripe_config, "subscriptions", &form_data).await { Ok(response) => { if let Err(e) = flow_manager.dispatch_response_script( "new_create_subscription_response", &response ).await { eprintln!("Failed to dispatch response: {:?}", e); } } Err(error) => { if let Err(e) = flow_manager.dispatch_error_script( "new_create_subscription_error", &error ).await { eprintln!("Failed to dispatch error: {:?}", e); } } } }); }); // Return immediately with confirmation Ok("subscription_request_dispatched".to_string()) } ``` ### Convert Coupon Function Replace the existing `create_coupon` function: ```rust #[rhai_fn(name = "create", return_raw)] pub fn create_coupon(coupon: &mut RhaiCoupon) -> Result> { let form_data = prepare_coupon_data(coupon); // Get flow manager and stripe config let flow_manager = get_flow_manager() .map_err(|e| format!("Flow manager error: {:?}", e))?; let stripe_config = get_stripe_config()?; // Spawn background thread for HTTP request thread::spawn(move || { let rt = Runtime::new().expect("Failed to create runtime"); rt.block_on(async { match make_stripe_request(&stripe_config, "coupons", &form_data).await { Ok(response) => { if let Err(e) = flow_manager.dispatch_response_script( "new_create_coupon_response", &response ).await { eprintln!("Failed to dispatch response: {:?}", e); } } Err(error) => { if let Err(e) = flow_manager.dispatch_error_script( "new_create_coupon_error", &error ).await { eprintln!("Failed to dispatch error: {:?}", e); } } } }); }); // Return immediately with confirmation Ok("coupon_request_dispatched".to_string()) } ``` ## 3. Remove Old Blocking Code ### Remove from payment.rs: 1. **AsyncFunctionRegistry struct and implementation** - No longer needed 2. **ASYNC_REGISTRY static** - No longer needed 3. **AsyncRequest struct** - No longer needed 4. **async_worker_loop function** - No longer needed 5. **handle_stripe_request function** - Replaced by make_stripe_request in flow_manager 6. **make_request method** - No longer needed ### Keep in payment.rs: 1. **All struct definitions** (RhaiProduct, RhaiPrice, etc.) 2. **All builder methods** (name, amount, currency, etc.) 3. **All prepare_*_data functions** 4. **All getter functions** 5. **StripeConfig struct** 6. **configure_stripe function** (but remove AsyncFunctionRegistry creation) ## 4. Update Cargo.toml Add to `src/dsl/Cargo.toml`: ```toml [dependencies] # ... existing dependencies ... rhai_dispatcher = { path = "../dispatcher" } ``` ## 5. Update lib.rs Add to `src/dsl/src/lib.rs`: ```rust pub mod flow_manager; ``` ## 6. Flow Script Templates Create directory structure: ``` flows/ ├── new_create_payment_intent_response.rhai ├── new_create_payment_intent_error.rhai ├── new_create_product_response.rhai ├── new_create_product_error.rhai ├── new_create_price_response.rhai ├── new_create_price_error.rhai ├── new_create_subscription_response.rhai ├── new_create_subscription_error.rhai ├── new_create_coupon_response.rhai └── new_create_coupon_error.rhai ``` ### Example Flow Scripts #### flows/new_create_payment_intent_response.rhai ```rhai let payment_intent_id = parsed_data.id; let status = parsed_data.status; print(`✅ Payment Intent Created: ${payment_intent_id}`); print(`Status: ${status}`); // Continue the flow based on status if status == "requires_payment_method" { print("Payment method required - ready for frontend"); } else if status == "succeeded" { print("Payment completed successfully!"); } // Store the payment intent ID for later use set_context("payment_intent_id", payment_intent_id); set_context("payment_status", status); ``` #### flows/new_create_payment_intent_error.rhai ```rhai let error_type = parsed_error.error.type; let error_message = parsed_error.error.message; print(`❌ Payment Intent Error: ${error_type}`); print(`Message: ${error_message}`); // Handle different error types if error_type == "card_error" { print("Card was declined - notify user"); } else if error_type == "rate_limit_error" { print("Rate limited - retry later"); } else { print("Unknown error - log for investigation"); } // Store error details for debugging set_context("last_error_type", error_type); set_context("last_error_message", error_message); ``` ## 7. Usage Example ### main.rhai ```rhai // Initialize the flow system init_flows("worker-1", "context-123"); // Configure Stripe configure_stripe("sk_test_..."); // Create payment intent (non-blocking) let payment_intent = new_payment_intent() .amount(2000) .currency("usd") .customer("cus_customer123"); let result = payment_intent.create(); print(`Request dispatched: ${result}`); // Script ends here, but flow continues in background // Response will trigger new_create_payment_intent_response.rhai ``` ## 8. Testing Strategy 1. **Unit Tests**: Test FlowManager initialization and script dispatch 2. **Integration Tests**: Test full payment flow with mock Stripe responses 3. **Load Tests**: Verify non-blocking behavior under concurrent requests 4. **Error Tests**: Verify error flow handling and script dispatch ## 9. Migration Checklist - [ ] Create flow_manager.rs with FlowManager implementation - [ ] Add flow_manager module to lib.rs - [ ] Update Cargo.toml with rhai_dispatcher dependency - [ ] Modify payment.rs to remove blocking code - [ ] Add flow initialization functions - [ ] Convert all create functions to non-blocking pattern - [ ] Create flow script templates - [ ] Test basic payment intent flow - [ ] Test error handling flows - [ ] Verify non-blocking behavior - [ ] Update documentation This specification provides a complete roadmap for implementing the event-driven flow architecture using RhaiDispatcher.