199 lines
6.9 KiB
Rust
199 lines
6.9 KiB
Rust
use anyhow::Context;
|
|
use rhai_dispatcher::{RhaiDispatcher, RhaiDispatcherError, RhaiTaskDetails};
|
|
use std::env;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use tokio::sync::mpsc;
|
|
use tracing_subscriber::EnvFilter;
|
|
|
|
use engine::create_heromodels_engine;
|
|
use heromodels::db::hero::OurDB;
|
|
use std::path::PathBuf;
|
|
use worker_lib::spawn_rhai_worker;
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
tracing_subscriber::fmt()
|
|
.with_env_filter(
|
|
EnvFilter::from_default_env()
|
|
.add_directive("connect_and_play=info".parse().unwrap())
|
|
.add_directive("rhai_dispatcher=info".parse().unwrap()),
|
|
)
|
|
.init();
|
|
|
|
let args: Vec<String> = env::args().collect();
|
|
let redis_url = args.get(1).cloned().unwrap_or_else(|| {
|
|
let default_url = "redis://127.0.0.1/".to_string();
|
|
println!("No Redis URL provided. Defaulting to: {}", default_url);
|
|
default_url
|
|
});
|
|
let worker_name = args.get(2).cloned().unwrap_or_else(|| {
|
|
let default_worker = "default_worker".to_string();
|
|
println!("No worker name provided. Defaulting to: {}", default_worker);
|
|
default_worker
|
|
});
|
|
|
|
// Define DB path for the worker
|
|
let db_path_str = format!("./temp_db_for_example_worker_{}", worker_name);
|
|
let db_path = PathBuf::from(&db_path_str);
|
|
|
|
// Create shutdown channel for the worker
|
|
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
|
|
|
|
// Spawn a worker in the background
|
|
let worker_redis_url = redis_url.clone();
|
|
let worker_circle_name_for_task = worker_name.clone();
|
|
let db_path_for_task = db_path_str.clone();
|
|
|
|
log::info!(
|
|
"[Main] Spawning worker for circle '{}' with DB path '{}'",
|
|
worker_circle_name_for_task,
|
|
db_path_for_task
|
|
);
|
|
|
|
let worker_join_handle = tokio::spawn(async move {
|
|
log::info!(
|
|
"[BG Worker] Starting for circle '{}' on Redis '{}'",
|
|
worker_circle_name_for_task,
|
|
worker_redis_url
|
|
);
|
|
// The `reset: true` in OurDB::new handles pre-cleanup if the directory exists.
|
|
let db = Arc::new(
|
|
OurDB::new(&db_path_for_task, true)
|
|
.expect("Failed to create temp DB for example worker"),
|
|
);
|
|
let mut engine = create_heromodels_engine(db);
|
|
engine.set_max_operations(0);
|
|
engine.set_max_expr_depths(0, 0);
|
|
engine.set_optimization_level(rhai::OptimizationLevel::Full);
|
|
|
|
if let Err(e) = spawn_rhai_worker(
|
|
1, // dummy circle_id
|
|
worker_circle_name_for_task.clone(),
|
|
engine,
|
|
worker_redis_url.clone(),
|
|
shutdown_rx, // Pass the receiver from main
|
|
false, // preserve_tasks
|
|
)
|
|
.await
|
|
{
|
|
log::error!(
|
|
"[BG Worker] Failed to spawn or worker error for circle '{}': {}",
|
|
worker_circle_name_for_task,
|
|
e
|
|
);
|
|
} else {
|
|
log::info!(
|
|
"[BG Worker] Worker for circle '{}' shut down gracefully.",
|
|
worker_circle_name_for_task
|
|
);
|
|
}
|
|
});
|
|
|
|
// Give the worker a moment to start up
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
|
|
println!(
|
|
"Initializing RhaiDispatcher for Redis at {} to target worker '{}'...",
|
|
redis_url, worker_name
|
|
);
|
|
let client = RhaiDispatcher::new(&redis_url)
|
|
.with_context(|| format!("Failed to create RhaiDispatcher for Redis URL: {}", redis_url))?;
|
|
println!("RhaiDispatcher initialized.");
|
|
|
|
let script = "let a = 10; let b = 32; let message = \"Hello from example script!\"; message + \" Result: \" + (a + b)";
|
|
println!("\nSending script:\n```rhai\n{}\n```", script);
|
|
|
|
let timeout = Duration::from_secs(30);
|
|
match client
|
|
.submit_script_and_await_result(&worker_name, script.to_string(), None, timeout)
|
|
.await
|
|
{
|
|
Ok(task_details) => {
|
|
println!("\nWorker response:");
|
|
if let Some(ref output) = task_details.output {
|
|
println!("Output: {}", output);
|
|
}
|
|
if let Some(ref error_msg) = task_details.error {
|
|
eprintln!("Error: {}", error_msg);
|
|
}
|
|
if task_details.output.is_none() && task_details.error.is_none() {
|
|
println!(
|
|
"Worker finished with no explicit output or error. Status: {}",
|
|
task_details.status
|
|
);
|
|
}
|
|
}
|
|
Err(e) => match e {
|
|
RhaiDispatcherError::Timeout(task_id) => {
|
|
eprintln!(
|
|
"\nError: Script execution timed out for task_id: {}.",
|
|
task_id
|
|
);
|
|
}
|
|
RhaiDispatcherError::RedisError(redis_err) => {
|
|
eprintln!(
|
|
"\nError: Redis communication failed: {}. Check Redis connection and server status.",
|
|
redis_err
|
|
);
|
|
}
|
|
RhaiDispatcherError::SerializationError(serde_err) => {
|
|
eprintln!(
|
|
"\nError: Failed to serialize/deserialize task data: {}.",
|
|
serde_err
|
|
);
|
|
}
|
|
RhaiDispatcherError::TaskNotFound(task_id) => {
|
|
eprintln!("\nError: Task {} not found after submission.", task_id);
|
|
} /* All RhaiDispatcherError variants are handled, so _ arm is not strictly needed
|
|
unless RhaiDispatcherError becomes non-exhaustive in the future. */
|
|
},
|
|
}
|
|
|
|
println!("\nExample client operations finished. Shutting down worker...");
|
|
|
|
// Send shutdown signal to the worker
|
|
if let Err(e) = shutdown_tx.send(()).await {
|
|
eprintln!(
|
|
"[Main] Failed to send shutdown signal to worker: {} (worker might have already exited or an error occurred)",
|
|
e
|
|
);
|
|
}
|
|
|
|
// Wait for the worker to finish
|
|
log::info!("[Main] Waiting for worker task to join...");
|
|
if let Err(e) = worker_join_handle.await {
|
|
eprintln!("[Main] Error waiting for worker task to join: {:?}", e);
|
|
} else {
|
|
log::info!("[Main] Worker task joined successfully.");
|
|
}
|
|
|
|
// Clean up the database directory
|
|
log::info!(
|
|
"[Main] Cleaning up database directory: {}",
|
|
db_path.display()
|
|
);
|
|
if db_path.exists() {
|
|
if let Err(e) = std::fs::remove_dir_all(&db_path) {
|
|
eprintln!(
|
|
"[Main] Failed to remove database directory '{}': {}",
|
|
db_path.display(),
|
|
e
|
|
);
|
|
} else {
|
|
log::info!(
|
|
"[Main] Successfully removed database directory: {}",
|
|
db_path.display()
|
|
);
|
|
}
|
|
} else {
|
|
log::info!(
|
|
"[Main] Database directory '{}' not found, no cleanup needed.",
|
|
db_path.display()
|
|
);
|
|
}
|
|
|
|
println!("Example fully completed and cleaned up.");
|
|
Ok(())
|
|
}
|