9.9 KiB
9.9 KiB
True Non-Blocking Implementation (No rt.block_on)
Problem with Previous Approach
The issue was using rt.block_on()
which blocks the spawned thread:
// THIS BLOCKS THE THREAD:
thread::spawn(move || {
let rt = Runtime::new().expect("Failed to create runtime");
rt.block_on(async { // <-- This blocks!
// async code here
});
});
Solution: Use tokio::spawn Instead
Use tokio::spawn
to run async code without blocking:
// THIS DOESN'T BLOCK:
tokio::spawn(async move {
// async code runs in tokio's thread pool
let client = Client::new();
match make_stripe_request(&client, &stripe_secret, "payment_intents", &form_data).await {
Ok(response) => {
dispatch_response_script(&worker_id, &context_id, "new_create_payment_intent_response", &response).await;
}
Err(error) => {
dispatch_error_script(&worker_id, &context_id, "new_create_payment_intent_error", &error).await;
}
}
});
Complete Corrected Implementation
Payment Intent Function (Corrected)
#[rhai_fn(name = "create_async", return_raw)]
pub fn create_payment_intent_async(
intent: &mut RhaiPaymentIntent,
worker_id: String,
context_id: String,
stripe_secret: String
) -> Result<String, Box<EvalAltResult>> {
let form_data = prepare_payment_intent_data(intent);
// Use tokio::spawn instead of thread::spawn + rt.block_on
tokio::spawn(async move {
let client = Client::new();
match make_stripe_request(&client, &stripe_secret, "payment_intents", &form_data).await {
Ok(response) => {
dispatch_response_script(
&worker_id,
&context_id,
"new_create_payment_intent_response",
&response
).await;
}
Err(error) => {
dispatch_error_script(
&worker_id,
&context_id,
"new_create_payment_intent_error",
&error
).await;
}
}
});
// Returns immediately - no blocking!
Ok("payment_intent_request_dispatched".to_string())
}
Product Function (Corrected)
#[rhai_fn(name = "create_async", return_raw)]
pub fn create_product_async(
product: &mut RhaiProduct,
worker_id: String,
context_id: String,
stripe_secret: String
) -> Result<String, Box<EvalAltResult>> {
let form_data = prepare_product_data(product);
tokio::spawn(async move {
let client = Client::new();
match make_stripe_request(&client, &stripe_secret, "products", &form_data).await {
Ok(response) => {
dispatch_response_script(
&worker_id,
&context_id,
"new_create_product_response",
&response
).await;
}
Err(error) => {
dispatch_error_script(
&worker_id,
&context_id,
"new_create_product_error",
&error
).await;
}
}
});
Ok("product_request_dispatched".to_string())
}
Price Function (Corrected)
#[rhai_fn(name = "create_async", return_raw)]
pub fn create_price_async(
price: &mut RhaiPrice,
worker_id: String,
context_id: String,
stripe_secret: String
) -> Result<String, Box<EvalAltResult>> {
let form_data = prepare_price_data(price);
tokio::spawn(async move {
let client = Client::new();
match make_stripe_request(&client, &stripe_secret, "prices", &form_data).await {
Ok(response) => {
dispatch_response_script(
&worker_id,
&context_id,
"new_create_price_response",
&response
).await;
}
Err(error) => {
dispatch_error_script(
&worker_id,
&context_id,
"new_create_price_error",
&error
).await;
}
}
});
Ok("price_request_dispatched".to_string())
}
Subscription Function (Corrected)
#[rhai_fn(name = "create_async", return_raw)]
pub fn create_subscription_async(
subscription: &mut RhaiSubscription,
worker_id: String,
context_id: String,
stripe_secret: String
) -> Result<String, Box<EvalAltResult>> {
let form_data = prepare_subscription_data(subscription);
tokio::spawn(async move {
let client = Client::new();
match make_stripe_request(&client, &stripe_secret, "subscriptions", &form_data).await {
Ok(response) => {
dispatch_response_script(
&worker_id,
&context_id,
"new_create_subscription_response",
&response
).await;
}
Err(error) => {
dispatch_error_script(
&worker_id,
&context_id,
"new_create_subscription_error",
&error
).await;
}
}
});
Ok("subscription_request_dispatched".to_string())
}
Coupon Function (Corrected)
#[rhai_fn(name = "create_async", return_raw)]
pub fn create_coupon_async(
coupon: &mut RhaiCoupon,
worker_id: String,
context_id: String,
stripe_secret: String
) -> Result<String, Box<EvalAltResult>> {
let form_data = prepare_coupon_data(coupon);
tokio::spawn(async move {
let client = Client::new();
match make_stripe_request(&client, &stripe_secret, "coupons", &form_data).await {
Ok(response) => {
dispatch_response_script(
&worker_id,
&context_id,
"new_create_coupon_response",
&response
).await;
}
Err(error) => {
dispatch_error_script(
&worker_id,
&context_id,
"new_create_coupon_error",
&error
).await;
}
}
});
Ok("coupon_request_dispatched".to_string())
}
Helper Functions (Same as Before)
async fn make_stripe_request(
client: &Client,
secret_key: &str,
endpoint: &str,
form_data: &HashMap<String, String>
) -> Result<String, String> {
let url = format!("https://api.stripe.com/v1/{}", endpoint);
let response = client
.post(&url)
.basic_auth(secret_key, None::<&str>)
.form(form_data)
.send()
.await
.map_err(|e| format!("HTTP request failed: {}", e))?;
let response_text = response.text().await
.map_err(|e| format!("Failed to read response: {}", e))?;
Ok(response_text)
}
async fn dispatch_response_script(
worker_id: &str,
context_id: &str,
script_name: &str,
response_data: &str
) {
let script_content = format!(
r#"
let response_json = `{}`;
let parsed_data = parse_json(response_json);
eval_file("flows/{}.rhai");
"#,
response_data.replace('`', r#"\`"#),
script_name
);
if let Ok(dispatcher) = RhaiDispatcherBuilder::new()
.caller_id("stripe")
.worker_id(worker_id)
.context_id(context_id)
.redis_url("redis://127.0.0.1/")
.build()
{
let _ = dispatcher
.new_play_request()
.script(&script_content)
.submit()
.await;
}
}
async fn dispatch_error_script(
worker_id: &str,
context_id: &str,
script_name: &str,
error_data: &str
) {
let script_content = format!(
r#"
let error_json = `{}`;
let parsed_error = parse_json(error_json);
eval_file("flows/{}.rhai");
"#,
error_data.replace('`', r#"\`"#),
script_name
);
if let Ok(dispatcher) = RhaiDispatcherBuilder::new()
.caller_id("stripe")
.worker_id(worker_id)
.context_id(context_id)
.redis_url("redis://127.0.0.1/")
.build()
{
let _ = dispatcher
.new_play_request()
.script(&script_content)
.submit()
.await;
}
}
Key Differences
Before (Blocking):
thread::spawn(move || {
let rt = Runtime::new().expect("Failed to create runtime");
rt.block_on(async { // <-- BLOCKS THE THREAD
// async code
});
});
After (Non-Blocking):
tokio::spawn(async move { // <-- DOESN'T BLOCK
// async code runs in tokio's thread pool
});
Benefits of tokio::spawn
- No Blocking - Uses tokio's async runtime, doesn't block
- Efficient - Reuses existing tokio thread pool
- Lightweight - No need to create new runtime per request
- Scalable - Can handle many concurrent requests
- Simple - Less code, cleaner implementation
Usage (Same as Before)
let payment_intent = new_payment_intent()
.amount(2000)
.currency("usd")
.customer("cus_customer123");
// This returns immediately, HTTP happens asynchronously
let result = payment_intent.create_async(
"worker-1",
"context-123",
"sk_test_..."
);
print(`Request dispatched: ${result}`);
// Script ends, but HTTP continues in background
Requirements
Make sure your application is running in a tokio runtime context. If not, you might need to ensure the Rhai engine is running within a tokio runtime.
This implementation provides true non-blocking behavior - the Rhai function returns immediately while the HTTP request and script dispatch happen asynchronously in the background.