From 7675dc215004ff53c431b8929885eea29ec0feda Mon Sep 17 00:00:00 2001 From: Timur Gordon <31495328+timurgordon@users.noreply.github.com> Date: Wed, 19 Nov 2025 10:33:54 +0100 Subject: [PATCH 1/2] update horus heroscripts --- scripts/install.md | 3 +-- scripts/start.md | 12 +++++------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/scripts/install.md b/scripts/install.md index d33541c..fe90d09 100644 --- a/scripts/install.md +++ b/scripts/install.md @@ -1,6 +1,5 @@ -!!include configure.md +# Horus Installation -// Install all components !!coordinator.install name:'development' !!supervisor.install name:'development' !!herorunner.install name:'development' diff --git a/scripts/start.md b/scripts/start.md index 26f376a..91019ce 100644 --- a/scripts/start.md +++ b/scripts/start.md @@ -2,11 +2,9 @@ Starts all horus binaries -!!include install.md - // Start all services -!!herocoordinator.start name:'default' -!!supervisor.start name:'default' -!!herorunner.start name:'default' -!!osirisrunner.start name:'default' -!!salrunner.start name:'default' +!!herocoordinator.start name:'development' +!!supervisor.start name:'development' +!!herorunner.start name:'development' +!!osirisrunner.start name:'development' +!!salrunner.start name:'development' From 8c33c73b3c600ef30fe1706218ccb309e65a2eb2 Mon Sep 17 00:00:00 2001 From: Timur Gordon <31495328+timurgordon@users.noreply.github.com> Date: Wed, 19 Nov 2025 10:34:28 +0100 Subject: [PATCH 2/2] update coordinator and add end to end tests --- Cargo.toml | 1 + bin/coordinator/main.rs | 23 +- bin/coordinator/src/main.rs | 23 +- lib/clients/coordinator/src/lib.rs | 20 +- lib/osiris/derive/src/lib.rs | 210 ++++++++++++++++ priv_key.bin | 1 + tests/README.md | 170 +++++++++++++ tests/coordinator.rs | 392 +++++++++++++++++++++++++++++ 8 files changed, 830 insertions(+), 10 deletions(-) create mode 100644 priv_key.bin create mode 100644 tests/README.md create mode 100644 tests/coordinator.rs diff --git a/Cargo.toml b/Cargo.toml index 82915ad..9ec184f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ repository.workspace = true [dependencies] # Integration test dependencies - no library dependencies, tests spawn binaries hero-supervisor-openrpc-client = { path = "lib/clients/supervisor" } +hero-coordinator-client = { path = "lib/clients/coordinator" } hero-job = { path = "lib/models/job" } hero-job-client = { path = "lib/clients/job" } tokio = { workspace = true } diff --git a/bin/coordinator/main.rs b/bin/coordinator/main.rs index 971c8ae..4cfc109 100644 --- a/bin/coordinator/main.rs +++ b/bin/coordinator/main.rs @@ -70,6 +70,21 @@ struct Cli { help = "Bind port for WebSocket JSON-RPC server (default: 9653)" )] api_ws_port: u16, + + #[arg( + long = "supervisor-transport", + env = "SUPERVISOR_TRANSPORT", + default_value = "mycelium", + help = "Transport to use for supervisor communication: 'mycelium' or 'http' (default: mycelium)" + )] + supervisor_transport: String, + + #[arg( + long = "supervisor-http-url", + env = "SUPERVISOR_HTTP_URL", + help = "HTTP URL for supervisor when using HTTP transport (e.g., http://127.0.0.1:3031)" + )] + supervisor_http_url: Option, } #[tokio::main] @@ -99,8 +114,9 @@ async fn main() { // Shared application state let state = Arc::new(herocoordinator::rpc::AppState::new(service)); - // Start router workers (auto-discovered contexts) using a single global SupervisorHub (no separate inbound listener) - { + // Start router workers (auto-discovered contexts) using a single global SupervisorHub + // Skip router if using HTTP transport (no mycelium needed) + if cli.supervisor_transport == "mycelium" { let base_url = format!("http://{}:{}", cli.mycelium_ip, cli.mycelium_port); let hub = herocoordinator::clients::SupervisorHub::new( base_url.clone(), @@ -118,6 +134,9 @@ async fn main() { }; // Per-context outbound delivery loops (replies handled by SupervisorHub) let _auto_handle = herocoordinator::router::start_router_auto(service_for_router, cfg); + info!("Router started with mycelium transport"); + } else { + info!("Skipping router - using HTTP transport for supervisor communication"); } // Build RPC modules for both servers diff --git a/bin/coordinator/src/main.rs b/bin/coordinator/src/main.rs index 1eea2ff..7d6d711 100644 --- a/bin/coordinator/src/main.rs +++ b/bin/coordinator/src/main.rs @@ -70,6 +70,21 @@ struct Cli { help = "Bind port for WebSocket JSON-RPC server (default: 9653)" )] api_ws_port: u16, + + #[arg( + long = "supervisor-transport", + env = "SUPERVISOR_TRANSPORT", + default_value = "mycelium", + help = "Transport to use for supervisor communication: 'mycelium' or 'http' (default: mycelium)" + )] + supervisor_transport: String, + + #[arg( + long = "supervisor-http-url", + env = "SUPERVISOR_HTTP_URL", + help = "HTTP URL for supervisor when using HTTP transport (e.g., http://127.0.0.1:3031)" + )] + supervisor_http_url: Option, } #[tokio::main] @@ -99,8 +114,9 @@ async fn main() { // Shared application state let state = Arc::new(hero_coordinator::rpc::AppState::new(service)); - // Start router workers (auto-discovered contexts) using a single global SupervisorHub (no separate inbound listener) - { + // Start router workers (auto-discovered contexts) using a single global SupervisorHub + // Skip router if using HTTP transport (no mycelium needed) + if cli.supervisor_transport == "mycelium" { let base_url = format!("http://{}:{}", cli.mycelium_ip, cli.mycelium_port); let mycelium = Arc::new( hero_supervisor_openrpc_client::transports::MyceliumClient::new(&base_url) @@ -121,6 +137,9 @@ async fn main() { }; // Per-context outbound delivery loops (replies handled by SupervisorHub) let _auto_handle = hero_coordinator::router::start_router_auto(service_for_router, cfg); + info!("Router started with mycelium transport"); + } else { + info!("Skipping router - using HTTP transport for supervisor communication"); } // Build RPC modules for both servers diff --git a/lib/clients/coordinator/src/lib.rs b/lib/clients/coordinator/src/lib.rs index ac0a0ff..5b8d6d1 100644 --- a/lib/clients/coordinator/src/lib.rs +++ b/lib/clients/coordinator/src/lib.rs @@ -290,13 +290,19 @@ impl CoordinatorClient { async fn call(&self, method: &str, params: Value) -> Result { use jsonrpsee::core::client::ClientT; - use jsonrpsee::core::params::ArrayParams; + use jsonrpsee::core::params::ObjectParams; - let mut array_params = ArrayParams::new(); - array_params.insert(params).map_err(|e| CoordinatorError::Rpc(e.to_string()))?; + // Coordinator expects params as named parameters (object), not positional (array) + // Convert the Value object to ObjectParams + let mut object_params = ObjectParams::new(); + if let Value::Object(map) = params { + for (key, value) in map { + object_params.insert(&key, value).map_err(|e| CoordinatorError::Rpc(e.to_string()))?; + } + } - self.client - .request(method, array_params) + let result: T = self.client + .request(method, object_params) .await .map_err(|e| { let err_str = e.to_string(); @@ -311,7 +317,9 @@ impl CoordinatorClient { } else { CoordinatorError::Rpc(err_str) } - }) + })?; + + Ok(result) } } diff --git a/lib/osiris/derive/src/lib.rs b/lib/osiris/derive/src/lib.rs index 933c33d..fc3c6de 100644 --- a/lib/osiris/derive/src/lib.rs +++ b/lib/osiris/derive/src/lib.rs @@ -200,3 +200,213 @@ fn is_offsetdatetime_type(ty: &Type) -> bool { } false } + +/// Derive macro for generating CRUD client methods for Osiris models +/// +/// This macro generates async CRUD methods (create, get, update, delete, list) for a model, +/// plus any custom methods defined on the model. +/// +/// # Example +/// +/// ```rust +/// #[derive(OsirisModel)] +/// #[osiris( +/// collection = "calendar_events", +/// id_field = "event_id", +/// methods = ["reschedule", "cancel"] +/// )] +/// pub struct CalendarEvent { +/// pub event_id: String, +/// pub title: String, +/// pub start_time: i64, +/// // ... +/// } +/// ``` +/// +/// This generates methods on OsirisClient: +/// - `create_calendar_event(&self, event: CalendarEvent) -> Result` +/// - `get_calendar_event(&self, event_id: &str) -> Result` +/// - `update_calendar_event(&self, event_id: &str, event: CalendarEvent) -> Result` +/// - `delete_calendar_event(&self, event_id: &str) -> Result<()>` +/// - `list_calendar_events(&self) -> Result>` +/// - `reschedule_calendar_event(&self, event_id: &str, new_time: i64) -> Result` +/// - `cancel_calendar_event(&self, event_id: &str) -> Result` +#[proc_macro_derive(OsirisModel, attributes(osiris))] +pub fn derive_osiris_model(input: TokenStream) -> TokenStream { + let input = parse_macro_input!(input as DeriveInput); + + let model_name = &input.ident; + let model_name_snake = to_snake_case(&model_name.to_string()); + + // Parse attributes + let mut collection = model_name_snake.clone(); + let mut id_field = "id".to_string(); + let mut custom_methods: Vec = Vec::new(); + + for attr in &input.attrs { + if attr.path().is_ident("osiris") { + if let Ok(meta_list) = attr.parse_args::() { + // Parse nested attributes + for nested in meta_list.tokens.clone() { + let nested_str = nested.to_string(); + if nested_str.starts_with("collection") { + if let Some(val) = extract_string_value(&nested_str) { + collection = val; + } + } else if nested_str.starts_with("id_field") { + if let Some(val) = extract_string_value(&nested_str) { + id_field = val; + } + } else if nested_str.starts_with("methods") { + custom_methods = extract_array_values(&nested_str); + } + } + } + } + } + + // Generate method names + let create_method = syn::Ident::new(&format!("create_{}", model_name_snake), model_name.span()); + let get_method = syn::Ident::new(&format!("get_{}", model_name_snake), model_name.span()); + let update_method = syn::Ident::new(&format!("update_{}", model_name_snake), model_name.span()); + let delete_method = syn::Ident::new(&format!("delete_{}", model_name_snake), model_name.span()); + let list_method = syn::Ident::new(&format!("list_{}s", model_name_snake), model_name.span()); + + // Generate custom method implementations + let custom_method_impls: Vec<_> = custom_methods.iter().map(|method_name| { + let method_ident = syn::Ident::new(&format!("{}_{}", method_name, model_name_snake), model_name.span()); + let rhai_call = format!("{}_{}", model_name_snake, method_name); + + quote! { + pub async fn #method_ident(&self, id: &str, params: serde_json::Value) -> Result<#model_name, OsirisClientError> { + let script = format!( + r#" + let obj = {}::get("{}"); + obj.{}(params); + obj.save(); + obj + "#, + #collection, id, #method_name + ); + + let response = self.execute_script(&script).await?; + // Parse response and return model + // This is a simplified version - actual implementation would parse the job result + Err(OsirisClientError::CommandFailed("Not yet implemented".to_string())) + } + } + }).collect(); + + let expanded = quote! { + impl OsirisClient { + /// Create a new instance of #model_name + pub async fn #create_method(&self, model: &#model_name) -> Result<#model_name, OsirisClientError> { + let json = serde_json::to_string(model) + .map_err(|e| OsirisClientError::SerializationFailed(e.to_string()))?; + + let script = format!( + r#" + let data = {}; + let obj = {}::new(data); + obj.save(); + obj + "#, + json, #collection + ); + + let response = self.execute_script(&script).await?; + // Parse response - simplified for now + Err(OsirisClientError::CommandFailed("Not yet implemented".to_string())) + } + + /// Get an instance of #model_name by ID + pub async fn #get_method(&self, id: &str) -> Result<#model_name, OsirisClientError> { + let query = format!(r#"{{ "{}": "{}" }}"#, #id_field, id); + self.query::<#model_name>(#collection, &query).await + } + + /// Update an existing #model_name + pub async fn #update_method(&self, id: &str, model: &#model_name) -> Result<#model_name, OsirisClientError> { + let json = serde_json::to_string(model) + .map_err(|e| OsirisClientError::SerializationFailed(e.to_string()))?; + + let script = format!( + r#" + let obj = {}::get("{}"); + let data = {}; + obj.update(data); + obj.save(); + obj + "#, + #collection, id, json + ); + + let response = self.execute_script(&script).await?; + Err(OsirisClientError::CommandFailed("Not yet implemented".to_string())) + } + + /// Delete an instance of #model_name + pub async fn #delete_method(&self, id: &str) -> Result<(), OsirisClientError> { + let script = format!( + r#" + let obj = {}::get("{}"); + obj.delete(); + "#, + #collection, id + ); + + self.execute_script(&script).await?; + Ok(()) + } + + /// List all instances of #model_name + pub async fn #list_method(&self) -> Result, OsirisClientError> { + self.query_all::<#model_name>(#collection).await + } + + #(#custom_method_impls)* + } + }; + + TokenStream::from(expanded) +} + +fn to_snake_case(s: &str) -> String { + let mut result = String::new(); + for (i, ch) in s.chars().enumerate() { + if ch.is_uppercase() { + if i > 0 { + result.push('_'); + } + result.push(ch.to_lowercase().next().unwrap()); + } else { + result.push(ch); + } + } + result +} + +fn extract_string_value(s: &str) -> Option { + // Extract value from "key = \"value\"" format + if let Some(eq_pos) = s.find('=') { + let value_part = &s[eq_pos + 1..].trim(); + let cleaned = value_part.trim_matches(|c| c == '"' || c == ' '); + return Some(cleaned.to_string()); + } + None +} + +fn extract_array_values(s: &str) -> Vec { + // Extract values from "methods = [\"method1\", \"method2\"]" format + if let Some(start) = s.find('[') { + if let Some(end) = s.find(']') { + let array_content = &s[start + 1..end]; + return array_content + .split(',') + .map(|item| item.trim().trim_matches('"').to_string()) + .filter(|item| !item.is_empty()) + .collect(); + } + } + Vec::new() +} diff --git a/priv_key.bin b/priv_key.bin new file mode 100644 index 0000000..aec07bb --- /dev/null +++ b/priv_key.bin @@ -0,0 +1 @@ +LFEDȫAgߧ9yQ)< \ No newline at end of file diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..e3db504 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,170 @@ +# End-to-End Integration Tests + +This directory contains end-to-end integration tests for the Horus system components. Each test file spawns the actual binary and tests it via its client library. + +## Test Files + +### `coordinator.rs` +End-to-end tests for the Hero Coordinator service. + +**Tests:** +- Actor creation and loading +- Context creation and management +- Runner registration and configuration +- Job creation with dependencies +- Flow creation and DAG generation +- Flow execution (start) + +**Prerequisites:** +- Redis server running on `127.0.0.1:6379` +- Port `9652` (HTTP API) and `9653` (WebSocket API) available + +**Run:** +```bash +cargo test --test coordinator -- --test-threads=1 +``` + +### `supervisor.rs` +End-to-end tests for the Hero Supervisor service. + +**Tests:** +- OpenRPC discovery +- Runner registration and management +- Job creation and execution +- Job status tracking +- API key generation and management +- Authentication verification +- Complete workflow integration + +**Prerequisites:** +- Redis server running on `127.0.0.1:6379` +- Port `3031` available + +**Run:** +```bash +cargo test --test coordinator -- --test-threads=1 +``` + +### `runner_hero.rs` +End-to-end tests for the Hero (Python) runner. + +**Prerequisites:** +- Python 3 installed +- Redis server running + +**Run:** +```bash +cargo test --test runner_hero -- --test-threads=1 +``` + +### `runner_osiris.rs` +End-to-end tests for the Osiris (V language) runner. + +**Prerequisites:** +- V language compiler installed +- Redis server running + +**Run:** +```bash +cargo test --test runner_osiris -- --test-threads=1 +``` + +### `runner_sal.rs` +End-to-end tests for the Sal (Rhai scripting) runner. + +**Prerequisites:** +- Redis server running + +**Run:** +```bash +cargo test --test runner_sal -- --test-threads=1 +``` + +## Running All Tests + +To run all end-to-end tests sequentially: + +```bash +cargo test --tests -- --test-threads=1 +``` + +## Important Notes + +### Sequential Execution Required + +All tests **must** be run with `--test-threads=1` because: +1. Each test spawns a server process that binds to specific ports +2. Tests share Redis databases and may conflict if run in parallel +3. Process cleanup needs to happen sequentially + +### Redis Requirement + +All tests require a Redis server running on `127.0.0.1:6379`. You can start Redis with: + +```bash +redis-server +``` + +Or using Docker: + +```bash +docker run -d -p 6379:6379 redis:latest +``` + +### Port Conflicts + +If tests fail to start, check that the required ports are not in use: + +- **Coordinator**: 9652 (HTTP), 9653 (WebSocket) +- **Supervisor**: 3031 +- **Runners**: Various ports depending on configuration + +You can check port usage with: + +```bash +lsof -i :9652 +lsof -i :3031 +``` + +### Test Isolation + +Each test file: +1. Builds the binary using `escargot` +2. Starts the process with test-specific configuration +3. Runs tests against the running instance +4. Cleans up the process at the end + +Tests within a file may share state through Redis, so they are designed to be idempotent and handle existing data. + +### Debugging + +To see detailed logs during test execution: + +```bash +RUST_LOG=debug cargo test --test coordinator -- --test-threads=1 --nocapture +``` + +To run a specific test: + +```bash +cargo test --test coordinator test_01_actor_create -- --test-threads=1 --nocapture +``` + +## Test Architecture + +Each test file follows this pattern: + +1. **Global Process Management**: Uses `lazy_static` and `Once` to ensure the server process starts only once +2. **Setup Helper**: Common setup code (e.g., `setup_prerequisites()`) to reduce duplication +3. **Sequential Tests**: Tests are numbered (e.g., `test_01_`, `test_02_`) to indicate execution order +4. **Cleanup Test**: A final `test_zz_cleanup()` ensures the process is terminated and ports are freed + +## Contributing + +When adding new tests: + +1. Follow the existing naming convention (`test_NN_description`) +2. Use the setup helpers to avoid duplication +3. Make tests idempotent (handle existing data gracefully) +4. Add cleanup in the `test_zz_cleanup()` function +5. Update this README with any new prerequisites or test descriptions diff --git a/tests/coordinator.rs b/tests/coordinator.rs new file mode 100644 index 0000000..5e69203 --- /dev/null +++ b/tests/coordinator.rs @@ -0,0 +1,392 @@ +//! End-to-End Integration Tests for Hero Coordinator +//! +//! Tests coordinator flow management functionality against a running coordinator instance. +//! The coordinator binary is automatically started and stopped for each test run. +//! +//! **IMPORTANT**: Run with `--test-threads=1` to ensure tests run sequentially: +//! ``` +//! cargo test --test coordinator -- --test-threads=1 +//! ``` + +use hero_coordinator_client::{CoordinatorClient, models::*}; +use std::collections::HashMap; +use std::sync::Once; +use std::process::Child; + +/// Test configuration +const COORDINATOR_URL: &str = "http://127.0.0.1:9652"; +const TEST_CONTEXT_ID: u32 = 2; +const TEST_CALLER_ID: u32 = 11001; +const TEST_FLOW_ID: u32 = 13001; +const BASE_JOB_ID: u32 = 20000; + +use std::sync::Mutex; +use lazy_static::lazy_static; + +lazy_static! { + static ref COORDINATOR_PROCESS: Mutex> = Mutex::new(None); +} + +/// Global initialization flag +static INIT: Once = Once::new(); + +/// Initialize and start the coordinator binary (called once) +async fn init_coordinator() { + INIT.call_once(|| { + // Register cleanup handler + let _ = std::panic::catch_unwind(|| { + ctrlc::set_handler(move || { + cleanup_coordinator(); + std::process::exit(0); + }).ok(); + }); + + // Use escargot to build and get the binary path + let binary = escargot::CargoBuild::new() + .bin("coordinator") + .package("hero-coordinator") + .run() + .expect("Failed to build coordinator binary"); + + // Start the coordinator binary with HTTP transport (no mycelium needed) + let child = binary + .command() + .env("RUST_LOG", "info") + .args(&[ + "--api-http-port", + "9652", + "--api-ws-port", + "9653", + "--redis-addr", + "127.0.0.1:6379", + "--supervisor-transport", + "http", + ]) + .spawn() + .expect("Failed to start coordinator"); + + *COORDINATOR_PROCESS.lock().unwrap() = Some(child); + + // Wait for server to be ready with simple TCP check + use std::net::TcpStream; + use std::time::Duration; + + println!("⏳ Waiting for coordinator to start..."); + + for i in 0..30 { + std::thread::sleep(Duration::from_millis(500)); + + // Try to connect to the port + if TcpStream::connect_timeout( + &"127.0.0.1:9652".parse().unwrap(), + Duration::from_millis(100) + ).is_ok() { + // Give it more time to fully initialize + std::thread::sleep(Duration::from_secs(2)); + println!("✅ Coordinator ready after ~{}ms", (i * 500) + 2000); + return; + } + } + + panic!("Coordinator failed to start within 15 seconds"); + }); +} + +/// Cleanup coordinator process +fn cleanup_coordinator() { + if let Ok(mut guard) = COORDINATOR_PROCESS.lock() { + if let Some(mut child) = guard.take() { + println!("🧹 Cleaning up coordinator process..."); + let _ = child.kill(); + let _ = child.wait(); + } + } +} + +/// Helper to create a test client +async fn create_client() -> CoordinatorClient { + // Ensure coordinator is running + init_coordinator().await; + + CoordinatorClient::new(COORDINATOR_URL) + .expect("Failed to create coordinator client") +} + +#[tokio::test] +async fn test_01_flow_create_simple() { + println!("\n🧪 Test: flow.create (simple flow)"); + + let client = create_client().await; + + // Create jobs for the flow + let job_ids = vec![BASE_JOB_ID, BASE_JOB_ID + 1]; + for (i, job_id) in job_ids.iter().enumerate() { + let job = JobCreate { + id: *job_id, + caller_id: TEST_CALLER_ID, + context_id: TEST_CONTEXT_ID, + script: format!("print('job {}')", i), + script_type: ScriptType::Python, + timeout: 60, + retries: 0, + env_vars: HashMap::new(), + prerequisites: vec![], + depends: if i == 0 { vec![] } else { vec![job_ids[i - 1]] }, + }; + + let result = client.job_create_or_load(TEST_CONTEXT_ID, job).await; + if let Err(ref e) = result { + println!(" Job {} creation error: {:?}", job_id, e); + } + assert!(result.is_ok(), "Job {} should be created", job_id); + } + + // Create flow + let flow_create = FlowCreate { + id: TEST_FLOW_ID, + caller_id: TEST_CALLER_ID, + context_id: TEST_CONTEXT_ID, + jobs: job_ids.clone(), + env_vars: HashMap::new(), + }; + + let result = client.flow_create_or_load(TEST_CONTEXT_ID, flow_create).await; + + if let Err(ref e) = result { + println!(" Error: {:?}", e); + } + assert!(result.is_ok(), "flow.create_or_load should succeed"); + let flow = result.unwrap(); + + assert_eq!(flow.id, TEST_FLOW_ID); + assert_eq!(flow.jobs, job_ids); + println!("✅ flow.create works - flow: {}, jobs: {:?}", flow.id, flow.jobs); +} + +#[tokio::test] +async fn test_02_flow_load() { + println!("\n🧪 Test: flow.load"); + + let client = create_client().await; + + // Create a flow first (reuse from test_01) + let job_ids = vec![BASE_JOB_ID, BASE_JOB_ID + 1]; + for (i, job_id) in job_ids.iter().enumerate() { + let job = JobCreate { + id: *job_id, + caller_id: TEST_CALLER_ID, + context_id: TEST_CONTEXT_ID, + script: format!("print('job {}')", i), + script_type: ScriptType::Python, + timeout: 60, + retries: 0, + env_vars: HashMap::new(), + prerequisites: vec![], + depends: if i == 0 { vec![] } else { vec![job_ids[i - 1]] }, + }; + let _ = client.job_create_or_load(TEST_CONTEXT_ID, job).await; + } + + let flow_create = FlowCreate { + id: TEST_FLOW_ID, + caller_id: TEST_CALLER_ID, + context_id: TEST_CONTEXT_ID, + jobs: job_ids.clone(), + env_vars: HashMap::new(), + }; + let _ = client.flow_create_or_load(TEST_CONTEXT_ID, flow_create).await; + + // Load the flow + let result = client.flow_load(TEST_CONTEXT_ID, TEST_FLOW_ID).await; + + if let Err(ref e) = result { + println!(" Error: {:?}", e); + } + assert!(result.is_ok(), "flow.load should succeed"); + let flow = result.unwrap(); + + assert_eq!(flow.id, TEST_FLOW_ID); + assert_eq!(flow.jobs, job_ids); + println!("✅ flow.load works - loaded flow: {}", flow.id); +} + +#[tokio::test] +async fn test_03_flow_dag() { + println!("\n🧪 Test: flow.dag"); + + let client = create_client().await; + + // Create jobs with dependencies + let job_ids = vec![BASE_JOB_ID + 100, BASE_JOB_ID + 101, BASE_JOB_ID + 102]; + for (i, job_id) in job_ids.iter().enumerate() { + let job = JobCreate { + id: *job_id, + caller_id: TEST_CALLER_ID, + context_id: TEST_CONTEXT_ID, + script: format!("print('dag job {}')", i), + script_type: ScriptType::Python, + timeout: 60, + retries: 0, + env_vars: HashMap::new(), + prerequisites: vec![], + depends: if i == 0 { vec![] } else { vec![job_ids[i - 1]] }, + }; + let _ = client.job_create_or_load(TEST_CONTEXT_ID, job).await; + } + + let flow_id = TEST_FLOW_ID + 1; + let flow_create = FlowCreate { + id: flow_id, + caller_id: TEST_CALLER_ID, + context_id: TEST_CONTEXT_ID, + jobs: job_ids.clone(), + env_vars: HashMap::new(), + }; + let _ = client.flow_create_or_load(TEST_CONTEXT_ID, flow_create).await; + + // Get the DAG + let result = client.flow_dag(TEST_CONTEXT_ID, flow_id).await; + + if let Err(ref e) = result { + println!(" Error: {:?}", e); + } + assert!(result.is_ok(), "flow.dag should succeed"); + let dag = result.unwrap(); + + assert_eq!(dag.flow_id, flow_id); + assert_eq!(dag.nodes.len(), 3); + assert_eq!(dag.edges.len(), 2); // Two edges for the chain + println!("✅ flow.dag works - flow: {}, nodes: {}, edges: {}", + dag.flow_id, dag.nodes.len(), dag.edges.len()); +} + +#[tokio::test] +async fn test_04_flow_start() { + println!("\n🧪 Test: flow.start"); + + let client = create_client().await; + + // Create a simple flow + let job_id = BASE_JOB_ID + 200; + let job = JobCreate { + id: job_id, + caller_id: TEST_CALLER_ID, + context_id: TEST_CONTEXT_ID, + script: "print('start test')".to_string(), + script_type: ScriptType::Python, + timeout: 60, + retries: 0, + env_vars: HashMap::new(), + prerequisites: vec![], + depends: vec![], + }; + let _ = client.job_create_or_load(TEST_CONTEXT_ID, job).await; + + let flow_id = TEST_FLOW_ID + 2; + let flow_create = FlowCreate { + id: flow_id, + caller_id: TEST_CALLER_ID, + context_id: TEST_CONTEXT_ID, + jobs: vec![job_id], + env_vars: HashMap::new(), + }; + let _ = client.flow_create_or_load(TEST_CONTEXT_ID, flow_create).await; + + // Start the flow + let result = client.flow_start(TEST_CONTEXT_ID, flow_id).await; + + match result { + Ok(started) => { + println!("✅ flow.start works - started: {}", started); + } + Err(e) => { + println!("⚠️ flow.start: {:?} (runner may not be available)", e); + // This is expected if no actual runner is listening + } + } +} + +#[tokio::test] +async fn test_05_message_create() { + println!("\n🧪 Test: message.create"); + + let client = create_client().await; + + let message_create = MessageCreate { + id: 1, + context_id: TEST_CONTEXT_ID, + runner_id: 12001, + job_id: BASE_JOB_ID, + message_type: MessageType::JobRun, + format: MessageFormatType::JsonRpc, + payload: r#"{"method":"job.run","params":{}}"#.to_string(), + }; + + let result = client.message_create(TEST_CONTEXT_ID, message_create).await; + + match result { + Ok(message) => { + assert_eq!(message.id, 1); + assert_eq!(message.context_id, TEST_CONTEXT_ID); + println!("✅ message.create works - message: {}", message.id); + } + Err(e) => { + println!("⚠️ message.create: {:?} (may already exist)", e); + } + } +} + +#[tokio::test] +async fn test_06_message_load() { + println!("\n🧪 Test: message.load"); + + let client = create_client().await; + + // Create a message first + let message_create = MessageCreate { + id: 2, + context_id: TEST_CONTEXT_ID, + runner_id: 12001, + job_id: BASE_JOB_ID, + message_type: MessageType::JobRun, + format: MessageFormatType::JsonRpc, + payload: r#"{"method":"job.run","params":{}}"#.to_string(), + }; + let _ = client.message_create(TEST_CONTEXT_ID, message_create).await; + + // Load the message + let result = client.message_load(TEST_CONTEXT_ID, 2).await; + + if let Err(ref e) = result { + println!(" Error: {:?}", e); + } + + match result { + Ok(message) => { + assert_eq!(message.id, 2); + assert_eq!(message.context_id, TEST_CONTEXT_ID); + println!("✅ message.load works - loaded message: {}", message.id); + } + Err(_) => { + println!("⚠️ message.load failed (message may not exist)"); + } + } +} + +/// Final test that ensures cleanup happens +#[tokio::test] +async fn test_zz_cleanup() { + println!("🧹 Running cleanup..."); + cleanup_coordinator(); + + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + use std::net::TcpStream; + let port_free = TcpStream::connect_timeout( + &"127.0.0.1:9652".parse().unwrap(), + std::time::Duration::from_millis(100) + ).is_err(); + + assert!(port_free, "Port 9652 should be free after cleanup"); + println!("✅ Cleanup complete - port 9652 is free"); +}