rhailib/research/repl/examples/connect_and_play.rs

199 lines
6.9 KiB
Rust

use anyhow::Context;
use rhai_dispatcher::{RhaiDispatcher, RhaiDispatcherError, RhaiTaskDetails};
use std::env;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing_subscriber::EnvFilter;
use engine::create_heromodels_engine;
use heromodels::db::hero::OurDB;
use std::path::PathBuf;
use worker_lib::spawn_rhai_worker;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::from_default_env()
.add_directive("connect_and_play=info".parse().unwrap())
.add_directive("rhai_dispatcher=info".parse().unwrap()),
)
.init();
let args: Vec<String> = env::args().collect();
let redis_url = args.get(1).cloned().unwrap_or_else(|| {
let default_url = "redis://127.0.0.1/".to_string();
println!("No Redis URL provided. Defaulting to: {}", default_url);
default_url
});
let worker_name = args.get(2).cloned().unwrap_or_else(|| {
let default_worker = "default_worker".to_string();
println!("No worker name provided. Defaulting to: {}", default_worker);
default_worker
});
// Define DB path for the worker
let db_path_str = format!("./temp_db_for_example_worker_{}", worker_name);
let db_path = PathBuf::from(&db_path_str);
// Create shutdown channel for the worker
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
// Spawn a worker in the background
let worker_redis_url = redis_url.clone();
let worker_circle_name_for_task = worker_name.clone();
let db_path_for_task = db_path_str.clone();
log::info!(
"[Main] Spawning worker for circle '{}' with DB path '{}'",
worker_circle_name_for_task,
db_path_for_task
);
let worker_join_handle = tokio::spawn(async move {
log::info!(
"[BG Worker] Starting for circle '{}' on Redis '{}'",
worker_circle_name_for_task,
worker_redis_url
);
// The `reset: true` in OurDB::new handles pre-cleanup if the directory exists.
let db = Arc::new(
OurDB::new(&db_path_for_task, true)
.expect("Failed to create temp DB for example worker"),
);
let mut engine = create_heromodels_engine(db);
engine.set_max_operations(0);
engine.set_max_expr_depths(0, 0);
engine.set_optimization_level(rhai::OptimizationLevel::Full);
if let Err(e) = spawn_rhai_worker(
1, // dummy circle_id
worker_circle_name_for_task.clone(),
engine,
worker_redis_url.clone(),
shutdown_rx, // Pass the receiver from main
false, // preserve_tasks
)
.await
{
log::error!(
"[BG Worker] Failed to spawn or worker error for circle '{}': {}",
worker_circle_name_for_task,
e
);
} else {
log::info!(
"[BG Worker] Worker for circle '{}' shut down gracefully.",
worker_circle_name_for_task
);
}
});
// Give the worker a moment to start up
tokio::time::sleep(Duration::from_secs(1)).await;
println!(
"Initializing RhaiDispatcher for Redis at {} to target worker '{}'...",
redis_url, worker_name
);
let client = RhaiDispatcher::new(&redis_url)
.with_context(|| format!("Failed to create RhaiDispatcher for Redis URL: {}", redis_url))?;
println!("RhaiDispatcher initialized.");
let script = "let a = 10; let b = 32; let message = \"Hello from example script!\"; message + \" Result: \" + (a + b)";
println!("\nSending script:\n```rhai\n{}\n```", script);
let timeout = Duration::from_secs(30);
match client
.submit_script_and_await_result(&worker_name, script.to_string(), None, timeout)
.await
{
Ok(task_details) => {
println!("\nWorker response:");
if let Some(ref output) = task_details.output {
println!("Output: {}", output);
}
if let Some(ref error_msg) = task_details.error {
eprintln!("Error: {}", error_msg);
}
if task_details.output.is_none() && task_details.error.is_none() {
println!(
"Worker finished with no explicit output or error. Status: {}",
task_details.status
);
}
}
Err(e) => match e {
RhaiDispatcherError::Timeout(task_id) => {
eprintln!(
"\nError: Script execution timed out for task_id: {}.",
task_id
);
}
RhaiDispatcherError::RedisError(redis_err) => {
eprintln!(
"\nError: Redis communication failed: {}. Check Redis connection and server status.",
redis_err
);
}
RhaiDispatcherError::SerializationError(serde_err) => {
eprintln!(
"\nError: Failed to serialize/deserialize task data: {}.",
serde_err
);
}
RhaiDispatcherError::TaskNotFound(task_id) => {
eprintln!("\nError: Task {} not found after submission.", task_id);
} /* All RhaiDispatcherError variants are handled, so _ arm is not strictly needed
unless RhaiDispatcherError becomes non-exhaustive in the future. */
},
}
println!("\nExample client operations finished. Shutting down worker...");
// Send shutdown signal to the worker
if let Err(e) = shutdown_tx.send(()).await {
eprintln!(
"[Main] Failed to send shutdown signal to worker: {} (worker might have already exited or an error occurred)",
e
);
}
// Wait for the worker to finish
log::info!("[Main] Waiting for worker task to join...");
if let Err(e) = worker_join_handle.await {
eprintln!("[Main] Error waiting for worker task to join: {:?}", e);
} else {
log::info!("[Main] Worker task joined successfully.");
}
// Clean up the database directory
log::info!(
"[Main] Cleaning up database directory: {}",
db_path.display()
);
if db_path.exists() {
if let Err(e) = std::fs::remove_dir_all(&db_path) {
eprintln!(
"[Main] Failed to remove database directory '{}': {}",
db_path.display(),
e
);
} else {
log::info!(
"[Main] Successfully removed database directory: {}",
db_path.display()
);
}
} else {
log::info!(
"[Main] Database directory '{}' not found, no cleanup needed.",
db_path.display()
);
}
println!("Example fully completed and cleaned up.");
Ok(())
}