use std::time::Duration; use tokio::time::timeout; use crate::{JobBuilder, JobStatus, Client}; use log::{info, error}; use tokio::sync::mpsc; use std::sync::Arc; use crate::async_runner::AsyncRunner; use crate::runner_trait::{Runner, RunnerConfig}; /// Execute a script in single-job mode /// Creates a job, submits it, waits for completion, and returns the result pub async fn execute_script_mode( script_content: &str, runner_id: &str, redis_url: String, job_timeout: Duration, engine_factory: F, ) -> Result> where F: Fn() -> rhai::Engine + Send + Sync + 'static, { info!("Executing script in single-job mode"); // Create job client let job_client = Client::builder() .redis_url(&redis_url) .build() .await?; // Create the job using JobBuilder let job = JobBuilder::new() .caller_id("script_mode") .payload(script_content) .runner(runner_id) .timeout(job_timeout.as_secs()) .build()?; let job_id = job.id.clone(); info!("Created job with ID: {}", job_id); // Submit the job job_client.store_job_in_redis(&job).await?; info!("Job stored in Redis"); // Dispatch the job to the runner's queue job_client.job_run(&job_id, runner_id).await?; info!("Job dispatched to runner queue: {}", runner_id); // Create and spawn a temporary runner to process the job let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); let config = RunnerConfig { runner_id: runner_id.to_string(), db_path: "/tmp".to_string(), // Temporary path for script mode redis_url: redis_url.clone(), default_timeout: Some(job_timeout), }; let runner = Arc::new( AsyncRunner::builder() .runner_id(&config.runner_id) .db_path(&config.db_path) .redis_url(&config.redis_url) .default_timeout(config.default_timeout.unwrap_or(job_timeout)) .engine_factory(engine_factory) .build() .map_err(|e| format!("Failed to build runner: {}", e))? ); let runner_handle = runner.spawn(shutdown_rx); info!("Temporary runner spawned for job processing"); // Wait for job completion with timeout let result = timeout(job_timeout, wait_for_job_completion(&job_client, &job_id)).await; // Shutdown the temporary runner let _ = shutdown_tx.send(()).await; let _ = runner_handle.await; match result { Ok(job_result) => { match job_result { Ok(job_status) => { match job_status { JobStatus::Finished => { info!("Job completed successfully"); // Get the job result from Redis match job_client.get_result(&job_id).await { Ok(Some(result)) => Ok(result), Ok(None) => Ok("Job completed with no result".to_string()), Err(e) => { error!("Failed to get job result: {}", e); Ok("Job completed but result unavailable".to_string()) } } } JobStatus::Error => { // Get the job error from Redis - for now just return a generic error error!("Job failed with status: Error"); return Err("Job execution failed".into()); /*match job_client.get_job_error(&job_id).await { Ok(Some(error_msg)) => { error!("Job failed: {}", error_msg); Err(format!("Job failed: {}", error_msg).into()) } Ok(None) => { error!("Job failed with no error message"); Err("Job failed with no error message".into()) } Err(e) => { error!("Failed to get job error: {}", e); Err("Job failed but error details unavailable".into()) } }*/ } _ => { error!("Job ended in unexpected status: {:?}", job_status); Err(format!("Job ended in unexpected status: {:?}", job_status).into()) } } } Err(e) => { error!("Error waiting for job completion: {}", e); Err(e) } } } Err(_) => { error!("Job execution timed out after {:?}", job_timeout); // Try to cancel the job let _ = job_client.set_job_status(&job_id, JobStatus::Error).await; Err("Job execution timed out".into()) } } } /// Wait for job completion by polling Redis async fn wait_for_job_completion( job_client: &Client, job_id: &str, ) -> Result> { let poll_interval = Duration::from_millis(500); loop { match job_client.get_status(job_id).await { Ok(status) => { match status { JobStatus::Finished | JobStatus::Error => { return Ok(status); } JobStatus::Created | JobStatus::Dispatched | JobStatus::WaitingForPrerequisites | JobStatus::Started => { // Continue polling tokio::time::sleep(poll_interval).await; } JobStatus::Stopping => { // Job is being stopped, wait a bit more tokio::time::sleep(poll_interval).await; } } } Err(e) => { error!("Error polling job status: {}", e); tokio::time::sleep(poll_interval).await; } } } }