234 lines
6.7 KiB
Rust
234 lines
6.7 KiB
Rust
//! OSIS Worker Binary - Synchronous actor for system-level operations
|
|
|
|
use clap::Parser;
|
|
use log::{error, info};
|
|
use baobab_actor::config::{ConfigError, WorkerConfig};
|
|
use baobab_actor::engine::create_heromodels_engine;
|
|
use baobab_actor::sync_actor::SyncWorker;
|
|
use baobab_actor::actor_trait::{spawn_actor, WorkerConfig as TraitWorkerConfig};
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use tokio::signal;
|
|
use tokio::sync::mpsc;
|
|
|
|
#[derive(Parser, Debug)]
|
|
#[command(
|
|
name = "osis",
|
|
version = "0.1.0",
|
|
about = "OSIS (Operating System Integration Service) - Synchronous Worker",
|
|
long_about = "A synchronous actor for Hero framework that processes jobs sequentially. \
|
|
Ideal for system-level operations that require careful resource management."
|
|
)]
|
|
struct Args {
|
|
/// Path to TOML configuration file
|
|
#[arg(short, long, help = "Path to TOML configuration file")]
|
|
config: PathBuf,
|
|
|
|
/// Override actor ID from config
|
|
#[arg(long, help = "Override actor ID from configuration file")]
|
|
actor_id: Option<String>,
|
|
|
|
/// Override Redis URL from config
|
|
#[arg(long, help = "Override Redis URL from configuration file")]
|
|
redis_url: Option<String>,
|
|
|
|
/// Override database path from config
|
|
#[arg(long, help = "Override database path from configuration file")]
|
|
db_path: Option<String>,
|
|
|
|
/// Enable verbose logging (debug level)
|
|
#[arg(short, long, help = "Enable verbose logging")]
|
|
verbose: bool,
|
|
|
|
/// Disable timestamps in log output
|
|
#[arg(long, help = "Remove timestamps from log output")]
|
|
no_timestamp: bool,
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|
let args = Args::parse();
|
|
|
|
// Load configuration from TOML file
|
|
let mut config = match WorkerConfig::from_file(&args.config) {
|
|
Ok(config) => config,
|
|
Err(e) => {
|
|
eprintln!("Failed to load configuration from {:?}: {}", args.config, e);
|
|
std::process::exit(1);
|
|
}
|
|
};
|
|
|
|
// Validate that this is a sync actor configuration
|
|
if !config.is_sync() {
|
|
eprintln!("Error: OSIS actor requires a sync actor configuration");
|
|
eprintln!("Expected: [actor_type] type = \"sync\"");
|
|
eprintln!("Found: {:?}", config.actor_type);
|
|
std::process::exit(1);
|
|
}
|
|
|
|
// Apply command line overrides
|
|
if let Some(actor_id) = args.actor_id {
|
|
config.actor_id = actor_id;
|
|
}
|
|
if let Some(redis_url) = args.redis_url {
|
|
config.redis_url = redis_url;
|
|
}
|
|
if let Some(db_path) = args.db_path {
|
|
config.db_path = db_path;
|
|
}
|
|
|
|
// Configure logging
|
|
setup_logging(&config, args.verbose, args.no_timestamp)?;
|
|
|
|
info!("🚀 OSIS Worker starting...");
|
|
info!("Worker ID: {}", config.actor_id);
|
|
info!("Redis URL: {}", config.redis_url);
|
|
info!("Database Path: {}", config.db_path);
|
|
info!("Preserve Tasks: {}", config.preserve_tasks);
|
|
|
|
// Create Rhai engine
|
|
let engine = create_heromodels_engine();
|
|
info!("✅ Rhai engine initialized");
|
|
|
|
// Create actor configuration for the trait-based interface
|
|
let actor_config = TraitWorkerConfig::new(
|
|
config.actor_id.clone(),
|
|
config.db_path.clone(),
|
|
config.redis_url.clone(),
|
|
config.preserve_tasks,
|
|
);
|
|
|
|
// Create sync actor instance
|
|
let actor = Arc::new(SyncWorker::default());
|
|
info!("✅ Sync actor instance created");
|
|
|
|
// Setup shutdown signal handling
|
|
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
|
|
|
|
// Spawn shutdown signal handler
|
|
let shutdown_tx_clone = shutdown_tx.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = signal::ctrl_c().await {
|
|
error!("Failed to listen for shutdown signal: {}", e);
|
|
return;
|
|
}
|
|
info!("🛑 Shutdown signal received");
|
|
if let Err(e) = shutdown_tx_clone.send(()).await {
|
|
error!("Failed to send shutdown signal: {}", e);
|
|
}
|
|
});
|
|
|
|
// Spawn the actor
|
|
info!("🔄 Starting actor loop...");
|
|
let actor_handle = spawn_actor(actor, engine, shutdown_rx);
|
|
|
|
// Wait for the actor to complete
|
|
match actor_handle.await {
|
|
Ok(Ok(())) => {
|
|
info!("✅ OSIS Worker shut down gracefully");
|
|
}
|
|
Ok(Err(e)) => {
|
|
error!("❌ OSIS Worker encountered an error: {}", e);
|
|
std::process::exit(1);
|
|
}
|
|
Err(e) => {
|
|
error!("❌ Failed to join actor task: {}", e);
|
|
std::process::exit(1);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Setup logging based on configuration and command line arguments
|
|
fn setup_logging(
|
|
config: &WorkerConfig,
|
|
verbose: bool,
|
|
no_timestamp: bool,
|
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|
let mut builder = env_logger::Builder::new();
|
|
|
|
// Determine log level
|
|
let log_level = if verbose {
|
|
"debug"
|
|
} else {
|
|
&config.logging.level
|
|
};
|
|
|
|
// Set log level
|
|
builder.filter_level(match log_level.to_lowercase().as_str() {
|
|
"trace" => log::LevelFilter::Trace,
|
|
"debug" => log::LevelFilter::Debug,
|
|
"info" => log::LevelFilter::Info,
|
|
"warn" => log::LevelFilter::Warn,
|
|
"error" => log::LevelFilter::Error,
|
|
_ => {
|
|
eprintln!("Invalid log level: {}. Using 'info'", log_level);
|
|
log::LevelFilter::Info
|
|
}
|
|
});
|
|
|
|
// Configure timestamps
|
|
let show_timestamps = !no_timestamp && config.logging.timestamps;
|
|
if !show_timestamps {
|
|
builder.format_timestamp(None);
|
|
}
|
|
|
|
builder.init();
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use std::io::Write;
|
|
use tempfile::NamedTempFile;
|
|
|
|
#[test]
|
|
fn test_config_validation() {
|
|
let config_toml = r#"
|
|
actor_id = "test_osis"
|
|
redis_url = "redis://localhost:6379"
|
|
db_path = "/tmp/test_db"
|
|
|
|
[actor_type]
|
|
type = "sync"
|
|
|
|
[logging]
|
|
level = "info"
|
|
"#;
|
|
|
|
let mut temp_file = NamedTempFile::new().unwrap();
|
|
temp_file.write_all(config_toml.as_bytes()).unwrap();
|
|
|
|
let config = WorkerConfig::from_file(temp_file.path()).unwrap();
|
|
assert!(config.is_sync());
|
|
assert!(!config.is_async());
|
|
assert_eq!(config.actor_id, "test_osis");
|
|
}
|
|
|
|
#[test]
|
|
fn test_async_config_rejection() {
|
|
let config_toml = r#"
|
|
actor_id = "test_osis"
|
|
redis_url = "redis://localhost:6379"
|
|
db_path = "/tmp/test_db"
|
|
|
|
[actor_type]
|
|
type = "async"
|
|
default_timeout_seconds = 300
|
|
|
|
[logging]
|
|
level = "info"
|
|
"#;
|
|
|
|
let mut temp_file = NamedTempFile::new().unwrap();
|
|
temp_file.write_all(config_toml.as_bytes()).unwrap();
|
|
|
|
let config = WorkerConfig::from_file(temp_file.path()).unwrap();
|
|
assert!(!config.is_sync());
|
|
assert!(config.is_async());
|
|
// This would be rejected in main() function
|
|
}
|
|
}
|