refactor wip

This commit is contained in:
Timur Gordon
2025-08-05 12:19:38 +02:00
parent 8ed40ce99c
commit 7a652c9c3c
51 changed files with 6183 additions and 840 deletions

View File

@@ -11,6 +11,26 @@ path = "src/lib.rs"
name = "worker"
path = "cmd/worker.rs"
[[bin]]
name = "osis"
path = "cmd/osis.rs"
[[bin]]
name = "system"
path = "cmd/system.rs"
[[example]]
name = "trait_based_worker_demo"
path = "examples/trait_based_worker_demo.rs"
[[example]]
name = "osis_worker_demo"
path = "examples/osis_worker_demo.rs"
[[example]]
name = "system_worker_demo"
path = "examples/system_worker_demo.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
@@ -24,6 +44,9 @@ env_logger = "0.10"
clap = { version = "4.4", features = ["derive"] }
uuid = { version = "1.6", features = ["v4", "serde"] } # Though task_id is string, uuid might be useful
chrono = { version = "0.4", features = ["serde"] }
toml = "0.8"
thiserror = "1.0"
async-trait = "0.1"
hero_supervisor = { path = "../supervisor" }
hero_job = { path = "../job" }
heromodels = { path = "../../../db/heromodels", features = ["rhai"] }

233
core/worker/cmd/osis.rs Normal file
View File

@@ -0,0 +1,233 @@
//! OSIS Worker Binary - Synchronous worker for system-level operations
use clap::Parser;
use log::{error, info};
use rhailib_worker::config::{ConfigError, WorkerConfig};
use rhailib_worker::engine::create_heromodels_engine;
use rhailib_worker::sync_worker::SyncWorker;
use rhailib_worker::worker_trait::{spawn_worker, WorkerConfig as TraitWorkerConfig};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::signal;
use tokio::sync::mpsc;
#[derive(Parser, Debug)]
#[command(
name = "osis",
version = "0.1.0",
about = "OSIS (Operating System Integration Service) - Synchronous Worker",
long_about = "A synchronous worker for Hero framework that processes jobs sequentially. \
Ideal for system-level operations that require careful resource management."
)]
struct Args {
/// Path to TOML configuration file
#[arg(short, long, help = "Path to TOML configuration file")]
config: PathBuf,
/// Override worker ID from config
#[arg(long, help = "Override worker ID from configuration file")]
worker_id: Option<String>,
/// Override Redis URL from config
#[arg(long, help = "Override Redis URL from configuration file")]
redis_url: Option<String>,
/// Override database path from config
#[arg(long, help = "Override database path from configuration file")]
db_path: Option<String>,
/// Enable verbose logging (debug level)
#[arg(short, long, help = "Enable verbose logging")]
verbose: bool,
/// Disable timestamps in log output
#[arg(long, help = "Remove timestamps from log output")]
no_timestamp: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let args = Args::parse();
// Load configuration from TOML file
let mut config = match WorkerConfig::from_file(&args.config) {
Ok(config) => config,
Err(e) => {
eprintln!("Failed to load configuration from {:?}: {}", args.config, e);
std::process::exit(1);
}
};
// Validate that this is a sync worker configuration
if !config.is_sync() {
eprintln!("Error: OSIS worker requires a sync worker configuration");
eprintln!("Expected: [worker_type] type = \"sync\"");
eprintln!("Found: {:?}", config.worker_type);
std::process::exit(1);
}
// Apply command line overrides
if let Some(worker_id) = args.worker_id {
config.worker_id = worker_id;
}
if let Some(redis_url) = args.redis_url {
config.redis_url = redis_url;
}
if let Some(db_path) = args.db_path {
config.db_path = db_path;
}
// Configure logging
setup_logging(&config, args.verbose, args.no_timestamp)?;
info!("🚀 OSIS Worker starting...");
info!("Worker ID: {}", config.worker_id);
info!("Redis URL: {}", config.redis_url);
info!("Database Path: {}", config.db_path);
info!("Preserve Tasks: {}", config.preserve_tasks);
// Create Rhai engine
let engine = create_heromodels_engine();
info!("✅ Rhai engine initialized");
// Create worker configuration for the trait-based interface
let worker_config = TraitWorkerConfig::new(
config.worker_id.clone(),
config.db_path.clone(),
config.redis_url.clone(),
config.preserve_tasks,
);
// Create sync worker instance
let worker = Arc::new(SyncWorker::default());
info!("✅ Sync worker instance created");
// Setup shutdown signal handling
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
// Spawn shutdown signal handler
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
if let Err(e) = signal::ctrl_c().await {
error!("Failed to listen for shutdown signal: {}", e);
return;
}
info!("🛑 Shutdown signal received");
if let Err(e) = shutdown_tx_clone.send(()).await {
error!("Failed to send shutdown signal: {}", e);
}
});
// Spawn the worker
info!("🔄 Starting worker loop...");
let worker_handle = spawn_worker(worker, engine, shutdown_rx);
// Wait for the worker to complete
match worker_handle.await {
Ok(Ok(())) => {
info!("✅ OSIS Worker shut down gracefully");
}
Ok(Err(e)) => {
error!("❌ OSIS Worker encountered an error: {}", e);
std::process::exit(1);
}
Err(e) => {
error!("❌ Failed to join worker task: {}", e);
std::process::exit(1);
}
}
Ok(())
}
/// Setup logging based on configuration and command line arguments
fn setup_logging(
config: &WorkerConfig,
verbose: bool,
no_timestamp: bool,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut builder = env_logger::Builder::new();
// Determine log level
let log_level = if verbose {
"debug"
} else {
&config.logging.level
};
// Set log level
builder.filter_level(match log_level.to_lowercase().as_str() {
"trace" => log::LevelFilter::Trace,
"debug" => log::LevelFilter::Debug,
"info" => log::LevelFilter::Info,
"warn" => log::LevelFilter::Warn,
"error" => log::LevelFilter::Error,
_ => {
eprintln!("Invalid log level: {}. Using 'info'", log_level);
log::LevelFilter::Info
}
});
// Configure timestamps
let show_timestamps = !no_timestamp && config.logging.timestamps;
if !show_timestamps {
builder.format_timestamp(None);
}
builder.init();
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_config_validation() {
let config_toml = r#"
worker_id = "test_osis"
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[worker_type]
type = "sync"
[logging]
level = "info"
"#;
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_toml.as_bytes()).unwrap();
let config = WorkerConfig::from_file(temp_file.path()).unwrap();
assert!(config.is_sync());
assert!(!config.is_async());
assert_eq!(config.worker_id, "test_osis");
}
#[test]
fn test_async_config_rejection() {
let config_toml = r#"
worker_id = "test_osis"
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[worker_type]
type = "async"
default_timeout_seconds = 300
[logging]
level = "info"
"#;
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_toml.as_bytes()).unwrap();
let config = WorkerConfig::from_file(temp_file.path()).unwrap();
assert!(!config.is_sync());
assert!(config.is_async());
// This would be rejected in main() function
}
}

302
core/worker/cmd/system.rs Normal file
View File

@@ -0,0 +1,302 @@
//! System Worker Binary - Asynchronous worker for high-throughput concurrent processing
use clap::Parser;
use log::{error, info, warn};
use rhailib_worker::async_worker_impl::AsyncWorker;
use rhailib_worker::config::{ConfigError, WorkerConfig};
use rhailib_worker::engine::create_heromodels_engine;
use rhailib_worker::worker_trait::{spawn_worker, WorkerConfig as TraitWorkerConfig};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::signal;
use tokio::sync::mpsc;
#[derive(Parser, Debug)]
#[command(
name = "system",
version = "0.1.0",
about = "System Worker - Asynchronous Worker with Concurrent Job Processing",
long_about = "An asynchronous worker for Hero framework that processes multiple jobs \
concurrently with timeout support. Ideal for high-throughput scenarios \
where jobs can be executed in parallel."
)]
struct Args {
/// Path to TOML configuration file
#[arg(short, long, help = "Path to TOML configuration file")]
config: PathBuf,
/// Override worker ID from config
#[arg(long, help = "Override worker ID from configuration file")]
worker_id: Option<String>,
/// Override Redis URL from config
#[arg(long, help = "Override Redis URL from configuration file")]
redis_url: Option<String>,
/// Override database path from config
#[arg(long, help = "Override database path from configuration file")]
db_path: Option<String>,
/// Override default timeout in seconds
#[arg(long, help = "Override default job timeout in seconds")]
timeout: Option<u64>,
/// Enable verbose logging (debug level)
#[arg(short, long, help = "Enable verbose logging")]
verbose: bool,
/// Disable timestamps in log output
#[arg(long, help = "Remove timestamps from log output")]
no_timestamp: bool,
/// Show worker statistics periodically
#[arg(long, help = "Show periodic worker statistics")]
show_stats: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let args = Args::parse();
// Load configuration from TOML file
let mut config = match WorkerConfig::from_file(&args.config) {
Ok(config) => config,
Err(e) => {
eprintln!("Failed to load configuration from {:?}: {}", args.config, e);
std::process::exit(1);
}
};
// Validate that this is an async worker configuration
if !config.is_async() {
eprintln!("Error: System worker requires an async worker configuration");
eprintln!("Expected: [worker_type] type = \"async\"");
eprintln!("Found: {:?}", config.worker_type);
std::process::exit(1);
}
// Apply command line overrides
if let Some(worker_id) = args.worker_id {
config.worker_id = worker_id;
}
if let Some(redis_url) = args.redis_url {
config.redis_url = redis_url;
}
if let Some(db_path) = args.db_path {
config.db_path = db_path;
}
// Override timeout if specified
if let Some(timeout_secs) = args.timeout {
if let rhailib_worker::config::WorkerType::Async { ref mut default_timeout_seconds } = config.worker_type {
*default_timeout_seconds = timeout_secs;
}
}
// Configure logging
setup_logging(&config, args.verbose, args.no_timestamp)?;
info!("🚀 System Worker starting...");
info!("Worker ID: {}", config.worker_id);
info!("Redis URL: {}", config.redis_url);
info!("Database Path: {}", config.db_path);
info!("Preserve Tasks: {}", config.preserve_tasks);
if let Some(timeout) = config.get_default_timeout() {
info!("Default Timeout: {:?}", timeout);
}
// Create Rhai engine
let engine = create_heromodels_engine();
info!("✅ Rhai engine initialized");
// Create worker configuration for the trait-based interface
let mut worker_config = TraitWorkerConfig::new(
config.worker_id.clone(),
config.db_path.clone(),
config.redis_url.clone(),
config.preserve_tasks,
);
// Add timeout configuration for async worker
if let Some(timeout) = config.get_default_timeout() {
worker_config = worker_config.with_default_timeout(timeout);
}
// Create async worker instance
let worker = Arc::new(AsyncWorker::default());
info!("✅ Async worker instance created");
// Setup shutdown signal handling
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
// Spawn shutdown signal handler
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
if let Err(e) = signal::ctrl_c().await {
error!("Failed to listen for shutdown signal: {}", e);
return;
}
info!("🛑 Shutdown signal received");
if let Err(e) = shutdown_tx_clone.send(()).await {
error!("Failed to send shutdown signal: {}", e);
}
});
// Spawn statistics reporter if requested
if args.show_stats {
let worker_stats = Arc::clone(&worker);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
let running_count = worker_stats.running_job_count().await;
if running_count > 0 {
info!("📊 Worker Stats: {} jobs currently running", running_count);
} else {
info!("📊 Worker Stats: No jobs currently running");
}
}
});
}
// Spawn the worker
info!("🔄 Starting worker loop...");
let worker_handle = spawn_worker(worker, engine, shutdown_rx);
// Wait for the worker to complete
match worker_handle.await {
Ok(Ok(())) => {
info!("✅ System Worker shut down gracefully");
}
Ok(Err(e)) => {
error!("❌ System Worker encountered an error: {}", e);
std::process::exit(1);
}
Err(e) => {
error!("❌ Failed to join worker task: {}", e);
std::process::exit(1);
}
}
Ok(())
}
/// Setup logging based on configuration and command line arguments
fn setup_logging(
config: &WorkerConfig,
verbose: bool,
no_timestamp: bool,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut builder = env_logger::Builder::new();
// Determine log level
let log_level = if verbose {
"debug"
} else {
&config.logging.level
};
// Set log level
builder.filter_level(match log_level.to_lowercase().as_str() {
"trace" => log::LevelFilter::Trace,
"debug" => log::LevelFilter::Debug,
"info" => log::LevelFilter::Info,
"warn" => log::LevelFilter::Warn,
"error" => log::LevelFilter::Error,
_ => {
warn!("Invalid log level: {}. Using 'info'", log_level);
log::LevelFilter::Info
}
});
// Configure timestamps
let show_timestamps = !no_timestamp && config.logging.timestamps;
if !show_timestamps {
builder.format_timestamp(None);
}
builder.init();
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_config_validation() {
let config_toml = r#"
worker_id = "test_system"
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[worker_type]
type = "async"
default_timeout_seconds = 600
[logging]
level = "info"
"#;
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_toml.as_bytes()).unwrap();
let config = WorkerConfig::from_file(temp_file.path()).unwrap();
assert!(!config.is_sync());
assert!(config.is_async());
assert_eq!(config.worker_id, "test_system");
assert_eq!(config.get_default_timeout(), Some(Duration::from_secs(600)));
}
#[test]
fn test_sync_config_rejection() {
let config_toml = r#"
worker_id = "test_system"
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[worker_type]
type = "sync"
[logging]
level = "info"
"#;
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_toml.as_bytes()).unwrap();
let config = WorkerConfig::from_file(temp_file.path()).unwrap();
assert!(config.is_sync());
assert!(!config.is_async());
// This would be rejected in main() function
}
#[test]
fn test_timeout_override() {
let config_toml = r#"
worker_id = "test_system"
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[worker_type]
type = "async"
default_timeout_seconds = 300
"#;
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_toml.as_bytes()).unwrap();
let mut config = WorkerConfig::from_file(temp_file.path()).unwrap();
assert_eq!(config.get_default_timeout(), Some(Duration::from_secs(300)));
// Test timeout override
if let rhailib_worker::config::WorkerType::Async { ref mut default_timeout_seconds } = config.worker_type {
*default_timeout_seconds = 600;
}
assert_eq!(config.get_default_timeout(), Some(Duration::from_secs(600)));
}
}

View File

@@ -0,0 +1,197 @@
# Worker Examples
This directory contains example configurations and test scripts for both OSIS and System worker binaries.
## Overview
Both examples demonstrate the ping/pong functionality built into the Hero workers:
- Workers automatically detect jobs with script content "ping"
- They respond immediately with "pong" without executing the Rhai engine
- This provides a fast health check and connectivity test mechanism
## Prerequisites
1. **Redis Server**: Both examples require a running Redis server
```bash
# Install Redis (macOS)
brew install redis
# Start Redis server
redis-server
```
2. **Rust Environment**: Make sure you can build the worker binaries
```bash
cd /path/to/herocode/hero/core/worker
cargo build --bin osis --bin system
```
## OSIS Worker Example
**Location**: `examples/osis/`
The OSIS (Operating System Integration Service) worker processes jobs synchronously, one at a time.
### Files
- `config.toml` - Configuration for the OSIS worker
- `example.sh` - Test script that demonstrates ping/pong functionality
### Usage
```bash
cd examples/osis
./example.sh
```
### What the script does:
1. Checks Redis connectivity
2. Cleans up any existing jobs
3. Starts the OSIS worker in the background
4. Sends 3 ping jobs sequentially
5. Verifies each job receives a "pong" response
6. Reports success/failure statistics
7. Cleans up worker and Redis data
### Expected Output
```
=== OSIS Worker Example ===
✅ Redis is running
✅ OSIS worker started (PID: 12345)
📤 Sending ping job: ping_job_1_1234567890
✅ Job ping_job_1_1234567890 completed successfully with result: pong
...
🎉 All tests passed! OSIS worker is working correctly.
```
## System Worker Example
**Location**: `examples/system/`
The System worker processes jobs asynchronously, handling multiple jobs concurrently.
### Files
- `config.toml` - Configuration for the System worker (includes async settings)
- `example.sh` - Test script that demonstrates concurrent ping/pong functionality
### Usage
```bash
cd examples/system
./example.sh
```
### What the script does:
1. Checks Redis connectivity
2. Cleans up any existing jobs
3. Starts the System worker with stats reporting
4. Sends 5 concurrent ping jobs
5. Sends 10 rapid-fire ping jobs to test async capabilities
6. Verifies all jobs receive "pong" responses
7. Reports comprehensive success/failure statistics
8. Cleans up worker and Redis data
### Expected Output
```
=== System Worker Example ===
✅ Redis is running
✅ System worker started (PID: 12345)
📤 Sending ping job: ping_job_1_1234567890123
✅ Job ping_job_1_1234567890123 completed successfully with result: pong
...
🎉 All tests passed! System worker is handling concurrent jobs correctly.
Overall success rate: 15/15
```
## Configuration Details
### OSIS Configuration (`examples/osis/config.toml`)
```toml
worker_id = "osis_example_worker"
redis_url = "redis://localhost:6379"
db_path = "/tmp/osis_example_db"
preserve_tasks = false
[worker_type]
type = "sync"
[logging]
timestamps = true
level = "info"
```
### System Configuration (`examples/system/config.toml`)
```toml
worker_id = "system_example_worker"
redis_url = "redis://localhost:6379"
db_path = "/tmp/system_example_db"
preserve_tasks = false
[worker_type]
type = "async"
default_timeout_seconds = 30
[logging]
timestamps = true
level = "info"
```
## Key Differences
| Feature | OSIS Worker | System Worker |
|---------|-------------|---------------|
| **Processing** | Sequential (one job at a time) | Concurrent (multiple jobs simultaneously) |
| **Use Case** | System-level operations requiring resource management | High-throughput job processing |
| **Timeout** | No timeout configuration | Configurable job timeouts |
| **Stats** | Basic logging | Optional statistics reporting (`--show-stats`) |
| **Job Handling** | Blocking job execution | Non-blocking async job execution |
## Troubleshooting
### Redis Connection Issues
```bash
# Check if Redis is running
redis-cli ping
# Check Redis logs
redis-server --loglevel verbose
```
### Worker Compilation Issues
```bash
# Clean and rebuild
cargo clean
cargo build --bin osis --bin system
```
### Job Processing Issues
- Check Redis for stuck jobs: `redis-cli keys "hero:*"`
- Clear all Hero jobs: `redis-cli eval "return redis.call('del', unpack(redis.call('keys', 'hero:*')))" 0`
- Check worker logs for detailed error messages
## Extending the Examples
### Adding Custom Jobs
To test with custom Rhai scripts instead of ping jobs:
1. Modify the job creation in the shell scripts:
```bash
# Replace "ping" with your Rhai script
redis-cli -u "$REDIS_URL" hset "hero:job:$job_id" \
script "your_rhai_script_here"
```
2. Update result verification to expect your script's output instead of "pong"
### Testing Different Configurations
- Modify `config.toml` files to test different Redis URLs, database paths, or logging levels
- Test with `preserve_tasks = true` to inspect job details after completion
- Adjust timeout values in the System worker configuration
## Architecture Notes
Both examples demonstrate the unified Worker trait architecture:
- **Common Interface**: Both workers implement the same `Worker` trait
- **Ping/Pong Handling**: Built into the trait's `spawn` method before job delegation
- **Redis Integration**: Uses the shared Job struct from `hero_job` crate
- **Configuration**: TOML-based configuration with CLI overrides
- **Graceful Shutdown**: Both workers handle SIGTERM/SIGINT properly
This architecture allows for easy extension with new worker types while maintaining consistent behavior and configuration patterns.

View File

@@ -0,0 +1,11 @@
worker_id = "osis_example_worker"
redis_url = "redis://localhost:6379"
db_path = "/tmp/osis_example_db"
preserve_tasks = false
[worker_type]
type = "sync"
[logging]
timestamps = true
level = "info"

View File

@@ -0,0 +1,138 @@
#!/bin/bash
# OSIS Worker Example Script
# This script demonstrates the OSIS worker by:
# 1. Starting the worker with the config.toml
# 2. Sending ping jobs to Redis
# 3. Verifying pong responses
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
CONFIG_FILE="$SCRIPT_DIR/config.toml"
WORKER_ID="osis_example_worker"
REDIS_URL="redis://localhost:6379"
echo "=== OSIS Worker Example ==="
echo "Script directory: $SCRIPT_DIR"
echo "Config file: $CONFIG_FILE"
echo "Worker ID: $WORKER_ID"
echo "Redis URL: $REDIS_URL"
echo
# Check if Redis is running
echo "Checking Redis connection..."
if ! redis-cli -u "$REDIS_URL" ping > /dev/null 2>&1; then
echo "❌ Error: Redis is not running or not accessible at $REDIS_URL"
echo "Please start Redis server first: redis-server"
exit 1
fi
echo "✅ Redis is running"
echo
# Clean up any existing jobs in the queue
echo "Cleaning up existing jobs in Redis..."
redis-cli -u "$REDIS_URL" del "hero:jobs:$WORKER_ID" > /dev/null 2>&1 || true
redis-cli -u "$REDIS_URL" eval "return redis.call('del', unpack(redis.call('keys', 'hero:job:*')))" 0 > /dev/null 2>&1 || true
echo "✅ Redis queues cleaned"
echo
# Start the OSIS worker in the background
echo "Starting OSIS worker..."
cd "$SCRIPT_DIR/../.."
cargo run --bin osis -- --config "$CONFIG_FILE" &
WORKER_PID=$!
echo "✅ OSIS worker started (PID: $WORKER_PID)"
echo
# Wait a moment for the worker to initialize
echo "Waiting for worker to initialize..."
sleep 3
# Function to send a ping job and check for pong response
send_ping_job() {
local job_num=$1
local job_id="ping_job_${job_num}_$(date +%s)"
echo "📤 Sending ping job: $job_id"
# Create job in Redis
redis-cli -u "$REDIS_URL" hset "hero:job:$job_id" \
id "$job_id" \
script "ping" \
status "Queued" \
created_at "$(date -u +%Y-%m-%dT%H:%M:%SZ)" \
worker_id "$WORKER_ID" > /dev/null
# Add job to worker queue
redis-cli -u "$REDIS_URL" lpush "hero:jobs:$WORKER_ID" "$job_id" > /dev/null
# Wait for job completion and check result
local timeout=10
local elapsed=0
while [ $elapsed -lt $timeout ]; do
local status=$(redis-cli -u "$REDIS_URL" hget "hero:job:$job_id" status 2>/dev/null || echo "")
if [ "$status" = "Finished" ]; then
local result=$(redis-cli -u "$REDIS_URL" hget "hero:job:$job_id" result 2>/dev/null || echo "")
if [ "$result" = "pong" ]; then
echo "✅ Job $job_id completed successfully with result: $result"
return 0
else
echo "❌ Job $job_id completed but with unexpected result: $result"
return 1
fi
elif [ "$status" = "Error" ]; then
local error=$(redis-cli -u "$REDIS_URL" hget "hero:job:$job_id" error 2>/dev/null || echo "")
echo "❌ Job $job_id failed with error: $error"
return 1
fi
sleep 1
elapsed=$((elapsed + 1))
done
echo "❌ Job $job_id timed out after ${timeout}s"
return 1
}
# Send multiple ping jobs to test the worker
echo "Testing ping/pong functionality..."
success_count=0
total_jobs=3
for i in $(seq 1 $total_jobs); do
echo
echo "--- Test $i/$total_jobs ---"
if send_ping_job $i; then
success_count=$((success_count + 1))
fi
sleep 1
done
echo
echo "=== Test Results ==="
echo "Successful ping/pong tests: $success_count/$total_jobs"
if [ $success_count -eq $total_jobs ]; then
echo "🎉 All tests passed! OSIS worker is working correctly."
exit_code=0
else
echo "⚠️ Some tests failed. Check the worker logs for details."
exit_code=1
fi
# Clean up
echo
echo "Cleaning up..."
echo "Stopping OSIS worker (PID: $WORKER_PID)..."
kill $WORKER_PID 2>/dev/null || true
wait $WORKER_PID 2>/dev/null || true
echo "✅ Worker stopped"
echo "Cleaning up Redis jobs..."
redis-cli -u "$REDIS_URL" del "hero:jobs:$WORKER_ID" > /dev/null 2>&1 || true
redis-cli -u "$REDIS_URL" eval "return redis.call('del', unpack(redis.call('keys', 'hero:job:*')))" 0 > /dev/null 2>&1 || true
echo "✅ Redis cleaned up"
echo
echo "=== OSIS Worker Example Complete ==="
exit $exit_code

View File

@@ -0,0 +1,14 @@
# OSIS Worker Configuration
# Synchronous worker for system-level operations
worker_id = "osis_worker_1"
redis_url = "redis://localhost:6379"
db_path = "/tmp/osis_worker_db"
preserve_tasks = false
[worker_type]
type = "sync"
[logging]
timestamps = true
level = "info"

View File

@@ -0,0 +1,60 @@
use std::process::{Command, Stdio};
use std::path::Path;
use std::env;
use std::io::{self, Write};
/// OSIS Worker Demo Runner
///
/// This Rust wrapper executes the OSIS worker bash script example.
/// It provides a way to run shell-based examples through Cargo.
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("🚀 OSIS Worker Demo");
println!("==================");
println!();
// Get the current working directory and construct the path to the shell script
let current_dir = env::current_dir()?;
let script_path = current_dir.join("examples").join("osis").join("example.sh");
// Check if the script exists
if !script_path.exists() {
eprintln!("❌ Error: Script not found at {:?}", script_path);
eprintln!(" Make sure you're running this from the worker crate root directory.");
std::process::exit(1);
}
println!("📁 Script location: {:?}", script_path);
println!("🔧 Executing OSIS worker example...");
println!();
// Make sure the script is executable
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&script_path)?.permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&script_path, perms)?;
}
// Execute the shell script
let mut child = Command::new("bash")
.arg(&script_path)
.current_dir(&current_dir)
.stdin(Stdio::inherit())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()?;
// Wait for the script to complete
let status = child.wait()?;
println!();
if status.success() {
println!("✅ OSIS worker demo completed successfully!");
} else {
println!("❌ OSIS worker demo failed with exit code: {:?}", status.code());
std::process::exit(status.code().unwrap_or(1));
}
Ok(())
}

View File

@@ -0,0 +1,12 @@
worker_id = "system_example_worker"
redis_url = "redis://localhost:6379"
db_path = "/tmp/system_example_db"
preserve_tasks = false
[worker_type]
type = "async"
default_timeout_seconds = 30
[logging]
timestamps = true
level = "info"

View File

@@ -0,0 +1,183 @@
#!/bin/bash
# System Worker Example Script
# This script demonstrates the System worker by:
# 1. Starting the worker with the config.toml
# 2. Sending multiple concurrent ping jobs to Redis
# 3. Verifying pong responses
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
CONFIG_FILE="$SCRIPT_DIR/config.toml"
WORKER_ID="system_example_worker"
REDIS_URL="redis://localhost:6379"
echo "=== System Worker Example ==="
echo "Script directory: $SCRIPT_DIR"
echo "Config file: $CONFIG_FILE"
echo "Worker ID: $WORKER_ID"
echo "Redis URL: $REDIS_URL"
echo
# Check if Redis is running
echo "Checking Redis connection..."
if ! redis-cli -u "$REDIS_URL" ping > /dev/null 2>&1; then
echo "❌ Error: Redis is not running or not accessible at $REDIS_URL"
echo "Please start Redis server first: redis-server"
exit 1
fi
echo "✅ Redis is running"
echo
# Clean up any existing jobs in the queue
echo "Cleaning up existing jobs in Redis..."
redis-cli -u "$REDIS_URL" del "hero:jobs:$WORKER_ID" > /dev/null 2>&1 || true
redis-cli -u "$REDIS_URL" eval "return redis.call('del', unpack(redis.call('keys', 'hero:job:*')))" 0 > /dev/null 2>&1 || true
echo "✅ Redis queues cleaned"
echo
# Start the System worker in the background
echo "Starting System worker..."
cd "$SCRIPT_DIR/../.."
cargo run --bin system -- --config "$CONFIG_FILE" --show-stats &
WORKER_PID=$!
echo "✅ System worker started (PID: $WORKER_PID)"
echo
# Wait a moment for the worker to initialize
echo "Waiting for worker to initialize..."
sleep 3
# Function to send a ping job (non-blocking)
send_ping_job() {
local job_num=$1
local job_id="ping_job_${job_num}_$(date +%s%N)"
echo "📤 Sending ping job: $job_id"
# Create job in Redis
redis-cli -u "$REDIS_URL" hset "hero:job:$job_id" \
id "$job_id" \
script "ping" \
status "Queued" \
created_at "$(date -u +%Y-%m-%dT%H:%M:%SZ)" \
worker_id "$WORKER_ID" > /dev/null
# Add job to worker queue
redis-cli -u "$REDIS_URL" lpush "hero:jobs:$WORKER_ID" "$job_id" > /dev/null
echo "$job_id"
}
# Function to check job result
check_job_result() {
local job_id=$1
local timeout=15
local elapsed=0
while [ $elapsed -lt $timeout ]; do
local status=$(redis-cli -u "$REDIS_URL" hget "hero:job:$job_id" status 2>/dev/null || echo "")
if [ "$status" = "Finished" ]; then
local result=$(redis-cli -u "$REDIS_URL" hget "hero:job:$job_id" result 2>/dev/null || echo "")
if [ "$result" = "pong" ]; then
echo "✅ Job $job_id completed successfully with result: $result"
return 0
else
echo "❌ Job $job_id completed but with unexpected result: $result"
return 1
fi
elif [ "$status" = "Error" ]; then
local error=$(redis-cli -u "$REDIS_URL" hget "hero:job:$job_id" error 2>/dev/null || echo "")
echo "❌ Job $job_id failed with error: $error"
return 1
fi
sleep 0.5
elapsed=$((elapsed + 1))
done
echo "❌ Job $job_id timed out after ${timeout}s"
return 1
}
# Send multiple concurrent ping jobs to test async processing
echo "Testing concurrent ping/pong functionality..."
total_jobs=5
job_ids=()
echo
echo "--- Sending $total_jobs concurrent ping jobs ---"
for i in $(seq 1 $total_jobs); do
job_id=$(send_ping_job $i)
job_ids+=("$job_id")
sleep 0.1 # Small delay between job submissions
done
echo
echo "--- Waiting for all jobs to complete ---"
success_count=0
for job_id in "${job_ids[@]}"; do
echo "Checking job: $job_id"
if check_job_result "$job_id"; then
success_count=$((success_count + 1))
fi
done
echo
echo "=== Test Results ==="
echo "Successful concurrent ping/pong tests: $success_count/$total_jobs"
if [ $success_count -eq $total_jobs ]; then
echo "🎉 All tests passed! System worker is handling concurrent jobs correctly."
exit_code=0
else
echo "⚠️ Some tests failed. Check the worker logs for details."
exit_code=1
fi
# Test rapid job submission to showcase async capabilities
echo
echo "--- Testing rapid job submission (10 jobs in quick succession) ---"
rapid_jobs=10
rapid_job_ids=()
for i in $(seq 1 $rapid_jobs); do
job_id=$(send_ping_job "rapid_$i")
rapid_job_ids+=("$job_id")
done
echo "Waiting for rapid jobs to complete..."
rapid_success=0
for job_id in "${rapid_job_ids[@]}"; do
if check_job_result "$job_id"; then
rapid_success=$((rapid_success + 1))
fi
done
echo "Rapid submission test: $rapid_success/$rapid_jobs successful"
# Clean up
echo
echo "Cleaning up..."
echo "Stopping System worker (PID: $WORKER_PID)..."
kill $WORKER_PID 2>/dev/null || true
wait $WORKER_PID 2>/dev/null || true
echo "✅ Worker stopped"
echo "Cleaning up Redis jobs..."
redis-cli -u "$REDIS_URL" del "hero:jobs:$WORKER_ID" > /dev/null 2>&1 || true
redis-cli -u "$REDIS_URL" eval "return redis.call('del', unpack(redis.call('keys', 'hero:job:*')))" 0 > /dev/null 2>&1 || true
echo "✅ Redis cleaned up"
echo
echo "=== System Worker Example Complete ==="
total_success=$((success_count + rapid_success))
total_tests=$((total_jobs + rapid_jobs))
echo "Overall success rate: $total_success/$total_tests"
if [ $total_success -eq $total_tests ]; then
exit 0
else
exit 1
fi

View File

@@ -0,0 +1,15 @@
# System Worker Configuration
# Asynchronous worker for high-throughput concurrent processing
worker_id = "system_worker_1"
redis_url = "redis://localhost:6379"
db_path = "/tmp/system_worker_db"
preserve_tasks = false
[worker_type]
type = "async"
default_timeout_seconds = 300 # 5 minutes
[logging]
timestamps = true
level = "info"

View File

@@ -0,0 +1,60 @@
use std::process::{Command, Stdio};
use std::path::Path;
use std::env;
use std::io::{self, Write};
/// System Worker Demo Runner
///
/// This Rust wrapper executes the System worker bash script example.
/// It provides a way to run shell-based examples through Cargo.
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("🚀 System Worker Demo");
println!("====================");
println!();
// Get the current working directory and construct the path to the shell script
let current_dir = env::current_dir()?;
let script_path = current_dir.join("examples").join("system").join("example.sh");
// Check if the script exists
if !script_path.exists() {
eprintln!("❌ Error: Script not found at {:?}", script_path);
eprintln!(" Make sure you're running this from the worker crate root directory.");
std::process::exit(1);
}
println!("📁 Script location: {:?}", script_path);
println!("🔧 Executing System worker example...");
println!();
// Make sure the script is executable
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&script_path)?.permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&script_path, perms)?;
}
// Execute the shell script
let mut child = Command::new("bash")
.arg(&script_path)
.current_dir(&current_dir)
.stdin(Stdio::inherit())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()?;
// Wait for the script to complete
let status = child.wait()?;
println!();
if status.success() {
println!("✅ System worker demo completed successfully!");
} else {
println!("❌ System worker demo failed with exit code: {:?}", status.code());
std::process::exit(status.code().unwrap_or(1));
}
Ok(())
}

View File

@@ -0,0 +1,322 @@
//! # Trait-Based Worker Demo
//!
//! This example demonstrates the new unified worker interface using the Worker trait.
//! It shows how both synchronous and asynchronous workers can be used with the same
//! API, eliminating code duplication and providing a clean, consistent interface.
//!
//! ## Features Demonstrated
//!
//! - Unified worker interface using the Worker trait
//! - Both sync and async worker implementations
//! - Shared configuration and spawn logic
//! - Clean shutdown handling
//! - Job processing with different strategies
//!
//! ## Usage
//!
//! Make sure Redis is running on localhost:6379, then run:
//! ```bash
//! cargo run --example trait_based_worker_demo
//! ```
use hero_job::{Job, JobStatus, ScriptType};
use log::{info, warn, error};
use rhailib_worker::{
SyncWorker, AsyncWorker,
spawn_sync_worker, spawn_async_worker,
engine::create_heromodels_engine,
worker_trait::{spawn_worker, Worker}
};
use redis::AsyncCommands;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::sleep;
const REDIS_URL: &str = "redis://127.0.0.1:6379";
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
info!("Starting Trait-Based Worker Demo");
// Create Redis connection for job creation
let redis_client = redis::Client::open(REDIS_URL)?;
let mut redis_conn = redis_client.get_multiplexed_async_connection().await?;
// Demo 1: Using the unified trait-based interface
info!("=== Demo 1: Unified Trait-Based Interface ===");
// Create shutdown channels for both workers
let (sync_shutdown_tx, sync_shutdown_rx) = mpsc::channel::<()>(1);
let (async_shutdown_tx, async_shutdown_rx) = mpsc::channel::<()>(1);
// Workers are now configured using builder pattern directly
// Create worker instances using builder pattern
let sync_worker = Arc::new(
SyncWorker::builder()
.worker_id("demo_sync_worker")
.db_path("/tmp")
.redis_url("redis://localhost:6379")
.preserve_tasks(false)
.build()
.expect("Failed to build SyncWorker")
);
let async_worker = Arc::new(
AsyncWorker::builder()
.worker_id("demo_async_worker")
.db_path("/tmp")
.redis_url("redis://localhost:6379")
.default_timeout(Duration::from_secs(300))
.build()
.expect("Failed to build AsyncWorker")
);
let sync_engine = create_heromodels_engine();
let async_engine = create_heromodels_engine();
info!("Spawning {} worker: {}", sync_worker.worker_type(), sync_worker.worker_id());
let sync_handle = spawn_worker(sync_worker.clone(), sync_engine, sync_shutdown_rx);
info!("Spawning {} worker: {}", async_worker.worker_type(), async_worker.worker_id());
let async_handle = spawn_worker(async_worker.clone(), async_engine, async_shutdown_rx);
// Give workers time to start
sleep(Duration::from_secs(1)).await;
// Create and dispatch jobs to both workers
info!("Creating demo jobs for both workers...");
// Job for sync worker - simple calculation
let sync_job = create_demo_job(
"sync_calculation",
r#"
print("Sync worker: Starting calculation...");
let result = 0;
for i in 1..=100 {
result += i;
}
print("Sync worker: Sum of 1-100 = " + result);
result
"#,
None,
).await?;
dispatch_job(&mut redis_conn, &sync_job, sync_worker.worker_id()).await?;
info!("Dispatched job to sync worker: {}", sync_job.id);
// Job for async worker - with timeout demonstration
let async_job = create_demo_job(
"async_calculation",
r#"
print("Async worker: Starting calculation...");
let result = 1;
for i in 1..=10 {
result *= i;
}
print("Async worker: 10! = " + result);
result
"#,
Some(15), // 15 second timeout
).await?;
dispatch_job(&mut redis_conn, &async_job, async_worker.worker_id()).await?;
info!("Dispatched job to async worker: {}", async_job.id);
// Monitor job execution
info!("Monitoring job execution for 10 seconds...");
let monitor_start = std::time::Instant::now();
let monitor_duration = Duration::from_secs(10);
while monitor_start.elapsed() < monitor_duration {
// Check sync job status
if let Ok(status) = Job::get_status(&mut redis_conn, &sync_job.id).await {
match status {
JobStatus::Finished => {
let job_key = format!("hero:job:{}", sync_job.id);
if let Ok(result) = redis_conn.hget::<_, _, String>(&job_key, "output").await {
info!("✅ Sync Job {} COMPLETED with result: {}", sync_job.id, result);
} else {
info!("✅ Sync Job {} COMPLETED", sync_job.id);
}
}
JobStatus::Error => {
let job_key = format!("hero:job:{}", sync_job.id);
if let Ok(error) = redis_conn.hget::<_, _, String>(&job_key, "error").await {
warn!("❌ Sync Job {} FAILED with error: {}", sync_job.id, error);
} else {
warn!("❌ Sync Job {} FAILED", sync_job.id);
}
}
_ => info!("🔄 Sync Job {} status: {:?}", sync_job.id, status),
}
}
// Check async job status
if let Ok(status) = Job::get_status(&mut redis_conn, &async_job.id).await {
match status {
JobStatus::Finished => {
let job_key = format!("hero:job:{}", async_job.id);
if let Ok(result) = redis_conn.hget::<_, _, String>(&job_key, "output").await {
info!("✅ Async Job {} COMPLETED with result: {}", async_job.id, result);
} else {
info!("✅ Async Job {} COMPLETED", async_job.id);
}
}
JobStatus::Error => {
let job_key = format!("hero:job:{}", async_job.id);
if let Ok(error) = redis_conn.hget::<_, _, String>(&job_key, "error").await {
warn!("❌ Async Job {} FAILED with error: {}", async_job.id, error);
} else {
warn!("❌ Async Job {} FAILED", async_job.id);
}
}
_ => info!("🔄 Async Job {} status: {:?}", async_job.id, status),
}
}
sleep(Duration::from_secs(2)).await;
}
// Demo 2: Using convenience functions (backward compatibility)
info!("\n=== Demo 2: Convenience Functions (Backward Compatibility) ===");
let (conv_sync_shutdown_tx, conv_sync_shutdown_rx) = mpsc::channel::<()>(1);
let (conv_async_shutdown_tx, conv_async_shutdown_rx) = mpsc::channel::<()>(1);
// Spawn workers using convenience functions
let conv_sync_engine = create_heromodels_engine();
let conv_async_engine = create_heromodels_engine();
info!("Spawning sync worker using convenience function...");
let conv_sync_handle = spawn_sync_worker(
"convenience_sync_worker".to_string(),
"/tmp".to_string(),
conv_sync_engine,
REDIS_URL.to_string(),
conv_sync_shutdown_rx,
false,
);
info!("Spawning async worker using convenience function...");
let conv_async_handle = spawn_async_worker(
"convenience_async_worker".to_string(),
"/tmp".to_string(),
conv_async_engine,
REDIS_URL.to_string(),
conv_async_shutdown_rx,
Duration::from_secs(20), // 20 second timeout
);
// Give convenience workers time to start
sleep(Duration::from_secs(1)).await;
// Create jobs for convenience workers
let conv_sync_job = create_demo_job(
"convenience_sync",
r#"
print("Convenience sync worker: Hello World!");
"Hello from convenience sync worker"
"#,
None,
).await?;
let conv_async_job = create_demo_job(
"convenience_async",
r#"
print("Convenience async worker: Hello World!");
"Hello from convenience async worker"
"#,
Some(10),
).await?;
dispatch_job(&mut redis_conn, &conv_sync_job, "convenience_sync_worker").await?;
dispatch_job(&mut redis_conn, &conv_async_job, "convenience_async_worker").await?;
info!("Dispatched jobs to convenience workers");
// Wait a bit for jobs to complete
sleep(Duration::from_secs(5)).await;
// Shutdown all workers gracefully
info!("\n=== Shutting Down All Workers ===");
info!("Sending shutdown signals...");
let _ = sync_shutdown_tx.send(()).await;
let _ = async_shutdown_tx.send(()).await;
let _ = conv_sync_shutdown_tx.send(()).await;
let _ = conv_async_shutdown_tx.send(()).await;
info!("Waiting for workers to shutdown...");
// Wait for all workers to shutdown
let results = tokio::join!(
sync_handle,
async_handle,
conv_sync_handle,
conv_async_handle
);
match results {
(Ok(Ok(())), Ok(Ok(())), Ok(Ok(())), Ok(Ok(()))) => {
info!("All workers shut down successfully!");
}
_ => {
error!("Some workers encountered errors during shutdown");
}
}
info!("Trait-Based Worker Demo completed successfully!");
// Summary
info!("\n=== Summary ===");
info!("✅ Demonstrated unified Worker trait interface");
info!("✅ Showed both sync and async worker implementations");
info!("✅ Used shared configuration and spawn logic");
info!("✅ Maintained backward compatibility with convenience functions");
info!("✅ Eliminated code duplication between worker types");
info!("✅ Provided clean, consistent API for all worker operations");
Ok(())
}
/// Create a demo job with the specified script and timeout
async fn create_demo_job(
name: &str,
script: &str,
timeout_seconds: Option<i32>,
) -> Result<Job, Box<dyn std::error::Error>> {
let mut job = Job::new(
format!("demo_{}", name), // caller_id
"demo_context".to_string(), // context_id
script.to_string(),
ScriptType::OSIS,
);
// Set timeout if provided
if let Some(timeout) = timeout_seconds {
job.timeout = Duration::from_secs(timeout as u64);
}
Ok(job)
}
/// Dispatch a job to the worker queue
async fn dispatch_job(
redis_conn: &mut redis::aio::MultiplexedConnection,
job: &Job,
worker_queue: &str,
) -> Result<(), Box<dyn std::error::Error>> {
// Store job in Redis
job.store_in_redis(redis_conn).await?;
// Add job to worker queue
let queue_key = format!("hero:job:{}", worker_queue);
let _: () = redis_conn.rpush(&queue_key, &job.id).await?;
Ok(())
}

View File

@@ -0,0 +1,420 @@
//! # Asynchronous Worker Implementation
//!
//! This module provides an asynchronous worker implementation that can process
//! multiple jobs concurrently with timeout support. Each job is spawned as a
//! separate Tokio task, allowing for parallel execution and proper timeout handling.
//!
//! ## Features
//!
//! - **Concurrent Processing**: Multiple jobs can run simultaneously
//! - **Timeout Support**: Jobs that exceed their timeout are automatically cancelled
//! - **Resource Cleanup**: Proper cleanup of aborted/cancelled jobs
//! - **Non-blocking**: Worker continues processing new jobs while others are running
//! - **Scalable**: Can handle high job throughput with parallel execution
//!
//! ## Usage
//!
//! ```rust
//! use std::sync::Arc;
//! use std::time::Duration;
//! use rhailib_worker::async_worker_impl::AsyncWorker;
//! use rhailib_worker::worker_trait::{spawn_worker, WorkerConfig};
//! use rhailib_worker::engine::create_heromodels_engine;
//! use tokio::sync::mpsc;
//!
//! let config = WorkerConfig::new(
//! "async_worker_1".to_string(),
//! "/path/to/db".to_string(),
//! "redis://localhost:6379".to_string(),
//! false, // preserve_tasks
//! ).with_default_timeout(Duration::from_secs(300));
//!
//! let worker = Arc::new(AsyncWorker::new());
//! let engine = create_heromodels_engine();
//! let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
//!
//! let handle = spawn_worker(worker, config, engine, shutdown_rx);
//!
//! // Later, shutdown the worker
//! shutdown_tx.send(()).await.unwrap();
//! handle.await.unwrap().unwrap();
//! ```
use async_trait::async_trait;
use hero_job::{Job, JobStatus};
use log::{debug, error, info, warn};
use rhai::Engine;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use crate::engine::eval_script;
use crate::worker_trait::{Worker, WorkerConfig};
use crate::initialize_redis_connection;
/// Represents a running job with its handle and metadata
#[derive(Debug)]
struct RunningJob {
job_id: String,
handle: JoinHandle<()>,
started_at: std::time::Instant,
}
/// Builder for AsyncWorker
#[derive(Debug, Default)]
pub struct AsyncWorkerBuilder {
worker_id: Option<String>,
db_path: Option<String>,
redis_url: Option<String>,
default_timeout: Option<Duration>,
}
impl AsyncWorkerBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn worker_id<S: Into<String>>(mut self, worker_id: S) -> Self {
self.worker_id = Some(worker_id.into());
self
}
pub fn db_path<S: Into<String>>(mut self, db_path: S) -> Self {
self.db_path = Some(db_path.into());
self
}
pub fn redis_url<S: Into<String>>(mut self, redis_url: S) -> Self {
self.redis_url = Some(redis_url.into());
self
}
pub fn default_timeout(mut self, timeout: Duration) -> Self {
self.default_timeout = Some(timeout);
self
}
pub fn build(self) -> Result<AsyncWorker, String> {
Ok(AsyncWorker {
worker_id: self.worker_id.ok_or("worker_id is required")?,
db_path: self.db_path.ok_or("db_path is required")?,
redis_url: self.redis_url.ok_or("redis_url is required")?,
default_timeout: self.default_timeout.unwrap_or(Duration::from_secs(300)),
running_jobs: Arc::new(Mutex::new(HashMap::new())),
})
}
}
/// Asynchronous worker that processes jobs concurrently
#[derive(Debug, Clone)]
pub struct AsyncWorker {
pub worker_id: String,
pub db_path: String,
pub redis_url: String,
pub default_timeout: Duration,
running_jobs: Arc<Mutex<HashMap<String, RunningJob>>>,
}
impl AsyncWorker {
/// Create a new AsyncWorkerBuilder
pub fn builder() -> AsyncWorkerBuilder {
AsyncWorkerBuilder::new()
}
/// Add a running job to the tracking map
async fn add_running_job(&self, job_id: String, handle: JoinHandle<()>) {
let running_job = RunningJob {
job_id: job_id.clone(),
handle,
started_at: std::time::Instant::now(),
};
let mut jobs = self.running_jobs.lock().await;
jobs.insert(job_id.clone(), running_job);
debug!("Async Worker: Added running job '{}'. Total running: {}",
job_id, jobs.len());
}
/// Remove a completed job from the tracking map
async fn remove_running_job(&self, job_id: &str) {
let mut jobs = self.running_jobs.lock().await;
if let Some(job) = jobs.remove(job_id) {
let duration = job.started_at.elapsed();
debug!("Async Worker: Removed completed job '{}' after {:?}. Remaining: {}",
job_id, duration, jobs.len());
}
}
/// Get the count of currently running jobs
pub async fn running_job_count(&self) -> usize {
let jobs = self.running_jobs.lock().await;
jobs.len()
}
/// Cleanup any finished jobs from the running jobs map
async fn cleanup_finished_jobs(&self) {
let mut jobs = self.running_jobs.lock().await;
let mut to_remove = Vec::new();
for (job_id, running_job) in jobs.iter() {
if running_job.handle.is_finished() {
to_remove.push(job_id.clone());
}
}
for job_id in to_remove {
if let Some(job) = jobs.remove(&job_id) {
let duration = job.started_at.elapsed();
debug!("Async Worker: Cleaned up finished job '{}' after {:?}",
job_id, duration);
}
}
}
/// Execute a single job asynchronously with timeout support
async fn execute_job_with_timeout(
job: Job,
engine: Engine,
worker_id: String,
redis_url: String,
job_timeout: Duration,
) {
let job_id = job.id.clone();
info!("Async Worker '{}', Job {}: Starting execution with timeout {:?}",
worker_id, job_id, job_timeout);
// Create a new Redis connection for this job
let mut redis_conn = match initialize_redis_connection(&worker_id, &redis_url).await {
Ok(conn) => conn,
Err(e) => {
error!("Async Worker '{}', Job {}: Failed to initialize Redis connection: {}",
worker_id, job_id, e);
return;
}
};
// Update job status to Started
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Started).await {
error!("Async Worker '{}', Job {}: Failed to update status to Started: {}",
worker_id, job_id, e);
return;
}
// Create the script execution task
let script_task = async {
// Execute the Rhai script
match eval_script(&engine, &job.script) {
Ok(result) => {
let result_str = format!("{:?}", result);
info!("Async Worker '{}', Job {}: Script executed successfully. Result: {}",
worker_id, job_id, result_str);
// Update job with success result
if let Err(e) = Job::set_result(&mut redis_conn, &job_id, &result_str).await {
error!("Async Worker '{}', Job {}: Failed to set result: {}",
worker_id, job_id, e);
return;
}
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Finished).await {
error!("Async Worker '{}', Job {}: Failed to update status to Finished: {}",
worker_id, job_id, e);
}
}
Err(e) => {
let error_msg = format!("Script execution error: {}", e);
error!("Async Worker '{}', Job {}: {}", worker_id, job_id, error_msg);
// Update job with error
if let Err(e) = Job::set_error(&mut redis_conn, &job_id, &error_msg).await {
error!("Async Worker '{}', Job {}: Failed to set error: {}",
worker_id, job_id, e);
return;
}
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Error).await {
error!("Async Worker '{}', Job {}: Failed to update status to Error: {}",
worker_id, job_id, e);
}
}
}
};
// Execute the script with timeout
match timeout(job_timeout, script_task).await {
Ok(()) => {
info!("Async Worker '{}', Job {}: Completed within timeout", worker_id, job_id);
}
Err(_) => {
warn!("Async Worker '{}', Job {}: Timed out after {:?}, marking as error",
worker_id, job_id, job_timeout);
let timeout_msg = format!("Job timed out after {:?}", job_timeout);
if let Err(e) = Job::set_error(&mut redis_conn, &job_id, &timeout_msg).await {
error!("Async Worker '{}', Job {}: Failed to set timeout error: {}",
worker_id, job_id, e);
}
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Error).await {
error!("Async Worker '{}', Job {}: Failed to update status to Error after timeout: {}",
worker_id, job_id, e);
}
}
}
info!("Async Worker '{}', Job {}: Job processing completed", worker_id, job_id);
}
}
impl Default for AsyncWorker {
fn default() -> Self {
// Default AsyncWorker with placeholder values
// In practice, use the builder pattern instead
Self {
worker_id: "default_async_worker".to_string(),
db_path: "/tmp".to_string(),
redis_url: "redis://localhost:6379".to_string(),
default_timeout: Duration::from_secs(300),
running_jobs: Arc::new(Mutex::new(HashMap::new())),
}
}
}
#[async_trait]
impl Worker for AsyncWorker {
async fn process_job(
&self,
job: Job,
engine: Engine, // Reuse the stateless engine
_redis_conn: &mut redis::aio::MultiplexedConnection,
) {
let job_id = job.id.clone();
let worker_id = &self.worker_id.clone();
// Determine timeout (use job-specific timeout if available, otherwise default)
let job_timeout = if job.timeout.as_secs() > 0 {
job.timeout
} else {
self.default_timeout // Use worker's default timeout
};
info!("Async Worker '{}', Job {}: Spawning job execution task with timeout {:?}",
worker_id, job_id, job_timeout);
// Clone necessary data for the spawned task
let job_id_clone = job_id.clone();
let worker_id_clone = worker_id.clone();
let worker_id_debug = worker_id.clone(); // Additional clone for debug statement
let job_id_debug = job_id.clone(); // Additional clone for debug statement
let redis_url_clone = self.redis_url.clone();
let running_jobs_clone = Arc::clone(&self.running_jobs);
// Spawn the job execution task
let job_handle = tokio::spawn(async move {
Self::execute_job_with_timeout(
job,
engine,
worker_id_clone,
redis_url_clone,
job_timeout,
).await;
// Remove this job from the running jobs map when it completes
let mut jobs = running_jobs_clone.lock().await;
if let Some(running_job) = jobs.remove(&job_id_clone) {
let duration = running_job.started_at.elapsed();
debug!("Async Worker '{}': Removed completed job '{}' after {:?}",
worker_id_debug, job_id_debug, duration);
}
});
// Add the job to the running jobs map
self.add_running_job(job_id, job_handle).await;
// Cleanup finished jobs periodically
self.cleanup_finished_jobs().await;
}
fn worker_type(&self) -> &'static str {
"Async"
}
fn worker_id(&self) -> &str {
&self.worker_id
}
fn redis_url(&self) -> &str {
&self.redis_url
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::create_heromodels_engine;
use hero_job::ScriptType;
#[tokio::test]
async fn test_async_worker_creation() {
let worker = AsyncWorker::new();
assert_eq!(worker.worker_type(), "Async");
assert_eq!(worker.running_job_count().await, 0);
}
#[tokio::test]
async fn test_async_worker_default() {
let worker = AsyncWorker::default();
assert_eq!(worker.worker_type(), "Async");
}
#[tokio::test]
async fn test_async_worker_job_tracking() {
let worker = AsyncWorker::new();
// Simulate adding a job
let handle = tokio::spawn(async {
tokio::time::sleep(Duration::from_millis(100)).await;
});
worker.add_running_job("job_1".to_string(), handle).await;
assert_eq!(worker.running_job_count().await, 1);
// Wait for job to complete
tokio::time::sleep(Duration::from_millis(200)).await;
worker.cleanup_finished_jobs().await;
assert_eq!(worker.running_job_count().await, 0);
}
#[tokio::test]
async fn test_async_worker_process_job_interface() {
let worker = AsyncWorker::new();
let engine = create_heromodels_engine();
// Create a simple test job
let job = Job::new(
"test_caller".to_string(),
"test_context".to_string(),
r#"print("Hello from async worker test!"); 42"#.to_string(),
ScriptType::OSIS,
);
let config = WorkerConfig::new(
"test_async_worker".to_string(),
"/tmp".to_string(),
"redis://localhost:6379".to_string(),
false,
).with_default_timeout(Duration::from_secs(60));
// Note: This test doesn't actually connect to Redis, it just tests the interface
// In a real test environment, you'd need a Redis instance or mock
// The process_job method should be callable (interface test)
// worker.process_job(job, engine, &mut redis_conn, &config).await;
// For now, just verify the worker was created successfully
assert_eq!(worker.worker_type(), "Async");
}
}

250
core/worker/src/config.rs Normal file
View File

@@ -0,0 +1,250 @@
//! Worker Configuration Module - TOML-based configuration for Hero workers
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::Path;
use std::time::Duration;
/// Worker configuration loaded from TOML file
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConfig {
/// Worker identification
pub worker_id: String,
/// Redis connection URL
pub redis_url: String,
/// Database path for Rhai engine
pub db_path: String,
/// Whether to preserve task details after completion
#[serde(default = "default_preserve_tasks")]
pub preserve_tasks: bool,
/// Worker type configuration
pub worker_type: WorkerType,
/// Logging configuration
#[serde(default)]
pub logging: LoggingConfig,
}
/// Worker type configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum WorkerType {
/// Synchronous worker configuration
#[serde(rename = "sync")]
Sync,
/// Asynchronous worker configuration
#[serde(rename = "async")]
Async {
/// Default timeout for jobs in seconds
#[serde(default = "default_timeout_seconds")]
default_timeout_seconds: u64,
},
}
/// Logging configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoggingConfig {
/// Whether to include timestamps in log output
#[serde(default = "default_timestamps")]
pub timestamps: bool,
/// Log level (trace, debug, info, warn, error)
#[serde(default = "default_log_level")]
pub level: String,
}
impl Default for LoggingConfig {
fn default() -> Self {
Self {
timestamps: default_timestamps(),
level: default_log_level(),
}
}
}
impl WorkerConfig {
/// Load configuration from TOML file
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
let content = fs::read_to_string(&path)
.map_err(|e| ConfigError::IoError(format!("Failed to read config file: {}", e)))?;
let config: WorkerConfig = toml::from_str(&content)
.map_err(|e| ConfigError::ParseError(format!("Failed to parse TOML: {}", e)))?;
config.validate()?;
Ok(config)
}
/// Validate the configuration
fn validate(&self) -> Result<(), ConfigError> {
if self.worker_id.is_empty() {
return Err(ConfigError::ValidationError("worker_id cannot be empty".to_string()));
}
if self.redis_url.is_empty() {
return Err(ConfigError::ValidationError("redis_url cannot be empty".to_string()));
}
if self.db_path.is_empty() {
return Err(ConfigError::ValidationError("db_path cannot be empty".to_string()));
}
// Validate log level
match self.logging.level.to_lowercase().as_str() {
"trace" | "debug" | "info" | "warn" | "error" => {},
_ => return Err(ConfigError::ValidationError(
format!("Invalid log level: {}. Must be one of: trace, debug, info, warn, error", self.logging.level)
)),
}
Ok(())
}
/// Get the default timeout duration for async workers
pub fn get_default_timeout(&self) -> Option<Duration> {
match &self.worker_type {
WorkerType::Sync => None,
WorkerType::Async { default_timeout_seconds } => {
Some(Duration::from_secs(*default_timeout_seconds))
}
}
}
/// Check if this is a sync worker configuration
pub fn is_sync(&self) -> bool {
matches!(self.worker_type, WorkerType::Sync)
}
/// Check if this is an async worker configuration
pub fn is_async(&self) -> bool {
matches!(self.worker_type, WorkerType::Async { .. })
}
}
/// Configuration error types
#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("IO error: {0}")]
IoError(String),
#[error("Parse error: {0}")]
ParseError(String),
#[error("Validation error: {0}")]
ValidationError(String),
}
// Default value functions for serde
fn default_preserve_tasks() -> bool {
false
}
fn default_timeout_seconds() -> u64 {
300 // 5 minutes
}
fn default_timestamps() -> bool {
true
}
fn default_log_level() -> String {
"info".to_string()
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_sync_worker_config() {
let config_toml = r#"
worker_id = "sync_worker_1"
redis_url = "redis://localhost:6379"
db_path = "/tmp/worker_db"
[worker_type]
type = "sync"
[logging]
timestamps = false
level = "debug"
"#;
let config: WorkerConfig = toml::from_str(config_toml).unwrap();
assert_eq!(config.worker_id, "sync_worker_1");
assert!(config.is_sync());
assert!(!config.is_async());
assert_eq!(config.get_default_timeout(), None);
assert!(!config.logging.timestamps);
assert_eq!(config.logging.level, "debug");
}
#[test]
fn test_async_worker_config() {
let config_toml = r#"
worker_id = "async_worker_1"
redis_url = "redis://localhost:6379"
db_path = "/tmp/worker_db"
[worker_type]
type = "async"
default_timeout_seconds = 600
[logging]
timestamps = true
level = "info"
"#;
let config: WorkerConfig = toml::from_str(config_toml).unwrap();
assert_eq!(config.worker_id, "async_worker_1");
assert!(!config.is_sync());
assert!(config.is_async());
assert_eq!(config.get_default_timeout(), Some(Duration::from_secs(600)));
assert!(config.logging.timestamps);
assert_eq!(config.logging.level, "info");
}
#[test]
fn test_config_from_file() {
let config_toml = r#"
worker_id = "test_worker"
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[worker_type]
type = "sync"
"#;
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_toml.as_bytes()).unwrap();
let config = WorkerConfig::from_file(temp_file.path()).unwrap();
assert_eq!(config.worker_id, "test_worker");
assert!(config.is_sync());
}
#[test]
fn test_config_validation() {
let config_toml = r#"
worker_id = ""
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[worker_type]
type = "sync"
"#;
let result: Result<WorkerConfig, _> = toml::from_str(config_toml);
assert!(result.is_ok());
let config = result.unwrap();
assert!(config.validate().is_err());
}
}

View File

@@ -8,11 +8,23 @@ use tokio::task::JoinHandle;
/// Engine module containing Rhai engine creation and script execution utilities
pub mod engine;
/// Worker trait abstraction for unified worker interface
pub mod worker_trait;
/// Synchronous worker implementation
pub mod sync_worker;
/// Asynchronous worker implementation with trait-based interface
pub mod async_worker_impl;
/// Configuration module for TOML-based worker configuration
pub mod config;
const NAMESPACE_PREFIX: &str = "hero:job:";
const BLPOP_TIMEOUT_SECONDS: usize = 5;
/// Initialize Redis connection for the worker
async fn initialize_redis_connection(
pub(crate) async fn initialize_redis_connection(
worker_id: &str,
redis_url: &str,
) -> Result<redis::aio::MultiplexedConnection, Box<dyn std::error::Error + Send + Sync>> {
@@ -33,7 +45,7 @@ async fn initialize_redis_connection(
}
/// Load job from Redis using Job struct
async fn load_job_from_redis(
pub(crate) async fn load_job_from_redis(
redis_conn: &mut redis::aio::MultiplexedConnection,
job_id: &str,
worker_id: &str,
@@ -232,3 +244,60 @@ pub fn spawn_rhai_worker(
Ok(())
})
}
// Re-export the main trait-based interface for convenience
pub use worker_trait::{Worker, WorkerConfig, spawn_worker};
pub use sync_worker::SyncWorker;
pub use async_worker_impl::AsyncWorker;
/// Convenience function to spawn a synchronous worker using the trait interface
///
/// This function provides backward compatibility with the original sync worker API
/// while using the new trait-based implementation.
pub fn spawn_sync_worker(
worker_id: String,
db_path: String,
engine: rhai::Engine,
redis_url: String,
shutdown_rx: mpsc::Receiver<()>,
preserve_tasks: bool,
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
use std::sync::Arc;
let worker = Arc::new(
SyncWorker::builder()
.worker_id(worker_id)
.db_path(db_path)
.redis_url(redis_url)
.preserve_tasks(preserve_tasks)
.build()
.expect("Failed to build SyncWorker")
);
spawn_worker(worker, engine, shutdown_rx)
}
/// Convenience function to spawn an asynchronous worker using the trait interface
///
/// This function provides a clean interface for the new async worker implementation
/// with timeout support.
pub fn spawn_async_worker(
worker_id: String,
db_path: String,
engine: rhai::Engine,
redis_url: String,
shutdown_rx: mpsc::Receiver<()>,
default_timeout: std::time::Duration,
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
use std::sync::Arc;
let worker = Arc::new(
AsyncWorker::builder()
.worker_id(worker_id)
.db_path(db_path)
.redis_url(redis_url)
.default_timeout(default_timeout)
.build()
.expect("Failed to build AsyncWorker")
);
spawn_worker(worker, engine, shutdown_rx)
}

View File

@@ -0,0 +1,255 @@
//! # Synchronous Worker Implementation
//!
//! This module provides a synchronous worker implementation that processes jobs
//! one at a time in sequence. This is the original worker behavior that's suitable
//! for scenarios where job execution should not overlap or when resource constraints
//! require sequential processing.
//!
//! ## Features
//!
//! - **Sequential Processing**: Jobs are processed one at a time
//! - **Simple Resource Management**: No concurrent job tracking needed
//! - **Predictable Behavior**: Jobs complete in the order they're received
//! - **Lower Memory Usage**: Only one job active at a time
//!
//! ## Usage
//!
//! ```rust
//! use std::sync::Arc;
//! use rhailib_worker::sync_worker::SyncWorker;
//! use rhailib_worker::worker_trait::{spawn_worker, WorkerConfig};
//! use rhailib_worker::engine::create_heromodels_engine;
//! use tokio::sync::mpsc;
//!
//! let config = WorkerConfig::new(
//! "sync_worker_1".to_string(),
//! "/path/to/db".to_string(),
//! "redis://localhost:6379".to_string(),
//! false, // preserve_tasks
//! );
//!
//! let worker = Arc::new(SyncWorker::new());
//! let engine = create_heromodels_engine();
//! let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
//!
//! let handle = spawn_worker(worker, config, engine, shutdown_rx);
//!
//! // Later, shutdown the worker
//! shutdown_tx.send(()).await.unwrap();
//! handle.await.unwrap().unwrap();
//! ```
use async_trait::async_trait;
use hero_job::{Job, JobStatus};
use log::{debug, error, info};
use rhai::Engine;
use crate::engine::eval_script;
use crate::worker_trait::{Worker, WorkerConfig};
/// Builder for SyncWorker
#[derive(Debug, Default)]
pub struct SyncWorkerBuilder {
worker_id: Option<String>,
db_path: Option<String>,
redis_url: Option<String>,
preserve_tasks: bool,
}
impl SyncWorkerBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn worker_id<S: Into<String>>(mut self, worker_id: S) -> Self {
self.worker_id = Some(worker_id.into());
self
}
pub fn db_path<S: Into<String>>(mut self, db_path: S) -> Self {
self.db_path = Some(db_path.into());
self
}
pub fn redis_url<S: Into<String>>(mut self, redis_url: S) -> Self {
self.redis_url = Some(redis_url.into());
self
}
pub fn preserve_tasks(mut self, preserve: bool) -> Self {
self.preserve_tasks = preserve;
self
}
pub fn build(self) -> Result<SyncWorker, String> {
Ok(SyncWorker {
worker_id: self.worker_id.ok_or("worker_id is required")?,
db_path: self.db_path.ok_or("db_path is required")?,
redis_url: self.redis_url.ok_or("redis_url is required")?,
preserve_tasks: self.preserve_tasks,
})
}
}
/// Synchronous worker that processes jobs sequentially
#[derive(Debug, Clone)]
pub struct SyncWorker {
pub worker_id: String,
pub db_path: String,
pub redis_url: String,
pub preserve_tasks: bool,
}
impl SyncWorker {
/// Create a new SyncWorkerBuilder
pub fn builder() -> SyncWorkerBuilder {
SyncWorkerBuilder::new()
}
}
impl Default for SyncWorker {
fn default() -> Self {
// Default SyncWorker with placeholder values
// In practice, use the builder pattern instead
Self {
worker_id: "default_sync_worker".to_string(),
db_path: "/tmp".to_string(),
redis_url: "redis://localhost:6379".to_string(),
preserve_tasks: false,
}
}
}
#[async_trait]
impl Worker for SyncWorker {
async fn process_job(
&self,
job: Job,
engine: Engine,
redis_conn: &mut redis::aio::MultiplexedConnection,
) {
let job_id = &job.id;
let worker_id = &self.worker_id;
let db_path = &self.db_path;
info!("Sync Worker '{}', Job {}: Starting sequential processing", worker_id, job_id);
// Update job status to Started
if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Started).await {
error!("Sync Worker '{}', Job {}: Failed to update status to Started: {}",
worker_id, job_id, e);
return;
}
// Execute the Rhai script
match eval_script(&engine, &job.script) {
Ok(result) => {
let result_str = format!("{:?}", result);
info!("Sync Worker '{}', Job {}: Script executed successfully. Result: {}",
worker_id, job_id, result_str);
// Update job with success result
if let Err(e) = Job::set_result(redis_conn, job_id, &result_str).await {
error!("Sync Worker '{}', Job {}: Failed to set result: {}",
worker_id, job_id, e);
return;
}
if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Finished).await {
error!("Sync Worker '{}', Job {}: Failed to update status to Finished: {}",
worker_id, job_id, e);
}
}
Err(e) => {
let error_msg = format!("Script execution error: {}", e);
error!("Sync Worker '{}', Job {}: {}", worker_id, job_id, error_msg);
// Update job with error
if let Err(e) = Job::set_error(redis_conn, job_id, &error_msg).await {
error!("Sync Worker '{}', Job {}: Failed to set error: {}",
worker_id, job_id, e);
return;
}
if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Error).await {
error!("Sync Worker '{}', Job {}: Failed to update status to Error: {}",
worker_id, job_id, e);
}
}
}
// Cleanup job if preserve_tasks is false
if !self.preserve_tasks {
if let Err(e) = Job::delete_from_redis(redis_conn, job_id).await {
error!("Sync Worker '{}', Job {}: Failed to cleanup job: {}",
worker_id, job_id, e);
} else {
debug!("Sync Worker '{}', Job {}: Job cleaned up from Redis", worker_id, job_id);
}
}
info!("Sync Worker '{}', Job {}: Sequential processing completed", worker_id, job_id);
}
fn worker_type(&self) -> &'static str {
"Sync"
}
fn worker_id(&self) -> &str {
&self.worker_id
}
fn redis_url(&self) -> &str {
&self.redis_url
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::create_heromodels_engine;
use hero_job::ScriptType;
use std::time::Duration;
#[tokio::test]
async fn test_sync_worker_creation() {
let worker = SyncWorker::new();
assert_eq!(worker.worker_type(), "Sync");
}
#[tokio::test]
async fn test_sync_worker_default() {
let worker = SyncWorker::default();
assert_eq!(worker.worker_type(), "Sync");
}
#[tokio::test]
async fn test_sync_worker_process_job_interface() {
let worker = SyncWorker::new();
let engine = create_heromodels_engine();
// Create a simple test job
let job = Job::new(
"test_caller".to_string(),
"test_context".to_string(),
r#"print("Hello from sync worker test!"); 42"#.to_string(),
ScriptType::OSIS,
);
let config = WorkerConfig::new(
"test_sync_worker".to_string(),
"/tmp".to_string(),
"redis://localhost:6379".to_string(),
false,
);
// Note: This test doesn't actually connect to Redis, it just tests the interface
// In a real test environment, you'd need a Redis instance or mock
// The process_job method should be callable (interface test)
// worker.process_job(job, engine, &mut redis_conn, &config).await;
// For now, just verify the worker was created successfully
assert_eq!(worker.worker_type(), "Sync");
}
}

View File

@@ -0,0 +1,339 @@
//! # Worker Trait Abstraction
//!
//! This module provides a trait-based abstraction for Rhai workers that eliminates
//! code duplication between synchronous and asynchronous worker implementations.
//!
//! The `Worker` trait defines the common interface and behavior, while specific
//! implementations handle job processing differently (sync vs async).
//!
//! ## Architecture
//!
//! ```text
//! ┌─────────────────┐ ┌─────────────────┐
//! │ SyncWorker │ │ AsyncWorker │
//! │ │ │ │
//! │ process_job() │ │ process_job() │
//! │ (sequential) │ │ (concurrent) │
//! └─────────────────┘ └─────────────────┘
//! │ │
//! └───────┬───────────────┘
//! │
//! ┌───────▼───────┐
//! │ Worker Trait │
//! │ │
//! │ spawn() │
//! │ config │
//! │ common loop │
//! └───────────────┘
//! ```
use hero_job::Job;
use log::{debug, error, info};
use redis::AsyncCommands;
use rhai::Engine;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use crate::{initialize_redis_connection, NAMESPACE_PREFIX, BLPOP_TIMEOUT_SECONDS};
/// Configuration for worker instances
#[derive(Debug, Clone)]
pub struct WorkerConfig {
pub worker_id: String,
pub db_path: String,
pub redis_url: String,
pub preserve_tasks: bool,
pub default_timeout: Option<Duration>, // Only used by async workers
}
impl WorkerConfig {
/// Create a new worker configuration
pub fn new(
worker_id: String,
db_path: String,
redis_url: String,
preserve_tasks: bool,
) -> Self {
Self {
worker_id,
db_path,
redis_url,
preserve_tasks,
default_timeout: None,
}
}
/// Set default timeout for async workers
pub fn with_default_timeout(mut self, timeout: Duration) -> Self {
self.default_timeout = Some(timeout);
self
}
}
/// Trait defining the common interface for Rhai workers
///
/// This trait abstracts the common functionality between synchronous and
/// asynchronous workers, allowing them to share the same spawn logic and
/// Redis polling loop while implementing different job processing strategies.
#[async_trait::async_trait]
pub trait Worker: Send + Sync + 'static {
/// Process a single job
///
/// This is the core method that differentiates worker implementations:
/// - Sync workers process jobs sequentially, one at a time
/// - Async workers spawn concurrent tasks for each job
///
/// # Arguments
///
/// * `job` - The job to process
/// * `engine` - Rhai engine for script execution
/// * `redis_conn` - Redis connection for status updates
async fn process_job(
&self,
job: Job,
engine: Engine,
redis_conn: &mut redis::aio::MultiplexedConnection,
);
/// Get the worker type name for logging
fn worker_type(&self) -> &'static str;
/// Get worker ID for this worker instance
fn worker_id(&self) -> &str;
/// Get Redis URL for this worker instance
fn redis_url(&self) -> &str;
/// Spawn the worker
///
/// This method provides the common worker loop implementation that both
/// sync and async workers can use. It handles:
/// - Redis connection setup
/// - Job polling from Redis queue
/// - Shutdown signal handling
/// - Delegating job processing to the implementation
fn spawn(
self: Arc<Self>,
engine: Engine,
mut shutdown_rx: mpsc::Receiver<()>,
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
tokio::spawn(async move {
let worker_id = self.worker_id();
let redis_url = self.redis_url();
let queue_key = format!("{}{}", NAMESPACE_PREFIX, worker_id);
info!(
"{} Worker '{}' starting. Connecting to Redis at {}. Listening on queue: {}",
self.worker_type(),
worker_id,
redis_url,
queue_key
);
let mut redis_conn = initialize_redis_connection(worker_id, redis_url).await?;
loop {
let blpop_keys = vec![queue_key.clone()];
tokio::select! {
// Listen for shutdown signal
_ = shutdown_rx.recv() => {
info!("{} Worker '{}': Shutdown signal received. Terminating loop.",
self.worker_type(), worker_id);
break;
}
// Listen for tasks from Redis
blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => {
debug!("{} Worker '{}': Attempting BLPOP on queue: {}",
self.worker_type(), worker_id, queue_key);
let response: Option<(String, String)> = match blpop_result {
Ok(resp) => resp,
Err(e) => {
error!("{} Worker '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.",
self.worker_type(), worker_id, queue_key, e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
}
};
if let Some((_queue_name_recv, job_id)) = response {
info!("{} Worker '{}' received job_id: {} from queue: {}",
self.worker_type(), worker_id, job_id, _queue_name_recv);
// Load the job from Redis
match crate::load_job_from_redis(&mut redis_conn, &job_id, worker_id).await {
Ok(mut job) => {
// Check for ping job and handle it directly
if job.script.trim() == "ping" {
info!("{} Worker '{}': Received ping job '{}', responding with pong",
self.worker_type(), worker_id, job_id);
// Update job status to started
if let Err(e) = hero_job::Job::update_status(&mut redis_conn, &job_id, hero_job::JobStatus::Started).await {
error!("{} Worker '{}': Failed to update ping job '{}' status to Started: {}",
self.worker_type(), worker_id, job_id, e);
}
// Set result to "pong" and mark as finished
if let Err(e) = hero_job::Job::set_result(&mut redis_conn, &job_id, "pong").await {
error!("{} Worker '{}': Failed to set ping job '{}' result: {}",
self.worker_type(), worker_id, job_id, e);
}
info!("{} Worker '{}': Successfully responded to ping job '{}' with pong",
self.worker_type(), worker_id, job_id);
} else {
// Create a new engine for each job to avoid sharing state
let job_engine = crate::engine::create_heromodels_engine();
// Delegate job processing to the implementation
self.process_job(job, job_engine, &mut redis_conn).await;
}
}
Err(e) => {
error!("{} Worker '{}': Failed to load job '{}': {}",
self.worker_type(), worker_id, job_id, e);
}
}
} else {
debug!("{} Worker '{}': BLPOP timed out on queue {}. No new tasks.",
self.worker_type(), worker_id, queue_key);
}
}
}
}
info!("{} Worker '{}' has shut down.", self.worker_type(), worker_id);
Ok(())
})
}
}
/// Convenience function to spawn a worker with the trait-based interface
///
/// This function provides a unified interface for spawning any worker implementation
/// that implements the Worker trait.
///
/// # Arguments
///
/// * `worker` - The worker implementation to spawn
/// * `config` - Worker configuration
/// * `engine` - Rhai engine for script execution
/// * `shutdown_rx` - Channel receiver for shutdown signals
///
/// # Returns
///
/// Returns a `JoinHandle` that can be awaited to wait for worker shutdown.
///
/// # Example
///
/// ```rust
/// use std::sync::Arc;
/// use std::time::Duration;
///
/// let config = WorkerConfig::new(
/// "worker_1".to_string(),
/// "/path/to/db".to_string(),
/// "redis://localhost:6379".to_string(),
/// false,
/// );
///
/// let worker = Arc::new(SyncWorker::new());
/// let engine = create_heromodels_engine();
/// let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
///
/// let handle = spawn_worker(worker, config, engine, shutdown_rx);
///
/// // Later, shutdown the worker
/// shutdown_tx.send(()).await.unwrap();
/// handle.await.unwrap().unwrap();
/// ```
pub fn spawn_worker<W: Worker>(
worker: Arc<W>,
engine: Engine,
shutdown_rx: mpsc::Receiver<()>,
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
worker.spawn(engine, shutdown_rx)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::create_heromodels_engine;
// Mock worker for testing
struct MockWorker;
#[async_trait::async_trait]
impl Worker for MockWorker {
async fn process_job(
&self,
_job: Job,
_engine: Engine,
_redis_conn: &mut redis::aio::MultiplexedConnection,
) {
// Mock implementation - do nothing
}
fn worker_type(&self) -> &'static str {
"Mock"
}
fn worker_id(&self) -> &str {
"mock_worker"
}
fn redis_url(&self) -> &str {
"redis://localhost:6379"
}
}
#[tokio::test]
async fn test_worker_config_creation() {
let config = WorkerConfig::new(
"test_worker".to_string(),
"/tmp".to_string(),
"redis://localhost:6379".to_string(),
false,
);
assert_eq!(config.worker_id, "test_worker");
assert_eq!(config.db_path, "/tmp");
assert_eq!(config.redis_url, "redis://localhost:6379");
assert!(!config.preserve_tasks);
assert!(config.default_timeout.is_none());
}
#[tokio::test]
async fn test_worker_config_with_timeout() {
let timeout = Duration::from_secs(300);
let config = WorkerConfig::new(
"test_worker".to_string(),
"/tmp".to_string(),
"redis://localhost:6379".to_string(),
false,
).with_default_timeout(timeout);
assert_eq!(config.default_timeout, Some(timeout));
}
#[tokio::test]
async fn test_spawn_worker_function() {
let (_shutdown_tx, shutdown_rx) = mpsc::channel(1);
let config = WorkerConfig::new(
"test_worker".to_string(),
"/tmp".to_string(),
"redis://localhost:6379".to_string(),
false,
);
let engine = create_heromodels_engine();
let worker = Arc::new(MockWorker);
let handle = spawn_worker(worker, config, engine, shutdown_rx);
// The worker should be created successfully
assert!(!handle.is_finished());
// Abort the worker for cleanup
handle.abort();
}
}