logging update
This commit is contained in:
parent
9c4fa1a78b
commit
78da9da539
@ -1,10 +1,9 @@
|
||||
use hero_job::{Job, JobStatus};
|
||||
use tracing::{debug, error, info};
|
||||
use tracing::log::{debug, error, info};
|
||||
use redis::AsyncCommands;
|
||||
use rhai::{Dynamic, Engine};
|
||||
use tokio::sync::mpsc; // For shutdown signal
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::subscriber::with_default;
|
||||
|
||||
/// Actor trait abstraction for unified actor interface
|
||||
pub mod actor_trait;
|
||||
@ -53,109 +52,13 @@ pub(crate) async fn load_job_from_redis(
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the Rhai script and update job status in Redis with per-job logging
|
||||
/// Execute the Rhai script and update job status in Redis
|
||||
async fn execute_script_and_update_status(
|
||||
redis_conn: &mut redis::aio::MultiplexedConnection,
|
||||
engine: &mut Engine,
|
||||
job: &Job,
|
||||
db_path: &str,
|
||||
actor_type: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Create per-job logger for isolated logging
|
||||
let job_logger = match hero_logger::create_job_logger("logs", actor_type, &job.id) {
|
||||
Ok(logger) => logger,
|
||||
Err(e) => {
|
||||
error!("Failed to create job logger for job {}: {}", job.id, e);
|
||||
// Continue without per-job logging
|
||||
return execute_script_without_job_logging(redis_conn, engine, job, db_path, actor_type).await;
|
||||
}
|
||||
};
|
||||
|
||||
// Execute the job within the per-job logging context
|
||||
let job_id = job.id.clone();
|
||||
let context_id = job.context_id.clone();
|
||||
let script = job.script.clone();
|
||||
let caller_id = job.caller_id.clone();
|
||||
let db_path = db_path.to_string();
|
||||
let actor_target = format!("{}_actor", actor_type);
|
||||
|
||||
let result = with_default(job_logger, || {
|
||||
// Configure Rhai engine for logging within the job context
|
||||
hero_logger::rhai_integration::configure_rhai_logging(engine, &actor_target);
|
||||
|
||||
// Set up Rhai engine configuration
|
||||
let mut db_config = rhai::Map::new();
|
||||
db_config.insert("DB_PATH".into(), db_path.into());
|
||||
db_config.insert("CALLER_ID".into(), caller_id.into());
|
||||
db_config.insert("CONTEXT_ID".into(), context_id.clone().into());
|
||||
engine.set_default_tag(Dynamic::from(db_config));
|
||||
|
||||
info!(target: &actor_target, "Job {} processing started for context {}", job_id, context_id);
|
||||
debug!(target: &actor_target, "Evaluating script with Rhai engine");
|
||||
|
||||
// Execute the script (print/debug calls will now be captured in job logs)
|
||||
match engine.eval::<rhai::Dynamic>(&script) {
|
||||
Ok(result) => {
|
||||
let output_str = if result.is::<String>() {
|
||||
result.into_string().unwrap()
|
||||
} else {
|
||||
result.to_string()
|
||||
};
|
||||
info!(target: &actor_target, "Job {} completed successfully. Output: {}", job_id, output_str);
|
||||
Ok(output_str)
|
||||
}
|
||||
Err(e) => {
|
||||
let error_str = format!("{:?}", *e);
|
||||
error!(target: &actor_target, "Job {} script evaluation failed: {}", job_id, error_str);
|
||||
Err(error_str)
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Update job status based on execution result
|
||||
match result {
|
||||
Ok(output_str) => {
|
||||
Job::update_status(redis_conn, &job.id, JobStatus::Finished).await
|
||||
.map_err(|e| {
|
||||
error!("Failed to update job {} status to finished: {}", job.id, e);
|
||||
e
|
||||
})?;
|
||||
|
||||
Job::set_result(redis_conn, &job.id, &output_str).await
|
||||
.map_err(|e| {
|
||||
error!("Failed to set job {} result: {}", job.id, e);
|
||||
e
|
||||
})?;
|
||||
}
|
||||
Err(error_str) => {
|
||||
Job::update_status(redis_conn, &job.id, JobStatus::Error).await
|
||||
.map_err(|e| {
|
||||
error!("Failed to update job {} status to error: {}", job.id, e);
|
||||
e
|
||||
})?;
|
||||
|
||||
Job::set_error(redis_conn, &job.id, &error_str).await
|
||||
.map_err(|e| {
|
||||
error!("Failed to set job {} error: {}", job.id, e);
|
||||
e
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fallback function for script execution without per-job logging
|
||||
async fn execute_script_without_job_logging(
|
||||
redis_conn: &mut redis::aio::MultiplexedConnection,
|
||||
engine: &mut Engine,
|
||||
job: &Job,
|
||||
db_path: &str,
|
||||
actor_type: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Configure Rhai logging to use system logger
|
||||
let actor_target = format!("{}_actor", actor_type);
|
||||
hero_logger::rhai_integration::configure_rhai_logging(engine, &actor_target);
|
||||
let mut db_config = rhai::Map::new();
|
||||
db_config.insert("DB_PATH".into(), db_path.to_string().into());
|
||||
db_config.insert("CALLER_ID".into(), job.caller_id.clone().into());
|
||||
@ -173,6 +76,7 @@ async fn execute_script_without_job_logging(
|
||||
};
|
||||
info!("Actor for Context ID '{}' job {} completed. Output: {}", job.context_id, job.id, output_str);
|
||||
|
||||
// Update job status to finished and set result
|
||||
Job::update_status(redis_conn, &job.id, JobStatus::Finished).await
|
||||
.map_err(|e| {
|
||||
error!("Failed to update job {} status to finished: {}", job.id, e);
|
||||
@ -191,6 +95,7 @@ async fn execute_script_without_job_logging(
|
||||
let error_str = format!("{:?}", *e);
|
||||
error!("Actor for Context ID '{}' job {} script evaluation failed. Error: {}", job.context_id, job.id, error_str);
|
||||
|
||||
// Update job status to error and set error message
|
||||
Job::update_status(redis_conn, &job.id, JobStatus::Error).await
|
||||
.map_err(|e| {
|
||||
error!("Failed to update job {} status to error: {}", job.id, e);
|
||||
@ -235,8 +140,6 @@ async fn process_job(
|
||||
engine: &mut Engine,
|
||||
preserve_tasks: bool,
|
||||
) {
|
||||
// Extract actor type from actor_id (e.g., "osis_actor_1" -> "osis")
|
||||
let actor_type = hero_logger::extract_actor_type(actor_id);
|
||||
debug!("Actor '{}', Job {}: Processing started.", actor_id, job_id);
|
||||
|
||||
// Load job from Redis
|
||||
@ -252,8 +155,8 @@ async fn process_job(
|
||||
debug!("Actor for Context ID '{}', Job {}: Status updated to 'started'.", job.context_id, job_id);
|
||||
}
|
||||
|
||||
// Execute the script and update status with per-job logging
|
||||
if let Err(e) = execute_script_and_update_status(redis_conn, engine, &job, db_path, actor_type).await {
|
||||
// Execute the script and update status
|
||||
if let Err(e) = execute_script_and_update_status(redis_conn, engine, &job, db_path).await {
|
||||
error!("Actor for Context ID '{}', Job {}: Script execution failed: {}", job.context_id, job_id, e);
|
||||
|
||||
// Ensure job status is set to error if execution failed
|
||||
|
Loading…
Reference in New Issue
Block a user