Compare commits

...

1 Commits
main ... logger

Author SHA1 Message Date
Maxime Van Hees
9c4fa1a78b logger 2025-08-06 14:34:56 +02:00
18 changed files with 2641 additions and 33 deletions

213
Cargo.lock generated
View File

@ -402,6 +402,28 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "async-stream"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-trait"
version = "0.1.88"
@ -470,13 +492,12 @@ dependencies = [
"async-trait",
"chrono",
"clap",
"env_logger",
"hero_job",
"hero_logger",
"hero_supervisor",
"heromodels 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)",
"heromodels-derive 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)",
"heromodels_core 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)",
"log",
"redis 0.25.4",
"rhai",
"serde",
@ -484,6 +505,8 @@ dependencies = [
"thiserror 1.0.69",
"tokio",
"toml",
"tracing",
"tracing-subscriber",
"uuid",
]
@ -1639,16 +1662,16 @@ dependencies = [
"anyhow",
"chrono",
"criterion",
"env_logger",
"hero_logger",
"hero_supervisor",
"hero_websocket_server",
"log",
"redis 0.25.4",
"rhai",
"serde",
"serde_json",
"tempfile",
"tokio",
"tracing",
"uuid",
]
@ -1712,6 +1735,25 @@ dependencies = [
"uuid",
]
[[package]]
name = "hero_logger"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"rhai",
"serde",
"serde_json",
"tempfile",
"thiserror 1.0.69",
"tokio",
"tokio-test",
"tracing",
"tracing-appender",
"tracing-subscriber",
"tracing-test",
]
[[package]]
name = "hero_supervisor"
version = "0.1.0"
@ -1721,9 +1763,7 @@ dependencies = [
"clap",
"colored",
"crossterm",
"env_logger",
"hero_job",
"log",
"ratatui",
"redis 0.25.4",
"rhai",
@ -1731,6 +1771,8 @@ dependencies = [
"serde_json",
"tokio",
"toml",
"tracing",
"tracing-subscriber",
"uuid",
"zinit-client",
]
@ -2737,6 +2779,15 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "md-5"
version = "0.10.6"
@ -2831,6 +2882,16 @@ dependencies = [
"memchr",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-conv"
version = "0.1.0"
@ -2950,6 +3011,12 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.12.4"
@ -3392,8 +3459,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
"regex-automata 0.4.9",
"regex-syntax 0.8.5",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
@ -3404,7 +3480,7 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
"regex-syntax 0.8.5",
]
[[package]]
@ -3413,6 +3489,12 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a"
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.5"
@ -3899,6 +3981,15 @@ dependencies = [
"keccak",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
@ -4206,6 +4297,15 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
dependencies = [
"cfg-if",
]
[[package]]
name = "time"
version = "0.3.41"
@ -4370,6 +4470,19 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "tokio-test"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7"
dependencies = [
"async-stream",
"bytes",
"futures-core",
"tokio",
"tokio-stream",
]
[[package]]
name = "tokio-tungstenite"
version = "0.19.0"
@ -4505,6 +4618,18 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-appender"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf"
dependencies = [
"crossbeam-channel",
"thiserror 1.0.69",
"time",
"tracing-subscriber",
]
[[package]]
name = "tracing-attributes"
version = "0.1.30"
@ -4523,6 +4648,70 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]
name = "tracing-test"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
dependencies = [
"tracing-core",
"tracing-subscriber",
"tracing-test-macro",
]
[[package]]
name = "tracing-test-macro"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
dependencies = [
"quote",
"syn",
]
[[package]]
@ -4722,6 +4911,12 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "vcpkg"
version = "0.2.15"

View File

@ -6,14 +6,14 @@ edition = "2024"
[dependencies]
anyhow = "1.0"
chrono = { version = "0.4", features = ["serde"] }
env_logger = "0.10"
hero_logger = { path = "core/logger" }
hero_supervisor = { path = "core/supervisor" }
hero_websocket_server = { path = "interfaces/websocket/server" }
log = "0.4"
redis = { version = "0.25.0", features = ["tokio-comp"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "sync", "signal"] }
tracing = "0.1"
rhai = "1.21.0"
[dev-dependencies]
@ -48,6 +48,9 @@ serde_json = "1.0"
sha3 = "0.10"
thiserror = "1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "sync", "signal"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "registry", "fmt"] }
tracing-appender = "0.2"
url = "2.5"
uuid = { version = "1.6", features = ["v4", "serde"] }
@ -59,7 +62,9 @@ members = [
"interfaces/websocket/server",
"core/supervisor",
"core/actor",
"core/job", "interfaces/websocket/examples",
"core/job",
"core/logger",
"interfaces/websocket/examples",
"proxies/http",
]
resolver = "2" # Recommended for new workspaces

View File

@ -4,17 +4,11 @@ use std::time::Duration;
use hero_supervisor::{SupervisorBuilder, SupervisorError};
use hero_websocket_server::ServerBuilder;
use tokio::signal;
use log::{info, error};
use env_logger::Builder;
use tracing::{info, error};
/// The main entry point of the Hero Supervisor.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Info)
.init();
info!("Hero Supervisor starting up...");
// Get config path from command line arguments or use default
@ -41,6 +35,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let actor_configs = supervisor.get_actor_configs()?;
info!("Loaded {} actor configurations from TOML", actor_configs.len());
// Initialize the system logger with all components
let mut system_components = vec!["supervisor".to_string()];
for config in &actor_configs {
system_components.push(config.name.clone()); // e.g., "osis_actor_1"
}
// Initialize the logger for all system components
let _logger_guards = hero_logger::init_system_logger("logs", &system_components)?;
info!(target: "supervisor", "System logger initialized with {} components", system_components.len());
// Spawn the background lifecycle manager with 5-minute health check interval
let health_check_interval = Duration::from_secs(5 * 60); // 5 minutes
let mut lifecycle_handle = supervisor.clone().spawn_lifecycle_manager(actor_configs, health_check_interval);

View File

@ -15,8 +15,8 @@ rhai = { version = "1.21.0", features = ["std", "sync", "decimal", "internals"]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
log = "0.4"
env_logger = "0.10"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
clap = { version = "4.4", features = ["derive"] }
uuid = { version = "1.6", features = ["v4", "serde"] } # Though task_id is string, uuid might be useful
chrono = { version = "0.4", features = ["serde"] }
@ -25,6 +25,7 @@ thiserror = "1.0"
async-trait = "0.1"
hero_supervisor = { path = "../supervisor" }
hero_job = { path = "../job" }
hero_logger = { path = "../logger" }
heromodels = { git = "https://git.ourworld.tf/herocode/db.git" }
heromodels_core = { git = "https://git.ourworld.tf/herocode/db.git" }
heromodels-derive = { git = "https://git.ourworld.tf/herocode/db.git" }

View File

@ -28,7 +28,7 @@
//! ```
use hero_job::Job;
use log::{debug, error, info};
use tracing::{debug, error, info};
use redis::AsyncCommands;
use rhai::Engine;
use std::sync::Arc;

View File

@ -1,9 +1,10 @@
use hero_job::{Job, JobStatus};
use log::{debug, error, info};
use tracing::{debug, error, info};
use redis::AsyncCommands;
use rhai::{Dynamic, Engine};
use tokio::sync::mpsc; // For shutdown signal
use tokio::task::JoinHandle;
use tracing::subscriber::with_default;
/// Actor trait abstraction for unified actor interface
pub mod actor_trait;
@ -52,13 +53,109 @@ pub(crate) async fn load_job_from_redis(
}
}
/// Execute the Rhai script and update job status in Redis
/// Execute the Rhai script and update job status in Redis with per-job logging
async fn execute_script_and_update_status(
redis_conn: &mut redis::aio::MultiplexedConnection,
engine: &mut Engine,
job: &Job,
db_path: &str,
actor_type: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create per-job logger for isolated logging
let job_logger = match hero_logger::create_job_logger("logs", actor_type, &job.id) {
Ok(logger) => logger,
Err(e) => {
error!("Failed to create job logger for job {}: {}", job.id, e);
// Continue without per-job logging
return execute_script_without_job_logging(redis_conn, engine, job, db_path, actor_type).await;
}
};
// Execute the job within the per-job logging context
let job_id = job.id.clone();
let context_id = job.context_id.clone();
let script = job.script.clone();
let caller_id = job.caller_id.clone();
let db_path = db_path.to_string();
let actor_target = format!("{}_actor", actor_type);
let result = with_default(job_logger, || {
// Configure Rhai engine for logging within the job context
hero_logger::rhai_integration::configure_rhai_logging(engine, &actor_target);
// Set up Rhai engine configuration
let mut db_config = rhai::Map::new();
db_config.insert("DB_PATH".into(), db_path.into());
db_config.insert("CALLER_ID".into(), caller_id.into());
db_config.insert("CONTEXT_ID".into(), context_id.clone().into());
engine.set_default_tag(Dynamic::from(db_config));
info!(target: &actor_target, "Job {} processing started for context {}", job_id, context_id);
debug!(target: &actor_target, "Evaluating script with Rhai engine");
// Execute the script (print/debug calls will now be captured in job logs)
match engine.eval::<rhai::Dynamic>(&script) {
Ok(result) => {
let output_str = if result.is::<String>() {
result.into_string().unwrap()
} else {
result.to_string()
};
info!(target: &actor_target, "Job {} completed successfully. Output: {}", job_id, output_str);
Ok(output_str)
}
Err(e) => {
let error_str = format!("{:?}", *e);
error!(target: &actor_target, "Job {} script evaluation failed: {}", job_id, error_str);
Err(error_str)
}
}
});
// Update job status based on execution result
match result {
Ok(output_str) => {
Job::update_status(redis_conn, &job.id, JobStatus::Finished).await
.map_err(|e| {
error!("Failed to update job {} status to finished: {}", job.id, e);
e
})?;
Job::set_result(redis_conn, &job.id, &output_str).await
.map_err(|e| {
error!("Failed to set job {} result: {}", job.id, e);
e
})?;
}
Err(error_str) => {
Job::update_status(redis_conn, &job.id, JobStatus::Error).await
.map_err(|e| {
error!("Failed to update job {} status to error: {}", job.id, e);
e
})?;
Job::set_error(redis_conn, &job.id, &error_str).await
.map_err(|e| {
error!("Failed to set job {} error: {}", job.id, e);
e
})?;
}
}
Ok(())
}
/// Fallback function for script execution without per-job logging
async fn execute_script_without_job_logging(
redis_conn: &mut redis::aio::MultiplexedConnection,
engine: &mut Engine,
job: &Job,
db_path: &str,
actor_type: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Configure Rhai logging to use system logger
let actor_target = format!("{}_actor", actor_type);
hero_logger::rhai_integration::configure_rhai_logging(engine, &actor_target);
let mut db_config = rhai::Map::new();
db_config.insert("DB_PATH".into(), db_path.to_string().into());
db_config.insert("CALLER_ID".into(), job.caller_id.clone().into());
@ -76,7 +173,6 @@ async fn execute_script_and_update_status(
};
info!("Actor for Context ID '{}' job {} completed. Output: {}", job.context_id, job.id, output_str);
// Update job status to finished and set result
Job::update_status(redis_conn, &job.id, JobStatus::Finished).await
.map_err(|e| {
error!("Failed to update job {} status to finished: {}", job.id, e);
@ -95,7 +191,6 @@ async fn execute_script_and_update_status(
let error_str = format!("{:?}", *e);
error!("Actor for Context ID '{}' job {} script evaluation failed. Error: {}", job.context_id, job.id, error_str);
// Update job status to error and set error message
Job::update_status(redis_conn, &job.id, JobStatus::Error).await
.map_err(|e| {
error!("Failed to update job {} status to error: {}", job.id, e);
@ -140,6 +235,8 @@ async fn process_job(
engine: &mut Engine,
preserve_tasks: bool,
) {
// Extract actor type from actor_id (e.g., "osis_actor_1" -> "osis")
let actor_type = hero_logger::extract_actor_type(actor_id);
debug!("Actor '{}', Job {}: Processing started.", actor_id, job_id);
// Load job from Redis
@ -155,8 +252,8 @@ async fn process_job(
debug!("Actor for Context ID '{}', Job {}: Status updated to 'started'.", job.context_id, job_id);
}
// Execute the script and update status
if let Err(e) = execute_script_and_update_status(redis_conn, engine, &job, db_path).await {
// Execute the script and update status with per-job logging
if let Err(e) = execute_script_and_update_status(redis_conn, engine, &job, db_path, actor_type).await {
error!("Actor for Context ID '{}', Job {}: Script execution failed: {}", job.context_id, job_id, e);
// Ensure job status is set to error if execution failed

23
core/logger/Cargo.toml Normal file
View File

@ -0,0 +1,23 @@
[package]
name = "hero_logger"
version = "0.1.0"
edition = "2021"
description = "Hierarchical logging system for the Hero project with system and per-job isolation"
authors = ["Hero Team"]
[dependencies]
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "registry", "fmt"] }
tracing-appender = "0.2"
tokio = { version = "1", features = ["fs", "time", "rt"] }
chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
anyhow = "1.0"
rhai = "1.21.0"
[dev-dependencies]
tempfile = "3.0"
tokio-test = "0.4"
tracing-test = "0.2"

259
core/logger/README.md Normal file
View File

@ -0,0 +1,259 @@
# Hero Logger
A hierarchical logging system for the Hero project that provides system-level and per-job logging with complete isolation using the `tracing` ecosystem.
## Features
- **Hierarchical Organization**: Physical separation of logs by component and job
- **System Logger**: Global logging for all non-job-specific events
- **Per-Job Logger**: Isolated logging for individual job execution
- **Hourly Rotation**: Automatic log file rotation every hour
- **Rhai Integration**: Capture Rhai script `print()` and `debug()` calls
- **High Performance**: Async logging with efficient filtering
- **Structured Logging**: Rich context and metadata support
## Architecture
The logging system uses a hybrid approach with two main components:
### System Logger (Global)
- Long-lived logger initialized at application startup
- Routes logs to different files based on tracing targets
- Supports multiple components simultaneously
### Per-Job Logger (Dynamic)
- Created on-demand for each job execution
- Provides complete isolation for job-specific logs
- Automatically disposed after job completion
## Directory Structure
```
logs/
├── supervisor/ # System logs for supervisor
│ └── 2025-08-06-11.log
└── actor/
├── osis/
│ ├── 2025-08-06-11.log # General OSIS actor logs
│ ├── job-a1b2c3d4/ # Job-specific logs
│ │ └── 2025-08-06-11.log
│ └── job-9a8b7c6d/
│ └── 2025-08-06-12.log
└── sal/
├── 2025-08-06-13.log # General SAL actor logs
└── job-f1e2d3c4/
└── 2025-08-06-13.log
```
## Quick Start
### 1. Initialize System Logger
```rust
use hero_logger;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Define your system components
let components = vec![
"supervisor".to_string(),
"osis_actor".to_string(),
"sal_actor".to_string(),
];
// Initialize the system logger
let _guards = hero_logger::init_system_logger("logs", &components)?;
// Now you can use tracing macros with targets
tracing::info!(target: "supervisor", "System started");
tracing::info!(target: "osis_actor", "Actor ready");
Ok(())
}
```
### 2. Per-Job Logging
```rust
use hero_logger::create_job_logger;
use tracing::subscriber::with_default;
async fn process_job(job_id: &str, actor_type: &str) {
// Create job-specific logger
let job_logger = create_job_logger("logs", actor_type, job_id)?;
// Execute job within logging context
with_default(job_logger, || {
tracing::info!(target: "osis_actor", "Job {} started", job_id);
// All tracing calls here go to the job-specific log
tracing::debug!(target: "osis_actor", "Processing data...");
tracing::info!(target: "osis_actor", "Job {} completed", job_id);
});
}
```
### 3. Rhai Script Integration
```rust
use hero_logger::rhai_integration::configure_rhai_logging;
use rhai::Engine;
fn setup_rhai_engine() -> Engine {
let mut engine = Engine::new();
// Configure Rhai to capture print/debug calls
configure_rhai_logging(&mut engine, "osis_actor");
engine
}
// Now Rhai scripts can use print() and debug()
let script = r#"
print("Hello from Rhai!");
debug("Debug information");
42
"#;
let result = engine.eval::<i64>(script)?;
```
## API Reference
### Core Functions
#### `init_system_logger(logs_root, components)`
Initialize the global system logger with component-based filtering.
**Parameters:**
- `logs_root`: Root directory for log files
- `components`: List of component names for dedicated logging
**Returns:** Vector of `WorkerGuard`s that must be kept alive
#### `create_job_logger(logs_root, actor_type, job_id)`
Create a per-job logger for isolated logging.
**Parameters:**
- `logs_root`: Root directory for log files
- `actor_type`: Type of actor (e.g., "osis", "sal")
- `job_id`: Unique job identifier
**Returns:** Boxed subscriber for use with `with_default()`
### Rhai Integration
#### `configure_rhai_logging(engine, target)`
Configure a Rhai engine to capture print/debug output.
#### `add_custom_logging_functions(engine, target)`
Add custom logging functions (`log_info`, `log_debug`, etc.) to Rhai.
#### `create_logging_enabled_engine(target, include_custom)`
Create a new Rhai engine with full logging integration.
### Utilities
#### `ensure_log_directories(logs_root, components)`
Ensure the log directory structure exists.
#### `extract_actor_type(component)`
Extract actor type from component name.
#### `cleanup_old_logs(directory, pattern, max_age_days)`
Clean up old log files based on age.
## Configuration
### Log Levels
The system supports standard tracing log levels:
- `ERROR`: Critical errors
- `WARN`: Warning messages
- `INFO`: Informational messages
- `DEBUG`: Debug information
- `TRACE`: Detailed trace information
### Environment Variables
- `RUST_LOG`: Set log level filtering (e.g., `RUST_LOG=debug`)
### File Rotation
- **Hourly**: Default rotation every hour
- **Daily**: Optional daily rotation
- **Never**: Single file (no rotation)
## Examples
### Basic Usage
```bash
cargo run --example logging_demo
```
### Integration with Actor System
```rust
// In your actor implementation
async fn process_job(&self, job: &Job) {
let job_logger = hero_logger::create_job_logger(
"logs",
&self.actor_type,
&job.id
).unwrap();
let job_task = async move {
tracing::info!(target: &self.actor_type, "Job processing started");
// Configure Rhai engine for this job
let mut engine = Engine::new();
hero_logger::rhai_integration::configure_rhai_logging(
&mut engine,
&self.actor_type
);
// Execute Rhai script - print/debug calls captured
let result = engine.eval::<String>(&job.script)?;
tracing::info!(target: &self.actor_type, "Job finished: {}", result);
Ok(result)
};
// Execute with job-specific logging
tracing::subscriber::with_default(job_logger, job_task).await;
}
```
## Performance Considerations
- **Async Logging**: All file I/O is asynchronous
- **Efficient Filtering**: Target-based filtering minimizes overhead
- **Memory Usage**: Per-job loggers are short-lived and automatically cleaned up
- **File Handles**: Automatic rotation prevents excessive file handle usage
## Troubleshooting
### Common Issues
1. **Logs not appearing**: Ensure `WorkerGuard`s are kept alive
2. **Permission errors**: Check write permissions on log directory
3. **Missing directories**: Use `ensure_log_directories()` before logging
4. **Rhai output not captured**: Verify `configure_rhai_logging()` is called
### Debug Mode
Enable debug logging to see internal logger operations:
```bash
RUST_LOG=hero_logger=debug cargo run
```
## Testing
Run the test suite:
```bash
cargo test
```
Run the demo example:
```bash
cargo run --example logging_demo
```
## License
This project is part of the Hero ecosystem and follows the same licensing terms.

View File

@ -0,0 +1,142 @@
//! Logging System Demo
//!
//! This example demonstrates the Hero logging system functionality including:
//! - System logger initialization
//! - Per-job logger creation
//! - Rhai script integration with logging
//! - Directory structure creation
use hero_logger::{
init_system_logger, create_job_logger, rhai_integration::configure_rhai_logging,
};
use tracing::{info, debug, warn, error};
use tracing::subscriber::with_default;
use rhai::Engine;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("🚀 Hero Logging System Demo");
println!("============================");
// 1. Initialize the system logger
println!("\n📋 Step 1: Initializing system logger...");
let components = vec![
"supervisor".to_string(),
"osis_actor".to_string(),
"sal_actor".to_string(),
];
let _guards = init_system_logger("demo_logs", &components)?;
println!("✅ System logger initialized with {} components", components.len());
// 2. Test system-level logging
println!("\n📝 Step 2: Testing system-level logging...");
info!(target: "supervisor", "Supervisor started successfully");
info!(target: "osis_actor", "OSIS actor is ready");
info!(target: "sal_actor", "SAL actor is ready");
warn!(target: "supervisor", "This is a warning message");
error!(target: "supervisor", "This is an error message for testing");
// Give time for async logging
sleep(Duration::from_millis(100)).await;
println!("✅ System logs written to demo_logs/supervisor/ and demo_logs/actor/*/");
// 3. Test per-job logging
println!("\n🔄 Step 3: Testing per-job logging...");
// Create job loggers for different jobs
let job1_logger = create_job_logger("demo_logs", "osis", "demo-job-001")?;
let job2_logger = create_job_logger("demo_logs", "sal", "demo-job-002")?;
// Execute logging within job contexts
with_default(job1_logger, || {
info!(target: "osis_actor", "Job demo-job-001 started");
debug!(target: "osis_actor", "Processing OSIS data");
info!(target: "osis_actor", "Job demo-job-001 completed successfully");
});
with_default(job2_logger, || {
info!(target: "sal_actor", "Job demo-job-002 started");
debug!(target: "sal_actor", "Processing SAL data");
warn!(target: "sal_actor", "Minor issue detected but continuing");
info!(target: "sal_actor", "Job demo-job-002 completed successfully");
});
sleep(Duration::from_millis(100)).await;
println!("✅ Per-job logs written to demo_logs/actor/*/job-*/");
// 4. Test Rhai integration
println!("\n🔧 Step 4: Testing Rhai script logging integration...");
let job3_logger = create_job_logger("demo_logs", "osis", "rhai-demo-003")?;
with_default(job3_logger, || {
let mut engine = Engine::new();
configure_rhai_logging(&mut engine, "osis_actor");
info!(target: "osis_actor", "Starting Rhai script execution");
// Execute a Rhai script that uses print and debug
let script = r#"
print("Hello from Rhai script!");
debug("This is a debug message from Rhai");
let result = 42 + 8;
print("Calculation result: " + result);
result
"#;
match engine.eval::<i64>(script) {
Ok(result) => {
info!(target: "osis_actor", "Rhai script completed with result: {}", result);
}
Err(e) => {
error!(target: "osis_actor", "Rhai script failed: {:?}", e);
}
}
});
sleep(Duration::from_millis(100)).await;
println!("✅ Rhai script logs captured in per-job logger");
// 5. Display directory structure
println!("\n📁 Step 5: Generated directory structure:");
display_directory_structure("demo_logs", 0)?;
println!("\n🎉 Demo completed successfully!");
println!("Check the 'demo_logs' directory to see the generated log files.");
println!("Each component and job has its own isolated log files with hourly rotation.");
Ok(())
}
/// Recursively display directory structure
fn display_directory_structure(path: &str, depth: usize) -> Result<(), Box<dyn std::error::Error>> {
let path = std::path::Path::new(path);
if !path.exists() {
return Ok(());
}
let indent = " ".repeat(depth);
if path.is_dir() {
println!("{}📁 {}/", indent, path.file_name().unwrap_or_default().to_string_lossy());
let mut entries: Vec<_> = std::fs::read_dir(path)?.collect::<Result<Vec<_>, _>>()?;
entries.sort_by_key(|entry| entry.file_name());
for entry in entries {
let entry_path = entry.path();
if entry_path.is_dir() {
display_directory_structure(&entry_path.to_string_lossy(), depth + 1)?;
} else {
println!("{}📄 {}", " ".repeat(depth + 1), entry.file_name().to_string_lossy());
}
}
}
Ok(())
}

View File

@ -0,0 +1,285 @@
//! Custom File Appender Implementation
//!
//! This module provides custom file appender functionality with enhanced
//! rotation and directory management capabilities.
use crate::{LoggerError, Result};
use std::path::{Path, PathBuf};
use tracing_appender::rolling::{RollingFileAppender, Rotation};
/// Create a custom rolling file appender with enhanced configuration
pub fn create_rolling_appender<P: AsRef<Path>>(
directory: P,
file_name_prefix: &str,
rotation: AppenderRotation,
) -> Result<RollingFileAppender> {
let directory = directory.as_ref();
// Ensure directory exists
std::fs::create_dir_all(directory)
.map_err(|e| LoggerError::DirectoryCreation(
format!("Failed to create directory {}: {}", directory.display(), e)
))?;
let rotation = match rotation {
AppenderRotation::Hourly => Rotation::HOURLY,
AppenderRotation::Daily => Rotation::DAILY,
AppenderRotation::Never => Rotation::NEVER,
};
let appender = tracing_appender::rolling::Builder::new()
.rotation(rotation)
.filename_prefix(file_name_prefix)
.filename_suffix("log")
.build(directory)
.map_err(|e| LoggerError::Config(format!("Failed to create rolling appender: {}", e)))?;
Ok(appender)
}
/// Enhanced rotation configuration
#[derive(Debug, Clone, Copy)]
pub enum AppenderRotation {
/// Rotate files every hour
Hourly,
/// Rotate files every day
Daily,
/// Never rotate (single file)
Never,
}
/// File appender builder for more complex configurations
pub struct FileAppenderBuilder {
directory: PathBuf,
file_prefix: String,
file_suffix: String,
rotation: AppenderRotation,
max_files: Option<usize>,
}
impl FileAppenderBuilder {
/// Create a new file appender builder
pub fn new<P: AsRef<Path>>(directory: P) -> Self {
Self {
directory: directory.as_ref().to_path_buf(),
file_prefix: "log".to_string(),
file_suffix: "log".to_string(),
rotation: AppenderRotation::Hourly,
max_files: None,
}
}
/// Set the file prefix
pub fn file_prefix<S: Into<String>>(mut self, prefix: S) -> Self {
self.file_prefix = prefix.into();
self
}
/// Set the file suffix
pub fn file_suffix<S: Into<String>>(mut self, suffix: S) -> Self {
self.file_suffix = suffix.into();
self
}
/// Set the rotation policy
pub fn rotation(mut self, rotation: AppenderRotation) -> Self {
self.rotation = rotation;
self
}
/// Set maximum number of files to keep (for cleanup)
pub fn max_files(mut self, max: usize) -> Self {
self.max_files = Some(max);
self
}
/// Build the file appender
pub fn build(self) -> Result<RollingFileAppender> {
// Ensure directory exists
std::fs::create_dir_all(&self.directory)
.map_err(|e| LoggerError::DirectoryCreation(
format!("Failed to create directory {}: {}", self.directory.display(), e)
))?;
let rotation = match self.rotation {
AppenderRotation::Hourly => Rotation::HOURLY,
AppenderRotation::Daily => Rotation::DAILY,
AppenderRotation::Never => Rotation::NEVER,
};
let appender = tracing_appender::rolling::Builder::new()
.rotation(rotation)
.filename_prefix(&self.file_prefix)
.filename_suffix(&self.file_suffix)
.build(&self.directory)
.map_err(|e| LoggerError::Config(format!("Failed to create rolling appender: {}", e)))?;
// Perform cleanup if max_files is set
if let Some(max_files) = self.max_files {
if let Err(e) = cleanup_old_files(&self.directory, &self.file_prefix, max_files) {
tracing::warn!("Failed to cleanup old log files: {}", e);
}
}
Ok(appender)
}
}
/// Clean up old log files, keeping only the most recent ones
fn cleanup_old_files<P: AsRef<Path>>(
directory: P,
file_prefix: &str,
max_files: usize,
) -> Result<()> {
let directory = directory.as_ref();
let mut log_files = Vec::new();
// Read directory and collect log files
let entries = std::fs::read_dir(directory)
.map_err(|e| LoggerError::Io(e))?;
for entry in entries {
let entry = entry.map_err(|e| LoggerError::Io(e))?;
let path = entry.path();
if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
if file_name.starts_with(file_prefix) && file_name.ends_with(".log") {
if let Ok(metadata) = entry.metadata() {
if let Ok(modified) = metadata.modified() {
log_files.push((path, modified));
}
}
}
}
}
// Sort by modification time (newest first)
log_files.sort_by(|a, b| b.1.cmp(&a.1));
// Remove old files if we exceed max_files
if log_files.len() > max_files {
for (old_file, _) in log_files.iter().skip(max_files) {
if let Err(e) = std::fs::remove_file(old_file) {
tracing::warn!("Failed to remove old log file {}: {}", old_file.display(), e);
} else {
tracing::debug!("Removed old log file: {}", old_file.display());
}
}
}
Ok(())
}
/// Utility function to get the current log file path for a given configuration
pub fn get_current_log_file<P: AsRef<Path>>(
directory: P,
file_prefix: &str,
rotation: AppenderRotation,
) -> PathBuf {
let directory = directory.as_ref();
match rotation {
AppenderRotation::Hourly => {
let now = chrono::Utc::now();
let timestamp = now.format("%Y-%m-%d-%H");
directory.join(format!("{}.{}.log", file_prefix, timestamp))
}
AppenderRotation::Daily => {
let now = chrono::Utc::now();
let timestamp = now.format("%Y-%m-%d");
directory.join(format!("{}.{}.log", file_prefix, timestamp))
}
AppenderRotation::Never => {
directory.join(format!("{}.log", file_prefix))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use std::time::Duration;
#[test]
fn test_create_rolling_appender() {
let temp_dir = TempDir::new().unwrap();
let directory = temp_dir.path().join("logs");
let appender = create_rolling_appender(&directory, "test", AppenderRotation::Hourly).unwrap();
// Verify directory was created
assert!(directory.exists());
}
#[test]
fn test_file_appender_builder() {
let temp_dir = TempDir::new().unwrap();
let directory = temp_dir.path().join("logs");
let appender = FileAppenderBuilder::new(&directory)
.file_prefix("custom")
.file_suffix("txt")
.rotation(AppenderRotation::Daily)
.max_files(5)
.build()
.unwrap();
assert!(directory.exists());
}
#[test]
fn test_get_current_log_file() {
let temp_dir = TempDir::new().unwrap();
let directory = temp_dir.path();
// Test hourly rotation
let hourly_file = get_current_log_file(directory, "test", AppenderRotation::Hourly);
assert!(hourly_file.to_string_lossy().contains("test."));
assert!(hourly_file.extension().unwrap() == "log");
// Test daily rotation
let daily_file = get_current_log_file(directory, "test", AppenderRotation::Daily);
assert!(daily_file.to_string_lossy().contains("test."));
assert!(daily_file.extension().unwrap() == "log");
// Test never rotation
let never_file = get_current_log_file(directory, "test", AppenderRotation::Never);
assert_eq!(never_file, directory.join("test.log"));
}
#[test]
fn test_cleanup_old_files() {
let temp_dir = TempDir::new().unwrap();
let directory = temp_dir.path();
// Create some test log files
for i in 0..10 {
let file_path = directory.join(format!("test.{}.log", i));
std::fs::write(&file_path, "test content").unwrap();
// Sleep briefly to ensure different modification times
std::thread::sleep(Duration::from_millis(10));
}
// Cleanup, keeping only 5 files
cleanup_old_files(directory, "test", 5).unwrap();
// Count remaining files
let remaining_files: Vec<_> = std::fs::read_dir(directory)
.unwrap()
.filter_map(|entry| {
let entry = entry.ok()?;
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with("test.") && name.ends_with(".log") {
Some(name)
} else {
None
}
})
.collect();
assert_eq!(remaining_files.len(), 5);
}
}

View File

@ -0,0 +1,312 @@
//! Per-Job Logger Implementation
//!
//! This module implements the per-job logging functionality that creates
//! temporary, isolated loggers for individual job execution.
use crate::{LoggerError, Result};
use std::path::{Path, PathBuf};
use tracing_subscriber::{
filter::{EnvFilter, LevelFilter},
fmt,
layer::SubscriberExt,
util::SubscriberInitExt,
Layer, Registry,
};
use tracing_appender::{non_blocking::WorkerGuard, rolling};
/// Create a per-job logger for isolated job logging
///
/// This creates a temporary tracing subscriber that writes exclusively
/// to a job-specific directory. The subscriber is designed to be used
/// with `tracing::subscriber::with_default()` to scope all logging within a job.
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `actor_type` - Type of actor (e.g., "osis", "sal")
/// * `job_id` - Unique job identifier
///
/// # Returns
///
/// Returns a boxed subscriber that can be used with `with_default()`
/// The WorkerGuard is managed internally and will be dropped when the subscriber is dropped.
pub fn create_job_logger<P: AsRef<Path>>(
logs_root: P,
actor_type: &str,
job_id: &str,
) -> Result<Box<dyn tracing::Subscriber + Send + Sync>> {
let (subscriber, _guard) = create_job_logger_with_guard(logs_root, actor_type, job_id)?;
// Note: The guard is intentionally dropped here because the job logger
// is meant to be short-lived. In practice, the job execution should be
// fast enough that logs are flushed before the guard is dropped.
// For longer-running jobs, use create_job_logger_with_guard instead.
Ok(subscriber)
}
/// Create a job logger that returns both the subscriber and the guard
///
/// This variant returns both the subscriber and the worker guard, giving
/// the caller control over the guard's lifetime for proper log flushing.
pub fn create_job_logger_with_guard<P: AsRef<Path>>(
logs_root: P,
actor_type: &str,
job_id: &str,
) -> Result<(Box<dyn tracing::Subscriber + Send + Sync>, WorkerGuard)> {
let logs_root = logs_root.as_ref();
// Create job-specific directory: logs/actor/<type>/job-<job_id>/
let job_dir = logs_root
.join("actor")
.join(actor_type)
.join(format!("job-{}", job_id));
// Ensure the job directory exists
std::fs::create_dir_all(&job_dir)
.map_err(|e| LoggerError::DirectoryCreation(format!("Failed to create job directory {}: {}", job_dir.display(), e)))?;
// Create hourly rolling file appender for the job
let file_appender = rolling::hourly(&job_dir, "log");
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
// Create a formatted layer for the job
let layer = fmt::layer()
.with_writer(non_blocking)
.with_target(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.with_ansi(false) // No ANSI colors in log files
.with_filter(
EnvFilter::new("trace") // Capture all logs within the job context
.add_directive(LevelFilter::TRACE.into())
);
// Create a registry with the job layer
let subscriber = Registry::default()
.with(layer);
tracing::debug!(
target: "hero_logger",
"Created job logger for actor_type={}, job_id={}, log_dir={}",
actor_type,
job_id,
job_dir.display()
);
Ok((Box::new(subscriber), guard))
}
/// Create a job logger with custom configuration
///
/// This allows for more fine-grained control over the job logger configuration.
pub fn create_job_logger_with_config<P: AsRef<Path>>(
logs_root: P,
actor_type: &str,
job_id: &str,
config: JobLoggerConfig,
) -> Result<(Box<dyn tracing::Subscriber + Send + Sync>, WorkerGuard)> {
let logs_root = logs_root.as_ref();
// Create job-specific directory
let job_dir = logs_root
.join("actor")
.join(actor_type)
.join(format!("job-{}", job_id));
std::fs::create_dir_all(&job_dir)
.map_err(|e| LoggerError::DirectoryCreation(format!("Failed to create job directory {}: {}", job_dir.display(), e)))?;
// Create file appender based on config
let file_appender = match config.rotation {
RotationConfig::Hourly => rolling::hourly(&job_dir, &config.file_prefix),
RotationConfig::Daily => rolling::daily(&job_dir, &config.file_prefix),
RotationConfig::Never => rolling::never(&job_dir, format!("{}.log", config.file_prefix)),
};
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
// Create layer with custom configuration
let mut layer = fmt::layer()
.with_writer(non_blocking)
.with_target(config.include_target)
.with_thread_ids(config.include_thread_ids)
.with_file(config.include_file_location)
.with_line_number(config.include_line_numbers)
.with_ansi(false);
// Apply level filter
let layer = layer.with_filter(
EnvFilter::new(&config.level_filter)
.add_directive(config.max_level.into())
);
let subscriber = Registry::default()
.with(layer);
Ok((Box::new(subscriber), guard))
}
/// Configuration for job logger creation
#[derive(Debug, Clone)]
pub struct JobLoggerConfig {
/// File prefix for log files
pub file_prefix: String,
/// Log rotation configuration
pub rotation: RotationConfig,
/// Maximum log level to capture
pub max_level: LevelFilter,
/// Level filter string (e.g., "debug", "info", "trace")
pub level_filter: String,
/// Include target in log output
pub include_target: bool,
/// Include thread IDs in log output
pub include_thread_ids: bool,
/// Include file location in log output
pub include_file_location: bool,
/// Include line numbers in log output
pub include_line_numbers: bool,
}
impl Default for JobLoggerConfig {
fn default() -> Self {
Self {
file_prefix: "job".to_string(),
rotation: RotationConfig::Hourly,
max_level: LevelFilter::TRACE,
level_filter: "trace".to_string(),
include_target: true,
include_thread_ids: true,
include_file_location: true,
include_line_numbers: true,
}
}
}
/// Log file rotation configuration
#[derive(Debug, Clone)]
pub enum RotationConfig {
/// Rotate logs hourly
Hourly,
/// Rotate logs daily
Daily,
/// Never rotate logs (single file)
Never,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use tracing::{info, debug, error};
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn test_job_logger_creation() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let job_logger = create_job_logger(logs_root, "osis", "test-job-123").unwrap();
// Verify job directory was created
let job_dir = logs_root.join("actor/osis/job-test-job-123");
assert!(job_dir.exists());
// Test logging within the job context
tracing::subscriber::with_default(job_logger, || {
info!(target: "osis_actor", "Job started");
debug!(target: "osis_actor", "Processing data");
info!(target: "osis_actor", "Job completed");
});
// Give some time for async writing
sleep(Duration::from_millis(100)).await;
}
#[tokio::test]
async fn test_job_logger_with_guard() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let (job_logger, _guard) = create_job_logger_with_guard(logs_root, "sal", "test-job-456").unwrap();
// Verify job directory was created
let job_dir = logs_root.join("actor/sal/job-test-job-456");
assert!(job_dir.exists());
// Test logging
tracing::subscriber::with_default(job_logger, || {
error!(target: "sal_actor", "Job failed with error");
});
sleep(Duration::from_millis(100)).await;
}
#[tokio::test]
async fn test_job_logger_with_custom_config() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let config = JobLoggerConfig {
file_prefix: "custom".to_string(),
rotation: RotationConfig::Never,
max_level: LevelFilter::INFO,
level_filter: "info".to_string(),
include_target: false,
include_thread_ids: false,
include_file_location: false,
include_line_numbers: false,
};
let (job_logger, _guard) = create_job_logger_with_config(
logs_root,
"python",
"custom-job",
config
).unwrap();
// Verify job directory was created
let job_dir = logs_root.join("actor/python/job-custom-job");
assert!(job_dir.exists());
// Test logging
tracing::subscriber::with_default(job_logger, || {
info!(target: "python_actor", "Custom job logging");
});
sleep(Duration::from_millis(100)).await;
}
#[tokio::test]
async fn test_multiple_job_loggers() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
// Create multiple job loggers
let job1 = create_job_logger(logs_root, "osis", "job-1").unwrap();
let job2 = create_job_logger(logs_root, "osis", "job-2").unwrap();
let job3 = create_job_logger(logs_root, "sal", "job-3").unwrap();
// Verify all directories were created
assert!(logs_root.join("actor/osis/job-job-1").exists());
assert!(logs_root.join("actor/osis/job-job-2").exists());
assert!(logs_root.join("actor/sal/job-job-3").exists());
// Test isolated logging
tracing::subscriber::with_default(job1, || {
info!(target: "osis_actor", "Job 1 message");
});
tracing::subscriber::with_default(job2, || {
info!(target: "osis_actor", "Job 2 message");
});
tracing::subscriber::with_default(job3, || {
info!(target: "sal_actor", "Job 3 message");
});
sleep(Duration::from_millis(100)).await;
}
}

233
core/logger/src/lib.rs Normal file
View File

@ -0,0 +1,233 @@
//! # Hero Logger
//!
//! A hierarchical logging system for the Hero project that provides:
//! - System-level logging with component-based filtering
//! - Per-job logging with complete isolation
//! - Hourly log rotation
//! - Integration with the tracing ecosystem
//!
//! ## Architecture
//!
//! The logging system uses a hybrid approach:
//! - **System Logger**: Long-lived, captures all non-job-specific logs
//! - **Per-Job Logger**: Short-lived, captures all logs for a single job
//!
//! ## Usage
//!
//! ```rust
//! use hero_logger;
//!
//! // Initialize system logger (once at startup)
//! let components = vec!["supervisor".to_string(), "osis_actor".to_string()];
//! hero_logger::init_system_logger("logs", &components)?;
//!
//! // Use system logging
//! tracing::info!(target: "supervisor", "System started");
//!
//! // Create per-job logger for isolated logging
//! let job_logger = hero_logger::create_job_logger("logs", "osis", "job-123")?;
//! tracing::subscriber::with_default(job_logger, || {
//! tracing::info!(target: "osis_actor", "Job processing started");
//! });
//! ```
use std::path::{Path, PathBuf};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
use tracing_appender::non_blocking::WorkerGuard;
mod system_logger;
mod job_logger;
mod file_appender;
mod utils;
pub mod rhai_integration;
pub use system_logger::*;
pub use job_logger::*;
pub use file_appender::*;
pub use utils::*;
/// Errors that can occur during logging operations
#[derive(thiserror::Error, Debug)]
pub enum LoggerError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Tracing error: {0}")]
Tracing(String),
#[error("Invalid configuration: {0}")]
Config(String),
#[error("Directory creation failed: {0}")]
DirectoryCreation(String),
}
/// Result type for logger operations
pub type Result<T> = std::result::Result<T, LoggerError>;
/// Initialize the system logger with component-based filtering
///
/// This function sets up the global tracing subscriber with multiple file appenders,
/// each filtered by component target. It should be called once at application startup.
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `components` - List of component names that will have dedicated log directories
///
/// # Returns
///
/// Returns a vector of `WorkerGuard`s that must be kept alive for the duration
/// of the application to ensure proper log flushing.
///
/// # Example
///
/// ```rust
/// let components = vec![
/// "supervisor".to_string(),
/// "osis_actor".to_string(),
/// "sal_actor".to_string(),
/// ];
/// let _guards = hero_logger::init_system_logger("logs", &components)?;
/// ```
pub fn init_system_logger<P: AsRef<Path>>(
logs_root: P,
components: &[String],
) -> Result<Vec<WorkerGuard>> {
system_logger::init_system_logger(logs_root, components)
}
/// Create a per-job logger for isolated job logging
///
/// This function creates a temporary tracing subscriber that writes exclusively
/// to a job-specific directory. The subscriber should be used with
/// `tracing::subscriber::with_default()` to scope all logging within a job.
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `actor_type` - Type of actor (e.g., "osis", "sal")
/// * `job_id` - Unique job identifier
///
/// # Returns
///
/// Returns a boxed subscriber that can be used with `with_default()`
///
/// # Example
///
/// ```rust
/// let job_logger = hero_logger::create_job_logger("logs", "osis", "job-abc123")?;
///
/// tracing::subscriber::with_default(job_logger, || {
/// tracing::info!(target: "osis_actor", "Job started");
/// // All tracing calls here go to the job-specific log
/// });
/// ```
pub fn create_job_logger<P: AsRef<Path>>(
logs_root: P,
actor_type: &str,
job_id: &str,
) -> Result<Box<dyn tracing::Subscriber + Send + Sync>> {
job_logger::create_job_logger(logs_root, actor_type, job_id)
}
/// Create a job logger that returns both the subscriber and the guard
///
/// This variant returns both the subscriber and the worker guard, giving
/// the caller control over the guard's lifetime.
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `actor_type` - Type of actor (e.g., "osis", "sal")
/// * `job_id` - Unique job identifier
///
/// # Returns
///
/// Returns a tuple of (subscriber, guard) where the guard must be kept alive
/// for proper log flushing.
pub fn create_job_logger_with_guard<P: AsRef<Path>>(
logs_root: P,
actor_type: &str,
job_id: &str,
) -> Result<(Box<dyn tracing::Subscriber + Send + Sync>, WorkerGuard)> {
job_logger::create_job_logger_with_guard(logs_root, actor_type, job_id)
}
/// Ensure the log directory structure exists
///
/// Creates the necessary directory structure for the logging system:
/// - `logs/supervisor/`
/// - `logs/actor/osis/`
/// - `logs/actor/sal/`
/// - etc.
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `components` - List of component names
pub fn ensure_log_directories<P: AsRef<Path>>(
logs_root: P,
components: &[String],
) -> Result<()> {
utils::ensure_log_directories(logs_root, components)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use tracing::info;
#[tokio::test]
async fn test_system_logger_initialization() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let components = vec![
"supervisor".to_string(),
"test_actor".to_string(),
];
let _guards = init_system_logger(logs_root, &components).unwrap();
// Verify directories were created
assert!(logs_root.join("supervisor").exists());
assert!(logs_root.join("actor/test_actor").exists());
}
#[tokio::test]
async fn test_job_logger_creation() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let job_logger = create_job_logger(logs_root, "test", "job-123").unwrap();
// Verify job directory was created
assert!(logs_root.join("actor/test/job-job-123").exists());
// Test that we can use the logger
tracing::subscriber::with_default(job_logger, || {
info!(target: "test_actor", "Test log message");
});
}
#[tokio::test]
async fn test_directory_creation() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let components = vec![
"supervisor".to_string(),
"osis_actor".to_string(),
"sal_actor".to_string(),
];
ensure_log_directories(logs_root, &components).unwrap();
// Verify all directories exist
assert!(logs_root.join("supervisor").exists());
assert!(logs_root.join("actor/osis_actor").exists());
assert!(logs_root.join("actor/sal_actor").exists());
}
}

View File

@ -0,0 +1,411 @@
//! Rhai Engine Integration for Logging
//!
//! This module provides integration between Rhai scripts and the tracing logging system,
//! allowing Rhai print() and debug() calls to be captured in the logging infrastructure.
use rhai::{Engine, Dynamic};
use tracing::{info, debug, warn, error};
/// Configure a Rhai engine to capture print and debug output through tracing
///
/// This function sets up custom print and debug hooks that route Rhai script
/// output through the tracing system, allowing it to be captured by both
/// system and per-job loggers.
///
/// # Arguments
///
/// * `engine` - Mutable reference to the Rhai engine to configure
/// * `target` - Target name for tracing (e.g., "osis_actor", "sal_actor")
///
/// # Example
///
/// ```rust
/// use rhai::Engine;
/// use hero_logger::rhai_integration::configure_rhai_logging;
///
/// let mut engine = Engine::new();
/// configure_rhai_logging(&mut engine, "osis_actor");
///
/// // Now when Rhai scripts call print() or debug(), they will be logged
/// engine.eval::<()>(r#"print("Hello from Rhai!");"#).unwrap();
/// ```
pub fn configure_rhai_logging(engine: &mut Engine, target: &str) {
// Use a macro to create the logging functions with constant targets
match target {
"supervisor" => {
engine.on_print(|text| {
info!(target: "supervisor", "[Rhai Script] {}", text);
});
engine.on_debug(|text, source, pos| {
if let Some(source) = source {
if pos.is_none() {
debug!(target: "supervisor", "[Rhai Debug] {} (from {})", text, source);
} else {
debug!(target: "supervisor", "[Rhai Debug] {} (from {} at {:?})", text, source, pos);
}
} else {
debug!(target: "supervisor", "[Rhai Debug] {}", text);
}
});
}
"osis_actor" => {
engine.on_print(|text| {
info!(target: "osis_actor", "[Rhai Script] {}", text);
});
engine.on_debug(|text, source, pos| {
if let Some(source) = source {
if pos.is_none() {
debug!(target: "osis_actor", "[Rhai Debug] {} (from {})", text, source);
} else {
debug!(target: "osis_actor", "[Rhai Debug] {} (from {} at {:?})", text, source, pos);
}
} else {
debug!(target: "osis_actor", "[Rhai Debug] {}", text);
}
});
}
"sal_actor" => {
engine.on_print(|text| {
info!(target: "sal_actor", "[Rhai Script] {}", text);
});
engine.on_debug(|text, source, pos| {
if let Some(source) = source {
if pos.is_none() {
debug!(target: "sal_actor", "[Rhai Debug] {} (from {})", text, source);
} else {
debug!(target: "sal_actor", "[Rhai Debug] {} (from {} at {:?})", text, source, pos);
}
} else {
debug!(target: "sal_actor", "[Rhai Debug] {}", text);
}
});
}
"v_actor" => {
engine.on_print(|text| {
info!(target: "v_actor", "[Rhai Script] {}", text);
});
engine.on_debug(|text, source, pos| {
if let Some(source) = source {
if pos.is_none() {
debug!(target: "v_actor", "[Rhai Debug] {} (from {})", text, source);
} else {
debug!(target: "v_actor", "[Rhai Debug] {} (from {} at {:?})", text, source, pos);
}
} else {
debug!(target: "v_actor", "[Rhai Debug] {}", text);
}
});
}
"python_actor" => {
engine.on_print(|text| {
info!(target: "python_actor", "[Rhai Script] {}", text);
});
engine.on_debug(|text, source, pos| {
if let Some(source) = source {
if pos.is_none() {
debug!(target: "python_actor", "[Rhai Debug] {} (from {})", text, source);
} else {
debug!(target: "python_actor", "[Rhai Debug] {} (from {} at {:?})", text, source, pos);
}
} else {
debug!(target: "python_actor", "[Rhai Debug] {}", text);
}
});
}
_ => {
// Default fallback
engine.on_print(|text| {
info!("[Rhai Script] {}", text);
});
engine.on_debug(|text, source, pos| {
if let Some(source) = source {
if pos.is_none() {
debug!("[Rhai Debug] {} (from {})", text, source);
} else {
debug!("[Rhai Debug] {} (from {} at {:?})", text, source, pos);
}
} else {
debug!("[Rhai Debug] {}", text);
}
});
}
}
}
/// Configure a Rhai engine with enhanced logging capabilities
///
/// This function provides more advanced logging configuration, including
/// custom log levels and structured logging support.
///
/// # Arguments
///
/// * `engine` - Mutable reference to the Rhai engine to configure
/// * `config` - Configuration for Rhai logging behavior
pub fn configure_rhai_logging_advanced(engine: &mut Engine, config: RhaiLoggingConfig) {
// For now, use the basic configuration since tracing requires constant targets
configure_rhai_logging(engine, &config.target);
}
/// Configuration for Rhai logging behavior
#[derive(Debug, Clone)]
pub struct RhaiLoggingConfig {
/// Target name for tracing
pub target: String,
/// Log level for print() calls ("error", "warn", "info", "debug")
pub print_level: String,
/// Log level for debug() calls ("error", "warn", "info", "debug")
pub debug_level: String,
/// Whether to include source file and position information
pub include_source_info: bool,
/// Prefix for all Rhai log messages
pub message_prefix: Option<String>,
}
impl Default for RhaiLoggingConfig {
fn default() -> Self {
Self {
target: "rhai_script".to_string(),
print_level: "info".to_string(),
debug_level: "debug".to_string(),
include_source_info: true,
message_prefix: None,
}
}
}
impl RhaiLoggingConfig {
/// Create a new configuration with the specified target
pub fn new(target: &str) -> Self {
Self {
target: target.to_string(),
..Default::default()
}
}
/// Set the log level for print() calls
pub fn print_level(mut self, level: &str) -> Self {
self.print_level = level.to_string();
self
}
/// Set the log level for debug() calls
pub fn debug_level(mut self, level: &str) -> Self {
self.debug_level = level.to_string();
self
}
/// Set whether to include source information
pub fn include_source_info(mut self, include: bool) -> Self {
self.include_source_info = include;
self
}
/// Set a prefix for all log messages
pub fn message_prefix(mut self, prefix: &str) -> Self {
self.message_prefix = Some(prefix.to_string());
self
}
}
/// Add custom logging functions to a Rhai engine
///
/// This function adds custom logging functions (log_info, log_debug, log_warn, log_error)
/// that Rhai scripts can call directly for more granular logging control.
///
/// # Arguments
///
/// * `engine` - Mutable reference to the Rhai engine
/// * `target` - Target name for tracing
pub fn add_custom_logging_functions(engine: &mut Engine, target: &str) {
// Use match to handle different targets with constant strings
match target {
"supervisor" => {
engine.register_fn("log_info", |message: &str| {
info!(target: "supervisor", "[Rhai] {}", message);
});
engine.register_fn("log_debug", |message: &str| {
debug!(target: "supervisor", "[Rhai] {}", message);
});
engine.register_fn("log_warn", |message: &str| {
warn!(target: "supervisor", "[Rhai] {}", message);
});
engine.register_fn("log_error", |message: &str| {
error!(target: "supervisor", "[Rhai] {}", message);
});
}
"osis_actor" => {
engine.register_fn("log_info", |message: &str| {
info!(target: "osis_actor", "[Rhai] {}", message);
});
engine.register_fn("log_debug", |message: &str| {
debug!(target: "osis_actor", "[Rhai] {}", message);
});
engine.register_fn("log_warn", |message: &str| {
warn!(target: "osis_actor", "[Rhai] {}", message);
});
engine.register_fn("log_error", |message: &str| {
error!(target: "osis_actor", "[Rhai] {}", message);
});
}
"sal_actor" => {
engine.register_fn("log_info", |message: &str| {
info!(target: "sal_actor", "[Rhai] {}", message);
});
engine.register_fn("log_debug", |message: &str| {
debug!(target: "sal_actor", "[Rhai] {}", message);
});
engine.register_fn("log_warn", |message: &str| {
warn!(target: "sal_actor", "[Rhai] {}", message);
});
engine.register_fn("log_error", |message: &str| {
error!(target: "sal_actor", "[Rhai] {}", message);
});
}
"v_actor" => {
engine.register_fn("log_info", |message: &str| {
info!(target: "v_actor", "[Rhai] {}", message);
});
engine.register_fn("log_debug", |message: &str| {
debug!(target: "v_actor", "[Rhai] {}", message);
});
engine.register_fn("log_warn", |message: &str| {
warn!(target: "v_actor", "[Rhai] {}", message);
});
engine.register_fn("log_error", |message: &str| {
error!(target: "v_actor", "[Rhai] {}", message);
});
}
"python_actor" => {
engine.register_fn("log_info", |message: &str| {
info!(target: "python_actor", "[Rhai] {}", message);
});
engine.register_fn("log_debug", |message: &str| {
debug!(target: "python_actor", "[Rhai] {}", message);
});
engine.register_fn("log_warn", |message: &str| {
warn!(target: "python_actor", "[Rhai] {}", message);
});
engine.register_fn("log_error", |message: &str| {
error!(target: "python_actor", "[Rhai] {}", message);
});
}
_ => {
// Default fallback
engine.register_fn("log_info", |message: &str| {
info!("[Rhai] {}", message);
});
engine.register_fn("log_debug", |message: &str| {
debug!("[Rhai] {}", message);
});
engine.register_fn("log_warn", |message: &str| {
warn!("[Rhai] {}", message);
});
engine.register_fn("log_error", |message: &str| {
error!("[Rhai] {}", message);
});
}
}
}
/// Create a Rhai engine with full logging integration
///
/// This is a convenience function that creates a new Rhai engine and configures
/// it with comprehensive logging support.
///
/// # Arguments
///
/// * `target` - Target name for tracing
/// * `include_custom_functions` - Whether to include custom logging functions
///
/// # Returns
///
/// Returns a configured Rhai engine ready for use with logging
pub fn create_logging_enabled_engine(target: &str, include_custom_functions: bool) -> Engine {
let mut engine = Engine::new();
// Configure basic logging
configure_rhai_logging(&mut engine, target);
// Add custom logging functions if requested
if include_custom_functions {
add_custom_logging_functions(&mut engine, target);
}
engine
}
#[cfg(test)]
mod tests {
use super::*;
use tracing_test::traced_test;
#[traced_test]
#[test]
fn test_configure_rhai_logging() {
let mut engine = Engine::new();
configure_rhai_logging(&mut engine, "test_actor");
// Test print output
engine.eval::<()>(r#"print("Hello from Rhai!");"#).unwrap();
// Verify that the log was captured (tracing_test will capture it)
// In a real test, you would check the captured logs
}
#[traced_test]
#[test]
fn test_configure_rhai_logging_advanced() {
let mut engine = Engine::new();
let config = RhaiLoggingConfig::new("test_actor")
.print_level("warn")
.debug_level("info")
.include_source_info(false);
configure_rhai_logging_advanced(&mut engine, config);
// Test print and debug output
engine.eval::<()>(r#"
print("This is a print message");
debug("This is a debug message");
"#).unwrap();
}
#[traced_test]
#[test]
fn test_add_custom_logging_functions() {
let mut engine = Engine::new();
add_custom_logging_functions(&mut engine, "test_actor");
// Test custom logging functions
engine.eval::<()>(r#"
log_info("Info message");
log_debug("Debug message");
log_warn("Warning message");
log_error("Error message");
"#).unwrap();
}
#[test]
fn test_create_logging_enabled_engine() {
let engine = create_logging_enabled_engine("test_actor", true);
// Verify engine was created successfully
// In a real test, you would verify the logging configuration
assert!(engine.eval::<i64>("1 + 1").unwrap() == 2);
}
#[test]
fn test_rhai_logging_config() {
let config = RhaiLoggingConfig::new("test")
.print_level("error")
.debug_level("warn")
.include_source_info(false)
.message_prefix("TEST");
assert_eq!(config.target, "test");
assert_eq!(config.print_level, "error");
assert_eq!(config.debug_level, "warn");
assert!(!config.include_source_info);
assert_eq!(config.message_prefix, Some("TEST".to_string()));
}
}

View File

@ -0,0 +1,173 @@
//! System Logger Implementation
//!
//! This module implements the system-wide logging functionality that captures
//! all non-job-specific logs from every component with target-based filtering.
use crate::{LoggerError, Result};
use std::path::{Path, PathBuf};
use tracing_subscriber::{
filter::{EnvFilter, LevelFilter},
fmt,
layer::SubscriberExt,
util::SubscriberInitExt,
Layer,
};
use tracing_appender::{non_blocking::WorkerGuard, rolling};
/// Initialize the system logger with component-based filtering
///
/// This creates multiple file appenders, each filtered by a specific tracing target:
/// - `tracing::info!(target: "supervisor", ...)` -> `logs/supervisor/`
/// - `tracing::info!(target: "osis_actor", ...)` -> `logs/actor/osis/`
/// - etc.
pub fn init_system_logger<P: AsRef<Path>>(
logs_root: P,
components: &[String],
) -> Result<Vec<WorkerGuard>> {
let logs_root = logs_root.as_ref();
// Ensure log directories exist
crate::utils::ensure_log_directories(logs_root, components)?;
let mut guards = Vec::new();
let mut layers = Vec::new();
// Create a layer for each component
for component in components {
let (layer, guard) = create_component_layer(logs_root, component)?;
layers.push(layer);
guards.push(guard);
}
// Create the registry with all layers
let registry = tracing_subscriber::registry();
// Add all component layers to the registry
let collected_layers = layers.into_iter().collect::<Vec<_>>();
let registry = registry.with(collected_layers);
// Add console output for development
let console_layer = fmt::layer()
.with_target(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.with_filter(EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into()));
// Set as global default
registry.with(console_layer).init();
tracing::info!(target: "hero_logger", "System logger initialized with {} components", components.len());
Ok(guards)
}
/// Create a filtered layer for a specific component
fn create_component_layer<P: AsRef<Path>>(
logs_root: P,
component: &str,
) -> Result<(Box<dyn Layer<tracing_subscriber::Registry> + Send + Sync>, WorkerGuard)> {
let logs_root = logs_root.as_ref();
// Determine the log directory based on component type
let log_dir = if component == "supervisor" {
logs_root.join("supervisor")
} else {
// Extract actor type from component name (e.g., "osis_actor" -> "osis")
let actor_type = component.strip_suffix("_actor").unwrap_or(component);
logs_root.join("actor").join(actor_type)
};
// Create hourly rolling file appender
let file_appender = rolling::hourly(&log_dir, "log");
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
// Create a formatted layer with target filtering
let layer = fmt::layer()
.with_writer(non_blocking)
.with_target(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.with_ansi(false) // No ANSI colors in log files
.with_filter(
EnvFilter::new(format!("{}=trace", component))
.add_directive(LevelFilter::INFO.into())
);
Ok((layer.boxed(), guard))
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use tracing::{info, warn};
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn test_system_logger_initialization() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let components = vec![
"supervisor".to_string(),
"osis_actor".to_string(),
"sal_actor".to_string(),
];
let _guards = init_system_logger(logs_root, &components).unwrap();
// Test logging to different targets
info!(target: "supervisor", "Supervisor started");
info!(target: "osis_actor", "OSIS actor ready");
info!(target: "sal_actor", "SAL actor ready");
// Give some time for async writing
sleep(Duration::from_millis(100)).await;
// Verify directories were created
assert!(logs_root.join("supervisor").exists());
assert!(logs_root.join("actor/osis").exists());
assert!(logs_root.join("actor/sal").exists());
}
#[tokio::test]
async fn test_component_layer_creation() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
// Create supervisor layer
let (supervisor_layer, _guard1) = create_component_layer(logs_root, "supervisor").unwrap();
assert!(logs_root.join("supervisor").exists());
// Create actor layer
let (actor_layer, _guard2) = create_component_layer(logs_root, "osis_actor").unwrap();
assert!(logs_root.join("actor/osis").exists());
}
#[tokio::test]
async fn test_multiple_components() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let components = vec![
"supervisor".to_string(),
"osis_actor".to_string(),
"sal_actor".to_string(),
"v_actor".to_string(),
"python_actor".to_string(),
];
let guards = init_system_logger(logs_root, &components).unwrap();
assert_eq!(guards.len(), components.len());
// Test that all directories were created
assert!(logs_root.join("supervisor").exists());
assert!(logs_root.join("actor/osis").exists());
assert!(logs_root.join("actor/sal").exists());
assert!(logs_root.join("actor/v").exists());
assert!(logs_root.join("actor/python").exists());
}
}

468
core/logger/src/utils.rs Normal file
View File

@ -0,0 +1,468 @@
//! Utility functions for the Hero Logger
//!
//! This module provides common utility functions used throughout the logging system.
use crate::{LoggerError, Result};
use std::path::{Path, PathBuf};
/// Ensure the log directory structure exists
///
/// Creates the necessary directory structure for the logging system:
/// - `logs/supervisor/`
/// - `logs/actor/osis/`
/// - `logs/actor/sal/`
/// - etc.
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `components` - List of component names
pub fn ensure_log_directories<P: AsRef<Path>>(
logs_root: P,
components: &[String],
) -> Result<()> {
let logs_root = logs_root.as_ref();
// Create the root logs directory
std::fs::create_dir_all(logs_root)
.map_err(|e| LoggerError::DirectoryCreation(
format!("Failed to create logs root directory {}: {}", logs_root.display(), e)
))?;
// Create directories for each component
for component in components {
let component_dir = get_component_log_directory(logs_root, component);
std::fs::create_dir_all(&component_dir)
.map_err(|e| LoggerError::DirectoryCreation(
format!("Failed to create component directory {}: {}", component_dir.display(), e)
))?;
tracing::debug!(
target: "hero_logger",
"Created log directory for component '{}': {}",
component,
component_dir.display()
);
}
tracing::info!(
target: "hero_logger",
"Log directory structure created at: {}",
logs_root.display()
);
Ok(())
}
/// Get the log directory path for a specific component
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `component` - Component name (e.g., "supervisor", "osis_actor")
///
/// # Returns
///
/// Returns the appropriate directory path:
/// - "supervisor" -> `logs/supervisor/`
/// - "osis_actor" -> `logs/actor/osis/`
/// - etc.
pub fn get_component_log_directory<P: AsRef<Path>>(
logs_root: P,
component: &str,
) -> PathBuf {
let logs_root = logs_root.as_ref();
if component == "supervisor" {
logs_root.join("supervisor")
} else {
// Extract actor type from component name (e.g., "osis_actor" -> "osis")
let actor_type = component.strip_suffix("_actor").unwrap_or(component);
logs_root.join("actor").join(actor_type)
}
}
/// Get the job log directory path for a specific job
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `actor_type` - Type of actor (e.g., "osis", "sal")
/// * `job_id` - Unique job identifier
///
/// # Returns
///
/// Returns the job-specific directory path: `logs/actor/<type>/job-<job_id>/`
pub fn get_job_log_directory<P: AsRef<Path>>(
logs_root: P,
actor_type: &str,
job_id: &str,
) -> PathBuf {
logs_root
.as_ref()
.join("actor")
.join(actor_type)
.join(format!("job-{}", job_id))
}
/// Extract actor type from component name
///
/// # Arguments
///
/// * `component` - Component name (e.g., "osis_actor_1", "sal_actor")
///
/// # Returns
///
/// Returns the actor type (e.g., "osis", "sal")
pub fn extract_actor_type(component: &str) -> &str {
// Handle patterns like "osis_actor_1" -> "osis"
if let Some(actor_part) = component.strip_suffix("_actor") {
return actor_part;
}
// Handle patterns like "osis_actor_1" -> "osis"
if component.contains("_actor_") {
if let Some(pos) = component.find("_actor_") {
return &component[..pos];
}
}
// Handle patterns like "osis_actor" -> "osis"
component.strip_suffix("_actor").unwrap_or(component)
}
/// Generate a timestamp string for log file naming
///
/// # Arguments
///
/// * `format` - Timestamp format ("hourly", "daily", or custom format string)
///
/// # Returns
///
/// Returns a formatted timestamp string
pub fn generate_timestamp(format: &str) -> String {
let now = chrono::Utc::now();
match format {
"hourly" => now.format("%Y-%m-%d-%H").to_string(),
"daily" => now.format("%Y-%m-%d").to_string(),
custom => now.format(custom).to_string(),
}
}
/// Clean up old log files in a directory
///
/// # Arguments
///
/// * `directory` - Directory to clean up
/// * `file_pattern` - Pattern to match files (e.g., "*.log")
/// * `max_age_days` - Maximum age in days for files to keep
pub fn cleanup_old_logs<P: AsRef<Path>>(
directory: P,
file_pattern: &str,
max_age_days: u64,
) -> Result<usize> {
let directory = directory.as_ref();
if !directory.exists() {
return Ok(0);
}
let cutoff_time = std::time::SystemTime::now()
.checked_sub(std::time::Duration::from_secs(max_age_days * 24 * 60 * 60))
.ok_or_else(|| LoggerError::Config("Invalid max_age_days value".to_string()))?;
let mut removed_count = 0;
let entries = std::fs::read_dir(directory)
.map_err(|e| LoggerError::Io(e))?;
for entry in entries {
let entry = entry.map_err(|e| LoggerError::Io(e))?;
let path = entry.path();
if path.is_file() {
if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
// Simple pattern matching (could be enhanced with regex)
let matches_pattern = if file_pattern == "*" {
true
} else if file_pattern.starts_with("*.") {
let extension = &file_pattern[2..];
file_name.ends_with(extension)
} else {
file_name.contains(file_pattern)
};
if matches_pattern {
if let Ok(metadata) = entry.metadata() {
if let Ok(modified) = metadata.modified() {
if modified < cutoff_time {
if let Err(e) = std::fs::remove_file(&path) {
tracing::warn!(
target: "hero_logger",
"Failed to remove old log file {}: {}",
path.display(),
e
);
} else {
tracing::debug!(
target: "hero_logger",
"Removed old log file: {}",
path.display()
);
removed_count += 1;
}
}
}
}
}
}
}
}
if removed_count > 0 {
tracing::info!(
target: "hero_logger",
"Cleaned up {} old log files from {}",
removed_count,
directory.display()
);
}
Ok(removed_count)
}
/// Get disk usage information for the logs directory
pub fn get_logs_disk_usage<P: AsRef<Path>>(logs_root: P) -> Result<LogsDiskUsage> {
let logs_root = logs_root.as_ref();
if !logs_root.exists() {
return Ok(LogsDiskUsage {
total_size_bytes: 0,
file_count: 0,
directories: Vec::new(),
});
}
let mut total_size = 0u64;
let mut file_count = 0usize;
let mut directories = Vec::new();
fn scan_directory(
dir: &Path,
total_size: &mut u64,
file_count: &mut usize,
) -> Result<DirectoryUsage> {
let mut dir_size = 0u64;
let mut dir_file_count = 0usize;
let entries = std::fs::read_dir(dir)
.map_err(|e| LoggerError::Io(e))?;
for entry in entries {
let entry = entry.map_err(|e| LoggerError::Io(e))?;
let path = entry.path();
if path.is_file() {
if let Ok(metadata) = entry.metadata() {
let size = metadata.len();
dir_size += size;
*total_size += size;
dir_file_count += 1;
*file_count += 1;
}
} else if path.is_dir() {
let sub_usage = scan_directory(&path, total_size, file_count)?;
dir_size += sub_usage.size_bytes;
dir_file_count += sub_usage.file_count;
}
}
Ok(DirectoryUsage {
path: dir.to_path_buf(),
size_bytes: dir_size,
file_count: dir_file_count,
})
}
let root_usage = scan_directory(logs_root, &mut total_size, &mut file_count)?;
directories.push(root_usage);
Ok(LogsDiskUsage {
total_size_bytes: total_size,
file_count,
directories,
})
}
/// Information about disk usage of logs
#[derive(Debug, Clone)]
pub struct LogsDiskUsage {
pub total_size_bytes: u64,
pub file_count: usize,
pub directories: Vec<DirectoryUsage>,
}
/// Information about disk usage of a specific directory
#[derive(Debug, Clone)]
pub struct DirectoryUsage {
pub path: PathBuf,
pub size_bytes: u64,
pub file_count: usize,
}
impl LogsDiskUsage {
/// Get total size in human-readable format
pub fn total_size_human(&self) -> String {
format_bytes(self.total_size_bytes)
}
}
impl DirectoryUsage {
/// Get size in human-readable format
pub fn size_human(&self) -> String {
format_bytes(self.size_bytes)
}
}
/// Format bytes in human-readable format
fn format_bytes(bytes: u64) -> String {
const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
let mut size = bytes as f64;
let mut unit_index = 0;
while size >= 1024.0 && unit_index < UNITS.len() - 1 {
size /= 1024.0;
unit_index += 1;
}
if unit_index == 0 {
format!("{} {}", bytes, UNITS[unit_index])
} else {
format!("{:.2} {}", size, UNITS[unit_index])
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use std::time::Duration;
#[test]
fn test_ensure_log_directories() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let components = vec![
"supervisor".to_string(),
"osis_actor".to_string(),
"sal_actor".to_string(),
];
ensure_log_directories(logs_root, &components).unwrap();
assert!(logs_root.join("supervisor").exists());
assert!(logs_root.join("actor/osis").exists());
assert!(logs_root.join("actor/sal").exists());
}
#[test]
fn test_get_component_log_directory() {
let logs_root = Path::new("/logs");
assert_eq!(
get_component_log_directory(logs_root, "supervisor"),
logs_root.join("supervisor")
);
assert_eq!(
get_component_log_directory(logs_root, "osis_actor"),
logs_root.join("actor/osis")
);
assert_eq!(
get_component_log_directory(logs_root, "sal_actor_1"),
logs_root.join("actor/sal_actor_1")
);
}
#[test]
fn test_get_job_log_directory() {
let logs_root = Path::new("/logs");
assert_eq!(
get_job_log_directory(logs_root, "osis", "job-123"),
logs_root.join("actor/osis/job-job-123")
);
}
#[test]
fn test_extract_actor_type() {
assert_eq!(extract_actor_type("osis_actor"), "osis");
assert_eq!(extract_actor_type("sal_actor_1"), "sal");
assert_eq!(extract_actor_type("python_actor"), "python");
assert_eq!(extract_actor_type("supervisor"), "supervisor");
assert_eq!(extract_actor_type("custom"), "custom");
}
#[test]
fn test_generate_timestamp() {
let hourly = generate_timestamp("hourly");
let daily = generate_timestamp("daily");
// Basic format validation
assert!(hourly.len() >= 13); // YYYY-MM-DD-HH
assert!(daily.len() >= 10); // YYYY-MM-DD
// Custom format
let custom = generate_timestamp("%Y%m%d");
assert!(custom.len() == 8); // YYYYMMDD
}
#[test]
fn test_cleanup_old_logs() {
let temp_dir = TempDir::new().unwrap();
let logs_dir = temp_dir.path();
// Create some test log files
for i in 0..5 {
let file_path = logs_dir.join(format!("test{}.log", i));
std::fs::write(&file_path, "test content").unwrap();
}
// Create a non-log file
std::fs::write(logs_dir.join("not_a_log.txt"), "content").unwrap();
// Cleanup with 0 days (should remove all files)
let removed = cleanup_old_logs(logs_dir, "*.log", 0).unwrap();
assert_eq!(removed, 5);
// Verify non-log file still exists
assert!(logs_dir.join("not_a_log.txt").exists());
}
#[test]
fn test_format_bytes() {
assert_eq!(format_bytes(0), "0 B");
assert_eq!(format_bytes(1023), "1023 B");
assert_eq!(format_bytes(1024), "1.00 KB");
assert_eq!(format_bytes(1024 * 1024), "1.00 MB");
assert_eq!(format_bytes(1024 * 1024 * 1024), "1.00 GB");
}
#[test]
fn test_get_logs_disk_usage() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
// Create some test files
std::fs::create_dir_all(logs_root.join("supervisor")).unwrap();
std::fs::write(logs_root.join("supervisor/test.log"), "test content").unwrap();
let usage = get_logs_disk_usage(logs_root).unwrap();
assert!(usage.total_size_bytes > 0);
assert!(usage.file_count > 0);
assert!(!usage.directories.is_empty());
}
}

View File

@ -5,14 +5,13 @@ edition = "2021"
[dependencies]
clap = { version = "4.4", features = ["derive"] }
env_logger = "0.10"
redis = { version = "0.25.0", features = ["tokio-comp"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
toml = "0.8"
uuid = { version = "1.6", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
log = "0.4"
tracing = "0.1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] } # For async main in examples, and general async
colored = "2.0"
hero_job = { path = "../job" }
@ -22,5 +21,5 @@ crossterm = "0.28"
anyhow = "1.0"
[dev-dependencies] # For examples later
env_logger = "0.10"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
rhai = "1.18.0" # For examples that might need to show engine setup

View File

@ -1,4 +1,4 @@
use log::{debug, error, info, warn};
use tracing::{debug, error, info, warn};
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

View File

@ -3,7 +3,7 @@
//! This module provides actor process lifecycle management using Zinit as the process manager.
//! All functionality is implemented as methods on the Supervisor struct for a clean API.
use log::{debug, error, info, warn};
use tracing::{debug, error, info, warn};
use serde_json::json;
use std::collections::HashMap;
use std::path::PathBuf;