Compare commits

..

3 Commits

Author SHA1 Message Date
11e2a79bf6 ... 2025-08-25 07:07:18 +02:00
efd8ffb102 ... 2025-08-25 07:06:53 +02:00
Maxime Van Hees
473b9ca753 remove ai references for docs 2025-08-14 14:19:40 +02:00
12 changed files with 12 additions and 2513 deletions

16
Cargo.lock generated
View File

@@ -398,7 +398,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
dependencies = [
"lazy_static",
"windows-sys 0.48.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -966,9 +966,9 @@ dependencies = [
[[package]]
name = "hashbrown"
version = "0.15.4"
version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5"
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"allocator-api2",
"equivalent",
@@ -2470,8 +2470,12 @@ dependencies = [
[[package]]
name = "reth-ipc"
version = "1.6.0"
<<<<<<< HEAD
source = "git+https://github.com/paradigmxyz/reth#c23e53377922d8a842df55ededa6700e2b938d64"
=======
source = "git+https://github.com/paradigmxyz/reth#59e4a5556fa54f1c210e45412b6a91f2351bea19"
source = "git+https://github.com/paradigmxyz/reth#59e4a5556fa54f1c210e45412b6a91f2351bea19"
>>>>>>> 473b9ca753b9e350a8824e527789d53928b867d4
dependencies = [
"bytes",
"futures",
@@ -2929,9 +2933,9 @@ checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
[[package]]
name = "slab"
version = "0.4.10"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d"
checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589"
[[package]]
name = "smallvec"
@@ -3833,7 +3837,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.48.0",
"windows-sys 0.59.0",
]
[[package]]

View File

@@ -1,23 +0,0 @@
[package]
name = "hero_logger"
version = "0.1.0"
edition = "2021"
description = "Hierarchical logging system for the Hero project with system and per-job isolation"
authors = ["Hero Team"]
[dependencies]
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "registry", "fmt"] }
tracing-appender = "0.2"
tokio = { version = "1", features = ["fs", "time", "rt"] }
chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
anyhow = "1.0"
rhai = "1.21.0"
[dev-dependencies]
tempfile = "3.0"
tokio-test = "0.4"
tracing-test = "0.2"

View File

@@ -1,259 +0,0 @@
# Hero Logger
A hierarchical logging system for the Hero project that provides system-level and per-job logging with complete isolation using the `tracing` ecosystem.
## Features
- **Hierarchical Organization**: Physical separation of logs by component and job
- **System Logger**: Global logging for all non-job-specific events
- **Per-Job Logger**: Isolated logging for individual job execution
- **Hourly Rotation**: Automatic log file rotation every hour
- **Rhai Integration**: Capture Rhai script `print()` and `debug()` calls
- **High Performance**: Async logging with efficient filtering
- **Structured Logging**: Rich context and metadata support
## Architecture
The logging system uses a hybrid approach with two main components:
### System Logger (Global)
- Long-lived logger initialized at application startup
- Routes logs to different files based on tracing targets
- Supports multiple components simultaneously
### Per-Job Logger (Dynamic)
- Created on-demand for each job execution
- Provides complete isolation for job-specific logs
- Automatically disposed after job completion
## Directory Structure
```
logs/
├── supervisor/ # System logs for supervisor
│ └── 2025-08-06-11.log
└── actor/
├── osis/
│ ├── 2025-08-06-11.log # General OSIS actor logs
│ ├── job-a1b2c3d4/ # Job-specific logs
│ │ └── 2025-08-06-11.log
│ └── job-9a8b7c6d/
│ └── 2025-08-06-12.log
└── sal/
├── 2025-08-06-13.log # General SAL actor logs
└── job-f1e2d3c4/
└── 2025-08-06-13.log
```
## Quick Start
### 1. Initialize System Logger
```rust
use hero_logger;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Define your system components
let components = vec![
"supervisor".to_string(),
"osis_actor".to_string(),
"sal_actor".to_string(),
];
// Initialize the system logger
let _guards = hero_logger::init_system_logger("logs", &components)?;
// Now you can use tracing macros with targets
tracing::info!(target: "supervisor", "System started");
tracing::info!(target: "osis_actor", "Actor ready");
Ok(())
}
```
### 2. Per-Job Logging
```rust
use hero_logger::create_job_logger;
use tracing::subscriber::with_default;
async fn process_job(job_id: &str, actor_type: &str) {
// Create job-specific logger
let job_logger = create_job_logger("logs", actor_type, job_id)?;
// Execute job within logging context
with_default(job_logger, || {
tracing::info!(target: "osis_actor", "Job {} started", job_id);
// All tracing calls here go to the job-specific log
tracing::debug!(target: "osis_actor", "Processing data...");
tracing::info!(target: "osis_actor", "Job {} completed", job_id);
});
}
```
### 3. Rhai Script Integration
```rust
use hero_logger::rhai_integration::configure_rhai_logging;
use rhai::Engine;
fn setup_rhai_engine() -> Engine {
let mut engine = Engine::new();
// Configure Rhai to capture print/debug calls
configure_rhai_logging(&mut engine, "osis_actor");
engine
}
// Now Rhai scripts can use print() and debug()
let script = r#"
print("Hello from Rhai!");
debug("Debug information");
42
"#;
let result = engine.eval::<i64>(script)?;
```
## API Reference
### Core Functions
#### `init_system_logger(logs_root, components)`
Initialize the global system logger with component-based filtering.
**Parameters:**
- `logs_root`: Root directory for log files
- `components`: List of component names for dedicated logging
**Returns:** Vector of `WorkerGuard`s that must be kept alive
#### `create_job_logger(logs_root, actor_type, job_id)`
Create a per-job logger for isolated logging.
**Parameters:**
- `logs_root`: Root directory for log files
- `actor_type`: Type of actor (e.g., "osis", "sal")
- `job_id`: Unique job identifier
**Returns:** Boxed subscriber for use with `with_default()`
### Rhai Integration
#### `configure_rhai_logging(engine, target)`
Configure a Rhai engine to capture print/debug output.
#### `add_custom_logging_functions(engine, target)`
Add custom logging functions (`log_info`, `log_debug`, etc.) to Rhai.
#### `create_logging_enabled_engine(target, include_custom)`
Create a new Rhai engine with full logging integration.
### Utilities
#### `ensure_log_directories(logs_root, components)`
Ensure the log directory structure exists.
#### `extract_actor_type(component)`
Extract actor type from component name.
#### `cleanup_old_logs(directory, pattern, max_age_days)`
Clean up old log files based on age.
## Configuration
### Log Levels
The system supports standard tracing log levels:
- `ERROR`: Critical errors
- `WARN`: Warning messages
- `INFO`: Informational messages
- `DEBUG`: Debug information
- `TRACE`: Detailed trace information
### Environment Variables
- `RUST_LOG`: Set log level filtering (e.g., `RUST_LOG=debug`)
### File Rotation
- **Hourly**: Default rotation every hour
- **Daily**: Optional daily rotation
- **Never**: Single file (no rotation)
## Examples
### Basic Usage
```bash
cargo run --example logging_demo
```
### Integration with Actor System
```rust
// In your actor implementation
async fn process_job(&self, job: &Job) {
let job_logger = hero_logger::create_job_logger(
"logs",
&self.actor_type,
&job.id
).unwrap();
let job_task = async move {
tracing::info!(target: &self.actor_type, "Job processing started");
// Configure Rhai engine for this job
let mut engine = Engine::new();
hero_logger::rhai_integration::configure_rhai_logging(
&mut engine,
&self.actor_type
);
// Execute Rhai script - print/debug calls captured
let result = engine.eval::<String>(&job.script)?;
tracing::info!(target: &self.actor_type, "Job finished: {}", result);
Ok(result)
};
// Execute with job-specific logging
tracing::subscriber::with_default(job_logger, job_task).await;
}
```
## Performance Considerations
- **Async Logging**: All file I/O is asynchronous
- **Efficient Filtering**: Target-based filtering minimizes overhead
- **Memory Usage**: Per-job loggers are short-lived and automatically cleaned up
- **File Handles**: Automatic rotation prevents excessive file handle usage
## Troubleshooting
### Common Issues
1. **Logs not appearing**: Ensure `WorkerGuard`s are kept alive
2. **Permission errors**: Check write permissions on log directory
3. **Missing directories**: Use `ensure_log_directories()` before logging
4. **Rhai output not captured**: Verify `configure_rhai_logging()` is called
### Debug Mode
Enable debug logging to see internal logger operations:
```bash
RUST_LOG=hero_logger=debug cargo run
```
## Testing
Run the test suite:
```bash
cargo test
```
Run the demo example:
```bash
cargo run --example logging_demo
```
## License
This project is part of the Hero ecosystem and follows the same licensing terms.

View File

@@ -1,142 +0,0 @@
//! Logging System Demo
//!
//! This example demonstrates the Hero logging system functionality including:
//! - System logger initialization
//! - Per-job logger creation
//! - Rhai script integration with logging
//! - Directory structure creation
use hero_logger::{
init_system_logger, create_job_logger, rhai_integration::configure_rhai_logging,
};
use tracing::{info, debug, warn, error};
use tracing::subscriber::with_default;
use rhai::Engine;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("🚀 Hero Logging System Demo");
println!("============================");
// 1. Initialize the system logger
println!("\n📋 Step 1: Initializing system logger...");
let components = vec![
"supervisor".to_string(),
"osis_actor".to_string(),
"sal_actor".to_string(),
];
let _guards = init_system_logger("demo_logs", &components)?;
println!("✅ System logger initialized with {} components", components.len());
// 2. Test system-level logging
println!("\n📝 Step 2: Testing system-level logging...");
info!(target: "supervisor", "Supervisor started successfully");
info!(target: "osis_actor", "OSIS actor is ready");
info!(target: "sal_actor", "SAL actor is ready");
warn!(target: "supervisor", "This is a warning message");
error!(target: "supervisor", "This is an error message for testing");
// Give time for async logging
sleep(Duration::from_millis(100)).await;
println!("✅ System logs written to demo_logs/supervisor/ and demo_logs/actor/*/");
// 3. Test per-job logging
println!("\n🔄 Step 3: Testing per-job logging...");
// Create job loggers for different jobs
let job1_logger = create_job_logger("demo_logs", "osis", "demo-job-001")?;
let job2_logger = create_job_logger("demo_logs", "sal", "demo-job-002")?;
// Execute logging within job contexts
with_default(job1_logger, || {
info!(target: "osis_actor", "Job demo-job-001 started");
debug!(target: "osis_actor", "Processing OSIS data");
info!(target: "osis_actor", "Job demo-job-001 completed successfully");
});
with_default(job2_logger, || {
info!(target: "sal_actor", "Job demo-job-002 started");
debug!(target: "sal_actor", "Processing SAL data");
warn!(target: "sal_actor", "Minor issue detected but continuing");
info!(target: "sal_actor", "Job demo-job-002 completed successfully");
});
sleep(Duration::from_millis(100)).await;
println!("✅ Per-job logs written to demo_logs/actor/*/job-*/");
// 4. Test Rhai integration
println!("\n🔧 Step 4: Testing Rhai script logging integration...");
let job3_logger = create_job_logger("demo_logs", "osis", "rhai-demo-003")?;
with_default(job3_logger, || {
let mut engine = Engine::new();
configure_rhai_logging(&mut engine, "osis_actor");
info!(target: "osis_actor", "Starting Rhai script execution");
// Execute a Rhai script that uses print and debug
let script = r#"
print("Hello from Rhai script!");
debug("This is a debug message from Rhai");
let result = 42 + 8;
print("Calculation result: " + result);
result
"#;
match engine.eval::<i64>(script) {
Ok(result) => {
info!(target: "osis_actor", "Rhai script completed with result: {}", result);
}
Err(e) => {
error!(target: "osis_actor", "Rhai script failed: {:?}", e);
}
}
});
sleep(Duration::from_millis(100)).await;
println!("✅ Rhai script logs captured in per-job logger");
// 5. Display directory structure
println!("\n📁 Step 5: Generated directory structure:");
display_directory_structure("demo_logs", 0)?;
println!("\n🎉 Demo completed successfully!");
println!("Check the 'demo_logs' directory to see the generated log files.");
println!("Each component and job has its own isolated log files with hourly rotation.");
Ok(())
}
/// Recursively display directory structure
fn display_directory_structure(path: &str, depth: usize) -> Result<(), Box<dyn std::error::Error>> {
let path = std::path::Path::new(path);
if !path.exists() {
return Ok(());
}
let indent = " ".repeat(depth);
if path.is_dir() {
println!("{}📁 {}/", indent, path.file_name().unwrap_or_default().to_string_lossy());
let mut entries: Vec<_> = std::fs::read_dir(path)?.collect::<Result<Vec<_>, _>>()?;
entries.sort_by_key(|entry| entry.file_name());
for entry in entries {
let entry_path = entry.path();
if entry_path.is_dir() {
display_directory_structure(&entry_path.to_string_lossy(), depth + 1)?;
} else {
println!("{}📄 {}", " ".repeat(depth + 1), entry.file_name().to_string_lossy());
}
}
}
Ok(())
}

View File

@@ -1,285 +0,0 @@
//! Custom File Appender Implementation
//!
//! This module provides custom file appender functionality with enhanced
//! rotation and directory management capabilities.
use crate::{LoggerError, Result};
use std::path::{Path, PathBuf};
use tracing_appender::rolling::{RollingFileAppender, Rotation};
/// Create a custom rolling file appender with enhanced configuration
pub fn create_rolling_appender<P: AsRef<Path>>(
directory: P,
file_name_prefix: &str,
rotation: AppenderRotation,
) -> Result<RollingFileAppender> {
let directory = directory.as_ref();
// Ensure directory exists
std::fs::create_dir_all(directory)
.map_err(|e| LoggerError::DirectoryCreation(
format!("Failed to create directory {}: {}", directory.display(), e)
))?;
let rotation = match rotation {
AppenderRotation::Hourly => Rotation::HOURLY,
AppenderRotation::Daily => Rotation::DAILY,
AppenderRotation::Never => Rotation::NEVER,
};
let appender = tracing_appender::rolling::Builder::new()
.rotation(rotation)
.filename_prefix(file_name_prefix)
.filename_suffix("log")
.build(directory)
.map_err(|e| LoggerError::Config(format!("Failed to create rolling appender: {}", e)))?;
Ok(appender)
}
/// Enhanced rotation configuration
#[derive(Debug, Clone, Copy)]
pub enum AppenderRotation {
/// Rotate files every hour
Hourly,
/// Rotate files every day
Daily,
/// Never rotate (single file)
Never,
}
/// File appender builder for more complex configurations
pub struct FileAppenderBuilder {
directory: PathBuf,
file_prefix: String,
file_suffix: String,
rotation: AppenderRotation,
max_files: Option<usize>,
}
impl FileAppenderBuilder {
/// Create a new file appender builder
pub fn new<P: AsRef<Path>>(directory: P) -> Self {
Self {
directory: directory.as_ref().to_path_buf(),
file_prefix: "log".to_string(),
file_suffix: "log".to_string(),
rotation: AppenderRotation::Hourly,
max_files: None,
}
}
/// Set the file prefix
pub fn file_prefix<S: Into<String>>(mut self, prefix: S) -> Self {
self.file_prefix = prefix.into();
self
}
/// Set the file suffix
pub fn file_suffix<S: Into<String>>(mut self, suffix: S) -> Self {
self.file_suffix = suffix.into();
self
}
/// Set the rotation policy
pub fn rotation(mut self, rotation: AppenderRotation) -> Self {
self.rotation = rotation;
self
}
/// Set maximum number of files to keep (for cleanup)
pub fn max_files(mut self, max: usize) -> Self {
self.max_files = Some(max);
self
}
/// Build the file appender
pub fn build(self) -> Result<RollingFileAppender> {
// Ensure directory exists
std::fs::create_dir_all(&self.directory)
.map_err(|e| LoggerError::DirectoryCreation(
format!("Failed to create directory {}: {}", self.directory.display(), e)
))?;
let rotation = match self.rotation {
AppenderRotation::Hourly => Rotation::HOURLY,
AppenderRotation::Daily => Rotation::DAILY,
AppenderRotation::Never => Rotation::NEVER,
};
let appender = tracing_appender::rolling::Builder::new()
.rotation(rotation)
.filename_prefix(&self.file_prefix)
.filename_suffix(&self.file_suffix)
.build(&self.directory)
.map_err(|e| LoggerError::Config(format!("Failed to create rolling appender: {}", e)))?;
// Perform cleanup if max_files is set
if let Some(max_files) = self.max_files {
if let Err(e) = cleanup_old_files(&self.directory, &self.file_prefix, max_files) {
tracing::warn!("Failed to cleanup old log files: {}", e);
}
}
Ok(appender)
}
}
/// Clean up old log files, keeping only the most recent ones
fn cleanup_old_files<P: AsRef<Path>>(
directory: P,
file_prefix: &str,
max_files: usize,
) -> Result<()> {
let directory = directory.as_ref();
let mut log_files = Vec::new();
// Read directory and collect log files
let entries = std::fs::read_dir(directory)
.map_err(|e| LoggerError::Io(e))?;
for entry in entries {
let entry = entry.map_err(|e| LoggerError::Io(e))?;
let path = entry.path();
if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
if file_name.starts_with(file_prefix) && file_name.ends_with(".log") {
if let Ok(metadata) = entry.metadata() {
if let Ok(modified) = metadata.modified() {
log_files.push((path, modified));
}
}
}
}
}
// Sort by modification time (newest first)
log_files.sort_by(|a, b| b.1.cmp(&a.1));
// Remove old files if we exceed max_files
if log_files.len() > max_files {
for (old_file, _) in log_files.iter().skip(max_files) {
if let Err(e) = std::fs::remove_file(old_file) {
tracing::warn!("Failed to remove old log file {}: {}", old_file.display(), e);
} else {
tracing::debug!("Removed old log file: {}", old_file.display());
}
}
}
Ok(())
}
/// Utility function to get the current log file path for a given configuration
pub fn get_current_log_file<P: AsRef<Path>>(
directory: P,
file_prefix: &str,
rotation: AppenderRotation,
) -> PathBuf {
let directory = directory.as_ref();
match rotation {
AppenderRotation::Hourly => {
let now = chrono::Utc::now();
let timestamp = now.format("%Y-%m-%d-%H");
directory.join(format!("{}.{}.log", file_prefix, timestamp))
}
AppenderRotation::Daily => {
let now = chrono::Utc::now();
let timestamp = now.format("%Y-%m-%d");
directory.join(format!("{}.{}.log", file_prefix, timestamp))
}
AppenderRotation::Never => {
directory.join(format!("{}.log", file_prefix))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use std::time::Duration;
#[test]
fn test_create_rolling_appender() {
let temp_dir = TempDir::new().unwrap();
let directory = temp_dir.path().join("logs");
let appender = create_rolling_appender(&directory, "test", AppenderRotation::Hourly).unwrap();
// Verify directory was created
assert!(directory.exists());
}
#[test]
fn test_file_appender_builder() {
let temp_dir = TempDir::new().unwrap();
let directory = temp_dir.path().join("logs");
let appender = FileAppenderBuilder::new(&directory)
.file_prefix("custom")
.file_suffix("txt")
.rotation(AppenderRotation::Daily)
.max_files(5)
.build()
.unwrap();
assert!(directory.exists());
}
#[test]
fn test_get_current_log_file() {
let temp_dir = TempDir::new().unwrap();
let directory = temp_dir.path();
// Test hourly rotation
let hourly_file = get_current_log_file(directory, "test", AppenderRotation::Hourly);
assert!(hourly_file.to_string_lossy().contains("test."));
assert!(hourly_file.extension().unwrap() == "log");
// Test daily rotation
let daily_file = get_current_log_file(directory, "test", AppenderRotation::Daily);
assert!(daily_file.to_string_lossy().contains("test."));
assert!(daily_file.extension().unwrap() == "log");
// Test never rotation
let never_file = get_current_log_file(directory, "test", AppenderRotation::Never);
assert_eq!(never_file, directory.join("test.log"));
}
#[test]
fn test_cleanup_old_files() {
let temp_dir = TempDir::new().unwrap();
let directory = temp_dir.path();
// Create some test log files
for i in 0..10 {
let file_path = directory.join(format!("test.{}.log", i));
std::fs::write(&file_path, "test content").unwrap();
// Sleep briefly to ensure different modification times
std::thread::sleep(Duration::from_millis(10));
}
// Cleanup, keeping only 5 files
cleanup_old_files(directory, "test", 5).unwrap();
// Count remaining files
let remaining_files: Vec<_> = std::fs::read_dir(directory)
.unwrap()
.filter_map(|entry| {
let entry = entry.ok()?;
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with("test.") && name.ends_with(".log") {
Some(name)
} else {
None
}
})
.collect();
assert_eq!(remaining_files.len(), 5);
}
}

View File

@@ -1,312 +0,0 @@
//! Per-Job Logger Implementation
//!
//! This module implements the per-job logging functionality that creates
//! temporary, isolated loggers for individual job execution.
use crate::{LoggerError, Result};
use std::path::{Path, PathBuf};
use tracing_subscriber::{
filter::{EnvFilter, LevelFilter},
fmt,
layer::SubscriberExt,
util::SubscriberInitExt,
Layer, Registry,
};
use tracing_appender::{non_blocking::WorkerGuard, rolling};
/// Create a per-job logger for isolated job logging
///
/// This creates a temporary tracing subscriber that writes exclusively
/// to a job-specific directory. The subscriber is designed to be used
/// with `tracing::subscriber::with_default()` to scope all logging within a job.
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `actor_type` - Type of actor (e.g., "osis", "sal")
/// * `job_id` - Unique job identifier
///
/// # Returns
///
/// Returns a boxed subscriber that can be used with `with_default()`
/// The WorkerGuard is managed internally and will be dropped when the subscriber is dropped.
pub fn create_job_logger<P: AsRef<Path>>(
logs_root: P,
actor_type: &str,
job_id: &str,
) -> Result<Box<dyn tracing::Subscriber + Send + Sync>> {
let (subscriber, _guard) = create_job_logger_with_guard(logs_root, actor_type, job_id)?;
// Note: The guard is intentionally dropped here because the job logger
// is meant to be short-lived. In practice, the job execution should be
// fast enough that logs are flushed before the guard is dropped.
// For longer-running jobs, use create_job_logger_with_guard instead.
Ok(subscriber)
}
/// Create a job logger that returns both the subscriber and the guard
///
/// This variant returns both the subscriber and the worker guard, giving
/// the caller control over the guard's lifetime for proper log flushing.
pub fn create_job_logger_with_guard<P: AsRef<Path>>(
logs_root: P,
actor_type: &str,
job_id: &str,
) -> Result<(Box<dyn tracing::Subscriber + Send + Sync>, WorkerGuard)> {
let logs_root = logs_root.as_ref();
// Create job-specific directory: logs/actor/<type>/job-<job_id>/
let job_dir = logs_root
.join("actor")
.join(actor_type)
.join(format!("job-{}", job_id));
// Ensure the job directory exists
std::fs::create_dir_all(&job_dir)
.map_err(|e| LoggerError::DirectoryCreation(format!("Failed to create job directory {}: {}", job_dir.display(), e)))?;
// Create hourly rolling file appender for the job
let file_appender = rolling::hourly(&job_dir, "log");
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
// Create a formatted layer for the job
let layer = fmt::layer()
.with_writer(non_blocking)
.with_target(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.with_ansi(false) // No ANSI colors in log files
.with_filter(
EnvFilter::new("trace") // Capture all logs within the job context
.add_directive(LevelFilter::TRACE.into())
);
// Create a registry with the job layer
let subscriber = Registry::default()
.with(layer);
tracing::debug!(
target: "hero_logger",
"Created job logger for actor_type={}, job_id={}, log_dir={}",
actor_type,
job_id,
job_dir.display()
);
Ok((Box::new(subscriber), guard))
}
/// Create a job logger with custom configuration
///
/// This allows for more fine-grained control over the job logger configuration.
pub fn create_job_logger_with_config<P: AsRef<Path>>(
logs_root: P,
actor_type: &str,
job_id: &str,
config: JobLoggerConfig,
) -> Result<(Box<dyn tracing::Subscriber + Send + Sync>, WorkerGuard)> {
let logs_root = logs_root.as_ref();
// Create job-specific directory
let job_dir = logs_root
.join("actor")
.join(actor_type)
.join(format!("job-{}", job_id));
std::fs::create_dir_all(&job_dir)
.map_err(|e| LoggerError::DirectoryCreation(format!("Failed to create job directory {}: {}", job_dir.display(), e)))?;
// Create file appender based on config
let file_appender = match config.rotation {
RotationConfig::Hourly => rolling::hourly(&job_dir, &config.file_prefix),
RotationConfig::Daily => rolling::daily(&job_dir, &config.file_prefix),
RotationConfig::Never => rolling::never(&job_dir, format!("{}.log", config.file_prefix)),
};
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
// Create layer with custom configuration
let mut layer = fmt::layer()
.with_writer(non_blocking)
.with_target(config.include_target)
.with_thread_ids(config.include_thread_ids)
.with_file(config.include_file_location)
.with_line_number(config.include_line_numbers)
.with_ansi(false);
// Apply level filter
let layer = layer.with_filter(
EnvFilter::new(&config.level_filter)
.add_directive(config.max_level.into())
);
let subscriber = Registry::default()
.with(layer);
Ok((Box::new(subscriber), guard))
}
/// Configuration for job logger creation
#[derive(Debug, Clone)]
pub struct JobLoggerConfig {
/// File prefix for log files
pub file_prefix: String,
/// Log rotation configuration
pub rotation: RotationConfig,
/// Maximum log level to capture
pub max_level: LevelFilter,
/// Level filter string (e.g., "debug", "info", "trace")
pub level_filter: String,
/// Include target in log output
pub include_target: bool,
/// Include thread IDs in log output
pub include_thread_ids: bool,
/// Include file location in log output
pub include_file_location: bool,
/// Include line numbers in log output
pub include_line_numbers: bool,
}
impl Default for JobLoggerConfig {
fn default() -> Self {
Self {
file_prefix: "job".to_string(),
rotation: RotationConfig::Hourly,
max_level: LevelFilter::TRACE,
level_filter: "trace".to_string(),
include_target: true,
include_thread_ids: true,
include_file_location: true,
include_line_numbers: true,
}
}
}
/// Log file rotation configuration
#[derive(Debug, Clone)]
pub enum RotationConfig {
/// Rotate logs hourly
Hourly,
/// Rotate logs daily
Daily,
/// Never rotate logs (single file)
Never,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use tracing::{info, debug, error};
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn test_job_logger_creation() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let job_logger = create_job_logger(logs_root, "osis", "test-job-123").unwrap();
// Verify job directory was created
let job_dir = logs_root.join("actor/osis/job-test-job-123");
assert!(job_dir.exists());
// Test logging within the job context
tracing::subscriber::with_default(job_logger, || {
info!(target: "osis_actor", "Job started");
debug!(target: "osis_actor", "Processing data");
info!(target: "osis_actor", "Job completed");
});
// Give some time for async writing
sleep(Duration::from_millis(100)).await;
}
#[tokio::test]
async fn test_job_logger_with_guard() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let (job_logger, _guard) = create_job_logger_with_guard(logs_root, "sal", "test-job-456").unwrap();
// Verify job directory was created
let job_dir = logs_root.join("actor/sal/job-test-job-456");
assert!(job_dir.exists());
// Test logging
tracing::subscriber::with_default(job_logger, || {
error!(target: "sal_actor", "Job failed with error");
});
sleep(Duration::from_millis(100)).await;
}
#[tokio::test]
async fn test_job_logger_with_custom_config() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let config = JobLoggerConfig {
file_prefix: "custom".to_string(),
rotation: RotationConfig::Never,
max_level: LevelFilter::INFO,
level_filter: "info".to_string(),
include_target: false,
include_thread_ids: false,
include_file_location: false,
include_line_numbers: false,
};
let (job_logger, _guard) = create_job_logger_with_config(
logs_root,
"python",
"custom-job",
config
).unwrap();
// Verify job directory was created
let job_dir = logs_root.join("actor/python/job-custom-job");
assert!(job_dir.exists());
// Test logging
tracing::subscriber::with_default(job_logger, || {
info!(target: "python_actor", "Custom job logging");
});
sleep(Duration::from_millis(100)).await;
}
#[tokio::test]
async fn test_multiple_job_loggers() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
// Create multiple job loggers
let job1 = create_job_logger(logs_root, "osis", "job-1").unwrap();
let job2 = create_job_logger(logs_root, "osis", "job-2").unwrap();
let job3 = create_job_logger(logs_root, "sal", "job-3").unwrap();
// Verify all directories were created
assert!(logs_root.join("actor/osis/job-job-1").exists());
assert!(logs_root.join("actor/osis/job-job-2").exists());
assert!(logs_root.join("actor/sal/job-job-3").exists());
// Test isolated logging
tracing::subscriber::with_default(job1, || {
info!(target: "osis_actor", "Job 1 message");
});
tracing::subscriber::with_default(job2, || {
info!(target: "osis_actor", "Job 2 message");
});
tracing::subscriber::with_default(job3, || {
info!(target: "sal_actor", "Job 3 message");
});
sleep(Duration::from_millis(100)).await;
}
}

View File

@@ -1,233 +0,0 @@
//! # Hero Logger
//!
//! A hierarchical logging system for the Hero project that provides:
//! - System-level logging with component-based filtering
//! - Per-job logging with complete isolation
//! - Hourly log rotation
//! - Integration with the tracing ecosystem
//!
//! ## Architecture
//!
//! The logging system uses a hybrid approach:
//! - **System Logger**: Long-lived, captures all non-job-specific logs
//! - **Per-Job Logger**: Short-lived, captures all logs for a single job
//!
//! ## Usage
//!
//! ```rust
//! use hero_logger;
//!
//! // Initialize system logger (once at startup)
//! let components = vec!["supervisor".to_string(), "osis_actor".to_string()];
//! hero_logger::init_system_logger("logs", &components)?;
//!
//! // Use system logging
//! tracing::info!(target: "supervisor", "System started");
//!
//! // Create per-job logger for isolated logging
//! let job_logger = hero_logger::create_job_logger("logs", "osis", "job-123")?;
//! tracing::subscriber::with_default(job_logger, || {
//! tracing::info!(target: "osis_actor", "Job processing started");
//! });
//! ```
use std::path::{Path, PathBuf};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
use tracing_appender::non_blocking::WorkerGuard;
mod system_logger;
mod job_logger;
mod file_appender;
mod utils;
pub mod rhai_integration;
pub use system_logger::*;
pub use job_logger::*;
pub use file_appender::*;
pub use utils::*;
/// Errors that can occur during logging operations
#[derive(thiserror::Error, Debug)]
pub enum LoggerError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Tracing error: {0}")]
Tracing(String),
#[error("Invalid configuration: {0}")]
Config(String),
#[error("Directory creation failed: {0}")]
DirectoryCreation(String),
}
/// Result type for logger operations
pub type Result<T> = std::result::Result<T, LoggerError>;
/// Initialize the system logger with component-based filtering
///
/// This function sets up the global tracing subscriber with multiple file appenders,
/// each filtered by component target. It should be called once at application startup.
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `components` - List of component names that will have dedicated log directories
///
/// # Returns
///
/// Returns a vector of `WorkerGuard`s that must be kept alive for the duration
/// of the application to ensure proper log flushing.
///
/// # Example
///
/// ```rust
/// let components = vec![
/// "supervisor".to_string(),
/// "osis_actor".to_string(),
/// "sal_actor".to_string(),
/// ];
/// let _guards = hero_logger::init_system_logger("logs", &components)?;
/// ```
pub fn init_system_logger<P: AsRef<Path>>(
logs_root: P,
components: &[String],
) -> Result<Vec<WorkerGuard>> {
system_logger::init_system_logger(logs_root, components)
}
/// Create a per-job logger for isolated job logging
///
/// This function creates a temporary tracing subscriber that writes exclusively
/// to a job-specific directory. The subscriber should be used with
/// `tracing::subscriber::with_default()` to scope all logging within a job.
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `actor_type` - Type of actor (e.g., "osis", "sal")
/// * `job_id` - Unique job identifier
///
/// # Returns
///
/// Returns a boxed subscriber that can be used with `with_default()`
///
/// # Example
///
/// ```rust
/// let job_logger = hero_logger::create_job_logger("logs", "osis", "job-abc123")?;
///
/// tracing::subscriber::with_default(job_logger, || {
/// tracing::info!(target: "osis_actor", "Job started");
/// // All tracing calls here go to the job-specific log
/// });
/// ```
pub fn create_job_logger<P: AsRef<Path>>(
logs_root: P,
actor_type: &str,
job_id: &str,
) -> Result<Box<dyn tracing::Subscriber + Send + Sync>> {
job_logger::create_job_logger(logs_root, actor_type, job_id)
}
/// Create a job logger that returns both the subscriber and the guard
///
/// This variant returns both the subscriber and the worker guard, giving
/// the caller control over the guard's lifetime.
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `actor_type` - Type of actor (e.g., "osis", "sal")
/// * `job_id` - Unique job identifier
///
/// # Returns
///
/// Returns a tuple of (subscriber, guard) where the guard must be kept alive
/// for proper log flushing.
pub fn create_job_logger_with_guard<P: AsRef<Path>>(
logs_root: P,
actor_type: &str,
job_id: &str,
) -> Result<(Box<dyn tracing::Subscriber + Send + Sync>, WorkerGuard)> {
job_logger::create_job_logger_with_guard(logs_root, actor_type, job_id)
}
/// Ensure the log directory structure exists
///
/// Creates the necessary directory structure for the logging system:
/// - `logs/supervisor/`
/// - `logs/actor/osis/`
/// - `logs/actor/sal/`
/// - etc.
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `components` - List of component names
pub fn ensure_log_directories<P: AsRef<Path>>(
logs_root: P,
components: &[String],
) -> Result<()> {
utils::ensure_log_directories(logs_root, components)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use tracing::info;
#[tokio::test]
async fn test_system_logger_initialization() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let components = vec![
"supervisor".to_string(),
"test_actor".to_string(),
];
let _guards = init_system_logger(logs_root, &components).unwrap();
// Verify directories were created
assert!(logs_root.join("supervisor").exists());
assert!(logs_root.join("actor/test_actor").exists());
}
#[tokio::test]
async fn test_job_logger_creation() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let job_logger = create_job_logger(logs_root, "test", "job-123").unwrap();
// Verify job directory was created
assert!(logs_root.join("actor/test/job-job-123").exists());
// Test that we can use the logger
tracing::subscriber::with_default(job_logger, || {
info!(target: "test_actor", "Test log message");
});
}
#[tokio::test]
async fn test_directory_creation() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let components = vec![
"supervisor".to_string(),
"osis_actor".to_string(),
"sal_actor".to_string(),
];
ensure_log_directories(logs_root, &components).unwrap();
// Verify all directories exist
assert!(logs_root.join("supervisor").exists());
assert!(logs_root.join("actor/osis_actor").exists());
assert!(logs_root.join("actor/sal_actor").exists());
}
}

View File

@@ -1,411 +0,0 @@
//! Rhai Engine Integration for Logging
//!
//! This module provides integration between Rhai scripts and the tracing logging system,
//! allowing Rhai print() and debug() calls to be captured in the logging infrastructure.
use rhai::{Engine, Dynamic};
use tracing::{info, debug, warn, error};
/// Configure a Rhai engine to capture print and debug output through tracing
///
/// This function sets up custom print and debug hooks that route Rhai script
/// output through the tracing system, allowing it to be captured by both
/// system and per-job loggers.
///
/// # Arguments
///
/// * `engine` - Mutable reference to the Rhai engine to configure
/// * `target` - Target name for tracing (e.g., "osis_actor", "sal_actor")
///
/// # Example
///
/// ```rust
/// use rhai::Engine;
/// use hero_logger::rhai_integration::configure_rhai_logging;
///
/// let mut engine = Engine::new();
/// configure_rhai_logging(&mut engine, "osis_actor");
///
/// // Now when Rhai scripts call print() or debug(), they will be logged
/// engine.eval::<()>(r#"print("Hello from Rhai!");"#).unwrap();
/// ```
pub fn configure_rhai_logging(engine: &mut Engine, target: &str) {
// Use a macro to create the logging functions with constant targets
match target {
"supervisor" => {
engine.on_print(|text| {
info!(target: "supervisor", "[Rhai Script] {}", text);
});
engine.on_debug(|text, source, pos| {
if let Some(source) = source {
if pos.is_none() {
debug!(target: "supervisor", "[Rhai Debug] {} (from {})", text, source);
} else {
debug!(target: "supervisor", "[Rhai Debug] {} (from {} at {:?})", text, source, pos);
}
} else {
debug!(target: "supervisor", "[Rhai Debug] {}", text);
}
});
}
"osis_actor" => {
engine.on_print(|text| {
info!(target: "osis_actor", "[Rhai Script] {}", text);
});
engine.on_debug(|text, source, pos| {
if let Some(source) = source {
if pos.is_none() {
debug!(target: "osis_actor", "[Rhai Debug] {} (from {})", text, source);
} else {
debug!(target: "osis_actor", "[Rhai Debug] {} (from {} at {:?})", text, source, pos);
}
} else {
debug!(target: "osis_actor", "[Rhai Debug] {}", text);
}
});
}
"sal_actor" => {
engine.on_print(|text| {
info!(target: "sal_actor", "[Rhai Script] {}", text);
});
engine.on_debug(|text, source, pos| {
if let Some(source) = source {
if pos.is_none() {
debug!(target: "sal_actor", "[Rhai Debug] {} (from {})", text, source);
} else {
debug!(target: "sal_actor", "[Rhai Debug] {} (from {} at {:?})", text, source, pos);
}
} else {
debug!(target: "sal_actor", "[Rhai Debug] {}", text);
}
});
}
"v_actor" => {
engine.on_print(|text| {
info!(target: "v_actor", "[Rhai Script] {}", text);
});
engine.on_debug(|text, source, pos| {
if let Some(source) = source {
if pos.is_none() {
debug!(target: "v_actor", "[Rhai Debug] {} (from {})", text, source);
} else {
debug!(target: "v_actor", "[Rhai Debug] {} (from {} at {:?})", text, source, pos);
}
} else {
debug!(target: "v_actor", "[Rhai Debug] {}", text);
}
});
}
"python_actor" => {
engine.on_print(|text| {
info!(target: "python_actor", "[Rhai Script] {}", text);
});
engine.on_debug(|text, source, pos| {
if let Some(source) = source {
if pos.is_none() {
debug!(target: "python_actor", "[Rhai Debug] {} (from {})", text, source);
} else {
debug!(target: "python_actor", "[Rhai Debug] {} (from {} at {:?})", text, source, pos);
}
} else {
debug!(target: "python_actor", "[Rhai Debug] {}", text);
}
});
}
_ => {
// Default fallback
engine.on_print(|text| {
info!("[Rhai Script] {}", text);
});
engine.on_debug(|text, source, pos| {
if let Some(source) = source {
if pos.is_none() {
debug!("[Rhai Debug] {} (from {})", text, source);
} else {
debug!("[Rhai Debug] {} (from {} at {:?})", text, source, pos);
}
} else {
debug!("[Rhai Debug] {}", text);
}
});
}
}
}
/// Configure a Rhai engine with enhanced logging capabilities
///
/// This function provides more advanced logging configuration, including
/// custom log levels and structured logging support.
///
/// # Arguments
///
/// * `engine` - Mutable reference to the Rhai engine to configure
/// * `config` - Configuration for Rhai logging behavior
pub fn configure_rhai_logging_advanced(engine: &mut Engine, config: RhaiLoggingConfig) {
// For now, use the basic configuration since tracing requires constant targets
configure_rhai_logging(engine, &config.target);
}
/// Configuration for Rhai logging behavior
#[derive(Debug, Clone)]
pub struct RhaiLoggingConfig {
/// Target name for tracing
pub target: String,
/// Log level for print() calls ("error", "warn", "info", "debug")
pub print_level: String,
/// Log level for debug() calls ("error", "warn", "info", "debug")
pub debug_level: String,
/// Whether to include source file and position information
pub include_source_info: bool,
/// Prefix for all Rhai log messages
pub message_prefix: Option<String>,
}
impl Default for RhaiLoggingConfig {
fn default() -> Self {
Self {
target: "rhai_script".to_string(),
print_level: "info".to_string(),
debug_level: "debug".to_string(),
include_source_info: true,
message_prefix: None,
}
}
}
impl RhaiLoggingConfig {
/// Create a new configuration with the specified target
pub fn new(target: &str) -> Self {
Self {
target: target.to_string(),
..Default::default()
}
}
/// Set the log level for print() calls
pub fn print_level(mut self, level: &str) -> Self {
self.print_level = level.to_string();
self
}
/// Set the log level for debug() calls
pub fn debug_level(mut self, level: &str) -> Self {
self.debug_level = level.to_string();
self
}
/// Set whether to include source information
pub fn include_source_info(mut self, include: bool) -> Self {
self.include_source_info = include;
self
}
/// Set a prefix for all log messages
pub fn message_prefix(mut self, prefix: &str) -> Self {
self.message_prefix = Some(prefix.to_string());
self
}
}
/// Add custom logging functions to a Rhai engine
///
/// This function adds custom logging functions (log_info, log_debug, log_warn, log_error)
/// that Rhai scripts can call directly for more granular logging control.
///
/// # Arguments
///
/// * `engine` - Mutable reference to the Rhai engine
/// * `target` - Target name for tracing
pub fn add_custom_logging_functions(engine: &mut Engine, target: &str) {
// Use match to handle different targets with constant strings
match target {
"supervisor" => {
engine.register_fn("log_info", |message: &str| {
info!(target: "supervisor", "[Rhai] {}", message);
});
engine.register_fn("log_debug", |message: &str| {
debug!(target: "supervisor", "[Rhai] {}", message);
});
engine.register_fn("log_warn", |message: &str| {
warn!(target: "supervisor", "[Rhai] {}", message);
});
engine.register_fn("log_error", |message: &str| {
error!(target: "supervisor", "[Rhai] {}", message);
});
}
"osis_actor" => {
engine.register_fn("log_info", |message: &str| {
info!(target: "osis_actor", "[Rhai] {}", message);
});
engine.register_fn("log_debug", |message: &str| {
debug!(target: "osis_actor", "[Rhai] {}", message);
});
engine.register_fn("log_warn", |message: &str| {
warn!(target: "osis_actor", "[Rhai] {}", message);
});
engine.register_fn("log_error", |message: &str| {
error!(target: "osis_actor", "[Rhai] {}", message);
});
}
"sal_actor" => {
engine.register_fn("log_info", |message: &str| {
info!(target: "sal_actor", "[Rhai] {}", message);
});
engine.register_fn("log_debug", |message: &str| {
debug!(target: "sal_actor", "[Rhai] {}", message);
});
engine.register_fn("log_warn", |message: &str| {
warn!(target: "sal_actor", "[Rhai] {}", message);
});
engine.register_fn("log_error", |message: &str| {
error!(target: "sal_actor", "[Rhai] {}", message);
});
}
"v_actor" => {
engine.register_fn("log_info", |message: &str| {
info!(target: "v_actor", "[Rhai] {}", message);
});
engine.register_fn("log_debug", |message: &str| {
debug!(target: "v_actor", "[Rhai] {}", message);
});
engine.register_fn("log_warn", |message: &str| {
warn!(target: "v_actor", "[Rhai] {}", message);
});
engine.register_fn("log_error", |message: &str| {
error!(target: "v_actor", "[Rhai] {}", message);
});
}
"python_actor" => {
engine.register_fn("log_info", |message: &str| {
info!(target: "python_actor", "[Rhai] {}", message);
});
engine.register_fn("log_debug", |message: &str| {
debug!(target: "python_actor", "[Rhai] {}", message);
});
engine.register_fn("log_warn", |message: &str| {
warn!(target: "python_actor", "[Rhai] {}", message);
});
engine.register_fn("log_error", |message: &str| {
error!(target: "python_actor", "[Rhai] {}", message);
});
}
_ => {
// Default fallback
engine.register_fn("log_info", |message: &str| {
info!("[Rhai] {}", message);
});
engine.register_fn("log_debug", |message: &str| {
debug!("[Rhai] {}", message);
});
engine.register_fn("log_warn", |message: &str| {
warn!("[Rhai] {}", message);
});
engine.register_fn("log_error", |message: &str| {
error!("[Rhai] {}", message);
});
}
}
}
/// Create a Rhai engine with full logging integration
///
/// This is a convenience function that creates a new Rhai engine and configures
/// it with comprehensive logging support.
///
/// # Arguments
///
/// * `target` - Target name for tracing
/// * `include_custom_functions` - Whether to include custom logging functions
///
/// # Returns
///
/// Returns a configured Rhai engine ready for use with logging
pub fn create_logging_enabled_engine(target: &str, include_custom_functions: bool) -> Engine {
let mut engine = Engine::new();
// Configure basic logging
configure_rhai_logging(&mut engine, target);
// Add custom logging functions if requested
if include_custom_functions {
add_custom_logging_functions(&mut engine, target);
}
engine
}
#[cfg(test)]
mod tests {
use super::*;
use tracing_test::traced_test;
#[traced_test]
#[test]
fn test_configure_rhai_logging() {
let mut engine = Engine::new();
configure_rhai_logging(&mut engine, "test_actor");
// Test print output
engine.eval::<()>(r#"print("Hello from Rhai!");"#).unwrap();
// Verify that the log was captured (tracing_test will capture it)
// In a real test, you would check the captured logs
}
#[traced_test]
#[test]
fn test_configure_rhai_logging_advanced() {
let mut engine = Engine::new();
let config = RhaiLoggingConfig::new("test_actor")
.print_level("warn")
.debug_level("info")
.include_source_info(false);
configure_rhai_logging_advanced(&mut engine, config);
// Test print and debug output
engine.eval::<()>(r#"
print("This is a print message");
debug("This is a debug message");
"#).unwrap();
}
#[traced_test]
#[test]
fn test_add_custom_logging_functions() {
let mut engine = Engine::new();
add_custom_logging_functions(&mut engine, "test_actor");
// Test custom logging functions
engine.eval::<()>(r#"
log_info("Info message");
log_debug("Debug message");
log_warn("Warning message");
log_error("Error message");
"#).unwrap();
}
#[test]
fn test_create_logging_enabled_engine() {
let engine = create_logging_enabled_engine("test_actor", true);
// Verify engine was created successfully
// In a real test, you would verify the logging configuration
assert!(engine.eval::<i64>("1 + 1").unwrap() == 2);
}
#[test]
fn test_rhai_logging_config() {
let config = RhaiLoggingConfig::new("test")
.print_level("error")
.debug_level("warn")
.include_source_info(false)
.message_prefix("TEST");
assert_eq!(config.target, "test");
assert_eq!(config.print_level, "error");
assert_eq!(config.debug_level, "warn");
assert!(!config.include_source_info);
assert_eq!(config.message_prefix, Some("TEST".to_string()));
}
}

View File

@@ -1,173 +0,0 @@
//! System Logger Implementation
//!
//! This module implements the system-wide logging functionality that captures
//! all non-job-specific logs from every component with target-based filtering.
use crate::{LoggerError, Result};
use std::path::{Path, PathBuf};
use tracing_subscriber::{
filter::{EnvFilter, LevelFilter},
fmt,
layer::SubscriberExt,
util::SubscriberInitExt,
Layer,
};
use tracing_appender::{non_blocking::WorkerGuard, rolling};
/// Initialize the system logger with component-based filtering
///
/// This creates multiple file appenders, each filtered by a specific tracing target:
/// - `tracing::info!(target: "supervisor", ...)` -> `logs/supervisor/`
/// - `tracing::info!(target: "osis_actor", ...)` -> `logs/actor/osis/`
/// - etc.
pub fn init_system_logger<P: AsRef<Path>>(
logs_root: P,
components: &[String],
) -> Result<Vec<WorkerGuard>> {
let logs_root = logs_root.as_ref();
// Ensure log directories exist
crate::utils::ensure_log_directories(logs_root, components)?;
let mut guards = Vec::new();
let mut layers = Vec::new();
// Create a layer for each component
for component in components {
let (layer, guard) = create_component_layer(logs_root, component)?;
layers.push(layer);
guards.push(guard);
}
// Create the registry with all layers
let registry = tracing_subscriber::registry();
// Add all component layers to the registry
let collected_layers = layers.into_iter().collect::<Vec<_>>();
let registry = registry.with(collected_layers);
// Add console output for development
let console_layer = fmt::layer()
.with_target(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.with_filter(EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into()));
// Set as global default
registry.with(console_layer).init();
tracing::info!(target: "hero_logger", "System logger initialized with {} components", components.len());
Ok(guards)
}
/// Create a filtered layer for a specific component
fn create_component_layer<P: AsRef<Path>>(
logs_root: P,
component: &str,
) -> Result<(Box<dyn Layer<tracing_subscriber::Registry> + Send + Sync>, WorkerGuard)> {
let logs_root = logs_root.as_ref();
// Determine the log directory based on component type
let log_dir = if component == "supervisor" {
logs_root.join("supervisor")
} else {
// Extract actor type from component name (e.g., "osis_actor" -> "osis")
let actor_type = component.strip_suffix("_actor").unwrap_or(component);
logs_root.join("actor").join(actor_type)
};
// Create hourly rolling file appender
let file_appender = rolling::hourly(&log_dir, "log");
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
// Create a formatted layer with target filtering
let layer = fmt::layer()
.with_writer(non_blocking)
.with_target(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.with_ansi(false) // No ANSI colors in log files
.with_filter(
EnvFilter::new(format!("{}=trace", component))
.add_directive(LevelFilter::INFO.into())
);
Ok((layer.boxed(), guard))
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use tracing::{info, warn};
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn test_system_logger_initialization() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let components = vec![
"supervisor".to_string(),
"osis_actor".to_string(),
"sal_actor".to_string(),
];
let _guards = init_system_logger(logs_root, &components).unwrap();
// Test logging to different targets
info!(target: "supervisor", "Supervisor started");
info!(target: "osis_actor", "OSIS actor ready");
info!(target: "sal_actor", "SAL actor ready");
// Give some time for async writing
sleep(Duration::from_millis(100)).await;
// Verify directories were created
assert!(logs_root.join("supervisor").exists());
assert!(logs_root.join("actor/osis").exists());
assert!(logs_root.join("actor/sal").exists());
}
#[tokio::test]
async fn test_component_layer_creation() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
// Create supervisor layer
let (supervisor_layer, _guard1) = create_component_layer(logs_root, "supervisor").unwrap();
assert!(logs_root.join("supervisor").exists());
// Create actor layer
let (actor_layer, _guard2) = create_component_layer(logs_root, "osis_actor").unwrap();
assert!(logs_root.join("actor/osis").exists());
}
#[tokio::test]
async fn test_multiple_components() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let components = vec![
"supervisor".to_string(),
"osis_actor".to_string(),
"sal_actor".to_string(),
"v_actor".to_string(),
"python_actor".to_string(),
];
let guards = init_system_logger(logs_root, &components).unwrap();
assert_eq!(guards.len(), components.len());
// Test that all directories were created
assert!(logs_root.join("supervisor").exists());
assert!(logs_root.join("actor/osis").exists());
assert!(logs_root.join("actor/sal").exists());
assert!(logs_root.join("actor/v").exists());
assert!(logs_root.join("actor/python").exists());
}
}

View File

@@ -1,468 +0,0 @@
//! Utility functions for the Hero Logger
//!
//! This module provides common utility functions used throughout the logging system.
use crate::{LoggerError, Result};
use std::path::{Path, PathBuf};
/// Ensure the log directory structure exists
///
/// Creates the necessary directory structure for the logging system:
/// - `logs/supervisor/`
/// - `logs/actor/osis/`
/// - `logs/actor/sal/`
/// - etc.
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `components` - List of component names
pub fn ensure_log_directories<P: AsRef<Path>>(
logs_root: P,
components: &[String],
) -> Result<()> {
let logs_root = logs_root.as_ref();
// Create the root logs directory
std::fs::create_dir_all(logs_root)
.map_err(|e| LoggerError::DirectoryCreation(
format!("Failed to create logs root directory {}: {}", logs_root.display(), e)
))?;
// Create directories for each component
for component in components {
let component_dir = get_component_log_directory(logs_root, component);
std::fs::create_dir_all(&component_dir)
.map_err(|e| LoggerError::DirectoryCreation(
format!("Failed to create component directory {}: {}", component_dir.display(), e)
))?;
tracing::debug!(
target: "hero_logger",
"Created log directory for component '{}': {}",
component,
component_dir.display()
);
}
tracing::info!(
target: "hero_logger",
"Log directory structure created at: {}",
logs_root.display()
);
Ok(())
}
/// Get the log directory path for a specific component
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `component` - Component name (e.g., "supervisor", "osis_actor")
///
/// # Returns
///
/// Returns the appropriate directory path:
/// - "supervisor" -> `logs/supervisor/`
/// - "osis_actor" -> `logs/actor/osis/`
/// - etc.
pub fn get_component_log_directory<P: AsRef<Path>>(
logs_root: P,
component: &str,
) -> PathBuf {
let logs_root = logs_root.as_ref();
if component == "supervisor" {
logs_root.join("supervisor")
} else {
// Extract actor type from component name (e.g., "osis_actor" -> "osis")
let actor_type = component.strip_suffix("_actor").unwrap_or(component);
logs_root.join("actor").join(actor_type)
}
}
/// Get the job log directory path for a specific job
///
/// # Arguments
///
/// * `logs_root` - Root directory for all log files
/// * `actor_type` - Type of actor (e.g., "osis", "sal")
/// * `job_id` - Unique job identifier
///
/// # Returns
///
/// Returns the job-specific directory path: `logs/actor/<type>/job-<job_id>/`
pub fn get_job_log_directory<P: AsRef<Path>>(
logs_root: P,
actor_type: &str,
job_id: &str,
) -> PathBuf {
logs_root
.as_ref()
.join("actor")
.join(actor_type)
.join(format!("job-{}", job_id))
}
/// Extract actor type from component name
///
/// # Arguments
///
/// * `component` - Component name (e.g., "osis_actor_1", "sal_actor")
///
/// # Returns
///
/// Returns the actor type (e.g., "osis", "sal")
pub fn extract_actor_type(component: &str) -> &str {
// Handle patterns like "osis_actor_1" -> "osis"
if let Some(actor_part) = component.strip_suffix("_actor") {
return actor_part;
}
// Handle patterns like "osis_actor_1" -> "osis"
if component.contains("_actor_") {
if let Some(pos) = component.find("_actor_") {
return &component[..pos];
}
}
// Handle patterns like "osis_actor" -> "osis"
component.strip_suffix("_actor").unwrap_or(component)
}
/// Generate a timestamp string for log file naming
///
/// # Arguments
///
/// * `format` - Timestamp format ("hourly", "daily", or custom format string)
///
/// # Returns
///
/// Returns a formatted timestamp string
pub fn generate_timestamp(format: &str) -> String {
let now = chrono::Utc::now();
match format {
"hourly" => now.format("%Y-%m-%d-%H").to_string(),
"daily" => now.format("%Y-%m-%d").to_string(),
custom => now.format(custom).to_string(),
}
}
/// Clean up old log files in a directory
///
/// # Arguments
///
/// * `directory` - Directory to clean up
/// * `file_pattern` - Pattern to match files (e.g., "*.log")
/// * `max_age_days` - Maximum age in days for files to keep
pub fn cleanup_old_logs<P: AsRef<Path>>(
directory: P,
file_pattern: &str,
max_age_days: u64,
) -> Result<usize> {
let directory = directory.as_ref();
if !directory.exists() {
return Ok(0);
}
let cutoff_time = std::time::SystemTime::now()
.checked_sub(std::time::Duration::from_secs(max_age_days * 24 * 60 * 60))
.ok_or_else(|| LoggerError::Config("Invalid max_age_days value".to_string()))?;
let mut removed_count = 0;
let entries = std::fs::read_dir(directory)
.map_err(|e| LoggerError::Io(e))?;
for entry in entries {
let entry = entry.map_err(|e| LoggerError::Io(e))?;
let path = entry.path();
if path.is_file() {
if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
// Simple pattern matching (could be enhanced with regex)
let matches_pattern = if file_pattern == "*" {
true
} else if file_pattern.starts_with("*.") {
let extension = &file_pattern[2..];
file_name.ends_with(extension)
} else {
file_name.contains(file_pattern)
};
if matches_pattern {
if let Ok(metadata) = entry.metadata() {
if let Ok(modified) = metadata.modified() {
if modified < cutoff_time {
if let Err(e) = std::fs::remove_file(&path) {
tracing::warn!(
target: "hero_logger",
"Failed to remove old log file {}: {}",
path.display(),
e
);
} else {
tracing::debug!(
target: "hero_logger",
"Removed old log file: {}",
path.display()
);
removed_count += 1;
}
}
}
}
}
}
}
}
if removed_count > 0 {
tracing::info!(
target: "hero_logger",
"Cleaned up {} old log files from {}",
removed_count,
directory.display()
);
}
Ok(removed_count)
}
/// Get disk usage information for the logs directory
pub fn get_logs_disk_usage<P: AsRef<Path>>(logs_root: P) -> Result<LogsDiskUsage> {
let logs_root = logs_root.as_ref();
if !logs_root.exists() {
return Ok(LogsDiskUsage {
total_size_bytes: 0,
file_count: 0,
directories: Vec::new(),
});
}
let mut total_size = 0u64;
let mut file_count = 0usize;
let mut directories = Vec::new();
fn scan_directory(
dir: &Path,
total_size: &mut u64,
file_count: &mut usize,
) -> Result<DirectoryUsage> {
let mut dir_size = 0u64;
let mut dir_file_count = 0usize;
let entries = std::fs::read_dir(dir)
.map_err(|e| LoggerError::Io(e))?;
for entry in entries {
let entry = entry.map_err(|e| LoggerError::Io(e))?;
let path = entry.path();
if path.is_file() {
if let Ok(metadata) = entry.metadata() {
let size = metadata.len();
dir_size += size;
*total_size += size;
dir_file_count += 1;
*file_count += 1;
}
} else if path.is_dir() {
let sub_usage = scan_directory(&path, total_size, file_count)?;
dir_size += sub_usage.size_bytes;
dir_file_count += sub_usage.file_count;
}
}
Ok(DirectoryUsage {
path: dir.to_path_buf(),
size_bytes: dir_size,
file_count: dir_file_count,
})
}
let root_usage = scan_directory(logs_root, &mut total_size, &mut file_count)?;
directories.push(root_usage);
Ok(LogsDiskUsage {
total_size_bytes: total_size,
file_count,
directories,
})
}
/// Information about disk usage of logs
#[derive(Debug, Clone)]
pub struct LogsDiskUsage {
pub total_size_bytes: u64,
pub file_count: usize,
pub directories: Vec<DirectoryUsage>,
}
/// Information about disk usage of a specific directory
#[derive(Debug, Clone)]
pub struct DirectoryUsage {
pub path: PathBuf,
pub size_bytes: u64,
pub file_count: usize,
}
impl LogsDiskUsage {
/// Get total size in human-readable format
pub fn total_size_human(&self) -> String {
format_bytes(self.total_size_bytes)
}
}
impl DirectoryUsage {
/// Get size in human-readable format
pub fn size_human(&self) -> String {
format_bytes(self.size_bytes)
}
}
/// Format bytes in human-readable format
fn format_bytes(bytes: u64) -> String {
const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
let mut size = bytes as f64;
let mut unit_index = 0;
while size >= 1024.0 && unit_index < UNITS.len() - 1 {
size /= 1024.0;
unit_index += 1;
}
if unit_index == 0 {
format!("{} {}", bytes, UNITS[unit_index])
} else {
format!("{:.2} {}", size, UNITS[unit_index])
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use std::time::Duration;
#[test]
fn test_ensure_log_directories() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
let components = vec![
"supervisor".to_string(),
"osis_actor".to_string(),
"sal_actor".to_string(),
];
ensure_log_directories(logs_root, &components).unwrap();
assert!(logs_root.join("supervisor").exists());
assert!(logs_root.join("actor/osis").exists());
assert!(logs_root.join("actor/sal").exists());
}
#[test]
fn test_get_component_log_directory() {
let logs_root = Path::new("/logs");
assert_eq!(
get_component_log_directory(logs_root, "supervisor"),
logs_root.join("supervisor")
);
assert_eq!(
get_component_log_directory(logs_root, "osis_actor"),
logs_root.join("actor/osis")
);
assert_eq!(
get_component_log_directory(logs_root, "sal_actor_1"),
logs_root.join("actor/sal_actor_1")
);
}
#[test]
fn test_get_job_log_directory() {
let logs_root = Path::new("/logs");
assert_eq!(
get_job_log_directory(logs_root, "osis", "job-123"),
logs_root.join("actor/osis/job-job-123")
);
}
#[test]
fn test_extract_actor_type() {
assert_eq!(extract_actor_type("osis_actor"), "osis");
assert_eq!(extract_actor_type("sal_actor_1"), "sal");
assert_eq!(extract_actor_type("python_actor"), "python");
assert_eq!(extract_actor_type("supervisor"), "supervisor");
assert_eq!(extract_actor_type("custom"), "custom");
}
#[test]
fn test_generate_timestamp() {
let hourly = generate_timestamp("hourly");
let daily = generate_timestamp("daily");
// Basic format validation
assert!(hourly.len() >= 13); // YYYY-MM-DD-HH
assert!(daily.len() >= 10); // YYYY-MM-DD
// Custom format
let custom = generate_timestamp("%Y%m%d");
assert!(custom.len() == 8); // YYYYMMDD
}
#[test]
fn test_cleanup_old_logs() {
let temp_dir = TempDir::new().unwrap();
let logs_dir = temp_dir.path();
// Create some test log files
for i in 0..5 {
let file_path = logs_dir.join(format!("test{}.log", i));
std::fs::write(&file_path, "test content").unwrap();
}
// Create a non-log file
std::fs::write(logs_dir.join("not_a_log.txt"), "content").unwrap();
// Cleanup with 0 days (should remove all files)
let removed = cleanup_old_logs(logs_dir, "*.log", 0).unwrap();
assert_eq!(removed, 5);
// Verify non-log file still exists
assert!(logs_dir.join("not_a_log.txt").exists());
}
#[test]
fn test_format_bytes() {
assert_eq!(format_bytes(0), "0 B");
assert_eq!(format_bytes(1023), "1023 B");
assert_eq!(format_bytes(1024), "1.00 KB");
assert_eq!(format_bytes(1024 * 1024), "1.00 MB");
assert_eq!(format_bytes(1024 * 1024 * 1024), "1.00 GB");
}
#[test]
fn test_get_logs_disk_usage() {
let temp_dir = TempDir::new().unwrap();
let logs_root = temp_dir.path();
// Create some test files
std::fs::create_dir_all(logs_root.join("supervisor")).unwrap();
std::fs::write(logs_root.join("supervisor/test.log"), "test content").unwrap();
let usage = get_logs_disk_usage(logs_root).unwrap();
assert!(usage.total_size_bytes > 0);
assert!(usage.file_count > 0);
assert!(!usage.directories.is_empty());
}
}

View File

@@ -1,199 +0,0 @@
# Redis Queues Guide: Who Pushes Where, When, and How to Inspect
This guide documents the canonical queues used in the project, explains which component pushes to which queue at each step, and provides redis-cli commands to inspect state during development.
Canonical keys
- Job hash (immutable key shape):
- hero:job:{job_id}
- Builder: [rust.keys::job_hash()](core/job/src/lib.rs:396)
- Work queues (push here to dispatch work):
- Type queue: hero:q:work:type:{script_type}
- Builders:
- [rust.keys::work_type()](core/job/src/lib.rs:405)
- [rust.keys::work_group()](core/job/src/lib.rs:411)
- [rust.keys::work_instance()](core/job/src/lib.rs:420)
- Reply queue (optional, for actors that send explicit replies):
- hero:q:reply:{job_id}
- Builder: [rust.keys::reply()](core/job/src/lib.rs:401)
- Control queue (optional stop/control per-type):
- hero:q:ctl:type:{script_type}
- Builder: [rust.keys::stop_type()](core/job/src/lib.rs:429)
1) Who pushes where
A. Supervisor: creating, starting, and running jobs
- Create job (stores job hash):
- [rust.Supervisor::create_job()](core/supervisor/src/lib.rs:660)
- Persists hero:job:{job_id} via [rust.Job::store_in_redis()](core/job/src/lib.rs:147)
- Start job (dispatch to worker queue):
- [rust.Supervisor::start_job()](core/supervisor/src/lib.rs:675) → [rust.Supervisor::start_job_using_connection()](core/supervisor/src/lib.rs:599)
- LPUSH hero:q:work:type:{script_type} using [rust.keys::work_type()](core/job/src/lib.rs:405)
- Run-and-wait (one-shot):
- [rust.Supervisor::run_job_and_await_result()](core/supervisor/src/lib.rs:689)
- Stores hero:job:{job_id}, LPUSH hero:q:work:type:{script_type} (same as start)
- Waits on hero:q:reply:{job_id} (via [rust.keys::reply()](core/job/src/lib.rs:401)) and also polls hero:job:{job_id} for output to support hash-only actors
B. Terminal UI: quick dispatch from the actor TUI
- Stores job using Job::store_in_redis, then pushes to type queue:
- Dispatch code: [core/actor/src/terminal_ui.rs](core/actor/src/terminal_ui.rs:460)
- LPUSH hero:q:work:type:{script_type} using [rust.keys::work_type()](core/job/src/lib.rs:405)
C. Actors: consuming and completing work
- Consume jobs:
- Standalone Rhai actor: [rust.spawn_rhai_actor()](core/actor/src/lib.rs:211)
- BLPOP hero:q:work:type:{script_type} (queue selection computed via [rust.derive_script_type_from_actor_id()](core/actor/src/lib.rs:262), then [rust.keys::work_type()](core/job/src/lib.rs:405))
- Trait-based actor loop: [rust.Actor::spawn()](core/actor/src/actor_trait.rs:119)
- BLPOP hero:q:work:type:{script_type} using [rust.keys::work_type()](core/job/src/lib.rs:405)
- Write results:
- Hash-only (current default): [rust.Job::set_result()](core/job/src/lib.rs:322) updates hero:job:{job_id} with output and status=finished
- Optional reply queue model: actor may LPUSH hero:q:reply:{job_id} (if implemented)
2) End-to-end flows and the queues involved
Flow A: Two-step (create + start) with Supervisor
- Code path:
- [rust.Supervisor::create_job()](core/supervisor/src/lib.rs:660)
- [rust.Supervisor::start_job()](core/supervisor/src/lib.rs:675)
- Keys touched:
- hero:job:{job_id} (created)
- hero:q:work:type:{script_type} (LPUSH job_id)
- Expected actor behavior:
- BLPOP hero:q:work:type:{script_type}
- Execute script, then [rust.Job::set_result()](core/job/src/lib.rs:322)
- How to inspect with redis-cli:
- FLUSHALL (fresh dev) then run create and start
- Verify job hash:
- HGETALL hero:job:{job_id}
- Verify queue length before consumption:
- LLEN hero:q:work:type:osis
- See pending items:
- LRANGE hero:q:work:type:osis 0 -1
- After actor runs, verify result in job hash:
- HGET hero:job:{job_id} status
- HGET hero:job:{job_id} output
Flow B: One-shot (run and await result) with Supervisor
- Code path:
- [rust.Supervisor::run_job_and_await_result()](core/supervisor/src/lib.rs:689)
- Uses [rust.keys::reply()](core/job/src/lib.rs:401) and polls the hash for output
- Keys touched:
- hero:job:{job_id}
- hero:q:work:type:{script_type}
- hero:q:reply:{job_id} (only if an actor uses reply queues)
- How to inspect with redis-cli:
- While waiting:
- LLEN hero:q:work:type:osis
- HGET hero:job:{job_id} status
- If an actor uses reply queues (optional):
- LLEN hero:q:reply:{job_id}
- LRANGE hero:q:reply:{job_id} 0 -1
- After completion:
- HGET hero:job:{job_id} output
Flow C: Dispatch from the Actor TUI (manual testing)
- Code path:
- [core/actor/src/terminal_ui.rs](core/actor/src/terminal_ui.rs:460) stores job and LPUSH to [rust.keys::work_type()](core/job/src/lib.rs:405)
- Keys touched:
- hero:job:{job_id}
- hero:q:work:type:{script_type}
- How to inspect with redis-cli:
- List all work queues:
- KEYS hero:q:work:type:*
- Show items in a specific type queue:
- LRANGE hero:q:work:type:osis 0 -1
- Read one pending job:
- HGETALL hero:job:{job_id}
- After actor runs:
- HGET hero:job:{job_id} status
- HGET hero:job:{job_id} output
3) Example redis-cli sequences
A. Basic OSIS job lifecycle (two-step)
- Prepare
- FLUSHALL
- Create and start (via code or supervisor-cli)
- Inspect queue and job
- KEYS hero:q:work:type:*
- LLEN hero:q:work:type:osis
- LRANGE hero:q:work:type:osis 0 -1
- HGETALL hero:job:{job_id}
- After actor consumes the job:
- HGET hero:job:{job_id} status → finished
- HGET hero:job:{job_id} output → script result
- LLEN hero:q:work:type:osis → likely 0 if all consumed
B. One-shot run-and-wait (hash-only actor)
- Prepare
- FLUSHALL
- Submit via run_job_and_await_result()
- While supervisor waits:
- HGET hero:job:{job_id} status → started/finished
- (Optional) LLEN hero:q:reply:{job_id} → typically 0 if actor doesnt use reply queues
- When done:
- HGET hero:job:{job_id} output → result
C. Listing and cleanup helpers
- List jobs
- KEYS hero:job:*
- Show a specific job
- HGETALL hero:job:{job_id}
- Clear all keys (dev only)
- FLUSHALL
4) Where the queue names are computed in code
- Builders for canonical keys:
- [rust.keys::job_hash()](core/job/src/lib.rs:396)
- [rust.keys::reply()](core/job/src/lib.rs:401)
- [rust.keys::work_type()](core/job/src/lib.rs:405)
- [rust.keys::work_group()](core/job/src/lib.rs:411)
- [rust.keys::work_instance()](core/job/src/lib.rs:420)
- Supervisor routing and waiting:
- Type queue selection: [rust.Supervisor::get_actor_queue_key()](core/supervisor/src/lib.rs:410)
- LPUSH to type queue: [rust.Supervisor::start_job_using_connection()](core/supervisor/src/lib.rs:599)
- One-shot run and wait: [rust.Supervisor::run_job_and_await_result()](core/supervisor/src/lib.rs:689)
- Actor consumption:
- Standalone Rhai actor: [rust.spawn_rhai_actor()](core/actor/src/lib.rs:211)
- Type queue computed via [rust.derive_script_type_from_actor_id()](core/actor/src/lib.rs:262) + [rust.keys::work_type()](core/job/src/lib.rs:405)
- Trait-based actor loop: [rust.Actor::spawn()](core/actor/src/actor_trait.rs:119)
- BLPOP type queue via [rust.keys::work_type()](core/job/src/lib.rs:405)
5) Quick checklist for debugging
- Nothing consumes from the type queue
- Is at least one actor process running that BLPOPs hero:q:work:type:{script_type}?
- LLEN hero:q:work:type:{script_type} shows > 0 means unconsumed backlog
- Job “Dispatched” but never “Finished”
- HGET hero:job:{job_id} status
- Actor logs: check for script errors and verify it is connected to the same Redis
- “run-and-wait” timeout
- Hash-only actors dont push to reply queues; the supervisor will still return once it sees hero:job:{job_id}.output set by [rust.Job::set_result()](core/job/src/lib.rs:322)
- Mixed types:
- Verify you targeted the correct type queue (e.g., osis vs sal): LLEN hero:q:work:type:osis, hero:q:work:type:sal
6) Canonical patterns to remember
- To dispatch a job:
- LPUSH hero:q:work:type:{script_type} {job_id}
- To read job data:
- HGETALL hero:job:{job_id}
- To wait for output (optional reply model):
- BLPOP hero:q:reply:{job_id} {timeout_secs}
- To verify system state:
- KEYS hero:q:*
- KEYS hero:job:*
This guide reflects the canonical scheme implemented in:
- [rust.Supervisor](core/supervisor/src/lib.rs:1)
- [rust.keys](core/job/src/lib.rs:392)
- [core/actor/src/lib.rs](core/actor/src/lib.rs:1)
- [core/actor/src/actor_trait.rs](core/actor/src/actor_trait.rs:1)
- [core/actor/src/terminal_ui.rs](core/actor/src/terminal_ui.rs:1)

View File

@@ -1,4 +1,4 @@
use heromodels::models::heroledger::rhai::register_heroledger_rhai_modules;
// use heromodels::models::heroledger::rhai::register_heroledger_rhai_modules;
use rhai::Engine;
use rhailib_dsl;
use std::sync::{Arc, OnceLock};
@@ -123,7 +123,7 @@ pub fn register_dsl_modules(engine: &mut Engine) {
// Register basic object functionality directly
register_object_functions(engine);
heromodels::heroledger::rhai::register_heroledger_rhai_modules(&mut engine);
//heromodels::heroledger::rhai::register_heroledger_rhai_modules(&mut engine);
println!("Rhailib Domain Specific Language modules registered successfully.");