rhailib/docs/NON_BLOCKING_ASYNC_DESIGN.md

14 KiB

Non-Blocking Async Architecture Design

Problem Statement

The current async architecture has a critical limitation: slow API responses block the entire Rhai engine, preventing other scripts from executing. When an API call takes 10 seconds, the Rhai engine is blocked for the full duration.

Current Blocking Behavior

// This BLOCKS the Rhai execution thread!
response_receiver.recv_timeout(Duration::from_secs(30))
    .map_err(|e| format!("Failed to receive response: {}", e))?

Impact:

  • Async worker thread: NOT blocked (continues processing)
  • Rhai engine thread: BLOCKED (cannot execute other scripts)
  • Other Rhai scripts: QUEUED (must wait)

Callback-Based Solution

Architecture Overview

graph TB
    subgraph "Rhai Engine Thread (Non-Blocking)"
        RS1[Rhai Script 1]
        RS2[Rhai Script 2]
        RS3[Rhai Script 3]
        RE[Rhai Engine]
    end
    
    subgraph "Request Registry"
        PR[Pending Requests Map]
        RID[Request IDs]
    end
    
    subgraph "Async Worker Thread"
        AW[Async Worker]
        HTTP[HTTP Client]
        API[External APIs]
    end
    
    RS1 --> RE
    RS2 --> RE
    RS3 --> RE
    RE --> PR
    PR --> AW
    AW --> HTTP
    HTTP --> API
    API --> HTTP
    HTTP --> AW
    AW --> PR
    PR --> RE

Core Data Structures

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use uuid::Uuid;

// Global registry for pending requests
static PENDING_REQUESTS: Mutex<HashMap<String, PendingRequest>> = Mutex::new(HashMap::new());

#[derive(Debug)]
pub struct PendingRequest {
    pub id: String,
    pub status: RequestStatus,
    pub result: Option<Result<String, String>>,
    pub created_at: std::time::Instant,
}

#[derive(Debug, Clone)]
pub enum RequestStatus {
    Pending,
    Completed,
    Failed,
    Timeout,
}

#[derive(Debug)]
pub struct AsyncRequest {
    pub id: String,  // Unique request ID
    pub endpoint: String,
    pub method: String,
    pub data: HashMap<String, String>,
    // No response channel - results stored in global registry
}

Non-Blocking Request Function

impl AsyncFunctionRegistry {
    // Non-blocking version - returns immediately
    pub fn make_request_async(&self, endpoint: String, method: String, data: HashMap<String, String>) -> Result<String, String> {
        let request_id = Uuid::new_v4().to_string();
        
        // Store pending request
        {
            let mut pending = PENDING_REQUESTS.lock().unwrap();
            pending.insert(request_id.clone(), PendingRequest {
                id: request_id.clone(),
                status: RequestStatus::Pending,
                result: None,
                created_at: std::time::Instant::now(),
            });
        }
        
        let request = AsyncRequest {
            id: request_id.clone(),
            endpoint,
            method,
            data,
        };
        
        // Send to async worker (non-blocking)
        self.request_sender.send(request)
            .map_err(|_| "Failed to send request to async worker".to_string())?;
        
        // Return request ID immediately - NO BLOCKING!
        Ok(request_id)
    }
    
    // Check if request is complete
    pub fn is_request_complete(&self, request_id: &str) -> bool {
        let pending = PENDING_REQUESTS.lock().unwrap();
        if let Some(request) = pending.get(request_id) {
            matches!(request.status, RequestStatus::Completed | RequestStatus::Failed | RequestStatus::Timeout)
        } else {
            false
        }
    }
    
    // Get request result (non-blocking)
    pub fn get_request_result(&self, request_id: &str) -> Result<String, String> {
        let mut pending = PENDING_REQUESTS.lock().unwrap();
        if let Some(request) = pending.remove(request_id) {
            match request.result {
                Some(result) => result,
                None => Err("Request not completed yet".to_string()),
            }
        } else {
            Err("Request not found".to_string())
        }
    }
}

Updated Async Worker

async fn async_worker_loop(config: StripeConfig, receiver: Receiver<AsyncRequest>) {
    println!("🚀 Async worker thread started");
    
    loop {
        match receiver.recv_timeout(Duration::from_millis(100)) {
            Ok(request) => {
                let request_id = request.id.clone();
                let result = Self::handle_stripe_request(&config, &request).await;
                
                // Store result in global registry instead of sending through channel
                {
                    let mut pending = PENDING_REQUESTS.lock().unwrap();
                    if let Some(pending_request) = pending.get_mut(&request_id) {
                        pending_request.result = Some(result.clone());
                        pending_request.status = match result {
                            Ok(_) => RequestStatus::Completed,
                            Err(_) => RequestStatus::Failed,
                        };
                    }
                }
                
                println!("✅ Request {} completed", request_id);
            }
            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
            Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
        }
    }
}

Rhai Function Registration

#[export_module]
mod rhai_payment_module {
    // Async version - returns request ID immediately
    #[rhai_fn(name = "create_async", return_raw)]
    pub fn create_product_async(product: &mut RhaiProduct) -> Result<String, Box<EvalAltResult>> {
        let registry = ASYNC_REGISTRY.lock().unwrap();
        let registry = registry.as_ref().ok_or("Stripe not configured")?;
        
        let form_data = prepare_product_data(product);
        let request_id = registry.make_request_async("products".to_string(), "POST".to_string(), form_data)
            .map_err(|e| e.to_string())?;
        
        Ok(request_id)
    }
    
    // Check if async request is complete
    #[rhai_fn(name = "is_complete", return_raw)]
    pub fn is_request_complete(request_id: String) -> Result<bool, Box<EvalAltResult>> {
        let registry = ASYNC_REGISTRY.lock().unwrap();
        let registry = registry.as_ref().ok_or("Stripe not configured")?;
        
        Ok(registry.is_request_complete(&request_id))
    }
    
    // Get result of async request
    #[rhai_fn(name = "get_result", return_raw)]
    pub fn get_request_result(request_id: String) -> Result<String, Box<EvalAltResult>> {
        let registry = ASYNC_REGISTRY.lock().unwrap();
        let registry = registry.as_ref().ok_or("Stripe not configured")?;
        
        registry.get_request_result(&request_id)
            .map_err(|e| e.to_string().into())
    }
    
    // Convenience function - wait for result with polling
    #[rhai_fn(name = "await_result", return_raw)]
    pub fn await_request_result(request_id: String, timeout_seconds: i64) -> Result<String, Box<EvalAltResult>> {
        let registry = ASYNC_REGISTRY.lock().unwrap();
        let registry = registry.as_ref().ok_or("Stripe not configured")?;
        
        let start_time = std::time::Instant::now();
        let timeout = Duration::from_secs(timeout_seconds as u64);
        
        // Non-blocking polling loop
        loop {
            if registry.is_request_complete(&request_id) {
                return registry.get_request_result(&request_id)
                    .map_err(|e| e.to_string().into());
            }
            
            if start_time.elapsed() > timeout {
                return Err("Request timeout".to_string().into());
            }
            
            // Small delay to prevent busy waiting
            std::thread::sleep(Duration::from_millis(50));
        }
    }
}

Usage Patterns

1. Fire-and-Forget Pattern

configure_stripe(STRIPE_API_KEY);

// Start multiple async operations immediately - NO BLOCKING!
let product_req = new_product()
    .name("Product 1")
    .create_async();

let price_req = new_price()
    .amount(1000)
    .create_async();

let coupon_req = new_coupon()
    .percent_off(25)
    .create_async();

print("All requests started, continuing with other work...");

// Do other work while APIs are processing
for i in 1..100 {
    print(`Doing work: ${i}`);
}

// Check results when ready
if is_complete(product_req) {
    let product_id = get_result(product_req);
    print(`Product created: ${product_id}`);
}

2. Polling Pattern

// Start async operation
let request_id = new_product()
    .name("My Product")
    .create_async();

print("Request started, polling for completion...");

// Poll until complete (non-blocking)
let max_attempts = 100;
let attempt = 0;

while attempt < max_attempts {
    if is_complete(request_id) {
        let result = get_result(request_id);
        print(`Success: ${result}`);
        break;
    }
    
    print(`Attempt ${attempt}: still waiting...`);
    attempt += 1;
    
    // Small delay between checks
    sleep(100);
}

3. Await Pattern (Convenience)

// Start async operation and wait for result
let request_id = new_product()
    .name("My Product")
    .create_async();

print("Request started, waiting for result...");

// This polls internally but doesn't block other scripts
try {
    let product_id = await_result(request_id, 30); // 30 second timeout
    print(`Product created: ${product_id}`);
} catch(error) {
    print(`Failed: ${error}`);
}

4. Concurrent Operations

// Start multiple operations concurrently
let requests = [];

for i in 1..5 {
    let req = new_product()
        .name(`Product ${i}`)
        .create_async();
    requests.push(req);
}

print("Started 5 concurrent product creations");

// Wait for all to complete
let results = [];
for req in requests {
    let result = await_result(req, 30);
    results.push(result);
    print(`Product created: ${result}`);
}

print(`All ${results.len()} products created!`);

Execution Flow Comparison

Current Blocking Architecture

sequenceDiagram
    participant R1 as Rhai Script 1
    participant R2 as Rhai Script 2
    participant RE as Rhai Engine
    participant AR as AsyncRegistry
    participant AW as Async Worker

    R1->>RE: product.create()
    RE->>AR: make_request()
    AR->>AW: send request
    Note over RE: 🚫 BLOCKED for up to 30 seconds
    Note over R2: ⏳ Cannot execute - engine blocked
    AW->>AR: response (after 10 seconds)
    AR->>RE: unblock
    RE->>R1: return result
    R2->>RE: Now can execute

New Non-Blocking Architecture

sequenceDiagram
    participant R1 as Rhai Script 1
    participant R2 as Rhai Script 2
    participant RE as Rhai Engine
    participant AR as AsyncRegistry
    participant AW as Async Worker

    R1->>RE: product.create_async()
    RE->>AR: make_request_async()
    AR->>AW: send request
    AR->>RE: return request_id (immediate)
    RE->>R1: return request_id
    Note over R1: Script 1 continues...
    
    R2->>RE: other_operation()
    Note over RE: ✅ Engine available immediately
    RE->>R2: result
    
    AW->>AR: store result in registry
    R1->>RE: is_complete(request_id)
    RE->>R1: true
    R1->>RE: get_result(request_id)
    RE->>R1: product_id

Benefits

1. Complete Non-Blocking Execution

  • Rhai engine never blocks on API calls
  • Multiple scripts can execute concurrently
  • Better resource utilization

2. Backward Compatibility

// Keep existing blocking API for simple cases
let product_id = new_product().name("Simple").create();

// Use async API for concurrent operations
let request_id = new_product().name("Async").create_async();

3. Flexible Programming Patterns

  • Fire-and-forget: Start operation, check later
  • Polling: Check periodically until complete
  • Await: Convenience function with timeout
  • Concurrent: Start multiple operations simultaneously

4. Resource Management

// Automatic cleanup of completed requests
impl AsyncFunctionRegistry {
    pub fn cleanup_old_requests(&self) {
        let mut pending = PENDING_REQUESTS.lock().unwrap();
        let now = std::time::Instant::now();
        
        pending.retain(|_, request| {
            // Remove requests older than 5 minutes
            now.duration_since(request.created_at) < Duration::from_secs(300)
        });
    }
}

Performance Comparison

Architecture Blocking Behavior Concurrent Scripts API Latency Impact
Current Blocks engine Sequential only Blocks all execution
Callback Non-blocking Unlimited concurrent No impact on other scripts

Implementation Strategy

Phase 1: Add Async Functions

  • Implement callback-based functions alongside existing ones
  • Add create_async(), is_complete(), get_result(), await_result()
  • Maintain backward compatibility

Phase 2: Enhanced Features

  • Add batch operations for multiple concurrent requests
  • Implement request prioritization
  • Add metrics and monitoring

Phase 3: Migration Path

  • Provide migration guide for existing scripts
  • Consider deprecating blocking functions in favor of async ones
  • Add performance benchmarks

Conclusion

The callback-based solution completely eliminates the blocking problem while maintaining a clean, intuitive API for Rhai scripts. This enables true concurrent execution of multiple scripts with external API integration, dramatically improving the system's scalability and responsiveness.

The key innovation is replacing synchronous blocking calls with an asynchronous request/response pattern that stores results in a shared registry, allowing the Rhai engine to remain responsive while API operations complete in the background.