# Non-Blocking Async Architecture Design ## Problem Statement The current async architecture has a critical limitation: **slow API responses block the entire Rhai engine**, preventing other scripts from executing. When an API call takes 10 seconds, the Rhai engine is blocked for the full duration. ## Current Blocking Behavior ```rust // This BLOCKS the Rhai execution thread! response_receiver.recv_timeout(Duration::from_secs(30)) .map_err(|e| format!("Failed to receive response: {}", e))? ``` **Impact:** - ✅ Async worker thread: NOT blocked (continues processing) - ❌ Rhai engine thread: BLOCKED (cannot execute other scripts) - ❌ Other Rhai scripts: QUEUED (must wait) ## Callback-Based Solution ### Architecture Overview ```mermaid graph TB subgraph "Rhai Engine Thread (Non-Blocking)" RS1[Rhai Script 1] RS2[Rhai Script 2] RS3[Rhai Script 3] RE[Rhai Engine] end subgraph "Request Registry" PR[Pending Requests Map] RID[Request IDs] end subgraph "Async Worker Thread" AW[Async Worker] HTTP[HTTP Client] API[External APIs] end RS1 --> RE RS2 --> RE RS3 --> RE RE --> PR PR --> AW AW --> HTTP HTTP --> API API --> HTTP HTTP --> AW AW --> PR PR --> RE ``` ### Core Data Structures ```rust use std::collections::HashMap; use std::sync::{Arc, Mutex}; use uuid::Uuid; // Global registry for pending requests static PENDING_REQUESTS: Mutex> = Mutex::new(HashMap::new()); #[derive(Debug)] pub struct PendingRequest { pub id: String, pub status: RequestStatus, pub result: Option>, pub created_at: std::time::Instant, } #[derive(Debug, Clone)] pub enum RequestStatus { Pending, Completed, Failed, Timeout, } #[derive(Debug)] pub struct AsyncRequest { pub id: String, // Unique request ID pub endpoint: String, pub method: String, pub data: HashMap, // No response channel - results stored in global registry } ``` ### Non-Blocking Request Function ```rust impl AsyncFunctionRegistry { // Non-blocking version - returns immediately pub fn make_request_async(&self, endpoint: String, method: String, data: HashMap) -> Result { let request_id = Uuid::new_v4().to_string(); // Store pending request { let mut pending = PENDING_REQUESTS.lock().unwrap(); pending.insert(request_id.clone(), PendingRequest { id: request_id.clone(), status: RequestStatus::Pending, result: None, created_at: std::time::Instant::now(), }); } let request = AsyncRequest { id: request_id.clone(), endpoint, method, data, }; // Send to async worker (non-blocking) self.request_sender.send(request) .map_err(|_| "Failed to send request to async worker".to_string())?; // Return request ID immediately - NO BLOCKING! Ok(request_id) } // Check if request is complete pub fn is_request_complete(&self, request_id: &str) -> bool { let pending = PENDING_REQUESTS.lock().unwrap(); if let Some(request) = pending.get(request_id) { matches!(request.status, RequestStatus::Completed | RequestStatus::Failed | RequestStatus::Timeout) } else { false } } // Get request result (non-blocking) pub fn get_request_result(&self, request_id: &str) -> Result { let mut pending = PENDING_REQUESTS.lock().unwrap(); if let Some(request) = pending.remove(request_id) { match request.result { Some(result) => result, None => Err("Request not completed yet".to_string()), } } else { Err("Request not found".to_string()) } } } ``` ### Updated Async Worker ```rust async fn async_worker_loop(config: StripeConfig, receiver: Receiver) { println!("🚀 Async worker thread started"); loop { match receiver.recv_timeout(Duration::from_millis(100)) { Ok(request) => { let request_id = request.id.clone(); let result = Self::handle_stripe_request(&config, &request).await; // Store result in global registry instead of sending through channel { let mut pending = PENDING_REQUESTS.lock().unwrap(); if let Some(pending_request) = pending.get_mut(&request_id) { pending_request.result = Some(result.clone()); pending_request.status = match result { Ok(_) => RequestStatus::Completed, Err(_) => RequestStatus::Failed, }; } } println!("✅ Request {} completed", request_id); } Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue, Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, } } } ``` ### Rhai Function Registration ```rust #[export_module] mod rhai_payment_module { // Async version - returns request ID immediately #[rhai_fn(name = "create_async", return_raw)] pub fn create_product_async(product: &mut RhaiProduct) -> Result> { let registry = ASYNC_REGISTRY.lock().unwrap(); let registry = registry.as_ref().ok_or("Stripe not configured")?; let form_data = prepare_product_data(product); let request_id = registry.make_request_async("products".to_string(), "POST".to_string(), form_data) .map_err(|e| e.to_string())?; Ok(request_id) } // Check if async request is complete #[rhai_fn(name = "is_complete", return_raw)] pub fn is_request_complete(request_id: String) -> Result> { let registry = ASYNC_REGISTRY.lock().unwrap(); let registry = registry.as_ref().ok_or("Stripe not configured")?; Ok(registry.is_request_complete(&request_id)) } // Get result of async request #[rhai_fn(name = "get_result", return_raw)] pub fn get_request_result(request_id: String) -> Result> { let registry = ASYNC_REGISTRY.lock().unwrap(); let registry = registry.as_ref().ok_or("Stripe not configured")?; registry.get_request_result(&request_id) .map_err(|e| e.to_string().into()) } // Convenience function - wait for result with polling #[rhai_fn(name = "await_result", return_raw)] pub fn await_request_result(request_id: String, timeout_seconds: i64) -> Result> { let registry = ASYNC_REGISTRY.lock().unwrap(); let registry = registry.as_ref().ok_or("Stripe not configured")?; let start_time = std::time::Instant::now(); let timeout = Duration::from_secs(timeout_seconds as u64); // Non-blocking polling loop loop { if registry.is_request_complete(&request_id) { return registry.get_request_result(&request_id) .map_err(|e| e.to_string().into()); } if start_time.elapsed() > timeout { return Err("Request timeout".to_string().into()); } // Small delay to prevent busy waiting std::thread::sleep(Duration::from_millis(50)); } } } ``` ## Usage Patterns ### 1. Fire-and-Forget Pattern ```rhai configure_stripe(STRIPE_API_KEY); // Start multiple async operations immediately - NO BLOCKING! let product_req = new_product() .name("Product 1") .create_async(); let price_req = new_price() .amount(1000) .create_async(); let coupon_req = new_coupon() .percent_off(25) .create_async(); print("All requests started, continuing with other work..."); // Do other work while APIs are processing for i in 1..100 { print(`Doing work: ${i}`); } // Check results when ready if is_complete(product_req) { let product_id = get_result(product_req); print(`Product created: ${product_id}`); } ``` ### 2. Polling Pattern ```rhai // Start async operation let request_id = new_product() .name("My Product") .create_async(); print("Request started, polling for completion..."); // Poll until complete (non-blocking) let max_attempts = 100; let attempt = 0; while attempt < max_attempts { if is_complete(request_id) { let result = get_result(request_id); print(`Success: ${result}`); break; } print(`Attempt ${attempt}: still waiting...`); attempt += 1; // Small delay between checks sleep(100); } ``` ### 3. Await Pattern (Convenience) ```rhai // Start async operation and wait for result let request_id = new_product() .name("My Product") .create_async(); print("Request started, waiting for result..."); // This polls internally but doesn't block other scripts try { let product_id = await_result(request_id, 30); // 30 second timeout print(`Product created: ${product_id}`); } catch(error) { print(`Failed: ${error}`); } ``` ### 4. Concurrent Operations ```rhai // Start multiple operations concurrently let requests = []; for i in 1..5 { let req = new_product() .name(`Product ${i}`) .create_async(); requests.push(req); } print("Started 5 concurrent product creations"); // Wait for all to complete let results = []; for req in requests { let result = await_result(req, 30); results.push(result); print(`Product created: ${result}`); } print(`All ${results.len()} products created!`); ``` ## Execution Flow Comparison ### Current Blocking Architecture ```mermaid sequenceDiagram participant R1 as Rhai Script 1 participant R2 as Rhai Script 2 participant RE as Rhai Engine participant AR as AsyncRegistry participant AW as Async Worker R1->>RE: product.create() RE->>AR: make_request() AR->>AW: send request Note over RE: 🚫 BLOCKED for up to 30 seconds Note over R2: ⏳ Cannot execute - engine blocked AW->>AR: response (after 10 seconds) AR->>RE: unblock RE->>R1: return result R2->>RE: Now can execute ``` ### New Non-Blocking Architecture ```mermaid sequenceDiagram participant R1 as Rhai Script 1 participant R2 as Rhai Script 2 participant RE as Rhai Engine participant AR as AsyncRegistry participant AW as Async Worker R1->>RE: product.create_async() RE->>AR: make_request_async() AR->>AW: send request AR->>RE: return request_id (immediate) RE->>R1: return request_id Note over R1: Script 1 continues... R2->>RE: other_operation() Note over RE: ✅ Engine available immediately RE->>R2: result AW->>AR: store result in registry R1->>RE: is_complete(request_id) RE->>R1: true R1->>RE: get_result(request_id) RE->>R1: product_id ``` ## Benefits ### 1. **Complete Non-Blocking Execution** - Rhai engine never blocks on API calls - Multiple scripts can execute concurrently - Better resource utilization ### 2. **Backward Compatibility** ```rhai // Keep existing blocking API for simple cases let product_id = new_product().name("Simple").create(); // Use async API for concurrent operations let request_id = new_product().name("Async").create_async(); ``` ### 3. **Flexible Programming Patterns** - **Fire-and-forget**: Start operation, check later - **Polling**: Check periodically until complete - **Await**: Convenience function with timeout - **Concurrent**: Start multiple operations simultaneously ### 4. **Resource Management** ```rust // Automatic cleanup of completed requests impl AsyncFunctionRegistry { pub fn cleanup_old_requests(&self) { let mut pending = PENDING_REQUESTS.lock().unwrap(); let now = std::time::Instant::now(); pending.retain(|_, request| { // Remove requests older than 5 minutes now.duration_since(request.created_at) < Duration::from_secs(300) }); } } ``` ## Performance Comparison | Architecture | Blocking Behavior | Concurrent Scripts | API Latency Impact | |-------------|------------------|-------------------|-------------------| | **Current** | ❌ Blocks engine | ❌ Sequential only | ❌ Blocks all execution | | **Callback** | ✅ Non-blocking | ✅ Unlimited concurrent | ✅ No impact on other scripts | ## Implementation Strategy ### Phase 1: Add Async Functions - Implement callback-based functions alongside existing ones - Add `create_async()`, `is_complete()`, `get_result()`, `await_result()` - Maintain backward compatibility ### Phase 2: Enhanced Features - Add batch operations for multiple concurrent requests - Implement request prioritization - Add metrics and monitoring ### Phase 3: Migration Path - Provide migration guide for existing scripts - Consider deprecating blocking functions in favor of async ones - Add performance benchmarks ## Conclusion The callback-based solution completely eliminates the blocking problem while maintaining a clean, intuitive API for Rhai scripts. This enables true concurrent execution of multiple scripts with external API integration, dramatically improving the system's scalability and responsiveness. The key innovation is replacing synchronous blocking calls with an asynchronous request/response pattern that stores results in a shared registry, allowing the Rhai engine to remain responsive while API operations complete in the background.