Files
herolib_python/herolib/infra/tmuxrunner/task_runner.py
2025-08-22 13:18:50 +02:00

559 lines
23 KiB
Python

import os
import time
import re
import toml
import libtmux
from libtmux.pane import Pane
from libtmux.window import Window
from libtmux.session import Session
from typing import Dict, List, Optional, Tuple
from dataclasses import asdict
from datetime import datetime
import uuid
from pathlib import Path
import threading
from .model import DAGStructure, DirectoryStatus, TaskStatus, MetaData
from .process_monitor import ProcessMonitor
# Configuration
WAITING_MESSAGE = "WAITING FOR JOBS"
HPY_SH_PATH = "/root/heromonkey/functions/hpy.sh" # Path to hpy.sh
class TaskRunner:
def __init__(self, tasks_root_dir: str):
self.tasks_root_dir = tasks_root_dir
self.run_name = os.path.basename(os.path.abspath(tasks_root_dir)) # Derive run_name
self.session = self._get_current_tmux_session()
self.all_tasks_with_meta = self._get_sorted_tasks_with_meta(tasks_root_dir)
self.window_panes = {} # {window_idx: [pane1, pane2, ...]}
self.run_id = str(uuid.uuid4())
self.dag = self._initialize_dag()
self.dag_file_path = Path(tasks_root_dir) / ".dag.toml"
self.process_monitor = ProcessMonitor()
self._save_dag()
def _initialize_dag(self) -> DAGStructure:
"""Initialize the DAG structure."""
dag = DAGStructure(
run_name=self.run_name,
run_id=self.run_id,
state="INITIALIZING",
start_time=datetime.now().isoformat(),
total_directories=len(self.all_tasks_with_meta)
)
# Create directory entries
for dir_num, (scripts, metadata) in self.all_tasks_with_meta.items():
dir_status = DirectoryStatus(
directory_num=dir_num,
directory_path=os.path.dirname(scripts[0]) if scripts else "",
timeout=metadata.timeout,
window_name=f"{self.run_name}_{dir_num}"
)
# Create task entries
for script_path in scripts:
task = TaskStatus(
script_path=script_path,
script_name=os.path.basename(script_path)
)
dir_status.tasks.append(task)
dag.directories.append(dir_status)
return dag
def _save_dag(self):
"""Save the DAG structure to a TOML file."""
try:
dag_dict = asdict(self.dag)
with open(self.dag_file_path, 'w') as f:
toml.dump(dag_dict, f)
except Exception as e:
print(f"Error saving DAG: {e}")
def _update_task_state(self, dir_idx: int, task_idx: int,
state: str, error_message: Optional[str] = None):
"""Update task state and save DAG."""
task = self.dag.directories[dir_idx].tasks[task_idx]
old_state = task.state
task.state = state
if state == "RUNNING" and old_state != "RUNNING":
task.start_time = datetime.now().isoformat()
elif state in ["DONE", "ERROR", "CRASHED", "TIMED_OUT"]:
task.end_time = datetime.now().isoformat()
if task.start_time:
start = datetime.fromisoformat(task.start_time)
end = datetime.fromisoformat(task.end_time)
task.duration_seconds = (end - start).total_seconds()
if error_message:
task.error_message = error_message
self.dag.last_updated = datetime.now().isoformat()
self._save_dag()
def _update_directory_state(self, dir_idx: int, state: str):
"""Update directory state and save DAG."""
directory = self.dag.directories[dir_idx]
old_state = directory.state
directory.state = state
if state == "RUNNING" and old_state != "RUNNING":
directory.start_time = datetime.now().isoformat()
elif state in ["DONE", "ERROR", "TIMED_OUT"]:
directory.end_time = datetime.now().isoformat()
if directory.start_time:
start = datetime.fromisoformat(directory.start_time)
end = datetime.fromisoformat(directory.end_time)
directory.duration_seconds = (end - start).total_seconds()
if state == "DONE":
self.dag.completed_directories += 1
else:
self.dag.failed_directories += 1
self._save_dag()
def _check_task_status(self, script_path: str, pane: Pane) -> Tuple[str, Optional[str]]:
"""
Comprehensive task status checking.
Returns: (state, error_message)
"""
script_basename = os.path.basename(script_path)
done_file = f"{script_path}.done"
error_file = f"{script_path}.error"
ok_file = f"{script_path}.ok"
# Check file markers
if os.path.exists(done_file) or os.path.exists(ok_file):
# Create .ok file if it doesn't exist
if not os.path.exists(ok_file):
Path(ok_file).touch()
return "DONE", None
if os.path.exists(error_file):
error_msg = None
try:
with open(error_file, 'r') as f:
error_msg = f.read().strip()
except:
error_msg = "Unknown error"
return "ERROR", error_msg
# Check if hpy command is running
if self.process_monitor.is_process_running_command(pane, f"hpy {script_basename}"):
return "RUNNING", None
# Check if pane has any running process
if self._is_pane_running(pane):
# Might be setting up or running something else
return "RUNNING", None
# If we get here, the process finished without markers
# This is likely a crash
error_msg = f"Process terminated without completion marker"
# Create error file
with open(error_file, 'w') as f:
f.write(error_msg)
return "CRASHED", error_msg
def _monitor_directory_tasks(self, dir_idx: int, timeout: int) -> bool:
"""
Monitor tasks in a directory with comprehensive status checking.
Returns: True if all tasks completed successfully, False otherwise.
"""
directory = self.dag.directories[dir_idx]
scripts, metadata = self.all_tasks_with_meta[directory.directory_num]
panes = self.window_panes[dir_idx]
self._update_directory_state(dir_idx, "RUNNING")
start_time = time.time()
all_success = True
while True:
all_finished = True
has_errors = False
for task_idx, (script_path, pane) in enumerate(zip(scripts, panes)):
task = directory.tasks[task_idx]
# Get process metrics if running
if task.state == "RUNNING":
metrics = self.process_monitor.get_process_metrics(pane)
task.process_metrics = metrics
# Check task status
new_state, error_msg = self._check_task_status(script_path, pane)
if new_state != task.state:
self._update_task_state(dir_idx, task_idx, new_state, error_msg)
print(f" Task {task.script_name}: {task.state}")
if new_state == "RUNNING":
all_finished = False
elif new_state in ["ERROR", "CRASHED", "TIMED_OUT"]:
has_errors = True
all_success = False
# Save DAG periodically
self._save_dag()
if all_finished:
if has_errors:
self._update_directory_state(dir_idx, "ERROR")
else:
self._update_directory_state(dir_idx, "DONE")
break
# Check timeout
elapsed = time.time() - start_time
if elapsed > timeout:
print(f" Directory {directory.directory_num} timed out!")
for task_idx, task in enumerate(directory.tasks):
if task.state == "RUNNING":
self._update_task_state(dir_idx, task_idx, "TIMED_OUT")
panes[task_idx].send_keys("C-c", literal=True)
self._update_directory_state(dir_idx, "TIMED_OUT")
all_success = False
break
time.sleep(2) # Check every 2 seconds
return all_success
# def run(self):
# """Enhanced run method with DAG tracking."""
# print(f"Starting enhanced task orchestration for '{self.run_name}'")
# print(f"Run ID: {self.run_id}")
# print(f"DAG file: {self.dag_file_path}")
# self.dag.state = "RUNNING"
# self._save_dag()
# # Initialize windows and panes (similar to original)
# self._setup_windows_and_panes()
# # Process directories sequentially
# overall_success = True
# for dir_idx in range(len(self.dag.directories)):
# directory = self.dag.directories[dir_idx]
# print(f"\n--- Processing Directory {directory.directory_num} ---")
# # Start tasks if not the first directory
# if dir_idx > 0:
# self._start_directory_tasks(dir_idx)
# # Monitor tasks
# success = self._monitor_directory_tasks(
# dir_idx,
# directory.timeout
# )
# if not success:
# overall_success = False
# # Update final DAG state
# self.dag.state = "COMPLETED" if overall_success else "FAILED"
# self.dag.end_time = datetime.now().isoformat()
# if self.dag.start_time:
# start = datetime.fromisoformat(self.dag.start_time)
# end = datetime.fromisoformat(self.dag.end_time)
# self.dag.duration_seconds = (end - start).total_seconds()
# self._save_dag()
# print(f"\nTask orchestration completed: {self.dag.state}")
# print(f"Total duration: {self.dag.duration_seconds:.2f} seconds")
def reset(self):
"""Kills all processes and panes inside task windows, removes windows, and deletes .done/.error/.ok files."""
print(f"\n--- Resetting run '{self.run_name}' ---")
self.cleanup() # First, kill all associated tmux windows
# Then, remove all .done, .error, and .ok files
print(" Removing .done, .error, and .ok files...")
for dir_num, (scripts, metadata) in self.all_tasks_with_meta.items():
for script_path in scripts:
done_file = f"{script_path}.done"
error_file = f"{script_path}.error"
ok_file = f"{script_path}.ok"
if os.path.exists(done_file):
os.remove(done_file)
print(f" Removed: {done_file}")
if os.path.exists(error_file):
os.remove(error_file)
print(f" Removed: {error_file}")
if os.path.exists(ok_file):
os.remove(ok_file)
print(f" Removed: {ok_file}")
# Also remove the .dag.toml file if it exists
if hasattr(self, 'dag_file_path') and self.dag_file_path.exists():
os.remove(self.dag_file_path)
print(f" Removed: {self.dag_file_path}")
print("Reset complete.")
def _get_sorted_tasks_with_meta(self, tasks_root):
"""
Reads all scripts and .meta.toml from the tasks_root, sorts them by directory,
and then by script name within each directory.
Returns a dictionary where keys are directory numbers (e.g., 1, 2)
and values are tuples of (list_of_full_script_paths, MetaData_object).
"""
tasks_with_meta = {}
for dirpath, dirnames, filenames in os.walk(tasks_root):
if dirpath == tasks_root:
dirnames[:] = sorted([d for d in dirnames if d.isdigit()], key=int)
relative_path = os.path.relpath(dirpath, tasks_root)
if relative_path != "." and relative_path.isdigit():
dir_num = int(relative_path)
scripts = sorted([os.path.join(dirpath, f) for f in filenames if f.endswith(".sh")])
metadata_file = os.path.join(dirpath, ".meta.toml")
metadata = MetaData() # Default metadata
if os.path.exists(metadata_file):
try:
with open(metadata_file, 'r') as f:
meta_data_dict = toml.load(f)
if 'timeout' in meta_data_dict:
metadata.timeout = int(meta_data_dict['timeout'])
except Exception as e:
print(f"Warning: Could not read or parse .meta.toml for directory {dir_num}: {e}")
if scripts:
tasks_with_meta[dir_num] = (scripts, metadata)
sorted_tasks_with_meta = dict(sorted(tasks_with_meta.items()))
return sorted_tasks_with_meta
def _get_current_tmux_session(self) -> Session:
"""Gets the current tmux session based on TMUX environment variable."""
server = libtmux.Server()
tmux_env = os.environ.get('TMUX')
if not tmux_env:
raise Exception("Not running inside a tmux session. The 'TMUX' environment variable is not set.")
try:
# TMUX variable format: /tmp/tmux-1000/default,12345,0
# The last part '0' is the session index.
match = re.search(r',(\d+)$', tmux_env)
if not match:
raise Exception(f"Could not parse session index from TMUX environment variable: {tmux_env}")
session_index_from_env = match.group(1)
found_session = None
for s in server.sessions:
if s.session_id == f"${session_index_from_env}":
found_session = s
break
if not found_session:
raise Exception(f"Could not find tmux session with ID: ${session_index_from_env}")
print(f"Attached to current tmux session: '{found_session.name}' via TMUX env var.")
return found_session
except Exception as e:
raise Exception(f"Error getting current tmux session: {e}")
def _create_tmux_window(self, window_name: str) -> Window:
"""Creates a new tmux window."""
window = self.session.new_window(attach=False, window_name=window_name)
print(f" Tmux window '{window_name}' created.")
return window
def _create_tmux_pane(self, window: Window, pane_index: int, command: str) -> Pane:
"""Creates a tmux pane and sends a command."""
if pane_index == 0:
pane = window.active_pane
pane.send_keys("clear", enter=True)
else:
pane = window.split(attach=False)
pane.send_keys(command, enter=True)
print(f" Pane {pane_index}: Command sent: '{command}'")
return pane
def _is_pane_running(self, pane: Pane) -> bool:
"""Checks if a tmux pane is still running a process."""
try:
pane_pid = pane.pane_pid
if pane_pid is not None:
try:
pid_int = int(pane_pid)
if pid_int > 0:
os.kill(pid_int, 0)
return True
except (ValueError, OSError):
return False
return False
except Exception as e:
print(f"Error checking pane status for {pane.window_name}:{pane.pane_index}: {e}")
return False
def _setup_windows_and_panes(self):
"""Initial setup of tmux windows and panes for all tasks."""
all_dir_nums = sorted(self.all_tasks_with_meta.keys())
print("\n--- Initial Tmux Setup: Creating windows and panes ---")
for window_idx, dir_num in enumerate(all_dir_nums):
scripts, metadata = self.all_tasks_with_meta[dir_num]
window_name = f"{self.run_name}_{dir_num}"
window = self._create_tmux_window(window_name)
self.window_panes[window_idx] = []
for pane_idx, script_path in enumerate(scripts):
script_dir = os.path.dirname(script_path)
script_basename = os.path.basename(script_path)
if window_idx == 0:
# Send cd command first, then the hpy command
pane = self._create_tmux_pane(window, pane_idx, f"cd {script_dir}")
pane.send_keys(f"source {HPY_SH_PATH} && hpy {script_basename}; echo \"Script {script_basename} finished.\"", enter=True)
print(f" Pane {pane_idx}: Command sent: 'cd {script_dir}' and 'source {HPY_SH_PATH} && hpy {script_basename}'")
else:
command = f"echo '{WAITING_MESSAGE} for {script_basename}'"
pane = self._create_tmux_pane(window, pane_idx, command)
self.window_panes[window_idx].append(pane)
if window_idx == 0:
print(f" Window '{window_name}' (Directory {dir_num}) tasks started.")
else:
print(f" Window '{window_name}' (Directory {dir_num}) panes set to '{WAITING_MESSAGE}'.")
def _start_directory_tasks(self, dir_idx: int):
"""Starts tasks in a specific directory (window)."""
directory = self.dag.directories[dir_idx]
scripts, metadata = self.all_tasks_with_meta[directory.directory_num]
panes_in_current_window = self.window_panes[dir_idx]
print(f"\n--- Activating tasks in window '{directory.window_name}' (Directory {directory.directory_num}) ---")
for pane_idx, script_path in enumerate(scripts):
script_dir = os.path.dirname(script_path)
script_basename = os.path.basename(script_path)
pane = panes_in_current_window[pane_idx]
pane.send_keys("C-c", literal=True) # Clear any previous command/output
# Send cd command first, then the hpy command
pane.send_keys(f"cd {script_dir}", enter=True)
pane.send_keys(f"source {HPY_SH_PATH} && hpy {script_basename}; echo \"Script {script_basename} finished.\"", enter=True)
print(f" Pane {pane_idx}: Command sent: 'cd {script_dir}' and 'source {HPY_SH_PATH} && hpy {script_basename}'")
def run(self):
"""Enhanced run method with DAG tracking."""
print(f"Starting enhanced task orchestration for '{self.run_name}'")
print(f"Run ID: {self.run_id}")
print(f"DAG file: {self.dag_file_path}")
self.dag.state = "RUNNING"
self._save_dag()
if not self.all_tasks_with_meta:
print("No tasks found to execute.")
return
print("Detected tasks:")
for dir_num, (scripts, metadata) in self.all_tasks_with_meta.items():
print(f" Directory {dir_num} (Timeout: {metadata.timeout}s):")
for script in scripts:
print(f" - {script}")
# Initialize windows and panes
self._setup_windows_and_panes()
# Process directories sequentially
overall_success = True
for dir_idx in range(len(self.dag.directories)):
directory = self.dag.directories[dir_idx]
print(f"\n--- Processing Directory {directory.directory_num} ---")
# Start tasks if not the first directory
if dir_idx > 0:
self._start_directory_tasks(dir_idx)
# Monitor tasks
success = self._monitor_directory_tasks(
dir_idx,
directory.timeout
)
if not success:
overall_success = False
# Update final DAG state
self.dag.state = "COMPLETED" if overall_success else "FAILED"
self.dag.end_time = datetime.now().isoformat()
if self.dag.start_time:
start = datetime.fromisoformat(self.dag.start_time)
end = datetime.fromisoformat(self.dag.end_time)
self.dag.duration_seconds = (end - start).total_seconds()
self._save_dag()
print(f"\nTask orchestration completed: {self.dag.state}")
print(f"Total duration: {self.dag.duration_seconds:.2f} seconds")
print(f"You can attach to the tmux session to review: tmux attach -t {self.session.name}")
def cleanup(self):
"""Removes all tmux windows created by this run."""
print(f"\n--- Cleaning up tmux windows for run '{self.run_name}' ---")
print(f" Current session name: '{self.session.name}'")
all_session_windows = [w.name for w in self.session.windows if w.name]
print(f" All windows in current session: {all_session_windows}")
windows_to_kill = []
expected_prefix = f"{self.run_name}_"
print(f" Looking for windows starting with prefix: '{expected_prefix}'")
for window in self.session.windows:
if window.name and window.name.startswith(expected_prefix):
windows_to_kill.append(window)
if not windows_to_kill:
print(f" No windows found to kill with prefix '{expected_prefix}'.")
print("Cleanup complete.")
return
print(f" Identified {len(windows_to_kill)} windows to kill: {[w.name for w in windows_to_kill]}")
for window in windows_to_kill:
try:
window.kill()
print(f" Killed window: '{window.name}'")
except Exception as e:
print(f" Error killing window '{window.name}': {e}")
print("Cleanup complete.")
def reset(self):
"""Kills all processes and panes inside task windows, removes windows, and deletes .done/.error/.ok files."""
print(f"\n--- Resetting run '{self.run_name}' ---")
self.cleanup() # First, kill all associated tmux windows
# Then, remove all .done, .error, and .ok files
print(" Removing .done, .error, and .ok files...")
for dir_num, (scripts, metadata) in self.all_tasks_with_meta.items():
for script_path in scripts:
done_file = f"{script_path}.done"
error_file = f"{script_path}.error"
ok_file = f"{script_path}.ok"
if os.path.exists(done_file):
os.remove(done_file)
print(f" Removed: {done_file}")
if os.path.exists(error_file):
os.remove(error_file)
print(f" Removed: {error_file}")
if os.path.exists(ok_file):
os.remove(ok_file)
print(f" Removed: {ok_file}")
# Also remove the .dag.toml file if it exists
if hasattr(self, 'dag_file_path') and self.dag_file_path.exists():
os.remove(self.dag_file_path)
print(f" Removed: {self.dag_file_path}")
print("Reset complete.")