460 lines
13 KiB
Markdown
460 lines
13 KiB
Markdown
# Async Rhai Architecture for HTTP API Integration
|
|
|
|
## Overview
|
|
|
|
This document describes the async architecture implemented in RhaiLib that enables Rhai scripts to perform HTTP API calls despite Rhai's fundamentally synchronous nature. The architecture bridges Rhai's blocking execution model with Rust's async ecosystem using multi-threading and message passing.
|
|
|
|
## The Challenge
|
|
|
|
Rhai is a synchronous, single-threaded scripting language that cannot natively handle async operations. However, modern applications often need to:
|
|
|
|
- Make HTTP API calls (REST, GraphQL, etc.)
|
|
- Interact with external services (Stripe, payment processors, etc.)
|
|
- Perform I/O operations that benefit from async handling
|
|
- Maintain responsive execution while waiting for network responses
|
|
|
|
## Architecture Solution
|
|
|
|
### Core Components
|
|
|
|
```mermaid
|
|
graph TB
|
|
subgraph "Rhai Thread (Synchronous)"
|
|
RS[Rhai Script]
|
|
RF[Rhai Functions]
|
|
RR[Registry Interface]
|
|
end
|
|
|
|
subgraph "Communication Layer"
|
|
MC[MPSC Channel]
|
|
REQ[AsyncRequest]
|
|
RESP[Response Channel]
|
|
end
|
|
|
|
subgraph "Async Worker Thread"
|
|
RT[Tokio Runtime]
|
|
AW[Async Worker Loop]
|
|
HC[HTTP Client]
|
|
API[External APIs]
|
|
end
|
|
|
|
RS --> RF
|
|
RF --> RR
|
|
RR --> MC
|
|
MC --> REQ
|
|
REQ --> AW
|
|
AW --> HC
|
|
HC --> API
|
|
API --> HC
|
|
HC --> AW
|
|
AW --> RESP
|
|
RESP --> RR
|
|
RR --> RF
|
|
RF --> RS
|
|
```
|
|
|
|
### 1. AsyncFunctionRegistry
|
|
|
|
The central coordinator that manages async operations:
|
|
|
|
```rust
|
|
#[derive(Debug, Clone)]
|
|
pub struct AsyncFunctionRegistry {
|
|
pub request_sender: Sender<AsyncRequest>,
|
|
pub stripe_config: StripeConfig,
|
|
}
|
|
```
|
|
|
|
**Key Features:**
|
|
- **Thread-safe communication**: Uses `std::sync::mpsc` channels
|
|
- **Request coordination**: Manages the request/response lifecycle
|
|
- **Configuration management**: Stores API credentials and HTTP client settings
|
|
|
|
### 2. AsyncRequest Structure
|
|
|
|
Encapsulates all information needed for an async operation:
|
|
|
|
```rust
|
|
#[derive(Debug)]
|
|
pub struct AsyncRequest {
|
|
pub endpoint: String,
|
|
pub method: String,
|
|
pub data: HashMap<String, String>,
|
|
pub response_sender: std::sync::mpsc::Sender<Result<String, String>>,
|
|
}
|
|
```
|
|
|
|
**Components:**
|
|
- **endpoint**: API endpoint path (e.g., "products", "payment_intents")
|
|
- **method**: HTTP method (POST, GET, PUT, DELETE)
|
|
- **data**: Form data for the request body
|
|
- **response_sender**: Channel to send the result back to the calling thread
|
|
|
|
### 3. Async Worker Thread
|
|
|
|
A dedicated thread running a Tokio runtime that processes async operations:
|
|
|
|
```rust
|
|
async fn async_worker_loop(config: StripeConfig, receiver: Receiver<AsyncRequest>) {
|
|
loop {
|
|
match receiver.recv_timeout(Duration::from_millis(100)) {
|
|
Ok(request) => {
|
|
let result = Self::handle_stripe_request(&config, &request).await;
|
|
if let Err(_) = request.response_sender.send(result) {
|
|
println!("⚠️ Failed to send response back to caller");
|
|
}
|
|
}
|
|
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
|
|
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
**Key Design Decisions:**
|
|
- **Timeout-based polling**: Uses `recv_timeout()` instead of blocking `recv()` to prevent runtime deadlocks
|
|
- **Error handling**: Gracefully handles channel disconnections and timeouts
|
|
- **Non-blocking**: Allows the async runtime to process other tasks during polling intervals
|
|
|
|
## Request Flow
|
|
|
|
### 1. Rhai Script Execution
|
|
|
|
```rhai
|
|
// Rhai script calls a function
|
|
let product = new_product()
|
|
.name("Premium Software License")
|
|
.description("A comprehensive software solution");
|
|
|
|
let product_id = product.create(); // This triggers async HTTP call
|
|
```
|
|
|
|
### 2. Function Registration and Execution
|
|
|
|
```rust
|
|
#[rhai_fn(name = "create", return_raw)]
|
|
pub fn create_product(product: &mut RhaiProduct) -> Result<String, Box<EvalAltResult>> {
|
|
let registry = ASYNC_REGISTRY.lock().unwrap();
|
|
let registry = registry.as_ref().ok_or("Stripe not configured")?;
|
|
|
|
let form_data = prepare_product_data(product);
|
|
let result = registry.make_request("products".to_string(), "POST".to_string(), form_data)
|
|
.map_err(|e| e.to_string())?;
|
|
|
|
product.id = Some(result.clone());
|
|
Ok(result)
|
|
}
|
|
```
|
|
|
|
### 3. Request Processing
|
|
|
|
```rust
|
|
pub fn make_request(&self, endpoint: String, method: String, data: HashMap<String, String>) -> Result<String, String> {
|
|
let (response_sender, response_receiver) = mpsc::channel();
|
|
|
|
let request = AsyncRequest {
|
|
endpoint,
|
|
method,
|
|
data,
|
|
response_sender,
|
|
};
|
|
|
|
// Send request to async worker
|
|
self.request_sender.send(request)
|
|
.map_err(|_| "Failed to send request to async worker".to_string())?;
|
|
|
|
// Wait for response with timeout
|
|
response_receiver.recv_timeout(Duration::from_secs(30))
|
|
.map_err(|e| format!("Failed to receive response: {}", e))?
|
|
}
|
|
```
|
|
|
|
### 4. HTTP Request Execution
|
|
|
|
```rust
|
|
async fn handle_stripe_request(config: &StripeConfig, request: &AsyncRequest) -> Result<String, String> {
|
|
let url = format!("{}/{}", STRIPE_API_BASE, request.endpoint);
|
|
|
|
let response = config.client
|
|
.post(&url)
|
|
.basic_auth(&config.secret_key, None::<&str>)
|
|
.form(&request.data)
|
|
.send()
|
|
.await
|
|
.map_err(|e| format!("HTTP request failed: {}", e))?;
|
|
|
|
let response_text = response.text().await
|
|
.map_err(|e| format!("Failed to read response: {}", e))?;
|
|
|
|
// Parse and validate response
|
|
let json: serde_json::Value = serde_json::from_str(&response_text)
|
|
.map_err(|e| format!("Failed to parse JSON: {}", e))?;
|
|
|
|
if let Some(id) = json.get("id").and_then(|v| v.as_str()) {
|
|
Ok(id.to_string())
|
|
} else if let Some(error) = json.get("error") {
|
|
Err(format!("API error: {}", error))
|
|
} else {
|
|
Err(format!("Unexpected response: {}", response_text))
|
|
}
|
|
}
|
|
```
|
|
|
|
## Configuration and Setup
|
|
|
|
### 1. HTTP Client Configuration
|
|
|
|
```rust
|
|
let client = Client::builder()
|
|
.timeout(Duration::from_secs(5))
|
|
.connect_timeout(Duration::from_secs(3))
|
|
.pool_idle_timeout(Duration::from_secs(10))
|
|
.tcp_keepalive(Duration::from_secs(30))
|
|
.user_agent("rhailib-payment/1.0")
|
|
.build()?;
|
|
```
|
|
|
|
### 2. Environment Variable Loading
|
|
|
|
```rust
|
|
// Load from .env file
|
|
dotenv::from_filename("examples/payment/.env").ok();
|
|
|
|
let stripe_secret_key = env::var("STRIPE_SECRET_KEY")
|
|
.unwrap_or_else(|_| "sk_test_demo_key".to_string());
|
|
```
|
|
|
|
### 3. Rhai Engine Setup
|
|
|
|
```rust
|
|
let mut engine = Engine::new();
|
|
register_payment_rhai_module(&mut engine);
|
|
|
|
let mut scope = Scope::new();
|
|
scope.push("STRIPE_API_KEY", stripe_secret_key);
|
|
|
|
engine.eval_with_scope::<()>(&mut scope, &script)?;
|
|
```
|
|
|
|
## API Integration Examples
|
|
|
|
### Stripe Payment Processing
|
|
|
|
The architecture supports comprehensive Stripe API integration:
|
|
|
|
#### Product Creation
|
|
```rhai
|
|
let product = new_product()
|
|
.name("Premium Software License")
|
|
.description("A comprehensive software solution")
|
|
.metadata("category", "software");
|
|
|
|
let product_id = product.create(); // Async HTTP POST to /v1/products
|
|
```
|
|
|
|
#### Price Configuration
|
|
```rhai
|
|
let monthly_price = new_price()
|
|
.amount(2999) // $29.99 in cents
|
|
.currency("usd")
|
|
.product(product_id)
|
|
.recurring("month");
|
|
|
|
let price_id = monthly_price.create(); // Async HTTP POST to /v1/prices
|
|
```
|
|
|
|
#### Subscription Management
|
|
```rhai
|
|
let subscription = new_subscription()
|
|
.customer("cus_example_customer")
|
|
.add_price(monthly_price_id)
|
|
.trial_days(14)
|
|
.coupon(coupon_id);
|
|
|
|
let subscription_id = subscription.create(); // Async HTTP POST to /v1/subscriptions
|
|
```
|
|
|
|
#### Payment Intent Processing
|
|
```rhai
|
|
let payment_intent = new_payment_intent()
|
|
.amount(19999)
|
|
.currency("usd")
|
|
.customer("cus_example_customer")
|
|
.description("Premium Software License");
|
|
|
|
let intent_id = payment_intent.create(); // Async HTTP POST to /v1/payment_intents
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
### 1. Network Errors
|
|
```rust
|
|
.map_err(|e| {
|
|
println!("❌ HTTP request failed: {}", e);
|
|
format!("HTTP request failed: {}", e)
|
|
})?
|
|
```
|
|
|
|
### 2. API Errors
|
|
```rust
|
|
if let Some(error) = json.get("error") {
|
|
let error_msg = format!("Stripe API error: {}", error);
|
|
println!("❌ {}", error_msg);
|
|
Err(error_msg)
|
|
}
|
|
```
|
|
|
|
### 3. Timeout Handling
|
|
```rust
|
|
response_receiver.recv_timeout(Duration::from_secs(30))
|
|
.map_err(|e| format!("Failed to receive response: {}", e))?
|
|
```
|
|
|
|
### 4. Rhai Script Error Handling
|
|
```rhai
|
|
try {
|
|
let product_id = product.create();
|
|
print(`✅ Product ID: ${product_id}`);
|
|
} catch(error) {
|
|
print(`❌ Failed to create product: ${error}`);
|
|
return; // Exit gracefully
|
|
}
|
|
```
|
|
|
|
## Performance Characteristics
|
|
|
|
### Throughput
|
|
- **Concurrent requests**: Multiple async operations can be processed simultaneously
|
|
- **Connection pooling**: HTTP client reuses connections for efficiency
|
|
- **Timeout management**: Prevents hanging requests from blocking the system
|
|
|
|
### Latency
|
|
- **Channel overhead**: Minimal overhead for message passing (~microseconds)
|
|
- **Thread switching**: Single context switch per request
|
|
- **Network latency**: Dominated by actual HTTP request time
|
|
|
|
### Memory Usage
|
|
- **Request buffering**: Bounded by channel capacity
|
|
- **Connection pooling**: Efficient memory usage for HTTP connections
|
|
- **Response caching**: No automatic caching (can be added if needed)
|
|
|
|
## Thread Safety
|
|
|
|
### 1. Global Registry
|
|
```rust
|
|
static ASYNC_REGISTRY: Mutex<Option<AsyncFunctionRegistry>> = Mutex::new(None);
|
|
```
|
|
|
|
### 2. Channel Communication
|
|
- **MPSC channels**: Multiple producers (Rhai functions), single consumer (async worker)
|
|
- **Response channels**: One-to-one communication for each request
|
|
|
|
### 3. Shared Configuration
|
|
- **Immutable after setup**: Configuration is cloned to worker thread
|
|
- **Thread-safe HTTP client**: reqwest::Client is thread-safe
|
|
|
|
## Extensibility
|
|
|
|
### Adding New APIs
|
|
|
|
1. **Define request structures**:
|
|
```rust
|
|
#[derive(Debug)]
|
|
pub struct GraphQLRequest {
|
|
pub query: String,
|
|
pub variables: HashMap<String, serde_json::Value>,
|
|
pub response_sender: std::sync::mpsc::Sender<Result<String, String>>,
|
|
}
|
|
```
|
|
|
|
2. **Implement request handlers**:
|
|
```rust
|
|
async fn handle_graphql_request(config: &GraphQLConfig, request: &GraphQLRequest) -> Result<String, String> {
|
|
// Implementation
|
|
}
|
|
```
|
|
|
|
3. **Register Rhai functions**:
|
|
```rust
|
|
#[rhai_fn(name = "graphql_query", return_raw)]
|
|
pub fn execute_graphql_query(query: String) -> Result<String, Box<EvalAltResult>> {
|
|
// Implementation
|
|
}
|
|
```
|
|
|
|
### Custom HTTP Methods
|
|
|
|
The architecture supports any HTTP method:
|
|
```rust
|
|
registry.make_request("endpoint".to_string(), "PUT".to_string(), data)
|
|
registry.make_request("endpoint".to_string(), "DELETE".to_string(), HashMap::new())
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### 1. Configuration Management
|
|
- Use environment variables for sensitive data (API keys)
|
|
- Validate configuration before starting async workers
|
|
- Provide meaningful error messages for missing configuration
|
|
|
|
### 2. Error Handling
|
|
- Always handle both network and API errors
|
|
- Provide fallback behavior for failed requests
|
|
- Log errors with sufficient context for debugging
|
|
|
|
### 3. Timeout Configuration
|
|
- Set appropriate timeouts for different types of requests
|
|
- Consider retry logic for transient failures
|
|
- Balance responsiveness with reliability
|
|
|
|
### 4. Resource Management
|
|
- Limit concurrent requests to prevent overwhelming external APIs
|
|
- Use connection pooling for efficiency
|
|
- Clean up resources when shutting down
|
|
|
|
## Troubleshooting
|
|
|
|
### Common Issues
|
|
|
|
1. **"Cannot block the current thread from within a runtime"**
|
|
- **Cause**: Using blocking operations within async context
|
|
- **Solution**: Use `recv_timeout()` instead of `blocking_recv()`
|
|
|
|
2. **Channel disconnection errors**
|
|
- **Cause**: Worker thread terminated unexpectedly
|
|
- **Solution**: Check worker thread for panics, ensure proper error handling
|
|
|
|
3. **Request timeouts**
|
|
- **Cause**: Network issues or slow API responses
|
|
- **Solution**: Adjust timeout values, implement retry logic
|
|
|
|
4. **API authentication errors**
|
|
- **Cause**: Invalid or missing API keys
|
|
- **Solution**: Verify environment variable configuration
|
|
|
|
### Debugging Tips
|
|
|
|
1. **Enable detailed logging**:
|
|
```rust
|
|
println!("🔄 Processing {} request to {}", request.method, request.endpoint);
|
|
println!("📥 API response: {}", response_text);
|
|
```
|
|
|
|
2. **Monitor channel health**:
|
|
```rust
|
|
if let Err(_) = request.response_sender.send(result) {
|
|
println!("⚠️ Failed to send response back to caller");
|
|
}
|
|
```
|
|
|
|
3. **Test with demo data**:
|
|
```rhai
|
|
// Use demo API keys that fail gracefully for testing
|
|
let demo_key = "sk_test_demo_key_will_fail_gracefully";
|
|
```
|
|
|
|
## Conclusion
|
|
|
|
This async architecture successfully bridges Rhai's synchronous execution model with Rust's async ecosystem, enabling powerful HTTP API integration while maintaining the simplicity and safety of Rhai scripts. The design is extensible, performant, and handles errors gracefully, making it suitable for production use in applications requiring external API integration.
|
|
|
|
The key innovation is the use of timeout-based polling in the async worker loop, which prevents the common "cannot block within runtime" error while maintaining responsive execution. This pattern can be applied to other async operations beyond HTTP requests, such as database queries, file I/O, or any other async Rust operations that need to be exposed to Rhai scripts. |