rhailib/docs/TRUE_NON_BLOCKING_IMPLEMENTATION.md

9.9 KiB

True Non-Blocking Implementation (No rt.block_on)

Problem with Previous Approach

The issue was using rt.block_on() which blocks the spawned thread:

// THIS BLOCKS THE THREAD:
thread::spawn(move || {
    let rt = Runtime::new().expect("Failed to create runtime");
    rt.block_on(async {  // <-- This blocks!
        // async code here
    });
});

Solution: Use tokio::spawn Instead

Use tokio::spawn to run async code without blocking:

// THIS DOESN'T BLOCK:
tokio::spawn(async move {
    // async code runs in tokio's thread pool
    let client = Client::new();
    match make_stripe_request(&client, &stripe_secret, "payment_intents", &form_data).await {
        Ok(response) => {
            dispatch_response_script(&worker_id, &context_id, "new_create_payment_intent_response", &response).await;
        }
        Err(error) => {
            dispatch_error_script(&worker_id, &context_id, "new_create_payment_intent_error", &error).await;
        }
    }
});

Complete Corrected Implementation

Payment Intent Function (Corrected)

#[rhai_fn(name = "create_async", return_raw)]
pub fn create_payment_intent_async(
    intent: &mut RhaiPaymentIntent,
    worker_id: String,
    context_id: String,
    stripe_secret: String
) -> Result<String, Box<EvalAltResult>> {
    let form_data = prepare_payment_intent_data(intent);
    
    // Use tokio::spawn instead of thread::spawn + rt.block_on
    tokio::spawn(async move {
        let client = Client::new();
        match make_stripe_request(&client, &stripe_secret, "payment_intents", &form_data).await {
            Ok(response) => {
                dispatch_response_script(
                    &worker_id, 
                    &context_id, 
                    "new_create_payment_intent_response", 
                    &response
                ).await;
            }
            Err(error) => {
                dispatch_error_script(
                    &worker_id, 
                    &context_id, 
                    "new_create_payment_intent_error", 
                    &error
                ).await;
            }
        }
    });
    
    // Returns immediately - no blocking!
    Ok("payment_intent_request_dispatched".to_string())
}

Product Function (Corrected)

#[rhai_fn(name = "create_async", return_raw)]
pub fn create_product_async(
    product: &mut RhaiProduct,
    worker_id: String,
    context_id: String,
    stripe_secret: String
) -> Result<String, Box<EvalAltResult>> {
    let form_data = prepare_product_data(product);
    
    tokio::spawn(async move {
        let client = Client::new();
        match make_stripe_request(&client, &stripe_secret, "products", &form_data).await {
            Ok(response) => {
                dispatch_response_script(
                    &worker_id, 
                    &context_id, 
                    "new_create_product_response", 
                    &response
                ).await;
            }
            Err(error) => {
                dispatch_error_script(
                    &worker_id, 
                    &context_id, 
                    "new_create_product_error", 
                    &error
                ).await;
            }
        }
    });
    
    Ok("product_request_dispatched".to_string())
}

Price Function (Corrected)

#[rhai_fn(name = "create_async", return_raw)]
pub fn create_price_async(
    price: &mut RhaiPrice,
    worker_id: String,
    context_id: String,
    stripe_secret: String
) -> Result<String, Box<EvalAltResult>> {
    let form_data = prepare_price_data(price);
    
    tokio::spawn(async move {
        let client = Client::new();
        match make_stripe_request(&client, &stripe_secret, "prices", &form_data).await {
            Ok(response) => {
                dispatch_response_script(
                    &worker_id, 
                    &context_id, 
                    "new_create_price_response", 
                    &response
                ).await;
            }
            Err(error) => {
                dispatch_error_script(
                    &worker_id, 
                    &context_id, 
                    "new_create_price_error", 
                    &error
                ).await;
            }
        }
    });
    
    Ok("price_request_dispatched".to_string())
}

Subscription Function (Corrected)

#[rhai_fn(name = "create_async", return_raw)]
pub fn create_subscription_async(
    subscription: &mut RhaiSubscription,
    worker_id: String,
    context_id: String,
    stripe_secret: String
) -> Result<String, Box<EvalAltResult>> {
    let form_data = prepare_subscription_data(subscription);
    
    tokio::spawn(async move {
        let client = Client::new();
        match make_stripe_request(&client, &stripe_secret, "subscriptions", &form_data).await {
            Ok(response) => {
                dispatch_response_script(
                    &worker_id, 
                    &context_id, 
                    "new_create_subscription_response", 
                    &response
                ).await;
            }
            Err(error) => {
                dispatch_error_script(
                    &worker_id, 
                    &context_id, 
                    "new_create_subscription_error", 
                    &error
                ).await;
            }
        }
    });
    
    Ok("subscription_request_dispatched".to_string())
}

Coupon Function (Corrected)

#[rhai_fn(name = "create_async", return_raw)]
pub fn create_coupon_async(
    coupon: &mut RhaiCoupon,
    worker_id: String,
    context_id: String,
    stripe_secret: String
) -> Result<String, Box<EvalAltResult>> {
    let form_data = prepare_coupon_data(coupon);
    
    tokio::spawn(async move {
        let client = Client::new();
        match make_stripe_request(&client, &stripe_secret, "coupons", &form_data).await {
            Ok(response) => {
                dispatch_response_script(
                    &worker_id, 
                    &context_id, 
                    "new_create_coupon_response", 
                    &response
                ).await;
            }
            Err(error) => {
                dispatch_error_script(
                    &worker_id, 
                    &context_id, 
                    "new_create_coupon_error", 
                    &error
                ).await;
            }
        }
    });
    
    Ok("coupon_request_dispatched".to_string())
}

Helper Functions (Same as Before)

async fn make_stripe_request(
    client: &Client,
    secret_key: &str,
    endpoint: &str,
    form_data: &HashMap<String, String>
) -> Result<String, String> {
    let url = format!("https://api.stripe.com/v1/{}", endpoint);
    
    let response = client
        .post(&url)
        .basic_auth(secret_key, None::<&str>)
        .form(form_data)
        .send()
        .await
        .map_err(|e| format!("HTTP request failed: {}", e))?;
    
    let response_text = response.text().await
        .map_err(|e| format!("Failed to read response: {}", e))?;
    
    Ok(response_text)
}

async fn dispatch_response_script(
    worker_id: &str,
    context_id: &str,
    script_name: &str,
    response_data: &str
) {
    let script_content = format!(
        r#"
        let response_json = `{}`;
        let parsed_data = parse_json(response_json);
        eval_file("flows/{}.rhai");
        "#,
        response_data.replace('`', r#"\`"#),
        script_name
    );
    
    if let Ok(dispatcher) = RhaiDispatcherBuilder::new()
        .caller_id("stripe")
        .worker_id(worker_id)
        .context_id(context_id)
        .redis_url("redis://127.0.0.1/")
        .build()
    {
        let _ = dispatcher
            .new_play_request()
            .script(&script_content)
            .submit()
            .await;
    }
}

async fn dispatch_error_script(
    worker_id: &str,
    context_id: &str,
    script_name: &str,
    error_data: &str
) {
    let script_content = format!(
        r#"
        let error_json = `{}`;
        let parsed_error = parse_json(error_json);
        eval_file("flows/{}.rhai");
        "#,
        error_data.replace('`', r#"\`"#),
        script_name
    );
    
    if let Ok(dispatcher) = RhaiDispatcherBuilder::new()
        .caller_id("stripe")
        .worker_id(worker_id)
        .context_id(context_id)
        .redis_url("redis://127.0.0.1/")
        .build()
    {
        let _ = dispatcher
            .new_play_request()
            .script(&script_content)
            .submit()
            .await;
    }
}

Key Differences

Before (Blocking):

thread::spawn(move || {
    let rt = Runtime::new().expect("Failed to create runtime");
    rt.block_on(async {  // <-- BLOCKS THE THREAD
        // async code
    });
});

After (Non-Blocking):

tokio::spawn(async move {  // <-- DOESN'T BLOCK
    // async code runs in tokio's thread pool
});

Benefits of tokio::spawn

  1. No Blocking - Uses tokio's async runtime, doesn't block
  2. Efficient - Reuses existing tokio thread pool
  3. Lightweight - No need to create new runtime per request
  4. Scalable - Can handle many concurrent requests
  5. Simple - Less code, cleaner implementation

Usage (Same as Before)

let payment_intent = new_payment_intent()
    .amount(2000)
    .currency("usd")
    .customer("cus_customer123");

// This returns immediately, HTTP happens asynchronously
let result = payment_intent.create_async(
    "worker-1",
    "context-123", 
    "sk_test_..."
);

print(`Request dispatched: ${result}`);
// Script ends, but HTTP continues in background

Requirements

Make sure your application is running in a tokio runtime context. If not, you might need to ensure the Rhai engine is running within a tokio runtime.

This implementation provides true non-blocking behavior - the Rhai function returns immediately while the HTTP request and script dispatch happen asynchronously in the background.