210 lines
7.3 KiB
Rust
210 lines
7.3 KiB
Rust
//! Runner trait abstraction for job processing
|
|
|
|
use crate::{Job, JobStatus, Client};
|
|
use log::{debug, error, info};
|
|
use redis::AsyncCommands;
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use tokio::sync::mpsc;
|
|
use tokio::task::JoinHandle;
|
|
|
|
use crate::{initialize_redis_connection, BLPOP_TIMEOUT_SECONDS};
|
|
|
|
/// Configuration for runner instances
|
|
#[derive(Debug, Clone)]
|
|
pub struct RunnerConfig {
|
|
pub runner_id: String,
|
|
pub db_path: String,
|
|
pub redis_url: String,
|
|
pub default_timeout: Option<Duration>, // Only used by async runners
|
|
}
|
|
|
|
impl RunnerConfig {
|
|
/// Create a new runner configuration
|
|
pub fn new(
|
|
runner_id: String,
|
|
db_path: String,
|
|
redis_url: String,
|
|
) -> Self {
|
|
Self {
|
|
runner_id,
|
|
db_path,
|
|
redis_url,
|
|
default_timeout: None,
|
|
}
|
|
}
|
|
|
|
/// Set default timeout for async runners
|
|
pub fn with_default_timeout(mut self, timeout: Duration) -> Self {
|
|
self.default_timeout = Some(timeout);
|
|
self
|
|
}
|
|
}
|
|
|
|
/// Trait for job runners
|
|
pub trait Runner: Send + Sync + 'static {
|
|
/// Process a single job and return the result
|
|
fn process_job(&self, job: Job) -> Result<String, Box<dyn std::error::Error + Send + Sync>>;
|
|
|
|
/// Get runner ID
|
|
fn runner_id(&self) -> &str;
|
|
|
|
/// Get Redis URL
|
|
fn redis_url(&self) -> &str;
|
|
|
|
/// Spawn the runner loop
|
|
fn spawn(
|
|
self: Arc<Self>,
|
|
mut shutdown_rx: mpsc::Receiver<()>,
|
|
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>
|
|
where
|
|
Self: Sized
|
|
{
|
|
tokio::spawn(async move {
|
|
let runner_id = self.runner_id();
|
|
let redis_url = self.redis_url();
|
|
|
|
// Create client to get the proper queue key
|
|
let client = Client::builder()
|
|
.redis_url(redis_url)
|
|
.build()
|
|
.await
|
|
.map_err(|e| format!("Failed to create client: {}", e))?;
|
|
|
|
let queue_key = client.runner_key(runner_id);
|
|
info!(
|
|
"Runner '{}' starting. Connecting to Redis at {}. Listening on queue: {}",
|
|
runner_id,
|
|
redis_url,
|
|
queue_key
|
|
);
|
|
|
|
let mut redis_conn = initialize_redis_connection(runner_id, redis_url).await?;
|
|
|
|
// Main runner loop: poll Redis queue for jobs and process them
|
|
// Exits on shutdown signal or Redis error
|
|
loop {
|
|
let blpop_keys = vec![queue_key.clone()];
|
|
tokio::select! {
|
|
// Listen for shutdown signal
|
|
_ = shutdown_rx.recv() => {
|
|
info!("Runner '{}': Shutdown signal received", runner_id);
|
|
break;
|
|
}
|
|
// Listen for tasks from Redis
|
|
blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => {
|
|
let response: Option<(String, String)> = match blpop_result {
|
|
Ok(resp) => resp,
|
|
Err(e) => {
|
|
error!("Runner '{}': Redis BLPOP error: {}", runner_id, e);
|
|
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
|
|
}
|
|
};
|
|
|
|
if let Some((_queue_name_recv, job_id)) = response {
|
|
info!("Runner '{}' received job: {}", runner_id, job_id);
|
|
|
|
// Load and process job
|
|
match client.load_job_from_redis(&job_id).await {
|
|
Ok(job) => {
|
|
if job.payload.trim() == "ping" {
|
|
handle_ping_job(&client, runner_id, &job_id).await;
|
|
} else {
|
|
process_regular_job(&*self, &client, runner_id, &job_id, job).await;
|
|
}
|
|
}
|
|
Err(e) => {
|
|
error!("Runner '{}': Failed to load job '{}': {}", runner_id, job_id, e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
info!("Runner '{}' has shut down", runner_id);
|
|
Ok(())
|
|
})
|
|
}
|
|
}
|
|
|
|
/// Handle ping job - responds with "pong"
|
|
async fn handle_ping_job(client: &Client, runner_id: &str, job_id: &str) {
|
|
info!("Runner '{}': Received ping job '{}'", runner_id, job_id);
|
|
|
|
if let Err(e) = client.set_job_status(job_id, JobStatus::Started).await {
|
|
error!("Runner '{}': Failed to set ping job '{}' status to Started: {}", runner_id, job_id, e);
|
|
}
|
|
|
|
if let Err(e) = client.set_result(job_id, "pong").await {
|
|
error!("Runner '{}': Failed to set ping job '{}' result: {}", runner_id, job_id, e);
|
|
}
|
|
|
|
if let Err(e) = client.set_job_status(job_id, JobStatus::Finished).await {
|
|
error!("Runner '{}': Failed to set ping job '{}' status to Finished: {}", runner_id, job_id, e);
|
|
}
|
|
|
|
info!("Runner '{}': Ping job '{}' completed", runner_id, job_id);
|
|
}
|
|
|
|
/// Process regular job - handles job execution and status updates
|
|
async fn process_regular_job(
|
|
runner: &dyn Runner,
|
|
client: &Client,
|
|
runner_id: &str,
|
|
job_id: &str,
|
|
job: Job,
|
|
) {
|
|
if let Err(e) = client.set_job_status(job_id, JobStatus::Started).await {
|
|
error!("Runner '{}': Failed to set job '{}' status to Started: {}", runner_id, job_id, e);
|
|
}
|
|
|
|
match runner.process_job(job) {
|
|
Ok(result) => {
|
|
if let Err(e) = client.set_result(job_id, &result).await {
|
|
error!("Runner '{}': Failed to set job '{}' result: {}", runner_id, job_id, e);
|
|
}
|
|
|
|
if let Err(e) = client.set_job_status(job_id, JobStatus::Finished).await {
|
|
error!("Runner '{}': Failed to set job '{}' status to Finished: {}", runner_id, job_id, e);
|
|
}
|
|
}
|
|
Err(e) => {
|
|
let error_str = format!("{:?}", e);
|
|
error!("Runner '{}': Job '{}' failed: {}", runner_id, job_id, error_str);
|
|
|
|
if let Err(e) = client.set_error(job_id, &error_str).await {
|
|
error!("Runner '{}': Failed to set job '{}' error: {}", runner_id, job_id, e);
|
|
}
|
|
|
|
if let Err(e) = client.set_job_status(job_id, JobStatus::Error).await {
|
|
error!("Runner '{}': Failed to set job '{}' status to Error: {}", runner_id, job_id, e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Convenience function to spawn a runner with the trait-based interface
|
|
///
|
|
/// This function provides a unified interface for spawning any runner implementation
|
|
/// that implements the Runner trait.
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// * `runner` - The runner implementation to spawn
|
|
/// * `shutdown_rx` - Channel receiver for shutdown signals
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// Returns a `JoinHandle` that can be awaited to wait for runner shutdown.
|
|
pub fn spawn_runner<W: Runner>(
|
|
runner: Arc<W>,
|
|
shutdown_rx: mpsc::Receiver<()>,
|
|
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
|
|
runner.spawn(shutdown_rx)
|
|
}
|
|
|
|
|
|
|