Implement hero_logic: General-Purpose DAG Control Flow Engine #1

Open
opened 2026-04-12 09:24:50 +00:00 by timur · 2 comments
Owner

hero_logic — General-Purpose DAG Control Flow Engine

Overview

hero_logic is a new Hero RPC service that provides a general-purpose directed acyclic graph (DAG) execution engine for defining, storing, and running multi-step workflows. It replaces hardcoded control flows (like the hero_router service agent pipeline) with a configurable, observable, and reusable system.

Originating issue: hero_router#34 — Hero service agent improvements


Architecture

Core Concepts

Concept Description
Workflow A DAG definition: a set of nodes and directed edges. Reusable blueprint.
Node A single step in a workflow with a type, configuration, inputs, and outputs.
Edge A directed connection from one node to another, carrying data mappings.
Template A stored workflow definition in the templates/ directory, loadable by name.
Play A single execution instance of a workflow — the runtime record.
NodeRun The execution record for a single node within a play, linked to a hero_proc job.

System Interactions

hero_router (user-facing agent)          hero_logic_ui (admin dashboard)
       |                                        |
       v                                        v
   hero_logic (DAG engine, JSON-RPC over Unix socket)
       |
       +---> hero_proc (action/job execution for ALL nodes)
       |        +---> python3/uv (script execution)
       |        +---> rhai (script execution)
       |
       +---> hero_aibroker (AI model calls, invoked via hero_proc actions)

Key principle: ALL node executions go through hero_proc as actions/jobs, providing uniform logging, process management, timeouts, and observability. hero_logic creates hero_proc actions for each node type and tracks the resulting job IDs.


Data Models (OSchema)

These should be defined in .oschema files and processed by the hero_rpc generator.

Enums

# Node types determine execution behavior
NodeType = "ai_call" | "script_execution" | "conditional" | "transform" | "parallel_group" | "wait" | "human_input"

# Execution status for plays and node runs
ExecutionStatus = "pending" | "queued" | "running" | "success" | "failed" | "skipped" | "cancelled" | "timed_out"

# Edge condition types
EdgeCondition = "always" | "on_success" | "on_failure" | "conditional"

# Script languages supported by script_execution nodes
ScriptLanguage = "python" | "rhai" | "bash"

Core Types

# NodeConfig — type-specific configuration for a node
# This is a flexible key-value structure; interpretation depends on NodeType
NodeConfig = {
    node_type: NodeType
    # For ai_call nodes:
    model: str              # e.g. "gpt-4o-mini", "claude-sonnet-4-20250514"
    system_prompt: str      # system message template (supports {{variable}} interpolation)
    user_prompt: str        # user message template
    temperature: f64        # 0.0 - 2.0
    max_tokens: u32
    # For script_execution nodes:
    language: ScriptLanguage
    script_template: str    # script code template (supports {{variable}} interpolation)
    timeout_secs: u32       # execution timeout
    # For conditional nodes:
    condition_expr: str     # expression evaluated against input data
    # For transform nodes:
    transform_expr: str     # data transformation expression
    # General:
    retry_count: u32        # max retries on failure (0 = no retry)
    retry_delay_secs: u32   # delay between retries
}

# DataMapping — maps output fields from source node to input fields of target node
DataMapping = {
    source_field: str       # field path in source node's output
    target_field: str       # field path in target node's input
}

# Edge — directed connection between nodes
Edge = {
    from_node: str          # source node ID (within workflow)
    to_node: str            # target node ID (within workflow)
    condition: EdgeCondition
    condition_expr: str     # expression when condition == "conditional"
    data_mappings: [DataMapping]  # how data flows from source to target
}

# Node — a single step in a workflow
Node = {
    node_id: str            # unique within the workflow (e.g. "select_services")
    name: str               # human-readable name
    description: str
    config: NodeConfig
    input_schema: str       # JSON schema for expected inputs (optional, for validation)
    output_schema: str      # JSON schema for expected outputs (optional)
}

# Workflow [rootobject]
# A reusable DAG definition
Workflow = {
    sid: str
    name: str @index
    description: str @index
    version: str
    nodes: [Node]
    edges: [Edge]
    input_schema: str       # JSON schema for workflow-level inputs
    output_schema: str      # JSON schema for workflow-level outputs
    tags: [str] @index
    created_at: otime
    updated_at: otime
}

# NodeRun — execution record for a single node within a play
NodeRun = {
    node_id: str            # references Node.node_id in the workflow
    status: ExecutionStatus
    hero_proc_action_id: str  # hero_proc action SID
    hero_proc_job_id: str     # hero_proc job SID for this execution
    input_data: str         # JSON: actual input data for this execution
    output_data: str        # JSON: actual output data
    error_message: str
    attempt: u32            # which attempt (0-based, increments on retry)
    started_at: otime
    completed_at: otime
    duration_ms: u64
    # AI-specific metrics (populated for ai_call nodes)
    tokens_prompt: u32
    tokens_completion: u32
    model_used: str
}

# Play [rootobject]
# A single execution instance of a workflow
Play = {
    sid: str
    workflow_sid: str       # references the Workflow this play executes
    name: str @index
    status: ExecutionStatus
    input_data: str         # JSON: workflow-level input data
    output_data: str        # JSON: workflow-level output data
    node_runs: [NodeRun]
    error_message: str
    started_at: otime
    completed_at: otime
    duration_ms: u64
    total_tokens_prompt: u32
    total_tokens_completion: u32
    tags: [str] @index
    created_at: otime
}

RPC Service API

service LogicService {
    version: "1.0.0"

    # --- Workflow Management ---
    # Standard CRUD (auto-generated): workflow.new, workflow.get, workflow.set, workflow.delete, workflow.list, workflow.find

    # Load a workflow from a template by name
    workflow_from_template(template_name: str, overrides: str) -> Workflow

    # Validate a workflow DAG (check for cycles, missing edges, schema mismatches)
    workflow_validate(workflow_sid: str) -> str

    # List available templates
    template_list() -> [str]

    # --- Play Execution ---
    # Standard CRUD (auto-generated): play.new, play.get, play.set, play.delete, play.list, play.find

    # Start executing a workflow (creates a Play and begins DAG traversal)
    play_start(workflow_sid: str, input_data: str, name: str) -> Play

    # Cancel a running play
    play_cancel(play_sid: str) -> Play

    # Retry a failed play from the failed node
    play_retry(play_sid: str) -> Play

    # Get detailed status of a play including all node runs
    play_status(play_sid: str) -> Play

    # --- Node Operations ---

    # Get logs for a specific node run (fetches from hero_proc)
    node_logs(play_sid: str, node_id: str) -> str

    # Retry a specific failed node within a play
    node_retry(play_sid: str, node_id: str) -> NodeRun
}

Templates & Examples

Directory Structure

hero_logic/
├── templates/                    # Reusable workflow templates
│   ├── service_agent.toml        # The service agent workflow
│   ├── code_review.toml          # Example: AI code review pipeline
│   └── data_pipeline.toml        # Example: ETL-style data pipeline
├── examples/                     # Templates configured with real specs
│   ├── service_agent_hero_book.toml  # Service agent configured for hero_book
│   └── service_agent_multi.toml      # Multi-service agent example

Service Agent Template (the primary use case)

The service_agent template encodes the current hero_router agent pipeline as a DAG:

[service_selection] --on_success--> [code_generation] --on_success--> [script_execution]
                                                                          |
                                                                     on_failure
                                                                          |
                                                                          v
                                                                    [error_debug] --on_success--> [script_execution]

Nodes:

  1. service_selection (ai_call) — Given a prompt and service catalog, select relevant services
  2. code_generation (ai_call) — Given selected services' interfaces, generate Python code
  3. script_execution (script_execution) — Execute the generated Python script
  4. error_debug (ai_call) — On script failure, analyze error and generate fixed script
  5. result_summary (ai_call) — Summarize execution output for the user

The retry loop (steps 3→4→3) is modeled as a conditional retry edge with retry_count on the script_execution node.


Integration with hero_proc

All node executions are delegated to hero_proc:

  1. On workflow node execution, hero_logic:

    • Creates a hero_proc action for the node type if one doesn't exist (e.g., logic_ai_call, logic_script_python)
    • Creates a hero_proc job from that action with the node's input data
    • Monitors the job for completion
    • Reads job output/logs via hero_proc SDK
    • Records results in the NodeRun
  2. Action types by node:

    • ai_call → hero_proc action that calls hero_aibroker via Unix socket JSON-RPC
    • script_execution → hero_proc action that runs the script with the configured interpreter
    • conditional → hero_proc action that evaluates the condition expression
    • transform → hero_proc action that applies the data transformation
  3. Benefits:

    • Uniform log collection (hero_proc captures stdout/stderr)
    • Process timeouts and cleanup
    • Job history and rerunnability
    • Links from hero_logic UI → hero_proc UI for deep inspection

Integration with hero_router

After hero_logic is implemented, hero_router's agent changes:

  1. Agent request arrives at hero_router
  2. hero_router calls hero_logic.workflow_from_template("service_agent", overrides) to instantiate the workflow with the appropriate model, service catalog, etc.
  3. hero_router calls hero_logic.play_start(workflow_sid, input_data) to execute
  4. hero_router polls hero_logic.play_status(play_sid) or subscribes to updates
  5. When complete, hero_router extracts the final output and returns AgentResponse
  6. AgentResponse includes a play_sid and link to hero_logic UI for detailed inspection

hero_router remains the user-friendly HTTP API; hero_logic is the execution engine.


Crate Structure

Following hero RPC service conventions:

hero_logic/
├── Cargo.toml                    # Workspace root
├── Makefile
├── templates/                    # Workflow template definitions
├── examples/                     # Configured template examples
├── crates/
│   ├── hero_logic/               # CLI client
│   ├── hero_logic_server/        # RPC server (JSON-RPC 2.0 over Unix socket)
│   │   ├── openrpc.json
│   │   └── src/
│   │       └── rpc/
│   │           ├── workflow.rs   # Workflow CRUD + template loading
│   │           ├── play.rs       # Play execution engine (DAG traversal)
│   │           └── node.rs       # Node execution (hero_proc delegation)
│   ├── hero_logic_sdk/           # Generated client SDK
│   ├── hero_logic_ui/            # Web admin dashboard
│   │   └── (Play visualization, node-level logs, workflow editor)
│   └── hero_logic_lib/           # Shared types and DAG utilities
├── schemas/
│   └── logic/
│       └── logic.oschema         # OSIS schema definitions
└── docs/

DAG Execution Engine

The play engine in play.rs implements:

  1. Topological sort of the workflow nodes based on edges
  2. Ready queue — nodes whose dependencies are all satisfied
  3. Parallel execution — nodes with no inter-dependency can run concurrently
  4. Data flow — output from completed nodes is mapped to inputs of downstream nodes via DataMapping
  5. Conditional edgeson_success, on_failure, and expression-based conditions determine which downstream nodes activate
  6. Retry logic — nodes with retry_count > 0 are re-queued on failure up to the configured limit
  7. Cancellation — propagates cancel to running hero_proc jobs
  8. Timeout — per-node timeouts enforced via hero_proc job timeouts

UI Requirements (hero_logic_ui)

The admin dashboard should provide:

  1. Workflow list — browse, search, create workflows
  2. Workflow editor — visual DAG editor showing nodes and edges
  3. Play list — browse all plays with status filtering
  4. Play detail view — visual DAG with node status overlay (green=success, red=failed, blue=running, gray=pending)
  5. Node run detail — shows input/output data, logs (from hero_proc), timing, token usage for AI nodes, error messages
  6. Links to hero_proc — each node run links to the corresponding hero_proc job for deep log inspection
  7. Template browser — view and instantiate workflow templates

Implementation Phases

Phase 1: Foundation

  • Scaffold workspace using hero_rpc conventions
  • Define OSIS schemas (Workflow, Play, NodeRun, etc.)
  • Generate types, server, SDK from schemas
  • Implement Workflow CRUD and template loading from templates/ directory
  • Implement basic DAG validation (cycle detection, edge consistency)

Phase 2: Execution Engine

  • Implement Play execution engine with topological sort
  • Implement hero_proc integration (action creation, job management)
  • Implement ai_call node executor (hero_aibroker via hero_proc)
  • Implement script_execution node executor (Python/Rhai via hero_proc)
  • Implement conditional and transform node executors
  • Implement data flow between nodes (DataMapping)
  • Implement retry logic and error handling

Phase 3: Service Agent Template

  • Create service_agent workflow template
  • Create example configurations
  • Integrate with hero_router (replace hardcoded agent pipeline)
  • Verify end-to-end: hero_router → hero_logic → hero_proc → hero_aibroker

Phase 4: UI & Observability

  • Implement hero_logic_ui admin dashboard
  • Play visualization with DAG rendering
  • Node-level log viewing (via hero_proc)
  • Token usage reporting for AI nodes
  • Links between hero_logic UI and hero_proc UI

Phase 5: Advanced Features

  • Parallel node execution within DAGs
  • human_input node type (pause workflow for user input)
  • wait node type (time-based delays)
  • Workflow versioning
  • Play comparison (diff two runs)

Configuration & Settings

Workflow-level and node-level settings enable the configurability requested in hero_router#34:

  • Model selection per node — each ai_call node specifies its model
  • Temperature, max_tokens — per-node AI parameters
  • Script language/timeout — per-node execution settings
  • Retry policies — per-node retry count and delay
  • Observability — every node run records duration, token usage, and links to hero_proc logs

Socket & Port Conventions

Component Binding Path
hero_logic_server Unix socket $HERO_SOCKET_DIR/hero_logic/rpc.sock
hero_logic_ui Unix socket $HERO_SOCKET_DIR/hero_logic/ui.sock
hero_logic_ui TCP (dev) Port TBD

Dependencies

  • hero_rpc — RPC framework and OSIS persistence
  • hero_proc_sdk — hero_proc client for action/job management
  • hero_aibroker — AI model access (invoked indirectly via hero_proc)
  • herolib — shared utilities
# hero_logic — General-Purpose DAG Control Flow Engine ## Overview `hero_logic` is a new Hero RPC service that provides a **general-purpose directed acyclic graph (DAG) execution engine** for defining, storing, and running multi-step workflows. It replaces hardcoded control flows (like the hero_router service agent pipeline) with a configurable, observable, and reusable system. **Originating issue:** [hero_router#34 — Hero service agent improvements](https://forge.ourworld.tf/lhumina_code/hero_router/issues/34) --- ## Architecture ### Core Concepts | Concept | Description | |---------|-------------| | **Workflow** | A DAG definition: a set of nodes and directed edges. Reusable blueprint. | | **Node** | A single step in a workflow with a type, configuration, inputs, and outputs. | | **Edge** | A directed connection from one node to another, carrying data mappings. | | **Template** | A stored workflow definition in the `templates/` directory, loadable by name. | | **Play** | A single execution instance of a workflow — the runtime record. | | **NodeRun** | The execution record for a single node within a play, linked to a hero_proc job. | ### System Interactions ``` hero_router (user-facing agent) hero_logic_ui (admin dashboard) | | v v hero_logic (DAG engine, JSON-RPC over Unix socket) | +---> hero_proc (action/job execution for ALL nodes) | +---> python3/uv (script execution) | +---> rhai (script execution) | +---> hero_aibroker (AI model calls, invoked via hero_proc actions) ``` **Key principle:** ALL node executions go through hero_proc as actions/jobs, providing uniform logging, process management, timeouts, and observability. hero_logic creates hero_proc actions for each node type and tracks the resulting job IDs. --- ## Data Models (OSchema) These should be defined in `.oschema` files and processed by the hero_rpc generator. ### Enums ```oschema # Node types determine execution behavior NodeType = "ai_call" | "script_execution" | "conditional" | "transform" | "parallel_group" | "wait" | "human_input" # Execution status for plays and node runs ExecutionStatus = "pending" | "queued" | "running" | "success" | "failed" | "skipped" | "cancelled" | "timed_out" # Edge condition types EdgeCondition = "always" | "on_success" | "on_failure" | "conditional" # Script languages supported by script_execution nodes ScriptLanguage = "python" | "rhai" | "bash" ``` ### Core Types ```oschema # NodeConfig — type-specific configuration for a node # This is a flexible key-value structure; interpretation depends on NodeType NodeConfig = { node_type: NodeType # For ai_call nodes: model: str # e.g. "gpt-4o-mini", "claude-sonnet-4-20250514" system_prompt: str # system message template (supports {{variable}} interpolation) user_prompt: str # user message template temperature: f64 # 0.0 - 2.0 max_tokens: u32 # For script_execution nodes: language: ScriptLanguage script_template: str # script code template (supports {{variable}} interpolation) timeout_secs: u32 # execution timeout # For conditional nodes: condition_expr: str # expression evaluated against input data # For transform nodes: transform_expr: str # data transformation expression # General: retry_count: u32 # max retries on failure (0 = no retry) retry_delay_secs: u32 # delay between retries } # DataMapping — maps output fields from source node to input fields of target node DataMapping = { source_field: str # field path in source node's output target_field: str # field path in target node's input } # Edge — directed connection between nodes Edge = { from_node: str # source node ID (within workflow) to_node: str # target node ID (within workflow) condition: EdgeCondition condition_expr: str # expression when condition == "conditional" data_mappings: [DataMapping] # how data flows from source to target } # Node — a single step in a workflow Node = { node_id: str # unique within the workflow (e.g. "select_services") name: str # human-readable name description: str config: NodeConfig input_schema: str # JSON schema for expected inputs (optional, for validation) output_schema: str # JSON schema for expected outputs (optional) } # Workflow [rootobject] # A reusable DAG definition Workflow = { sid: str name: str @index description: str @index version: str nodes: [Node] edges: [Edge] input_schema: str # JSON schema for workflow-level inputs output_schema: str # JSON schema for workflow-level outputs tags: [str] @index created_at: otime updated_at: otime } # NodeRun — execution record for a single node within a play NodeRun = { node_id: str # references Node.node_id in the workflow status: ExecutionStatus hero_proc_action_id: str # hero_proc action SID hero_proc_job_id: str # hero_proc job SID for this execution input_data: str # JSON: actual input data for this execution output_data: str # JSON: actual output data error_message: str attempt: u32 # which attempt (0-based, increments on retry) started_at: otime completed_at: otime duration_ms: u64 # AI-specific metrics (populated for ai_call nodes) tokens_prompt: u32 tokens_completion: u32 model_used: str } # Play [rootobject] # A single execution instance of a workflow Play = { sid: str workflow_sid: str # references the Workflow this play executes name: str @index status: ExecutionStatus input_data: str # JSON: workflow-level input data output_data: str # JSON: workflow-level output data node_runs: [NodeRun] error_message: str started_at: otime completed_at: otime duration_ms: u64 total_tokens_prompt: u32 total_tokens_completion: u32 tags: [str] @index created_at: otime } ``` --- ## RPC Service API ```oschema service LogicService { version: "1.0.0" # --- Workflow Management --- # Standard CRUD (auto-generated): workflow.new, workflow.get, workflow.set, workflow.delete, workflow.list, workflow.find # Load a workflow from a template by name workflow_from_template(template_name: str, overrides: str) -> Workflow # Validate a workflow DAG (check for cycles, missing edges, schema mismatches) workflow_validate(workflow_sid: str) -> str # List available templates template_list() -> [str] # --- Play Execution --- # Standard CRUD (auto-generated): play.new, play.get, play.set, play.delete, play.list, play.find # Start executing a workflow (creates a Play and begins DAG traversal) play_start(workflow_sid: str, input_data: str, name: str) -> Play # Cancel a running play play_cancel(play_sid: str) -> Play # Retry a failed play from the failed node play_retry(play_sid: str) -> Play # Get detailed status of a play including all node runs play_status(play_sid: str) -> Play # --- Node Operations --- # Get logs for a specific node run (fetches from hero_proc) node_logs(play_sid: str, node_id: str) -> str # Retry a specific failed node within a play node_retry(play_sid: str, node_id: str) -> NodeRun } ``` --- ## Templates & Examples ### Directory Structure ``` hero_logic/ ├── templates/ # Reusable workflow templates │ ├── service_agent.toml # The service agent workflow │ ├── code_review.toml # Example: AI code review pipeline │ └── data_pipeline.toml # Example: ETL-style data pipeline ├── examples/ # Templates configured with real specs │ ├── service_agent_hero_book.toml # Service agent configured for hero_book │ └── service_agent_multi.toml # Multi-service agent example ``` ### Service Agent Template (the primary use case) The `service_agent` template encodes the current hero_router agent pipeline as a DAG: ``` [service_selection] --on_success--> [code_generation] --on_success--> [script_execution] | on_failure | v [error_debug] --on_success--> [script_execution] ``` **Nodes:** 1. `service_selection` (ai_call) — Given a prompt and service catalog, select relevant services 2. `code_generation` (ai_call) — Given selected services' interfaces, generate Python code 3. `script_execution` (script_execution) — Execute the generated Python script 4. `error_debug` (ai_call) — On script failure, analyze error and generate fixed script 5. `result_summary` (ai_call) — Summarize execution output for the user The retry loop (steps 3→4→3) is modeled as a conditional retry edge with `retry_count` on the `script_execution` node. --- ## Integration with hero_proc All node executions are delegated to hero_proc: 1. **On workflow node execution**, hero_logic: - Creates a hero_proc **action** for the node type if one doesn't exist (e.g., `logic_ai_call`, `logic_script_python`) - Creates a hero_proc **job** from that action with the node's input data - Monitors the job for completion - Reads job output/logs via hero_proc SDK - Records results in the `NodeRun` 2. **Action types by node:** - `ai_call` → hero_proc action that calls hero_aibroker via Unix socket JSON-RPC - `script_execution` → hero_proc action that runs the script with the configured interpreter - `conditional` → hero_proc action that evaluates the condition expression - `transform` → hero_proc action that applies the data transformation 3. **Benefits:** - Uniform log collection (hero_proc captures stdout/stderr) - Process timeouts and cleanup - Job history and rerunnability - Links from hero_logic UI → hero_proc UI for deep inspection --- ## Integration with hero_router After hero_logic is implemented, hero_router's agent changes: 1. **Agent request arrives** at hero_router 2. hero_router calls `hero_logic.workflow_from_template("service_agent", overrides)` to instantiate the workflow with the appropriate model, service catalog, etc. 3. hero_router calls `hero_logic.play_start(workflow_sid, input_data)` to execute 4. hero_router polls `hero_logic.play_status(play_sid)` or subscribes to updates 5. When complete, hero_router extracts the final output and returns `AgentResponse` 6. `AgentResponse` includes a `play_sid` and link to hero_logic UI for detailed inspection hero_router remains the user-friendly HTTP API; hero_logic is the execution engine. --- ## Crate Structure Following hero RPC service conventions: ``` hero_logic/ ├── Cargo.toml # Workspace root ├── Makefile ├── templates/ # Workflow template definitions ├── examples/ # Configured template examples ├── crates/ │ ├── hero_logic/ # CLI client │ ├── hero_logic_server/ # RPC server (JSON-RPC 2.0 over Unix socket) │ │ ├── openrpc.json │ │ └── src/ │ │ └── rpc/ │ │ ├── workflow.rs # Workflow CRUD + template loading │ │ ├── play.rs # Play execution engine (DAG traversal) │ │ └── node.rs # Node execution (hero_proc delegation) │ ├── hero_logic_sdk/ # Generated client SDK │ ├── hero_logic_ui/ # Web admin dashboard │ │ └── (Play visualization, node-level logs, workflow editor) │ └── hero_logic_lib/ # Shared types and DAG utilities ├── schemas/ │ └── logic/ │ └── logic.oschema # OSIS schema definitions └── docs/ ``` --- ## DAG Execution Engine The play engine in `play.rs` implements: 1. **Topological sort** of the workflow nodes based on edges 2. **Ready queue** — nodes whose dependencies are all satisfied 3. **Parallel execution** — nodes with no inter-dependency can run concurrently 4. **Data flow** — output from completed nodes is mapped to inputs of downstream nodes via `DataMapping` 5. **Conditional edges** — `on_success`, `on_failure`, and expression-based conditions determine which downstream nodes activate 6. **Retry logic** — nodes with `retry_count > 0` are re-queued on failure up to the configured limit 7. **Cancellation** — propagates cancel to running hero_proc jobs 8. **Timeout** — per-node timeouts enforced via hero_proc job timeouts --- ## UI Requirements (hero_logic_ui) The admin dashboard should provide: 1. **Workflow list** — browse, search, create workflows 2. **Workflow editor** — visual DAG editor showing nodes and edges 3. **Play list** — browse all plays with status filtering 4. **Play detail view** — visual DAG with node status overlay (green=success, red=failed, blue=running, gray=pending) 5. **Node run detail** — shows input/output data, logs (from hero_proc), timing, token usage for AI nodes, error messages 6. **Links to hero_proc** — each node run links to the corresponding hero_proc job for deep log inspection 7. **Template browser** — view and instantiate workflow templates --- ## Implementation Phases ### Phase 1: Foundation - [ ] Scaffold workspace using hero_rpc conventions - [ ] Define OSIS schemas (Workflow, Play, NodeRun, etc.) - [ ] Generate types, server, SDK from schemas - [ ] Implement Workflow CRUD and template loading from `templates/` directory - [ ] Implement basic DAG validation (cycle detection, edge consistency) ### Phase 2: Execution Engine - [ ] Implement Play execution engine with topological sort - [ ] Implement hero_proc integration (action creation, job management) - [ ] Implement `ai_call` node executor (hero_aibroker via hero_proc) - [ ] Implement `script_execution` node executor (Python/Rhai via hero_proc) - [ ] Implement `conditional` and `transform` node executors - [ ] Implement data flow between nodes (DataMapping) - [ ] Implement retry logic and error handling ### Phase 3: Service Agent Template - [ ] Create `service_agent` workflow template - [ ] Create example configurations - [ ] Integrate with hero_router (replace hardcoded agent pipeline) - [ ] Verify end-to-end: hero_router → hero_logic → hero_proc → hero_aibroker ### Phase 4: UI & Observability - [ ] Implement hero_logic_ui admin dashboard - [ ] Play visualization with DAG rendering - [ ] Node-level log viewing (via hero_proc) - [ ] Token usage reporting for AI nodes - [ ] Links between hero_logic UI and hero_proc UI ### Phase 5: Advanced Features - [ ] Parallel node execution within DAGs - [ ] `human_input` node type (pause workflow for user input) - [ ] `wait` node type (time-based delays) - [ ] Workflow versioning - [ ] Play comparison (diff two runs) --- ## Configuration & Settings Workflow-level and node-level settings enable the configurability requested in hero_router#34: - **Model selection per node** — each `ai_call` node specifies its model - **Temperature, max_tokens** — per-node AI parameters - **Script language/timeout** — per-node execution settings - **Retry policies** — per-node retry count and delay - **Observability** — every node run records duration, token usage, and links to hero_proc logs --- ## Socket & Port Conventions | Component | Binding | Path | |-----------|---------|------| | hero_logic_server | Unix socket | `$HERO_SOCKET_DIR/hero_logic/rpc.sock` | | hero_logic_ui | Unix socket | `$HERO_SOCKET_DIR/hero_logic/ui.sock` | | hero_logic_ui | TCP (dev) | Port TBD | --- ## Dependencies - `hero_rpc` — RPC framework and OSIS persistence - `hero_proc_sdk` — hero_proc client for action/job management - `hero_aibroker` — AI model access (invoked indirectly via hero_proc) - `herolib` — shared utilities
Author
Owner

Phase 1+2 Implementation Complete

The initial implementation has been pushed to the development branch:

Commit: 1e077e9 — feat: implement hero_logic DAG control flow engine

What's implemented:

  • OSchema data modelsWorkflow and Play as root objects with full CRUD; Node, Edge, NodeConfig, NodeRun, DataMapping as embedded types; enums for NodeType, ExecutionStatus, EdgeCondition, ScriptLanguage
  • OSIS persistence — Auto-generated CRUD for Workflow and Play, filesystem-based with SmartID
  • DAG execution engine — Topological sort, conditional edge evaluation (on_success, on_failure, always, conditional), data flow between nodes via DataMapping, per-node retry logic
  • Node executorsai_call (hero_aibroker via hero_proc), script_execution (Python/Rhai/Bash via hero_proc), conditional (expression evaluation), transform (data extraction), wait (delay)
  • Template system — Load workflow templates from templates/ directory as JSON
  • Service agent templatetemplates/service_agent.json encoding the current hero_router agent pipeline as a 5-node DAG
  • Exampleexamples/service_agent_books.json showing the template configured for hero_books
  • RPC API — All 9 custom service methods: workflow_from_template, workflow_validate, template_list, play_start, play_cancel, play_retry, play_status, node_logs, node_retry
  • Server entry point — Standard hero RPC pattern with HeroLifecycle and OServer

Tests

  • 6 auto-generated CRUD tests passing
  • Compiles clean with cargo build

Remaining phases:

  • Phase 3: Integrate with hero_router (replace hardcoded agent pipeline)
  • Phase 4: hero_logic_ui admin dashboard
  • Phase 5: Parallel execution, human_input nodes, workflow versioning
## Phase 1+2 Implementation Complete The initial implementation has been pushed to the `development` branch: **Commit:** `1e077e9` — feat: implement hero_logic DAG control flow engine ### What's implemented: - **OSchema data models** — `Workflow` and `Play` as root objects with full CRUD; `Node`, `Edge`, `NodeConfig`, `NodeRun`, `DataMapping` as embedded types; enums for `NodeType`, `ExecutionStatus`, `EdgeCondition`, `ScriptLanguage` - **OSIS persistence** — Auto-generated CRUD for Workflow and Play, filesystem-based with SmartID - **DAG execution engine** — Topological sort, conditional edge evaluation (`on_success`, `on_failure`, `always`, `conditional`), data flow between nodes via DataMapping, per-node retry logic - **Node executors** — `ai_call` (hero_aibroker via hero_proc), `script_execution` (Python/Rhai/Bash via hero_proc), `conditional` (expression evaluation), `transform` (data extraction), `wait` (delay) - **Template system** — Load workflow templates from `templates/` directory as JSON - **Service agent template** — `templates/service_agent.json` encoding the current hero_router agent pipeline as a 5-node DAG - **Example** — `examples/service_agent_books.json` showing the template configured for hero_books - **RPC API** — All 9 custom service methods: `workflow_from_template`, `workflow_validate`, `template_list`, `play_start`, `play_cancel`, `play_retry`, `play_status`, `node_logs`, `node_retry` - **Server entry point** — Standard hero RPC pattern with HeroLifecycle and OServer ### Tests - 6 auto-generated CRUD tests passing - Compiles clean with `cargo build` ### Remaining phases: - [ ] Phase 3: Integrate with hero_router (replace hardcoded agent pipeline) - [ ] Phase 4: hero_logic_ui admin dashboard - [ ] Phase 5: Parallel execution, human_input nodes, workflow versioning
Author
Owner

Phase 3-5 Complete + Architectural Refactor

Key change: hero_proc is now the one place execution lives

hero_logic no longer embeds AI / Python / Bash dispatch logic. The NodeType enum shrank to purely orchestration kinds: action | conditional | transform | wait | human_input. Every node that does real work points at a hero_proc action by name (config.action_name + config.action_context) and lets hero_proc handle the interpreter, env, timeout, logs, and job id.

To make that work, AI became a first-class hero_proc interpreter (Interpreter::Ai) that calls hero_aibroker via its Unix-socket RPC instead of spawning a child process. Model/system_prompt/temperature/max_tokens live on the action spec as an ai_config struct — the same way other interpreters use script and env. Benefit: no more duplicated AI plumbing; the hero_proc UI gets a model dropdown (populated from models.list) whenever you pick the ai interpreter.

Cross-repo changes

hero_proc

  • hero_proc_lib: added Interpreter::Ai + AiConfig { model, system_prompt, temperature, max_tokens } on ActionSpec (src/db/actions/model.rs).
  • hero_proc_server/src/supervisor/executor.rs: new run_job_ai branch — POSTs ai.chat to $HERO_SOCKET_DIR/hero_aibroker/rpc.sock via raw HTTP-over-UDS, writes the model response to job logs, updates job status. Uses Interpreter::is_in_process() to route.
  • openrpc.json: added AiConfig schema; extended ActionSpec.interpreter enum with ai; added ai_config field.
  • hero_proc_ui/static/js/dashboard.js: added ai to the interpreter dropdown; conditionally reveals model/system_prompt/temperature/max_tokens fields; fetches models via new /api/aibroker/models proxy route added in routes.rs.

hero_logic

  • Schema (schemas/logic/logic.oschema): NodeTypeaction | conditional | transform | wait | human_input; added ExecutionStatus::AwaitingInput; added play_resume(play_sid, node_id, input_data) RPC method; simplified NodeConfig to { node_type, action_name, action_context, condition_expr, transform_expr, timeout_secs, retry_count, retry_delay_secs }.
  • engine/node_executors.rs: rewritten. execute_action calls action.get on hero_proc, interpolates {{var}} placeholders in the action script + ai_config.system_prompt with the node's input JSON, then job.create + waits for terminal phase. Orchestration nodes (conditional/transform/wait/human_input) stay in-engine.
  • engine/executor.rs: rewritten as a ready-queue model. Groups nodes with satisfied dependencies per iteration; pauses with status = AwaitingInput on hitting a HumanInput node; resumption via play_resume re-enters execute() and picks up where it left off.
  • engine/template_loader.rs: templates gained an actions[] array of hero_proc action specs (name, interpreter, script, timeout_ms, ai_config, env). workflow_from_template upserts them into hero_proc via action.set before persisting the Workflow record.
  • server/rpc.rs: implemented play_resume; play_cancel also handles AwaitingInput; workflow_from_template provisions declared actions.
  • templates/service_agent.json + examples/service_agent_books.json: rewritten to the action-based format (5 actions, 5 nodes, 4 edges).
  • hero_logic_ui:
    • Workflow list: client-side search box (workflows.html).
    • Play detail (play_detail.html + routes.rs): Cytoscape.js DAG canvas with node status color overlay (success=green, failed=red, running=blue, pending=grey, awaiting_input=amber); dag_json payload built server-side from workflow.get.
    • hero_proc job IDs rendered as clickable links (/ui/hero_proc/#jobs/:id).

hero_router — no code changes required. Already delegates to hero_logic.workflow_from_template("service_agent") + play_start(...); node_id lookups ("script_execution" etc.) are unchanged.

Build status

  • cargo check --workspace green on hero_proc and hero_logic.
  • cargo test -p hero_logic --no-run green.
  • hero_proc_app (Dioxus) still shows interpreter as a free-text input — users can type ai there today; a proper dropdown matching dashboard.js is left as a follow-up.

All five phases on the original spec are now addressed:

  • Phase 1 Foundation
  • Phase 2 Execution engine
  • Phase 3 hero_router integration
  • Phase 4 UI (workflow list search, play detail DAG viz, hero_proc job links; workflow editor stayed read-only — out of scope)
  • Phase 5 Parallel node execution (via ready-queue) + human_input + play_resume. Workflow versioning deferred.
## Phase 3-5 Complete + Architectural Refactor ### Key change: hero_proc is now the one place execution lives hero_logic no longer embeds AI / Python / Bash dispatch logic. The `NodeType` enum shrank to purely *orchestration* kinds: `action | conditional | transform | wait | human_input`. Every node that does real work points at a **hero_proc action by name** (`config.action_name` + `config.action_context`) and lets hero_proc handle the interpreter, env, timeout, logs, and job id. To make that work, AI became a first-class hero_proc interpreter (`Interpreter::Ai`) that calls hero_aibroker via its Unix-socket RPC instead of spawning a child process. Model/system_prompt/temperature/max_tokens live on the action spec as an `ai_config` struct — the same way other interpreters use `script` and `env`. Benefit: no more duplicated AI plumbing; the hero_proc UI gets a model dropdown (populated from `models.list`) whenever you pick the `ai` interpreter. ### Cross-repo changes **hero_proc** - `hero_proc_lib`: added `Interpreter::Ai` + `AiConfig { model, system_prompt, temperature, max_tokens }` on `ActionSpec` (`src/db/actions/model.rs`). - `hero_proc_server/src/supervisor/executor.rs`: new `run_job_ai` branch — POSTs `ai.chat` to `$HERO_SOCKET_DIR/hero_aibroker/rpc.sock` via raw HTTP-over-UDS, writes the model response to job logs, updates job status. Uses `Interpreter::is_in_process()` to route. - `openrpc.json`: added `AiConfig` schema; extended `ActionSpec.interpreter` enum with `ai`; added `ai_config` field. - `hero_proc_ui/static/js/dashboard.js`: added `ai` to the interpreter dropdown; conditionally reveals model/system_prompt/temperature/max_tokens fields; fetches models via new `/api/aibroker/models` proxy route added in `routes.rs`. **hero_logic** - Schema (`schemas/logic/logic.oschema`): `NodeType` → `action | conditional | transform | wait | human_input`; added `ExecutionStatus::AwaitingInput`; added `play_resume(play_sid, node_id, input_data)` RPC method; simplified `NodeConfig` to `{ node_type, action_name, action_context, condition_expr, transform_expr, timeout_secs, retry_count, retry_delay_secs }`. - `engine/node_executors.rs`: rewritten. `execute_action` calls `action.get` on hero_proc, interpolates `{{var}}` placeholders in the action script + `ai_config.system_prompt` with the node's input JSON, then `job.create` + waits for terminal phase. Orchestration nodes (conditional/transform/wait/human_input) stay in-engine. - `engine/executor.rs`: rewritten as a ready-queue model. Groups nodes with satisfied dependencies per iteration; pauses with `status = AwaitingInput` on hitting a HumanInput node; resumption via `play_resume` re-enters `execute()` and picks up where it left off. - `engine/template_loader.rs`: templates gained an `actions[]` array of hero_proc action specs (name, interpreter, script, timeout_ms, ai_config, env). `workflow_from_template` upserts them into hero_proc via `action.set` before persisting the Workflow record. - `server/rpc.rs`: implemented `play_resume`; `play_cancel` also handles `AwaitingInput`; `workflow_from_template` provisions declared actions. - `templates/service_agent.json` + `examples/service_agent_books.json`: rewritten to the action-based format (5 actions, 5 nodes, 4 edges). - `hero_logic_ui`: - Workflow list: client-side search box (`workflows.html`). - Play detail (`play_detail.html` + `routes.rs`): Cytoscape.js DAG canvas with node status color overlay (success=green, failed=red, running=blue, pending=grey, awaiting_input=amber); `dag_json` payload built server-side from `workflow.get`. - hero_proc job IDs rendered as clickable links (`/ui/hero_proc/#jobs/:id`). **hero_router** — no code changes required. Already delegates to `hero_logic.workflow_from_template("service_agent") + play_start(...)`; node_id lookups ("script_execution" etc.) are unchanged. ### Build status - `cargo check --workspace` green on hero_proc and hero_logic. - `cargo test -p hero_logic --no-run` green. - hero_proc_app (Dioxus) still shows interpreter as a free-text input — users can type `ai` there today; a proper dropdown matching dashboard.js is left as a follow-up. All five phases on the original spec are now addressed: - [x] Phase 1 Foundation - [x] Phase 2 Execution engine - [x] Phase 3 hero_router integration - [x] Phase 4 UI (workflow list search, play detail DAG viz, hero_proc job links; workflow editor stayed read-only — out of scope) - [x] Phase 5 Parallel node execution (via ready-queue) + human_input + play_resume. Workflow versioning deferred.
Sign in to join this conversation.
No labels
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
lhumina_code/hero_logic#1
No description provided.