267 lines
9.1 KiB
Rust
267 lines
9.1 KiB
Rust
use crate::Job;
|
|
use log::{debug, error, info};
|
|
use rhai::{Engine, packages::Package};
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use tokio::sync::{mpsc, Mutex};
|
|
use tokio::task::JoinHandle;
|
|
|
|
use crate::runner_trait::Runner;
|
|
|
|
/// Represents a running job with its handle and metadata
|
|
struct RunningJob {
|
|
job_id: String,
|
|
handle: JoinHandle<Result<String, Box<dyn std::error::Error + Send + Sync>>>,
|
|
started_at: std::time::Instant,
|
|
}
|
|
|
|
/// Builder for AsyncRunner
|
|
#[derive(Default)]
|
|
pub struct AsyncRunnerBuilder {
|
|
runner_id: Option<String>,
|
|
db_path: Option<String>,
|
|
redis_url: Option<String>,
|
|
default_timeout: Option<Duration>,
|
|
engine: Option<Arc<dyn Fn() -> Engine + Send + Sync>>,
|
|
}
|
|
|
|
impl AsyncRunnerBuilder {
|
|
pub fn new() -> Self {
|
|
Self::default()
|
|
}
|
|
|
|
pub fn runner_id<S: Into<String>>(mut self, runner_id: S) -> Self {
|
|
self.runner_id = Some(runner_id.into());
|
|
self
|
|
}
|
|
|
|
pub fn db_path<S: Into<String>>(mut self, db_path: S) -> Self {
|
|
self.db_path = Some(db_path.into());
|
|
self
|
|
}
|
|
|
|
pub fn redis_url<S: Into<String>>(mut self, redis_url: S) -> Self {
|
|
self.redis_url = Some(redis_url.into());
|
|
self
|
|
}
|
|
|
|
pub fn default_timeout(mut self, timeout: Duration) -> Self {
|
|
self.default_timeout = Some(timeout);
|
|
self
|
|
}
|
|
|
|
pub fn engine_factory<F>(mut self, factory: F) -> Self
|
|
where
|
|
F: Fn() -> Engine + Send + Sync + 'static,
|
|
{
|
|
self.engine = Some(Arc::new(factory));
|
|
self
|
|
}
|
|
|
|
pub fn build(self) -> Result<AsyncRunner, String> {
|
|
Ok(AsyncRunner {
|
|
runner_id: self.runner_id.ok_or("runner_id is required")?,
|
|
db_path: self.db_path.ok_or("db_path is required")?,
|
|
redis_url: self.redis_url.ok_or("redis_url is required")?,
|
|
default_timeout: self.default_timeout.unwrap_or(Duration::from_secs(300)),
|
|
engine_factory: self.engine.ok_or("engine factory is required")?,
|
|
running_jobs: Arc::new(Mutex::new(HashMap::new())),
|
|
})
|
|
}
|
|
}
|
|
|
|
/// Asynchronous runner that processes jobs concurrently
|
|
pub struct AsyncRunner {
|
|
pub runner_id: String,
|
|
pub db_path: String,
|
|
pub redis_url: String,
|
|
pub default_timeout: Duration,
|
|
pub engine_factory: Arc<dyn Fn() -> Engine + Send + Sync>,
|
|
running_jobs: Arc<Mutex<HashMap<String, RunningJob>>>,
|
|
}
|
|
|
|
impl AsyncRunner {
|
|
/// Create a new AsyncRunnerBuilder
|
|
pub fn builder() -> AsyncRunnerBuilder {
|
|
AsyncRunnerBuilder::new()
|
|
}
|
|
|
|
/// Add a running job to the tracking map
|
|
async fn add_running_job(&self, job_id: String, handle: JoinHandle<Result<String, Box<dyn std::error::Error + Send + Sync>>>) {
|
|
let running_job = RunningJob {
|
|
job_id: job_id.clone(),
|
|
handle,
|
|
started_at: std::time::Instant::now(),
|
|
};
|
|
|
|
let mut jobs = self.running_jobs.lock().await;
|
|
jobs.insert(job_id.clone(), running_job);
|
|
debug!("Async Runner: Added running job '{}'. Total running: {}",
|
|
job_id, jobs.len());
|
|
}
|
|
|
|
/// Remove a completed job from the tracking map
|
|
async fn remove_running_job(&self, job_id: &str) {
|
|
let mut jobs = self.running_jobs.lock().await;
|
|
if let Some(job) = jobs.remove(job_id) {
|
|
let duration = job.started_at.elapsed();
|
|
debug!("Async Runner: Removed completed job '{}' after {:?}. Remaining: {}",
|
|
job_id, duration, jobs.len());
|
|
}
|
|
}
|
|
|
|
/// Get the count of currently running jobs
|
|
pub async fn running_job_count(&self) -> usize {
|
|
let jobs = self.running_jobs.lock().await;
|
|
jobs.len()
|
|
}
|
|
|
|
/// Cleanup any finished jobs from the running jobs map
|
|
async fn cleanup_finished_jobs(&self) {
|
|
let mut jobs = self.running_jobs.lock().await;
|
|
let mut to_remove = Vec::new();
|
|
|
|
for (job_id, running_job) in jobs.iter() {
|
|
if running_job.handle.is_finished() {
|
|
to_remove.push(job_id.clone());
|
|
}
|
|
}
|
|
|
|
for job_id in to_remove {
|
|
if let Some(job) = jobs.remove(&job_id) {
|
|
let duration = job.started_at.elapsed();
|
|
debug!("Async Runner: Cleaned up finished job '{}' after {:?}",
|
|
job_id, duration);
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
|
|
impl Runner for AsyncRunner {
|
|
fn process_job(&self, job: Job) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
|
|
let job_id = job.id.clone();
|
|
let runner_id = &self.runner_id;
|
|
|
|
// Determine timeout (use job-specific timeout if available, otherwise default)
|
|
let job_timeout = if job.timeout > 0 {
|
|
Duration::from_secs(job.timeout)
|
|
} else {
|
|
self.default_timeout
|
|
};
|
|
|
|
info!("Async Runner '{}', Job {}: Spawning job execution task with timeout {:?}",
|
|
runner_id, job_id, job_timeout);
|
|
|
|
// Clone necessary data for the spawned task
|
|
let job_id_clone = job_id.clone();
|
|
let runner_id_clone = runner_id.clone();
|
|
let runner_id_debug = runner_id.clone();
|
|
let job_id_debug = job_id.clone();
|
|
let _redis_url_clone = self.redis_url.clone();
|
|
let running_jobs_clone = Arc::clone(&self.running_jobs);
|
|
let engine_factory = Arc::clone(&self.engine_factory);
|
|
let db_path_clone = self.db_path.clone();
|
|
|
|
// Spawn the job execution task
|
|
let job_handle = tokio::spawn(async move {
|
|
// Create a new engine instance (cheap with factory pattern)
|
|
let mut engine = engine_factory();
|
|
let mut db_config = rhai::Map::new();
|
|
db_config.insert("DB_PATH".into(), db_path_clone.into());
|
|
db_config.insert("CALLER_ID".into(), job.caller_id.clone().into());
|
|
db_config.insert("CONTEXT_ID".into(), job.context_id.clone().into());
|
|
engine.set_default_tag(rhai::Dynamic::from(db_config));
|
|
|
|
// Execute the Rhai script
|
|
let result = match engine.eval::<rhai::Dynamic>(&job.payload) {
|
|
Ok(result) => {
|
|
let result_str = if result.is::<String>() {
|
|
result.into_string().unwrap()
|
|
} else {
|
|
result.to_string()
|
|
};
|
|
info!("Async Runner '{}', Job {}: Script executed successfully. Result: {}",
|
|
runner_id_clone, job_id_clone, result_str);
|
|
Ok(result_str)
|
|
}
|
|
Err(e) => {
|
|
let error_msg = format!("Script execution error: {}", e);
|
|
error!("Async Runner '{}', Job {}: {}", runner_id_clone, job_id_clone, error_msg);
|
|
Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
|
|
}
|
|
};
|
|
|
|
// Remove this job from the running jobs map when it completes
|
|
let mut jobs = running_jobs_clone.lock().await;
|
|
if let Some(running_job) = jobs.remove(&job_id_clone) {
|
|
let duration = running_job.started_at.elapsed();
|
|
debug!("Async Runner '{}': Removed completed job '{}' after {:?}",
|
|
runner_id_debug, job_id_debug, duration);
|
|
}
|
|
|
|
result
|
|
});
|
|
|
|
// Add the job to the running jobs map
|
|
let running_job = RunningJob {
|
|
job_id: job_id.clone(),
|
|
handle: job_handle,
|
|
started_at: std::time::Instant::now(),
|
|
};
|
|
|
|
let running_jobs_clone = Arc::clone(&self.running_jobs);
|
|
let job_id_for_map = job_id.clone();
|
|
tokio::spawn(async move {
|
|
let mut jobs = running_jobs_clone.lock().await;
|
|
jobs.insert(job_id_for_map, running_job);
|
|
debug!("Async Runner: Added running job '{}'. Total running: {}",
|
|
job_id, jobs.len());
|
|
});
|
|
|
|
// For async runners, we return immediately with a placeholder
|
|
// The actual result will be handled by the spawned task
|
|
Ok("Job spawned for async processing".to_string())
|
|
}
|
|
|
|
fn runner_id(&self) -> &str {
|
|
&self.runner_id
|
|
}
|
|
|
|
fn redis_url(&self) -> &str {
|
|
&self.redis_url
|
|
}
|
|
}
|
|
|
|
/// Convenience function to spawn an asynchronous runner using the trait interface
|
|
///
|
|
/// This function provides a clean interface for the new async runner implementation
|
|
/// with timeout support.
|
|
pub fn spawn_async_runner<F>(
|
|
runner_id: String,
|
|
db_path: String,
|
|
redis_url: String,
|
|
shutdown_rx: mpsc::Receiver<()>,
|
|
default_timeout: std::time::Duration,
|
|
engine_factory: F,
|
|
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>
|
|
where
|
|
F: Fn() -> Engine + Send + Sync + 'static,
|
|
{
|
|
use std::sync::Arc;
|
|
|
|
let runner = Arc::new(
|
|
AsyncRunner::builder()
|
|
.runner_id(runner_id)
|
|
.db_path(db_path)
|
|
.redis_url(redis_url)
|
|
.default_timeout(default_timeout)
|
|
.engine_factory(engine_factory)
|
|
.build()
|
|
.expect("Failed to build AsyncRunner")
|
|
);
|
|
crate::runner_trait::spawn_runner(runner, shutdown_rx)
|
|
}
|