archive unused / old code

This commit is contained in:
Timur Gordon
2025-08-05 12:59:02 +02:00
parent cd47d398de
commit 8ec7e70edf
25 changed files with 2 additions and 46 deletions

View File

@@ -0,0 +1,20 @@
[package]
name = "hero_examples"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "supervisor_worker_demo"
path = "supervisor_worker_demo.rs"
[dependencies]
hero_supervisor = { path = "../supervisor" }
hero_job = { path = "../job" }
tokio = { version = "1.0", features = ["full"] }
redis = { version = "0.25", features = ["tokio-comp"] }
serde_json = "1.0"
log = "0.4"
env_logger = "0.10"
colored = "2.0"
uuid = { version = "1.0", features = ["v4"] }
chrono = { version = "0.4", features = ["serde"] }

View File

@@ -0,0 +1,88 @@
//! Hero Supervisor Worker Demo
//!
//! This example demonstrates the new Hero Supervisor API with:
//! - Synchronous build() method
//! - Asynchronous start_workers() method
//! - Proper cleanup on program exit
//! - Signal handling for graceful shutdown
use colored::*;
use hero_supervisor::{SupervisorBuilder, ScriptType};
use std::time::Duration;
use tokio::signal;
async fn run_supervisor_demo() -> Result<(), Box<dyn std::error::Error>> {
println!("{}", "🚀 Hero Supervisor Demo - New API".cyan().bold());
println!("{}", "Building supervisor synchronously...".yellow());
// Build supervisor synchronously (no .await needed)
let supervisor = SupervisorBuilder::new()
.redis_url("redis://127.0.0.1:6379")
.osis_worker("/usr/local/bin/osis_worker")
.sal_worker("/usr/local/bin/sal_worker")
.v_worker("/usr/local/bin/v_worker")
.python_worker("/usr/local/bin/python_worker")
.worker_env_var("REDIS_URL", "redis://127.0.0.1:6379")
.worker_env_var("LOG_LEVEL", "info")
.build()?;
println!("{}", "✅ Supervisor built successfully!".green());
println!("{}", "Starting workers asynchronously...".yellow());
// Start workers asynchronously
supervisor.start_workers().await?;
println!("{}", "✅ All workers started successfully!".green());
// Demonstrate job creation and execution
println!("{}", "\n📋 Creating and running test jobs...".cyan().bold());
// Create and run a test job
println!("📝 Creating and running OSIS job...");
// Submit and run the job
match supervisor.new_job()
.script_type(ScriptType::OSIS)
.script("println('Hello from OSIS worker!')")
.timeout(Duration::from_secs(30))
.await_response().await {
Ok(result) => {
println!("{}", format!("✅ Job completed successfully: {}", result).green());
}
Err(e) => {
println!("{}", format!("❌ Job failed: {}", e).red());
}
}
// Wait for interrupt signal
println!("{}", "\n⏳ Press Ctrl+C to shutdown gracefully...".yellow());
signal::ctrl_c().await?;
println!("{}", "\n🛑 Shutdown signal received, cleaning up...".yellow().bold());
// Cleanup workers before exit
supervisor.cleanup_and_shutdown().await?;
println!("{}", "✅ Cleanup completed. Goodbye!".green().bold());
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Info)
.init();
println!("{}", "Hero Supervisor Demo".cyan().bold());
println!("{}", "This demo shows the new synchronous build API".yellow());
println!();
// Run the demo
if let Err(e) = run_supervisor_demo().await {
eprintln!("{}", format!("Demo failed: {}", e).red().bold());
std::process::exit(1);
}
Ok(())
}

View File

@@ -0,0 +1,66 @@
# Supervisor CLI
A command-line interface for the Hero Supervisor.
## Binary: `hive-supervisor`
### Installation
Build the binary:
```bash
cargo build --bin hive-supervisor --release
```
### Usage
```bash
# Basic usage
hive-supervisor --config <CONFIG_PATH>
```
Where config is toml file with the following structure:
```toml
[global]
redis_url = "redis://localhost:6379"
[osis_worker]
binary_path = "/path/to/osis_worker"
env_vars = { "VAR1" = "value1", "VAR2" = "value2" }
[sal_worker]
binary_path = "/path/to/sal_worker"
env_vars = { "VAR1" = "value1", "VAR2" = "value2" }
[v_worker]
binary_path = "/path/to/v_worker"
env_vars = { "VAR1" = "value1", "VAR2" = "value2" }
[python_worker]
binary_path = "/path/to/python_worker"
env_vars = { "VAR1" = "value1", "VAR2" = "value2" }
```
Lets have verbosity settings etc.
CLI Offers a few commands:
workers:
start
stop
restart
status
logs
list
jobs:
create
start
stop
restart
status
logs
list
repl: you can enter interactive mode to run scripts, however predefine caller_id, context_id and worker type so supervisor dispathces jobs accordingly

View File

@@ -0,0 +1,365 @@
use anyhow::Result;
use clap::Parser;
use crossterm::{
event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode, KeyEventKind},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use hero_supervisor::{Supervisor, SupervisorBuilder};
use zinit_client::ZinitClient;
use log::{error, info};
use ratatui::{
backend::CrosstermBackend,
layout::{Constraint, Direction, Layout, Rect},
style::{Color, Modifier, Style},
text::Line,
widgets::{
Block, Borders, List, ListItem, Paragraph, Tabs, Wrap,
},
Frame, Terminal,
};
use std::{
io,
path::PathBuf,
sync::Arc,
time::{Duration, Instant},
};
use tokio::time::sleep;
use toml;
use serde::Deserialize;
#[derive(Parser)]
#[command(name = "hive-supervisor-tui")]
#[command(about = "Hero Supervisor Terminal User Interface")]
struct Args {
#[arg(short, long, help = "Configuration file path")]
config: PathBuf,
#[arg(short, long, help = "Enable verbose logging")]
verbose: bool,
}
#[derive(Debug, Deserialize)]
struct Config {
global: GlobalConfig,
#[serde(flatten)]
workers: std::collections::HashMap<String, WorkerConfigToml>,
}
#[derive(Debug, Deserialize)]
struct GlobalConfig {
redis_url: String,
}
#[derive(Debug, Deserialize)]
struct WorkerConfigToml {
binary_path: String,
env_vars: Option<std::collections::HashMap<String, String>>,
}
#[derive(Debug, Clone, PartialEq)]
enum TabId {
Dashboard,
Workers,
Jobs,
Logs,
}
impl TabId {
fn all() -> Vec<TabId> {
vec![TabId::Dashboard, TabId::Workers, TabId::Jobs, TabId::Logs]
}
fn title(&self) -> &str {
match self {
TabId::Dashboard => "Dashboard",
TabId::Workers => "Workers",
TabId::Jobs => "Jobs",
TabId::Logs => "Logs",
}
}
}
struct App {
supervisor: Arc<Supervisor>,
current_tab: TabId,
should_quit: bool,
logs: Vec<String>,
last_update: Instant,
}
impl App {
fn new(supervisor: Arc<Supervisor>) -> Self {
Self {
supervisor,
current_tab: TabId::Dashboard,
should_quit: false,
logs: vec!["TUI started successfully".to_string()],
last_update: Instant::now(),
}
}
fn next_tab(&mut self) {
let tabs = TabId::all();
let current_index = tabs.iter().position(|t| *t == self.current_tab).unwrap_or(0);
let next_index = (current_index + 1) % tabs.len();
self.current_tab = tabs[next_index].clone();
}
fn prev_tab(&mut self) {
let tabs = TabId::all();
let current_index = tabs.iter().position(|t| *t == self.current_tab).unwrap_or(0);
let prev_index = if current_index == 0 { tabs.len() - 1 } else { current_index - 1 };
self.current_tab = tabs[prev_index].clone();
}
fn add_log(&mut self, message: String) {
self.logs.push(format!("[{}] {}",
chrono::Utc::now().format("%H:%M:%S"),
message
));
if self.logs.len() > 100 {
self.logs.remove(0);
}
}
fn handle_key(&mut self, key: KeyCode) -> bool {
match key {
KeyCode::Char('q') => {
self.should_quit = true;
true
}
KeyCode::Tab => {
self.next_tab();
false
}
KeyCode::BackTab => {
self.prev_tab();
false
}
_ => false
}
}
}
fn render_ui(f: &mut Frame, app: &mut App) {
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Length(3), Constraint::Min(0)].as_ref())
.split(f.area());
// Render tabs
let tabs_list = TabId::all();
let tab_titles: Vec<Line> = tabs_list
.iter()
.map(|t| Line::from(t.title()))
.collect();
let selected_tab = TabId::all().iter().position(|t| *t == app.current_tab).unwrap_or(0);
let tabs = Tabs::new(tab_titles)
.block(Block::default().borders(Borders::ALL).title("Hero Supervisor TUI"))
.select(selected_tab)
.style(Style::default().fg(Color::Cyan))
.highlight_style(Style::default().add_modifier(Modifier::BOLD).bg(Color::Black));
f.render_widget(tabs, chunks[0]);
// Render content based on selected tab
match app.current_tab {
TabId::Dashboard => render_dashboard(f, chunks[1], app),
TabId::Workers => render_workers(f, chunks[1], app),
TabId::Jobs => render_jobs(f, chunks[1], app),
TabId::Logs => render_logs(f, chunks[1], app),
}
}
fn render_dashboard(f: &mut Frame, area: Rect, app: &App) {
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Length(7), Constraint::Min(0)].as_ref())
.split(area);
// Status overview - supervisor is already running if we get here
let status_text = "Status: ✓ Running\nWorkers: Started successfully\nJobs: Ready for processing\n\nPress 'q' to quit, Tab to navigate";
let status_paragraph = Paragraph::new(status_text)
.block(Block::default().borders(Borders::ALL).title("System Status"))
.wrap(Wrap { trim: true });
f.render_widget(status_paragraph, chunks[0]);
// Recent logs
let log_items: Vec<ListItem> = app.logs
.iter()
.rev()
.take(10)
.map(|log| ListItem::new(log.as_str()))
.collect();
let logs_list = List::new(log_items)
.block(Block::default().borders(Borders::ALL).title("Recent Activity"));
f.render_widget(logs_list, chunks[1]);
}
fn render_workers(f: &mut Frame, area: Rect, _app: &App) {
let paragraph = Paragraph::new("Workers tab - Status checking not implemented yet to avoid system issues")
.block(Block::default().borders(Borders::ALL).title("Workers"))
.wrap(Wrap { trim: true });
f.render_widget(paragraph, area);
}
fn render_jobs(f: &mut Frame, area: Rect, _app: &App) {
let paragraph = Paragraph::new("Jobs tab - Job monitoring not implemented yet to avoid system issues")
.block(Block::default().borders(Borders::ALL).title("Jobs"))
.wrap(Wrap { trim: true });
f.render_widget(paragraph, area);
}
fn render_logs(f: &mut Frame, area: Rect, app: &App) {
let items: Vec<ListItem> = app.logs
.iter()
.map(|log| ListItem::new(log.as_str()))
.collect();
let logs_list = List::new(items)
.block(Block::default().borders(Borders::ALL).title("System Logs"));
f.render_widget(logs_list, area);
}
async fn run_app(
terminal: &mut Terminal<CrosstermBackend<io::Stdout>>,
app: &mut App,
) -> Result<()> {
loop {
terminal.draw(|f| render_ui(f, app))?;
// Simple, safe event handling
if event::poll(Duration::from_millis(100))? {
if let Event::Key(key) = event::read()? {
if key.kind == KeyEventKind::Press {
if app.handle_key(key.code) {
break;
}
}
}
}
if app.should_quit {
break;
}
// Small delay to prevent excessive CPU usage
sleep(Duration::from_millis(50)).await;
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
// Initialize logging
if args.verbose {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("debug")).init();
} else {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
}
info!("Hero Supervisor TUI - Fail-fast initialization");
// Step 1: Load and parse configuration
info!("Step 1/4: Loading configuration from {:?}", args.config);
let config_content = std::fs::read_to_string(&args.config)
.map_err(|e| anyhow::anyhow!("Failed to read config file: {}", e))?;
let config: Config = toml::from_str(&config_content)
.map_err(|e| anyhow::anyhow!("Failed to parse config file: {}", e))?;
info!("✓ Configuration loaded successfully");
// Step 2: Check if Zinit is running
info!("Step 2/4: Checking if Zinit is running...");
let zinit_client = ZinitClient::new("/tmp/zinit.sock");
match zinit_client.status("_test_connectivity").await {
Ok(_) => {
info!("✓ Zinit is running and accessible");
}
Err(e) => {
let error_msg = e.to_string();
if error_msg.contains("Connection refused") || error_msg.contains("No such file") {
eprintln!("Error: Zinit process manager is not running.");
eprintln!("Please start Zinit before running the supervisor TUI.");
eprintln!("Expected Zinit socket at: /tmp/zinit.sock");
std::process::exit(1);
} else {
info!("✓ Zinit is running (service not found is expected)");
}
}
}
// Step 3: Build supervisor
info!("Step 3/4: Building supervisor...");
let mut builder = SupervisorBuilder::new()
.redis_url(&config.global.redis_url);
for (worker_name, worker_config) in &config.workers {
match worker_name.as_str() {
"osis_worker" => builder = builder.osis_worker(&worker_config.binary_path),
"sal_worker" => builder = builder.sal_worker(&worker_config.binary_path),
"v_worker" => builder = builder.v_worker(&worker_config.binary_path),
"python_worker" => builder = builder.python_worker(&worker_config.binary_path),
_ => log::warn!("Unknown worker type: {}", worker_name),
}
if let Some(env_vars) = &worker_config.env_vars {
for (key, value) in env_vars {
builder = builder.worker_env_var(key, value);
}
}
}
let supervisor = Arc::new(builder.build()
.map_err(|e| anyhow::anyhow!("Failed to build supervisor: {}", e))?);
info!("✓ Supervisor built successfully");
// Step 4: Start supervisor and workers
info!("Step 4/4: Starting supervisor and workers...");
supervisor.start_workers().await
.map_err(|e| anyhow::anyhow!("Failed to start workers: {}", e))?;
info!("✓ All workers started successfully");
// All initialization successful - now start TUI
info!("Initialization complete - starting TUI...");
let mut app = App::new(Arc::clone(&supervisor));
// Setup terminal
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
// Run the app
let result = run_app(&mut terminal, &mut app).await;
// Cleanup
disable_raw_mode()?;
execute!(
terminal.backend_mut(),
LeaveAlternateScreen,
DisableMouseCapture
)?;
terminal.show_cursor()?;
// Cleanup supervisor
if let Err(e) = supervisor.cleanup_and_shutdown().await {
error!("Error during cleanup: {}", e);
}
info!("Hero Supervisor TUI shutdown complete");
result
}

View File

@@ -0,0 +1,236 @@
use clap::Parser;
use hero_supervisor::{Supervisor, SupervisorBuilder, ScriptType};
use log::{error, info};
use colored::Colorize;
use std::io::{self, Write};
use std::time::Duration;
#[derive(Parser, Debug)]
#[command(author, version, about = "Rhai Client - Script execution client", long_about = None)]
struct Args {
/// Caller ID (your identity)
#[arg(short = 'c', long = "caller-id", help = "Caller ID (your identity)")]
caller_id: String,
/// Context ID (execution context)
#[arg(short = 'k', long = "context-id", help = "Context ID (execution context)")]
context_id: String,
/// Script type to execute (osis, sal, v, python)
#[arg(short = 'T', long = "script-type", default_value = "osis", help = "Script type: osis, sal, v, or python")]
script_type: String,
/// Redis URL
#[arg(short, long, default_value = "redis://localhost:6379", help = "Redis connection URL")]
redis_url: String,
/// Rhai script to execute
#[arg(short, long, help = "Rhai script to execute")]
script: Option<String>,
/// Path to Rhai script file
#[arg(short, long, help = "Path to Rhai script file")]
file: Option<String>,
/// Timeout for script execution (in seconds)
#[arg(short, long, default_value = "30", help = "Timeout for script execution in seconds")]
timeout: u64,
/// Increase verbosity (can be used multiple times)
#[arg(short, long, action = clap::ArgAction::Count, help = "Increase verbosity (-v for debug, -vv for trace)")]
verbose: u8,
/// Disable timestamps in log output
#[arg(long, help = "Remove timestamps from log output")]
no_timestamp: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
// Configure logging based on verbosity level
let log_config = match args.verbose {
0 => "warn,hero_supervisor=warn",
1 => "info,hero_supervisor=info",
2 => "debug,hero_supervisor=debug",
_ => "trace,hero_supervisor=trace",
};
std::env::set_var("RUST_LOG", log_config);
// Configure env_logger with or without timestamps
if args.no_timestamp {
env_logger::Builder::from_default_env()
.format_timestamp(None)
.init();
} else {
env_logger::init();
}
// Validate script type
match args.script_type.to_lowercase().as_str() {
"osis" | "sal" | "v" | "python" => {
// Valid script types - no worker validation needed since we use hardcoded queues
}
_ => {
error!("❌ Invalid script type: {}. Valid types: osis, sal, v, python", args.script_type);
return Err(format!("Invalid script type: {}", args.script_type).into());
}
}
if args.verbose > 0 {
info!("🔗 Starting Hero Supervisor");
info!("📋 Configuration:");
info!(" Caller ID: {}", args.caller_id);
info!(" Context ID: {}", args.context_id);
info!(" Script Type: {}", args.script_type);
info!(" Redis URL: {}", args.redis_url);
info!(" Timeout: {}s", args.timeout);
info!(" Using hardcoded worker queues for script type: {}", args.script_type);
info!("");
}
// Create the supervisor client
let client = SupervisorBuilder::new()
.redis_url(&args.redis_url)
.build()?;
if args.verbose > 0 {
info!("✅ Connected to Redis at {}", args.redis_url);
}
// Determine execution mode
if let Some(script_content) = args.script {
// Execute inline script
if args.verbose > 0 {
info!("📜 Executing inline script");
}
execute_script(&client, script_content, &args.script_type, args.timeout).await?;
} else if let Some(file_path) = args.file {
// Execute script from file
if args.verbose > 0 {
info!("📁 Loading script from file: {}", file_path);
}
let script_content = std::fs::read_to_string(&file_path)
.map_err(|e| format!("Failed to read script file '{}': {}", file_path, e))?;
execute_script(&client, script_content, &args.script_type, args.timeout).await?;
} else {
// Interactive mode
info!("🎮 Entering interactive mode");
info!("Type Rhai scripts and press Enter to execute. Type 'exit' or 'quit' to close.");
run_interactive_mode(&client, &args.script_type, args.timeout, args.verbose).await?;
}
Ok(())
}
async fn execute_script(
client: &Supervisor,
script: String,
script_type_str: &str,
timeout_secs: u64,
) -> Result<(), Box<dyn std::error::Error>> {
info!("⚡ Executing script: {:.50}...", script);
// Parse script type
let script_type = match script_type_str.to_lowercase().as_str() {
"osis" => ScriptType::OSIS,
"sal" => ScriptType::SAL,
"v" => ScriptType::V,
"python" => ScriptType::Python,
_ => {
error!("❌ Invalid script type: {}. Valid types: osis, sal, v, python", script_type_str);
return Err(format!("Invalid script type: {}", script_type_str).into());
}
};
let timeout = Duration::from_secs(timeout_secs);
match client
.new_job()
.script_type(script_type)
.script(&script)
.timeout(timeout)
.await_response()
.await
{
Ok(result) => {
info!("✅ Script execution completed");
println!("{}", "Result:".green().bold());
println!("{}", result);
}
Err(e) => {
error!("❌ Script execution failed: {}", e);
return Err(Box::new(e));
}
}
Ok(())
}
async fn run_interactive_mode(
client: &Supervisor,
script_type_str: &str,
timeout_secs: u64,
verbose: u8,
) -> Result<(), Box<dyn std::error::Error>> {
// Parse script type
let script_type = match script_type_str.to_lowercase().as_str() {
"osis" => ScriptType::OSIS,
"sal" => ScriptType::SAL,
"v" => ScriptType::V,
"python" => ScriptType::Python,
_ => {
error!("❌ Invalid script type: {}. Valid types: osis, sal, v, python", script_type_str);
return Err(format!("Invalid script type: {}", script_type_str).into());
}
};
let timeout = Duration::from_secs(timeout_secs);
loop {
print!("rhai> ");
io::stdout().flush()?;
let mut input = String::new();
io::stdin().read_line(&mut input)?;
let input = input.trim();
if input.is_empty() {
continue;
}
if input == "exit" || input == "quit" {
info!("👋 Goodbye!");
break;
}
if verbose > 0 {
info!("⚡ Executing: {}", input);
}
match client
.new_job()
.script_type(script_type.clone())
.script(input)
.timeout(timeout)
.await_response()
.await
{
Ok(result) => {
println!("{}", result.green());
}
Err(e) => {
println!("{}", format!("error: {}", e).red());
}
}
println!(); // Add blank line for readability
}
Ok(())
}

View File

@@ -0,0 +1,113 @@
# Rhai Worker Binary
A command-line worker for executing Rhai scripts from Redis task queues.
## Binary: `worker`
### Installation
Build the binary:
```bash
cargo build --bin worker --release
```
### Usage
```bash
# Basic usage - requires circle public key
worker --circle-public-key <CIRCLE_PUBLIC_KEY>
# Custom Redis URL
worker -c <CIRCLE_PUBLIC_KEY> --redis-url redis://localhost:6379/1
# Custom worker ID and database path
worker -c <CIRCLE_PUBLIC_KEY> --worker-id my_worker --db-path /tmp/worker_db
# Preserve tasks for debugging/benchmarking
worker -c <CIRCLE_PUBLIC_KEY> --preserve-tasks
# Remove timestamps from logs
worker -c <CIRCLE_PUBLIC_KEY> --no-timestamp
# Increase verbosity
worker -c <CIRCLE_PUBLIC_KEY> -v # Debug logging
worker -c <CIRCLE_PUBLIC_KEY> -vv # Full debug
worker -c <CIRCLE_PUBLIC_KEY> -vvv # Trace logging
```
### Command-Line Options
| Option | Short | Default | Description |
|--------|-------|---------|-------------|
| `--circle-public-key` | `-c` | **Required** | Circle public key to listen for tasks |
| `--redis-url` | `-r` | `redis://localhost:6379` | Redis connection URL |
| `--worker-id` | `-w` | `worker_1` | Unique worker identifier |
| `--preserve-tasks` | | `false` | Preserve task details after completion |
| `--db-path` | | `worker_rhai_temp_db` | Database path for Rhai engine |
| `--no-timestamp` | | `false` | Remove timestamps from log output |
| `--verbose` | `-v` | | Increase verbosity (stackable) |
### Features
- **Task Queue Processing**: Listens to Redis queues for Rhai script execution tasks
- **Performance Optimized**: Configured for maximum Rhai engine performance
- **Graceful Shutdown**: Supports shutdown signals for clean termination
- **Flexible Logging**: Configurable verbosity and timestamp control
- **Database Integration**: Uses heromodels for data persistence
- **Task Cleanup**: Optional task preservation for debugging/benchmarking
### How It Works
1. **Queue Listening**: Worker listens on Redis queue `rhailib:{circle_public_key}`
2. **Task Processing**: Receives task IDs, fetches task details from Redis
3. **Script Execution**: Executes Rhai scripts with configured engine
4. **Result Handling**: Updates task status and sends results to reply queues
5. **Cleanup**: Optionally cleans up task details after completion
### Configuration Examples
#### Development Worker
```bash
# Simple development worker
worker -c dev_circle_123
# Development with verbose logging (no timestamps)
worker -c dev_circle_123 -v --no-timestamp
```
#### Production Worker
```bash
# Production worker with custom configuration
worker \
--circle-public-key prod_circle_456 \
--redis-url redis://redis-server:6379/0 \
--worker-id prod_worker_1 \
--db-path /var/lib/worker/db \
--preserve-tasks
```
#### Benchmarking Worker
```bash
# Worker optimized for benchmarking
worker \
--circle-public-key bench_circle_789 \
--preserve-tasks \
--no-timestamp \
-vv
```
### Error Handling
The worker provides clear error messages for:
- Missing or invalid circle public key
- Redis connection failures
- Script execution errors
- Database access issues
### Dependencies
- `rhailib_engine`: Rhai engine with heromodels integration
- `redis`: Redis client for task queue management
- `rhai`: Script execution engine
- `clap`: Command-line argument parsing
- `env_logger`: Logging infrastructure

View File

@@ -0,0 +1,233 @@
//! OSIS Worker Binary - Synchronous worker for system-level operations
use clap::Parser;
use log::{error, info};
use rhailib_worker::config::{ConfigError, WorkerConfig};
use rhailib_worker::engine::create_heromodels_engine;
use rhailib_worker::sync_worker::SyncWorker;
use rhailib_worker::worker_trait::{spawn_worker, WorkerConfig as TraitWorkerConfig};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::signal;
use tokio::sync::mpsc;
#[derive(Parser, Debug)]
#[command(
name = "osis",
version = "0.1.0",
about = "OSIS (Operating System Integration Service) - Synchronous Worker",
long_about = "A synchronous worker for Hero framework that processes jobs sequentially. \
Ideal for system-level operations that require careful resource management."
)]
struct Args {
/// Path to TOML configuration file
#[arg(short, long, help = "Path to TOML configuration file")]
config: PathBuf,
/// Override worker ID from config
#[arg(long, help = "Override worker ID from configuration file")]
worker_id: Option<String>,
/// Override Redis URL from config
#[arg(long, help = "Override Redis URL from configuration file")]
redis_url: Option<String>,
/// Override database path from config
#[arg(long, help = "Override database path from configuration file")]
db_path: Option<String>,
/// Enable verbose logging (debug level)
#[arg(short, long, help = "Enable verbose logging")]
verbose: bool,
/// Disable timestamps in log output
#[arg(long, help = "Remove timestamps from log output")]
no_timestamp: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let args = Args::parse();
// Load configuration from TOML file
let mut config = match WorkerConfig::from_file(&args.config) {
Ok(config) => config,
Err(e) => {
eprintln!("Failed to load configuration from {:?}: {}", args.config, e);
std::process::exit(1);
}
};
// Validate that this is a sync worker configuration
if !config.is_sync() {
eprintln!("Error: OSIS worker requires a sync worker configuration");
eprintln!("Expected: [worker_type] type = \"sync\"");
eprintln!("Found: {:?}", config.worker_type);
std::process::exit(1);
}
// Apply command line overrides
if let Some(worker_id) = args.worker_id {
config.worker_id = worker_id;
}
if let Some(redis_url) = args.redis_url {
config.redis_url = redis_url;
}
if let Some(db_path) = args.db_path {
config.db_path = db_path;
}
// Configure logging
setup_logging(&config, args.verbose, args.no_timestamp)?;
info!("🚀 OSIS Worker starting...");
info!("Worker ID: {}", config.worker_id);
info!("Redis URL: {}", config.redis_url);
info!("Database Path: {}", config.db_path);
info!("Preserve Tasks: {}", config.preserve_tasks);
// Create Rhai engine
let engine = create_heromodels_engine();
info!("✅ Rhai engine initialized");
// Create worker configuration for the trait-based interface
let worker_config = TraitWorkerConfig::new(
config.worker_id.clone(),
config.db_path.clone(),
config.redis_url.clone(),
config.preserve_tasks,
);
// Create sync worker instance
let worker = Arc::new(SyncWorker::default());
info!("✅ Sync worker instance created");
// Setup shutdown signal handling
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
// Spawn shutdown signal handler
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
if let Err(e) = signal::ctrl_c().await {
error!("Failed to listen for shutdown signal: {}", e);
return;
}
info!("🛑 Shutdown signal received");
if let Err(e) = shutdown_tx_clone.send(()).await {
error!("Failed to send shutdown signal: {}", e);
}
});
// Spawn the worker
info!("🔄 Starting worker loop...");
let worker_handle = spawn_worker(worker, engine, shutdown_rx);
// Wait for the worker to complete
match worker_handle.await {
Ok(Ok(())) => {
info!("✅ OSIS Worker shut down gracefully");
}
Ok(Err(e)) => {
error!("❌ OSIS Worker encountered an error: {}", e);
std::process::exit(1);
}
Err(e) => {
error!("❌ Failed to join worker task: {}", e);
std::process::exit(1);
}
}
Ok(())
}
/// Setup logging based on configuration and command line arguments
fn setup_logging(
config: &WorkerConfig,
verbose: bool,
no_timestamp: bool,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut builder = env_logger::Builder::new();
// Determine log level
let log_level = if verbose {
"debug"
} else {
&config.logging.level
};
// Set log level
builder.filter_level(match log_level.to_lowercase().as_str() {
"trace" => log::LevelFilter::Trace,
"debug" => log::LevelFilter::Debug,
"info" => log::LevelFilter::Info,
"warn" => log::LevelFilter::Warn,
"error" => log::LevelFilter::Error,
_ => {
eprintln!("Invalid log level: {}. Using 'info'", log_level);
log::LevelFilter::Info
}
});
// Configure timestamps
let show_timestamps = !no_timestamp && config.logging.timestamps;
if !show_timestamps {
builder.format_timestamp(None);
}
builder.init();
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_config_validation() {
let config_toml = r#"
worker_id = "test_osis"
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[worker_type]
type = "sync"
[logging]
level = "info"
"#;
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_toml.as_bytes()).unwrap();
let config = WorkerConfig::from_file(temp_file.path()).unwrap();
assert!(config.is_sync());
assert!(!config.is_async());
assert_eq!(config.worker_id, "test_osis");
}
#[test]
fn test_async_config_rejection() {
let config_toml = r#"
worker_id = "test_osis"
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[worker_type]
type = "async"
default_timeout_seconds = 300
[logging]
level = "info"
"#;
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_toml.as_bytes()).unwrap();
let config = WorkerConfig::from_file(temp_file.path()).unwrap();
assert!(!config.is_sync());
assert!(config.is_async());
// This would be rejected in main() function
}
}

View File

@@ -0,0 +1,302 @@
//! System Worker Binary - Asynchronous worker for high-throughput concurrent processing
use clap::Parser;
use log::{error, info, warn};
use rhailib_worker::async_worker_impl::AsyncWorker;
use rhailib_worker::config::{ConfigError, WorkerConfig};
use rhailib_worker::engine::create_heromodels_engine;
use rhailib_worker::worker_trait::{spawn_worker, WorkerConfig as TraitWorkerConfig};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::signal;
use tokio::sync::mpsc;
#[derive(Parser, Debug)]
#[command(
name = "system",
version = "0.1.0",
about = "System Worker - Asynchronous Worker with Concurrent Job Processing",
long_about = "An asynchronous worker for Hero framework that processes multiple jobs \
concurrently with timeout support. Ideal for high-throughput scenarios \
where jobs can be executed in parallel."
)]
struct Args {
/// Path to TOML configuration file
#[arg(short, long, help = "Path to TOML configuration file")]
config: PathBuf,
/// Override worker ID from config
#[arg(long, help = "Override worker ID from configuration file")]
worker_id: Option<String>,
/// Override Redis URL from config
#[arg(long, help = "Override Redis URL from configuration file")]
redis_url: Option<String>,
/// Override database path from config
#[arg(long, help = "Override database path from configuration file")]
db_path: Option<String>,
/// Override default timeout in seconds
#[arg(long, help = "Override default job timeout in seconds")]
timeout: Option<u64>,
/// Enable verbose logging (debug level)
#[arg(short, long, help = "Enable verbose logging")]
verbose: bool,
/// Disable timestamps in log output
#[arg(long, help = "Remove timestamps from log output")]
no_timestamp: bool,
/// Show worker statistics periodically
#[arg(long, help = "Show periodic worker statistics")]
show_stats: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let args = Args::parse();
// Load configuration from TOML file
let mut config = match WorkerConfig::from_file(&args.config) {
Ok(config) => config,
Err(e) => {
eprintln!("Failed to load configuration from {:?}: {}", args.config, e);
std::process::exit(1);
}
};
// Validate that this is an async worker configuration
if !config.is_async() {
eprintln!("Error: System worker requires an async worker configuration");
eprintln!("Expected: [worker_type] type = \"async\"");
eprintln!("Found: {:?}", config.worker_type);
std::process::exit(1);
}
// Apply command line overrides
if let Some(worker_id) = args.worker_id {
config.worker_id = worker_id;
}
if let Some(redis_url) = args.redis_url {
config.redis_url = redis_url;
}
if let Some(db_path) = args.db_path {
config.db_path = db_path;
}
// Override timeout if specified
if let Some(timeout_secs) = args.timeout {
if let rhailib_worker::config::WorkerType::Async { ref mut default_timeout_seconds } = config.worker_type {
*default_timeout_seconds = timeout_secs;
}
}
// Configure logging
setup_logging(&config, args.verbose, args.no_timestamp)?;
info!("🚀 System Worker starting...");
info!("Worker ID: {}", config.worker_id);
info!("Redis URL: {}", config.redis_url);
info!("Database Path: {}", config.db_path);
info!("Preserve Tasks: {}", config.preserve_tasks);
if let Some(timeout) = config.get_default_timeout() {
info!("Default Timeout: {:?}", timeout);
}
// Create Rhai engine
let engine = create_heromodels_engine();
info!("✅ Rhai engine initialized");
// Create worker configuration for the trait-based interface
let mut worker_config = TraitWorkerConfig::new(
config.worker_id.clone(),
config.db_path.clone(),
config.redis_url.clone(),
config.preserve_tasks,
);
// Add timeout configuration for async worker
if let Some(timeout) = config.get_default_timeout() {
worker_config = worker_config.with_default_timeout(timeout);
}
// Create async worker instance
let worker = Arc::new(AsyncWorker::default());
info!("✅ Async worker instance created");
// Setup shutdown signal handling
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
// Spawn shutdown signal handler
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
if let Err(e) = signal::ctrl_c().await {
error!("Failed to listen for shutdown signal: {}", e);
return;
}
info!("🛑 Shutdown signal received");
if let Err(e) = shutdown_tx_clone.send(()).await {
error!("Failed to send shutdown signal: {}", e);
}
});
// Spawn statistics reporter if requested
if args.show_stats {
let worker_stats = Arc::clone(&worker);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
let running_count = worker_stats.running_job_count().await;
if running_count > 0 {
info!("📊 Worker Stats: {} jobs currently running", running_count);
} else {
info!("📊 Worker Stats: No jobs currently running");
}
}
});
}
// Spawn the worker
info!("🔄 Starting worker loop...");
let worker_handle = spawn_worker(worker, engine, shutdown_rx);
// Wait for the worker to complete
match worker_handle.await {
Ok(Ok(())) => {
info!("✅ System Worker shut down gracefully");
}
Ok(Err(e)) => {
error!("❌ System Worker encountered an error: {}", e);
std::process::exit(1);
}
Err(e) => {
error!("❌ Failed to join worker task: {}", e);
std::process::exit(1);
}
}
Ok(())
}
/// Setup logging based on configuration and command line arguments
fn setup_logging(
config: &WorkerConfig,
verbose: bool,
no_timestamp: bool,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut builder = env_logger::Builder::new();
// Determine log level
let log_level = if verbose {
"debug"
} else {
&config.logging.level
};
// Set log level
builder.filter_level(match log_level.to_lowercase().as_str() {
"trace" => log::LevelFilter::Trace,
"debug" => log::LevelFilter::Debug,
"info" => log::LevelFilter::Info,
"warn" => log::LevelFilter::Warn,
"error" => log::LevelFilter::Error,
_ => {
warn!("Invalid log level: {}. Using 'info'", log_level);
log::LevelFilter::Info
}
});
// Configure timestamps
let show_timestamps = !no_timestamp && config.logging.timestamps;
if !show_timestamps {
builder.format_timestamp(None);
}
builder.init();
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_config_validation() {
let config_toml = r#"
worker_id = "test_system"
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[worker_type]
type = "async"
default_timeout_seconds = 600
[logging]
level = "info"
"#;
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_toml.as_bytes()).unwrap();
let config = WorkerConfig::from_file(temp_file.path()).unwrap();
assert!(!config.is_sync());
assert!(config.is_async());
assert_eq!(config.worker_id, "test_system");
assert_eq!(config.get_default_timeout(), Some(Duration::from_secs(600)));
}
#[test]
fn test_sync_config_rejection() {
let config_toml = r#"
worker_id = "test_system"
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[worker_type]
type = "sync"
[logging]
level = "info"
"#;
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_toml.as_bytes()).unwrap();
let config = WorkerConfig::from_file(temp_file.path()).unwrap();
assert!(config.is_sync());
assert!(!config.is_async());
// This would be rejected in main() function
}
#[test]
fn test_timeout_override() {
let config_toml = r#"
worker_id = "test_system"
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[worker_type]
type = "async"
default_timeout_seconds = 300
"#;
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_toml.as_bytes()).unwrap();
let mut config = WorkerConfig::from_file(temp_file.path()).unwrap();
assert_eq!(config.get_default_timeout(), Some(Duration::from_secs(300)));
// Test timeout override
if let rhailib_worker::config::WorkerType::Async { ref mut default_timeout_seconds } = config.worker_type {
*default_timeout_seconds = 600;
}
assert_eq!(config.get_default_timeout(), Some(Duration::from_secs(600)));
}
}

View File

@@ -0,0 +1,95 @@
use clap::Parser;
use rhailib_worker::engine::create_heromodels_engine;
use rhailib_worker::spawn_rhai_worker;
use tokio::sync::mpsc;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Worker ID for identification
#[arg(short, long)]
worker_id: String,
/// Redis URL
#[arg(short, long, default_value = "redis://localhost:6379")]
redis_url: String,
/// Preserve task details after completion (for benchmarking)
#[arg(long, default_value = "false")]
preserve_tasks: bool,
/// Root directory for engine database
#[arg(long, default_value = "worker_rhai_temp_db")]
db_path: String,
/// Disable timestamps in log output
#[arg(long, help = "Remove timestamps from log output")]
no_timestamp: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let args = Args::parse();
// Configure env_logger with or without timestamps
if args.no_timestamp {
env_logger::Builder::from_default_env()
.format_timestamp(None)
.init();
} else {
env_logger::init();
}
log::info!("Rhai Worker (binary) starting with performance-optimized engine.");
log::info!(
"Worker ID: {}, Redis: {}",
args.worker_id,
args.redis_url
);
let mut engine = create_heromodels_engine();
// Performance optimizations for benchmarking
engine.set_max_operations(0); // Unlimited operations for performance testing
engine.set_max_expr_depths(0, 0); // Unlimited expression depth
engine.set_max_string_size(0); // Unlimited string size
engine.set_max_array_size(0); // Unlimited array size
engine.set_max_map_size(0); // Unlimited map size
// Enable full optimization for maximum performance
engine.set_optimization_level(rhai::OptimizationLevel::Full);
log::info!("Engine configured for maximum performance");
// Create shutdown channel (for graceful shutdown, though not used in benchmarks)
let (_shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
// Spawn the worker
let worker_handle = spawn_rhai_worker(
args.worker_id,
args.db_path,
engine,
args.redis_url,
shutdown_rx,
args.preserve_tasks,
);
// Wait for the worker to complete
match worker_handle.await {
Ok(result) => match result {
Ok(_) => {
log::info!("Worker completed successfully");
Ok(())
}
Err(e) => {
log::error!("Worker failed: {}", e);
Err(e)
}
},
Err(e) => {
log::error!("Worker task panicked: {}", e);
Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
}
}
}

View File

@@ -0,0 +1,197 @@
# Worker Examples
This directory contains example configurations and test scripts for both OSIS and System worker binaries.
## Overview
Both examples demonstrate the ping/pong functionality built into the Hero workers:
- Workers automatically detect jobs with script content "ping"
- They respond immediately with "pong" without executing the Rhai engine
- This provides a fast health check and connectivity test mechanism
## Prerequisites
1. **Redis Server**: Both examples require a running Redis server
```bash
# Install Redis (macOS)
brew install redis
# Start Redis server
redis-server
```
2. **Rust Environment**: Make sure you can build the worker binaries
```bash
cd /path/to/herocode/hero/core/worker
cargo build --bin osis --bin system
```
## OSIS Worker Example
**Location**: `examples/osis/`
The OSIS (Operating System Integration Service) worker processes jobs synchronously, one at a time.
### Files
- `config.toml` - Configuration for the OSIS worker
- `example.sh` - Test script that demonstrates ping/pong functionality
### Usage
```bash
cd examples/osis
./example.sh
```
### What the script does:
1. Checks Redis connectivity
2. Cleans up any existing jobs
3. Starts the OSIS worker in the background
4. Sends 3 ping jobs sequentially
5. Verifies each job receives a "pong" response
6. Reports success/failure statistics
7. Cleans up worker and Redis data
### Expected Output
```
=== OSIS Worker Example ===
✅ Redis is running
✅ OSIS worker started (PID: 12345)
📤 Sending ping job: ping_job_1_1234567890
✅ Job ping_job_1_1234567890 completed successfully with result: pong
...
🎉 All tests passed! OSIS worker is working correctly.
```
## System Worker Example
**Location**: `examples/system/`
The System worker processes jobs asynchronously, handling multiple jobs concurrently.
### Files
- `config.toml` - Configuration for the System worker (includes async settings)
- `example.sh` - Test script that demonstrates concurrent ping/pong functionality
### Usage
```bash
cd examples/system
./example.sh
```
### What the script does:
1. Checks Redis connectivity
2. Cleans up any existing jobs
3. Starts the System worker with stats reporting
4. Sends 5 concurrent ping jobs
5. Sends 10 rapid-fire ping jobs to test async capabilities
6. Verifies all jobs receive "pong" responses
7. Reports comprehensive success/failure statistics
8. Cleans up worker and Redis data
### Expected Output
```
=== System Worker Example ===
✅ Redis is running
✅ System worker started (PID: 12345)
📤 Sending ping job: ping_job_1_1234567890123
✅ Job ping_job_1_1234567890123 completed successfully with result: pong
...
🎉 All tests passed! System worker is handling concurrent jobs correctly.
Overall success rate: 15/15
```
## Configuration Details
### OSIS Configuration (`examples/osis/config.toml`)
```toml
worker_id = "osis_example_worker"
redis_url = "redis://localhost:6379"
db_path = "/tmp/osis_example_db"
preserve_tasks = false
[worker_type]
type = "sync"
[logging]
timestamps = true
level = "info"
```
### System Configuration (`examples/system/config.toml`)
```toml
worker_id = "system_example_worker"
redis_url = "redis://localhost:6379"
db_path = "/tmp/system_example_db"
preserve_tasks = false
[worker_type]
type = "async"
default_timeout_seconds = 30
[logging]
timestamps = true
level = "info"
```
## Key Differences
| Feature | OSIS Worker | System Worker |
|---------|-------------|---------------|
| **Processing** | Sequential (one job at a time) | Concurrent (multiple jobs simultaneously) |
| **Use Case** | System-level operations requiring resource management | High-throughput job processing |
| **Timeout** | No timeout configuration | Configurable job timeouts |
| **Stats** | Basic logging | Optional statistics reporting (`--show-stats`) |
| **Job Handling** | Blocking job execution | Non-blocking async job execution |
## Troubleshooting
### Redis Connection Issues
```bash
# Check if Redis is running
redis-cli ping
# Check Redis logs
redis-server --loglevel verbose
```
### Worker Compilation Issues
```bash
# Clean and rebuild
cargo clean
cargo build --bin osis --bin system
```
### Job Processing Issues
- Check Redis for stuck jobs: `redis-cli keys "hero:*"`
- Clear all Hero jobs: `redis-cli eval "return redis.call('del', unpack(redis.call('keys', 'hero:*')))" 0`
- Check worker logs for detailed error messages
## Extending the Examples
### Adding Custom Jobs
To test with custom Rhai scripts instead of ping jobs:
1. Modify the job creation in the shell scripts:
```bash
# Replace "ping" with your Rhai script
redis-cli -u "$REDIS_URL" hset "hero:job:$job_id" \
script "your_rhai_script_here"
```
2. Update result verification to expect your script's output instead of "pong"
### Testing Different Configurations
- Modify `config.toml` files to test different Redis URLs, database paths, or logging levels
- Test with `preserve_tasks = true` to inspect job details after completion
- Adjust timeout values in the System worker configuration
## Architecture Notes
Both examples demonstrate the unified Worker trait architecture:
- **Common Interface**: Both workers implement the same `Worker` trait
- **Ping/Pong Handling**: Built into the trait's `spawn` method before job delegation
- **Redis Integration**: Uses the shared Job struct from `hero_job` crate
- **Configuration**: TOML-based configuration with CLI overrides
- **Graceful Shutdown**: Both workers handle SIGTERM/SIGINT properly
This architecture allows for easy extension with new worker types while maintaining consistent behavior and configuration patterns.

View File

@@ -0,0 +1,11 @@
worker_id = "osis_example_worker"
redis_url = "redis://localhost:6379"
db_path = "/tmp/osis_example_db"
preserve_tasks = false
[worker_type]
type = "sync"
[logging]
timestamps = true
level = "info"

View File

@@ -0,0 +1,138 @@
#!/bin/bash
# OSIS Worker Example Script
# This script demonstrates the OSIS worker by:
# 1. Starting the worker with the config.toml
# 2. Sending ping jobs to Redis
# 3. Verifying pong responses
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
CONFIG_FILE="$SCRIPT_DIR/config.toml"
WORKER_ID="osis_example_worker"
REDIS_URL="redis://localhost:6379"
echo "=== OSIS Worker Example ==="
echo "Script directory: $SCRIPT_DIR"
echo "Config file: $CONFIG_FILE"
echo "Worker ID: $WORKER_ID"
echo "Redis URL: $REDIS_URL"
echo
# Check if Redis is running
echo "Checking Redis connection..."
if ! redis-cli -u "$REDIS_URL" ping > /dev/null 2>&1; then
echo "❌ Error: Redis is not running or not accessible at $REDIS_URL"
echo "Please start Redis server first: redis-server"
exit 1
fi
echo "✅ Redis is running"
echo
# Clean up any existing jobs in the queue
echo "Cleaning up existing jobs in Redis..."
redis-cli -u "$REDIS_URL" del "hero:jobs:$WORKER_ID" > /dev/null 2>&1 || true
redis-cli -u "$REDIS_URL" eval "return redis.call('del', unpack(redis.call('keys', 'hero:job:*')))" 0 > /dev/null 2>&1 || true
echo "✅ Redis queues cleaned"
echo
# Start the OSIS worker in the background
echo "Starting OSIS worker..."
cd "$SCRIPT_DIR/../.."
cargo run --bin osis -- --config "$CONFIG_FILE" &
WORKER_PID=$!
echo "✅ OSIS worker started (PID: $WORKER_PID)"
echo
# Wait a moment for the worker to initialize
echo "Waiting for worker to initialize..."
sleep 3
# Function to send a ping job and check for pong response
send_ping_job() {
local job_num=$1
local job_id="ping_job_${job_num}_$(date +%s)"
echo "📤 Sending ping job: $job_id"
# Create job in Redis
redis-cli -u "$REDIS_URL" hset "hero:job:$job_id" \
id "$job_id" \
script "ping" \
status "Queued" \
created_at "$(date -u +%Y-%m-%dT%H:%M:%SZ)" \
worker_id "$WORKER_ID" > /dev/null
# Add job to worker queue
redis-cli -u "$REDIS_URL" lpush "hero:jobs:$WORKER_ID" "$job_id" > /dev/null
# Wait for job completion and check result
local timeout=10
local elapsed=0
while [ $elapsed -lt $timeout ]; do
local status=$(redis-cli -u "$REDIS_URL" hget "hero:job:$job_id" status 2>/dev/null || echo "")
if [ "$status" = "Finished" ]; then
local result=$(redis-cli -u "$REDIS_URL" hget "hero:job:$job_id" result 2>/dev/null || echo "")
if [ "$result" = "pong" ]; then
echo "✅ Job $job_id completed successfully with result: $result"
return 0
else
echo "❌ Job $job_id completed but with unexpected result: $result"
return 1
fi
elif [ "$status" = "Error" ]; then
local error=$(redis-cli -u "$REDIS_URL" hget "hero:job:$job_id" error 2>/dev/null || echo "")
echo "❌ Job $job_id failed with error: $error"
return 1
fi
sleep 1
elapsed=$((elapsed + 1))
done
echo "❌ Job $job_id timed out after ${timeout}s"
return 1
}
# Send multiple ping jobs to test the worker
echo "Testing ping/pong functionality..."
success_count=0
total_jobs=3
for i in $(seq 1 $total_jobs); do
echo
echo "--- Test $i/$total_jobs ---"
if send_ping_job $i; then
success_count=$((success_count + 1))
fi
sleep 1
done
echo
echo "=== Test Results ==="
echo "Successful ping/pong tests: $success_count/$total_jobs"
if [ $success_count -eq $total_jobs ]; then
echo "🎉 All tests passed! OSIS worker is working correctly."
exit_code=0
else
echo "⚠️ Some tests failed. Check the worker logs for details."
exit_code=1
fi
# Clean up
echo
echo "Cleaning up..."
echo "Stopping OSIS worker (PID: $WORKER_PID)..."
kill $WORKER_PID 2>/dev/null || true
wait $WORKER_PID 2>/dev/null || true
echo "✅ Worker stopped"
echo "Cleaning up Redis jobs..."
redis-cli -u "$REDIS_URL" del "hero:jobs:$WORKER_ID" > /dev/null 2>&1 || true
redis-cli -u "$REDIS_URL" eval "return redis.call('del', unpack(redis.call('keys', 'hero:job:*')))" 0 > /dev/null 2>&1 || true
echo "✅ Redis cleaned up"
echo
echo "=== OSIS Worker Example Complete ==="
exit $exit_code

View File

@@ -0,0 +1,14 @@
# OSIS Worker Configuration
# Synchronous worker for system-level operations
worker_id = "osis_worker_1"
redis_url = "redis://localhost:6379"
db_path = "/tmp/osis_worker_db"
preserve_tasks = false
[worker_type]
type = "sync"
[logging]
timestamps = true
level = "info"

View File

@@ -0,0 +1,60 @@
use std::process::{Command, Stdio};
use std::path::Path;
use std::env;
use std::io::{self, Write};
/// OSIS Worker Demo Runner
///
/// This Rust wrapper executes the OSIS worker bash script example.
/// It provides a way to run shell-based examples through Cargo.
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("🚀 OSIS Worker Demo");
println!("==================");
println!();
// Get the current working directory and construct the path to the shell script
let current_dir = env::current_dir()?;
let script_path = current_dir.join("examples").join("osis").join("example.sh");
// Check if the script exists
if !script_path.exists() {
eprintln!("❌ Error: Script not found at {:?}", script_path);
eprintln!(" Make sure you're running this from the worker crate root directory.");
std::process::exit(1);
}
println!("📁 Script location: {:?}", script_path);
println!("🔧 Executing OSIS worker example...");
println!();
// Make sure the script is executable
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&script_path)?.permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&script_path, perms)?;
}
// Execute the shell script
let mut child = Command::new("bash")
.arg(&script_path)
.current_dir(&current_dir)
.stdin(Stdio::inherit())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()?;
// Wait for the script to complete
let status = child.wait()?;
println!();
if status.success() {
println!("✅ OSIS worker demo completed successfully!");
} else {
println!("❌ OSIS worker demo failed with exit code: {:?}", status.code());
std::process::exit(status.code().unwrap_or(1));
}
Ok(())
}

View File

@@ -0,0 +1,12 @@
worker_id = "system_example_worker"
redis_url = "redis://localhost:6379"
db_path = "/tmp/system_example_db"
preserve_tasks = false
[worker_type]
type = "async"
default_timeout_seconds = 30
[logging]
timestamps = true
level = "info"

View File

@@ -0,0 +1,183 @@
#!/bin/bash
# System Worker Example Script
# This script demonstrates the System worker by:
# 1. Starting the worker with the config.toml
# 2. Sending multiple concurrent ping jobs to Redis
# 3. Verifying pong responses
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
CONFIG_FILE="$SCRIPT_DIR/config.toml"
WORKER_ID="system_example_worker"
REDIS_URL="redis://localhost:6379"
echo "=== System Worker Example ==="
echo "Script directory: $SCRIPT_DIR"
echo "Config file: $CONFIG_FILE"
echo "Worker ID: $WORKER_ID"
echo "Redis URL: $REDIS_URL"
echo
# Check if Redis is running
echo "Checking Redis connection..."
if ! redis-cli -u "$REDIS_URL" ping > /dev/null 2>&1; then
echo "❌ Error: Redis is not running or not accessible at $REDIS_URL"
echo "Please start Redis server first: redis-server"
exit 1
fi
echo "✅ Redis is running"
echo
# Clean up any existing jobs in the queue
echo "Cleaning up existing jobs in Redis..."
redis-cli -u "$REDIS_URL" del "hero:jobs:$WORKER_ID" > /dev/null 2>&1 || true
redis-cli -u "$REDIS_URL" eval "return redis.call('del', unpack(redis.call('keys', 'hero:job:*')))" 0 > /dev/null 2>&1 || true
echo "✅ Redis queues cleaned"
echo
# Start the System worker in the background
echo "Starting System worker..."
cd "$SCRIPT_DIR/../.."
cargo run --bin system -- --config "$CONFIG_FILE" --show-stats &
WORKER_PID=$!
echo "✅ System worker started (PID: $WORKER_PID)"
echo
# Wait a moment for the worker to initialize
echo "Waiting for worker to initialize..."
sleep 3
# Function to send a ping job (non-blocking)
send_ping_job() {
local job_num=$1
local job_id="ping_job_${job_num}_$(date +%s%N)"
echo "📤 Sending ping job: $job_id"
# Create job in Redis
redis-cli -u "$REDIS_URL" hset "hero:job:$job_id" \
id "$job_id" \
script "ping" \
status "Queued" \
created_at "$(date -u +%Y-%m-%dT%H:%M:%SZ)" \
worker_id "$WORKER_ID" > /dev/null
# Add job to worker queue
redis-cli -u "$REDIS_URL" lpush "hero:jobs:$WORKER_ID" "$job_id" > /dev/null
echo "$job_id"
}
# Function to check job result
check_job_result() {
local job_id=$1
local timeout=15
local elapsed=0
while [ $elapsed -lt $timeout ]; do
local status=$(redis-cli -u "$REDIS_URL" hget "hero:job:$job_id" status 2>/dev/null || echo "")
if [ "$status" = "Finished" ]; then
local result=$(redis-cli -u "$REDIS_URL" hget "hero:job:$job_id" result 2>/dev/null || echo "")
if [ "$result" = "pong" ]; then
echo "✅ Job $job_id completed successfully with result: $result"
return 0
else
echo "❌ Job $job_id completed but with unexpected result: $result"
return 1
fi
elif [ "$status" = "Error" ]; then
local error=$(redis-cli -u "$REDIS_URL" hget "hero:job:$job_id" error 2>/dev/null || echo "")
echo "❌ Job $job_id failed with error: $error"
return 1
fi
sleep 0.5
elapsed=$((elapsed + 1))
done
echo "❌ Job $job_id timed out after ${timeout}s"
return 1
}
# Send multiple concurrent ping jobs to test async processing
echo "Testing concurrent ping/pong functionality..."
total_jobs=5
job_ids=()
echo
echo "--- Sending $total_jobs concurrent ping jobs ---"
for i in $(seq 1 $total_jobs); do
job_id=$(send_ping_job $i)
job_ids+=("$job_id")
sleep 0.1 # Small delay between job submissions
done
echo
echo "--- Waiting for all jobs to complete ---"
success_count=0
for job_id in "${job_ids[@]}"; do
echo "Checking job: $job_id"
if check_job_result "$job_id"; then
success_count=$((success_count + 1))
fi
done
echo
echo "=== Test Results ==="
echo "Successful concurrent ping/pong tests: $success_count/$total_jobs"
if [ $success_count -eq $total_jobs ]; then
echo "🎉 All tests passed! System worker is handling concurrent jobs correctly."
exit_code=0
else
echo "⚠️ Some tests failed. Check the worker logs for details."
exit_code=1
fi
# Test rapid job submission to showcase async capabilities
echo
echo "--- Testing rapid job submission (10 jobs in quick succession) ---"
rapid_jobs=10
rapid_job_ids=()
for i in $(seq 1 $rapid_jobs); do
job_id=$(send_ping_job "rapid_$i")
rapid_job_ids+=("$job_id")
done
echo "Waiting for rapid jobs to complete..."
rapid_success=0
for job_id in "${rapid_job_ids[@]}"; do
if check_job_result "$job_id"; then
rapid_success=$((rapid_success + 1))
fi
done
echo "Rapid submission test: $rapid_success/$rapid_jobs successful"
# Clean up
echo
echo "Cleaning up..."
echo "Stopping System worker (PID: $WORKER_PID)..."
kill $WORKER_PID 2>/dev/null || true
wait $WORKER_PID 2>/dev/null || true
echo "✅ Worker stopped"
echo "Cleaning up Redis jobs..."
redis-cli -u "$REDIS_URL" del "hero:jobs:$WORKER_ID" > /dev/null 2>&1 || true
redis-cli -u "$REDIS_URL" eval "return redis.call('del', unpack(redis.call('keys', 'hero:job:*')))" 0 > /dev/null 2>&1 || true
echo "✅ Redis cleaned up"
echo
echo "=== System Worker Example Complete ==="
total_success=$((success_count + rapid_success))
total_tests=$((total_jobs + rapid_jobs))
echo "Overall success rate: $total_success/$total_tests"
if [ $total_success -eq $total_tests ]; then
exit 0
else
exit 1
fi

View File

@@ -0,0 +1,15 @@
# System Worker Configuration
# Asynchronous worker for high-throughput concurrent processing
worker_id = "system_worker_1"
redis_url = "redis://localhost:6379"
db_path = "/tmp/system_worker_db"
preserve_tasks = false
[worker_type]
type = "async"
default_timeout_seconds = 300 # 5 minutes
[logging]
timestamps = true
level = "info"

View File

@@ -0,0 +1,60 @@
use std::process::{Command, Stdio};
use std::path::Path;
use std::env;
use std::io::{self, Write};
/// System Worker Demo Runner
///
/// This Rust wrapper executes the System worker bash script example.
/// It provides a way to run shell-based examples through Cargo.
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("🚀 System Worker Demo");
println!("====================");
println!();
// Get the current working directory and construct the path to the shell script
let current_dir = env::current_dir()?;
let script_path = current_dir.join("examples").join("system").join("example.sh");
// Check if the script exists
if !script_path.exists() {
eprintln!("❌ Error: Script not found at {:?}", script_path);
eprintln!(" Make sure you're running this from the worker crate root directory.");
std::process::exit(1);
}
println!("📁 Script location: {:?}", script_path);
println!("🔧 Executing System worker example...");
println!();
// Make sure the script is executable
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&script_path)?.permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&script_path, perms)?;
}
// Execute the shell script
let mut child = Command::new("bash")
.arg(&script_path)
.current_dir(&current_dir)
.stdin(Stdio::inherit())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()?;
// Wait for the script to complete
let status = child.wait()?;
println!();
if status.success() {
println!("✅ System worker demo completed successfully!");
} else {
println!("❌ System worker demo failed with exit code: {:?}", status.code());
std::process::exit(status.code().unwrap_or(1));
}
Ok(())
}

View File

@@ -0,0 +1,322 @@
//! # Trait-Based Worker Demo
//!
//! This example demonstrates the new unified worker interface using the Worker trait.
//! It shows how both synchronous and asynchronous workers can be used with the same
//! API, eliminating code duplication and providing a clean, consistent interface.
//!
//! ## Features Demonstrated
//!
//! - Unified worker interface using the Worker trait
//! - Both sync and async worker implementations
//! - Shared configuration and spawn logic
//! - Clean shutdown handling
//! - Job processing with different strategies
//!
//! ## Usage
//!
//! Make sure Redis is running on localhost:6379, then run:
//! ```bash
//! cargo run --example trait_based_worker_demo
//! ```
use hero_job::{Job, JobStatus, ScriptType};
use log::{info, warn, error};
use rhailib_worker::{
SyncWorker, AsyncWorker,
spawn_sync_worker, spawn_async_worker,
engine::create_heromodels_engine,
worker_trait::{spawn_worker, Worker}
};
use redis::AsyncCommands;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::sleep;
const REDIS_URL: &str = "redis://127.0.0.1:6379";
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
info!("Starting Trait-Based Worker Demo");
// Create Redis connection for job creation
let redis_client = redis::Client::open(REDIS_URL)?;
let mut redis_conn = redis_client.get_multiplexed_async_connection().await?;
// Demo 1: Using the unified trait-based interface
info!("=== Demo 1: Unified Trait-Based Interface ===");
// Create shutdown channels for both workers
let (sync_shutdown_tx, sync_shutdown_rx) = mpsc::channel::<()>(1);
let (async_shutdown_tx, async_shutdown_rx) = mpsc::channel::<()>(1);
// Workers are now configured using builder pattern directly
// Create worker instances using builder pattern
let sync_worker = Arc::new(
SyncWorker::builder()
.worker_id("demo_sync_worker")
.db_path("/tmp")
.redis_url("redis://localhost:6379")
.preserve_tasks(false)
.build()
.expect("Failed to build SyncWorker")
);
let async_worker = Arc::new(
AsyncWorker::builder()
.worker_id("demo_async_worker")
.db_path("/tmp")
.redis_url("redis://localhost:6379")
.default_timeout(Duration::from_secs(300))
.build()
.expect("Failed to build AsyncWorker")
);
let sync_engine = create_heromodels_engine();
let async_engine = create_heromodels_engine();
info!("Spawning {} worker: {}", sync_worker.worker_type(), sync_worker.worker_id());
let sync_handle = spawn_worker(sync_worker.clone(), sync_engine, sync_shutdown_rx);
info!("Spawning {} worker: {}", async_worker.worker_type(), async_worker.worker_id());
let async_handle = spawn_worker(async_worker.clone(), async_engine, async_shutdown_rx);
// Give workers time to start
sleep(Duration::from_secs(1)).await;
// Create and dispatch jobs to both workers
info!("Creating demo jobs for both workers...");
// Job for sync worker - simple calculation
let sync_job = create_demo_job(
"sync_calculation",
r#"
print("Sync worker: Starting calculation...");
let result = 0;
for i in 1..=100 {
result += i;
}
print("Sync worker: Sum of 1-100 = " + result);
result
"#,
None,
).await?;
dispatch_job(&mut redis_conn, &sync_job, sync_worker.worker_id()).await?;
info!("Dispatched job to sync worker: {}", sync_job.id);
// Job for async worker - with timeout demonstration
let async_job = create_demo_job(
"async_calculation",
r#"
print("Async worker: Starting calculation...");
let result = 1;
for i in 1..=10 {
result *= i;
}
print("Async worker: 10! = " + result);
result
"#,
Some(15), // 15 second timeout
).await?;
dispatch_job(&mut redis_conn, &async_job, async_worker.worker_id()).await?;
info!("Dispatched job to async worker: {}", async_job.id);
// Monitor job execution
info!("Monitoring job execution for 10 seconds...");
let monitor_start = std::time::Instant::now();
let monitor_duration = Duration::from_secs(10);
while monitor_start.elapsed() < monitor_duration {
// Check sync job status
if let Ok(status) = Job::get_status(&mut redis_conn, &sync_job.id).await {
match status {
JobStatus::Finished => {
let job_key = format!("hero:job:{}", sync_job.id);
if let Ok(result) = redis_conn.hget::<_, _, String>(&job_key, "output").await {
info!("✅ Sync Job {} COMPLETED with result: {}", sync_job.id, result);
} else {
info!("✅ Sync Job {} COMPLETED", sync_job.id);
}
}
JobStatus::Error => {
let job_key = format!("hero:job:{}", sync_job.id);
if let Ok(error) = redis_conn.hget::<_, _, String>(&job_key, "error").await {
warn!("❌ Sync Job {} FAILED with error: {}", sync_job.id, error);
} else {
warn!("❌ Sync Job {} FAILED", sync_job.id);
}
}
_ => info!("🔄 Sync Job {} status: {:?}", sync_job.id, status),
}
}
// Check async job status
if let Ok(status) = Job::get_status(&mut redis_conn, &async_job.id).await {
match status {
JobStatus::Finished => {
let job_key = format!("hero:job:{}", async_job.id);
if let Ok(result) = redis_conn.hget::<_, _, String>(&job_key, "output").await {
info!("✅ Async Job {} COMPLETED with result: {}", async_job.id, result);
} else {
info!("✅ Async Job {} COMPLETED", async_job.id);
}
}
JobStatus::Error => {
let job_key = format!("hero:job:{}", async_job.id);
if let Ok(error) = redis_conn.hget::<_, _, String>(&job_key, "error").await {
warn!("❌ Async Job {} FAILED with error: {}", async_job.id, error);
} else {
warn!("❌ Async Job {} FAILED", async_job.id);
}
}
_ => info!("🔄 Async Job {} status: {:?}", async_job.id, status),
}
}
sleep(Duration::from_secs(2)).await;
}
// Demo 2: Using convenience functions (backward compatibility)
info!("\n=== Demo 2: Convenience Functions (Backward Compatibility) ===");
let (conv_sync_shutdown_tx, conv_sync_shutdown_rx) = mpsc::channel::<()>(1);
let (conv_async_shutdown_tx, conv_async_shutdown_rx) = mpsc::channel::<()>(1);
// Spawn workers using convenience functions
let conv_sync_engine = create_heromodels_engine();
let conv_async_engine = create_heromodels_engine();
info!("Spawning sync worker using convenience function...");
let conv_sync_handle = spawn_sync_worker(
"convenience_sync_worker".to_string(),
"/tmp".to_string(),
conv_sync_engine,
REDIS_URL.to_string(),
conv_sync_shutdown_rx,
false,
);
info!("Spawning async worker using convenience function...");
let conv_async_handle = spawn_async_worker(
"convenience_async_worker".to_string(),
"/tmp".to_string(),
conv_async_engine,
REDIS_URL.to_string(),
conv_async_shutdown_rx,
Duration::from_secs(20), // 20 second timeout
);
// Give convenience workers time to start
sleep(Duration::from_secs(1)).await;
// Create jobs for convenience workers
let conv_sync_job = create_demo_job(
"convenience_sync",
r#"
print("Convenience sync worker: Hello World!");
"Hello from convenience sync worker"
"#,
None,
).await?;
let conv_async_job = create_demo_job(
"convenience_async",
r#"
print("Convenience async worker: Hello World!");
"Hello from convenience async worker"
"#,
Some(10),
).await?;
dispatch_job(&mut redis_conn, &conv_sync_job, "convenience_sync_worker").await?;
dispatch_job(&mut redis_conn, &conv_async_job, "convenience_async_worker").await?;
info!("Dispatched jobs to convenience workers");
// Wait a bit for jobs to complete
sleep(Duration::from_secs(5)).await;
// Shutdown all workers gracefully
info!("\n=== Shutting Down All Workers ===");
info!("Sending shutdown signals...");
let _ = sync_shutdown_tx.send(()).await;
let _ = async_shutdown_tx.send(()).await;
let _ = conv_sync_shutdown_tx.send(()).await;
let _ = conv_async_shutdown_tx.send(()).await;
info!("Waiting for workers to shutdown...");
// Wait for all workers to shutdown
let results = tokio::join!(
sync_handle,
async_handle,
conv_sync_handle,
conv_async_handle
);
match results {
(Ok(Ok(())), Ok(Ok(())), Ok(Ok(())), Ok(Ok(()))) => {
info!("All workers shut down successfully!");
}
_ => {
error!("Some workers encountered errors during shutdown");
}
}
info!("Trait-Based Worker Demo completed successfully!");
// Summary
info!("\n=== Summary ===");
info!("✅ Demonstrated unified Worker trait interface");
info!("✅ Showed both sync and async worker implementations");
info!("✅ Used shared configuration and spawn logic");
info!("✅ Maintained backward compatibility with convenience functions");
info!("✅ Eliminated code duplication between worker types");
info!("✅ Provided clean, consistent API for all worker operations");
Ok(())
}
/// Create a demo job with the specified script and timeout
async fn create_demo_job(
name: &str,
script: &str,
timeout_seconds: Option<i32>,
) -> Result<Job, Box<dyn std::error::Error>> {
let mut job = Job::new(
format!("demo_{}", name), // caller_id
"demo_context".to_string(), // context_id
script.to_string(),
ScriptType::OSIS,
);
// Set timeout if provided
if let Some(timeout) = timeout_seconds {
job.timeout = Duration::from_secs(timeout as u64);
}
Ok(job)
}
/// Dispatch a job to the worker queue
async fn dispatch_job(
redis_conn: &mut redis::aio::MultiplexedConnection,
job: &Job,
worker_queue: &str,
) -> Result<(), Box<dyn std::error::Error>> {
// Store job in Redis
job.store_in_redis(redis_conn).await?;
// Add job to worker queue
let queue_key = format!("hero:job:{}", worker_queue);
let _: () = redis_conn.rpush(&queue_key, &job.id).await?;
Ok(())
}