diff --git a/herolib/infra/tmuxrunner/model.py b/herolib/infra/tmuxrunner/model.py index 48584dd..988a2a6 100644 --- a/herolib/infra/tmuxrunner/model.py +++ b/herolib/infra/tmuxrunner/model.py @@ -1,28 +1,6 @@ -import os -import time -import re -import sys -import toml -import libtmux -from libtmux.pane import Pane -from libtmux.window import Window -from libtmux.session import Session -import psutil -from typing import Dict, List, Optional, Any, Set, Tuple -from dataclasses import dataclass, field, asdict +from typing import Dict, List, Optional +from dataclasses import dataclass, field from datetime import datetime -import uuid -from pathlib import Path -import asyncio -import uvicorn -from fastapi import FastAPI, HTTPException -from fastapi.middleware.cors import CORSMiddleware -from pydantic import BaseModel -import threading - -# Configuration -WAITING_MESSAGE = "WAITING FOR JOBS" -HPY_SH_PATH = "/root/heromonkey/functions/hpy.sh" # Path to hpy.sh @dataclass class ProcessMetrics: @@ -81,6 +59,6 @@ class DAGStructure: class MetaData: """Class to hold metadata for a task directory.""" - def __init__(self, timeout: int = 600): # Default timeout to 10 minutes (600 seconds) + def __init__(self, timeout: int = 600): # Default timeout to 10 minutes (600 seconds) self.timeout = timeout - # Add more attributes here in the future + # Add more attributes here in the future \ No newline at end of file diff --git a/herolib/infra/tmuxrunner/process_monitor.py b/herolib/infra/tmuxrunner/process_monitor.py index c88d9e1..3bd8f11 100644 --- a/herolib/infra/tmuxrunner/process_monitor.py +++ b/herolib/infra/tmuxrunner/process_monitor.py @@ -1,25 +1,8 @@ -import os -import time -import re -import sys -import toml -import libtmux -from libtmux.pane import Pane -from libtmux.window import Window -from libtmux.session import Session import psutil -from typing import Dict, List, Optional, Any, Set, Tuple -from dataclasses import dataclass, field, asdict +from typing import List, Optional, Tuple from datetime import datetime -import uuid -from pathlib import Path -import asyncio -import uvicorn -from fastapi import FastAPI, HTTPException -from fastapi.middleware.cors import CORSMiddleware -from pydantic import BaseModel -import threading - +from libtmux.pane import Pane +from .model import ProcessMetrics class ProcessMonitor: """Monitor processes running in tmux panes using psutil.""" @@ -103,4 +86,4 @@ class ProcessMonitor: except (psutil.NoSuchProcess, psutil.AccessDenied): pass - return False + return False \ No newline at end of file diff --git a/herolib/infra/tmuxrunner/task_runner.py b/herolib/infra/tmuxrunner/task_runner.py index dd606f9..fac0734 100644 --- a/herolib/infra/tmuxrunner/task_runner.py +++ b/herolib/infra/tmuxrunner/task_runner.py @@ -1,37 +1,33 @@ import os import time import re -import sys import toml import libtmux from libtmux.pane import Pane from libtmux.window import Window from libtmux.session import Session -import psutil -from typing import Dict, List, Optional, Any, Set, Tuple -from dataclasses import dataclass, field, asdict +from typing import Dict, List, Optional, Tuple +from dataclasses import asdict from datetime import datetime import uuid from pathlib import Path -import asyncio -import uvicorn -from fastapi import FastAPI, HTTPException -from fastapi.middleware.cors import CORSMiddleware -from pydantic import BaseModel import threading +from .model import DAGStructure, DirectoryStatus, TaskStatus, MetaData +from .process_monitor import ProcessMonitor + # Configuration WAITING_MESSAGE = "WAITING FOR JOBS" -HPY_SH_PATH = "/root/heromonkey/functions/hpy.sh" # Path to hpy.sh +HPY_SH_PATH = "/root/heromonkey/functions/hpy.sh" # Path to hpy.sh class TaskRunner: def __init__(self, tasks_root_dir: str): self.tasks_root_dir = tasks_root_dir - self.run_name = os.path.basename(os.path.abspath(tasks_root_dir)) # Derive run_name + self.run_name = os.path.basename(os.path.abspath(tasks_root_dir)) # Derive run_name self.session = self._get_current_tmux_session() self.all_tasks_with_meta = self._get_sorted_tasks_with_meta(tasks_root_dir) - self.window_panes = {} # {window_idx: [pane1, pane2, ...]} + self.window_panes = {} # {window_idx: [pane1, pane2, ...]} self.run_id = str(uuid.uuid4()) self.dag = self._initialize_dag() self.dag_file_path = Path(tasks_root_dir) / ".dag.toml" @@ -230,48 +226,48 @@ class TaskRunner: return all_success - def run(self): - """Enhanced run method with DAG tracking.""" - print(f"Starting enhanced task orchestration for '{self.run_name}'") - print(f"Run ID: {self.run_id}") - print(f"DAG file: {self.dag_file_path}") + # def run(self): + # """Enhanced run method with DAG tracking.""" + # print(f"Starting enhanced task orchestration for '{self.run_name}'") + # print(f"Run ID: {self.run_id}") + # print(f"DAG file: {self.dag_file_path}") - self.dag.state = "RUNNING" - self._save_dag() + # self.dag.state = "RUNNING" + # self._save_dag() - # Initialize windows and panes (similar to original) - self._setup_windows_and_panes() + # # Initialize windows and panes (similar to original) + # self._setup_windows_and_panes() - # Process directories sequentially - overall_success = True - for dir_idx in range(len(self.dag.directories)): - directory = self.dag.directories[dir_idx] - print(f"\n--- Processing Directory {directory.directory_num} ---") + # # Process directories sequentially + # overall_success = True + # for dir_idx in range(len(self.dag.directories)): + # directory = self.dag.directories[dir_idx] + # print(f"\n--- Processing Directory {directory.directory_num} ---") - # Start tasks if not the first directory - if dir_idx > 0: - self._start_directory_tasks(dir_idx) + # # Start tasks if not the first directory + # if dir_idx > 0: + # self._start_directory_tasks(dir_idx) - # Monitor tasks - success = self._monitor_directory_tasks( - dir_idx, - directory.timeout - ) + # # Monitor tasks + # success = self._monitor_directory_tasks( + # dir_idx, + # directory.timeout + # ) - if not success: - overall_success = False + # if not success: + # overall_success = False - # Update final DAG state - self.dag.state = "COMPLETED" if overall_success else "FAILED" - self.dag.end_time = datetime.now().isoformat() - if self.dag.start_time: - start = datetime.fromisoformat(self.dag.start_time) - end = datetime.fromisoformat(self.dag.end_time) - self.dag.duration_seconds = (end - start).total_seconds() - self._save_dag() + # # Update final DAG state + # self.dag.state = "COMPLETED" if overall_success else "FAILED" + # self.dag.end_time = datetime.now().isoformat() + # if self.dag.start_time: + # start = datetime.fromisoformat(self.dag.start_time) + # end = datetime.fromisoformat(self.dag.end_time) + # self.dag.duration_seconds = (end - start).total_seconds() + # self._save_dag() - print(f"\nTask orchestration completed: {self.dag.state}") - print(f"Total duration: {self.dag.duration_seconds:.2f} seconds") + # print(f"\nTask orchestration completed: {self.dag.state}") + # print(f"Total duration: {self.dag.duration_seconds:.2f} seconds") def reset(self): """Kills all processes and panes inside task windows, removes windows, and deletes .done/.error/.ok files.""" @@ -321,7 +317,7 @@ class TaskRunner: scripts = sorted([os.path.join(dirpath, f) for f in filenames if f.endswith(".sh")]) metadata_file = os.path.join(dirpath, ".meta.toml") - metadata = MetaData() # Default metadata + metadata = MetaData() # Default metadata if os.path.exists(metadata_file): try: with open(metadata_file, 'r') as f: @@ -444,7 +440,7 @@ class TaskRunner: script_basename = os.path.basename(script_path) pane = panes_in_current_window[pane_idx] - pane.send_keys("C-c", literal=True) # Clear any previous command/output + pane.send_keys("C-c", literal=True) # Clear any previous command/output # Send cd command first, then the hpy command pane.send_keys(f"cd {script_dir}", enter=True) @@ -452,8 +448,14 @@ class TaskRunner: print(f" Pane {pane_idx}: Command sent: 'cd {script_dir}' and 'source {HPY_SH_PATH} && hpy {script_basename}'") def run(self): - print(f"Starting task orchestration for run '{self.run_name}' from {self.tasks_root_dir}") - + """Enhanced run method with DAG tracking.""" + print(f"Starting enhanced task orchestration for '{self.run_name}'") + print(f"Run ID: {self.run_id}") + print(f"DAG file: {self.dag_file_path}") + + self.dag.state = "RUNNING" + self._save_dag() + if not self.all_tasks_with_meta: print("No tasks found to execute.") return @@ -464,81 +466,40 @@ class TaskRunner: for script in scripts: print(f" - {script}") - all_dir_nums = sorted(self.all_tasks_with_meta.keys()) - - # Phase 1: Initial setup - create all windows and panes, - # execute first window's tasks, set others to WAITING. + # Initialize windows and panes self._setup_windows_and_panes() - - # Phase 2: Sequential execution and monitoring - print("\n--- Starting Sequential Task Execution ---") - for window_idx, dir_num in enumerate(all_dir_nums): - scripts, metadata = self.all_tasks_with_meta[dir_num] - window_name = f"{self.run_name}_{dir_num}" - panes_in_current_window = self.window_panes[window_idx] - - if window_idx > 0: - self._start_directory_tasks(window_idx) + + # Process directories sequentially + overall_success = True + for dir_idx in range(len(self.dag.directories)): + directory = self.dag.directories[dir_idx] + print(f"\n--- Processing Directory {directory.directory_num} ---") - print(f" Monitoring tasks in window '{window_name}' (Timeout: {metadata.timeout}s)...") - start_time = time.time() - script_states = {os.path.basename(s): "INITIAL" for s in scripts} # Track state for each script + # Start tasks if not the first directory + if dir_idx > 0: + self._start_directory_tasks(dir_idx) - while True: - all_tasks_finished_or_failed = True - current_status_report = [] - - for pane_idx, script_path in enumerate(scripts): - script_basename = os.path.basename(script_path) - pane = panes_in_current_window[pane_idx] - done_file = f"{script_path}.done" - error_file = f"{script_path}.error" - ok_file = f"{script_path}.ok" # Added .ok file check - - current_script_state = script_states[script_basename] - new_script_state = current_script_state - - if os.path.exists(done_file) or os.path.exists(ok_file): # Check for .ok file - new_script_state = "DONE" - elif os.path.exists(error_file): - new_script_state = "ERROR" - elif self._is_pane_running(pane): - new_script_state = "RUNNING" - all_tasks_finished_or_failed = False - elif current_script_state == "RUNNING": - # If it was running but now isn't, and no done/error file, it crashed - new_script_state = "CRASHED" - - if new_script_state != current_script_state: - script_states[script_basename] = new_script_state - current_status_report.append(f" - {script_basename}: {new_script_state}") - - if current_status_report: - print(f" Status update for window '{window_name}' (Elapsed: {int(time.time() - start_time)}s):") - for report_line in current_status_report: - print(report_line) - - if all_tasks_finished_or_failed: - print(f" All tasks in window '{window_name}' completed or failed.") - break - - elapsed_time = time.time() - start_time - if elapsed_time > metadata.timeout: - print(f" ERROR: Tasks in window '{window_name}' timed out after {metadata.timeout}s.") - for pane_idx, script_path in enumerate(scripts): - script_basename = os.path.basename(script_path) - pane = panes_in_current_window[pane_idx] - if script_states[script_basename] == "RUNNING": - script_states[script_basename] = "TIMED_OUT" - print(f" - {script_basename}: TIMED_OUT (Killing pane)") - pane.send_keys("C-c", literal=True) - time.sleep(1) - break - - time.sleep(5) - - print("\nAll task directories processed. You can attach to the tmux session to review:") - print(f"tmux attach -t {self.session.name}") + # Monitor tasks + success = self._monitor_directory_tasks( + dir_idx, + directory.timeout + ) + + if not success: + overall_success = False + + # Update final DAG state + self.dag.state = "COMPLETED" if overall_success else "FAILED" + self.dag.end_time = datetime.now().isoformat() + if self.dag.start_time: + start = datetime.fromisoformat(self.dag.start_time) + end = datetime.fromisoformat(self.dag.end_time) + self.dag.duration_seconds = (end - start).total_seconds() + self._save_dag() + + print(f"\nTask orchestration completed: {self.dag.state}") + print(f"Total duration: {self.dag.duration_seconds:.2f} seconds") + print(f"You can attach to the tmux session to review: tmux attach -t {self.session.name}") def cleanup(self): """Removes all tmux windows created by this run.""" @@ -570,25 +531,29 @@ class TaskRunner: print("Cleanup complete.") def reset(self): - """Kills all processes and panes inside task windows, removes windows, and deletes .done/.error files.""" + """Kills all processes and panes inside task windows, removes windows, and deletes .done/.error/.ok files.""" print(f"\n--- Resetting run '{self.run_name}' ---") - self.cleanup() # First, kill all associated tmux windows + self.cleanup() # First, kill all associated tmux windows - # Then, remove all .done and .error files - print(" Removing .done and .error files...") + # Then, remove all .done, .error, and .ok files + print(" Removing .done, .error, and .ok files...") for dir_num, (scripts, metadata) in self.all_tasks_with_meta.items(): for script_path in scripts: done_file = f"{script_path}.done" error_file = f"{script_path}.error" - ok_file = f"{script_path}.ok" # Added .ok file to be removed + ok_file = f"{script_path}.ok" if os.path.exists(done_file): os.remove(done_file) print(f" Removed: {done_file}") if os.path.exists(error_file): os.remove(error_file) print(f" Removed: {error_file}") - if os.path.exists(ok_file): # Remove .ok file + if os.path.exists(ok_file): os.remove(ok_file) print(f" Removed: {ok_file}") - print("Reset complete.") - + + # Also remove the .dag.toml file if it exists + if hasattr(self, 'dag_file_path') and self.dag_file_path.exists(): + os.remove(self.dag_file_path) + print(f" Removed: {self.dag_file_path}") + print("Reset complete.") \ No newline at end of file diff --git a/herolib/infra/tmuxrunner/task_runner_api.py b/herolib/infra/tmuxrunner/task_runner_api.py index abb2fd2..f699610 100644 --- a/herolib/infra/tmuxrunner/task_runner_api.py +++ b/herolib/infra/tmuxrunner/task_runner_api.py @@ -1,31 +1,19 @@ -import os +import threading import time -import re -import sys -import toml -import libtmux -from libtmux.pane import Pane -from libtmux.window import Window -from libtmux.session import Session -import psutil -from typing import Dict, List, Optional, Any, Set, Tuple -from dataclasses import dataclass, field, asdict +from typing import Dict, List, Optional +from dataclasses import asdict from datetime import datetime -import uuid -from pathlib import Path -import asyncio import uvicorn from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware -from pydantic import BaseModel -import threading +from .task_runner import TaskRunner class TaskRunnerAPI: """FastAPI interface for the task runner.""" - def __init__(self, runner: EnhancedTaskRunner): + def __init__(self, runner: TaskRunner): self.runner = runner self.app = FastAPI(title="Task Runner API", version="1.0.0") @@ -132,13 +120,15 @@ class TaskRunnerAPI: """Start the FastAPI server.""" uvicorn.run(self.app, host=host, port=port) + class TaskOrchestrator: """Main orchestrator that runs tasks and API server.""" def __init__(self, tasks_dir: str, api_port: int = 8000): - self.runner = EnhancedTaskRunner(tasks_dir) + self.runner = TaskRunner(tasks_dir) self.api = TaskRunnerAPI(self.runner) self.api_thread = None + self.api_port = api_port def start_api_server(self, port: int = 8000): """Start API server in a separate thread.""" @@ -153,7 +143,7 @@ class TaskOrchestrator: def run(self): """Run the task orchestration.""" # Start API server - self.start_api_server() + self.start_api_server(self.api_port) # Reset and run tasks self.runner.reset() @@ -166,6 +156,7 @@ class TaskOrchestrator: self.runner._save_dag() print("\nExecution completed. API server still running.") + print(f"Access API at: http://localhost:{self.api_port}") print("Press Ctrl+C to stop the API server.") try: @@ -173,4 +164,4 @@ class TaskOrchestrator: while True: time.sleep(1) except KeyboardInterrupt: - print("\nShutting down...") + print("\nShutting down...") \ No newline at end of file