feat: ask_user primitive + replay-based step memoization for resumable flows #28

Closed
opened 2026-05-13 10:33:24 +00:00 by timur · 0 comments
Owner

Summary

Add a built-in ask_user primitive to the Python flow runtime that lets a @flow step pause execution and request input from the user, then exit cleanly with state persisted. When the user answers, the play re-runs from scratch — but every previously-completed step replays from its persisted output instead of re-executing, so no work is lost and side-effects don't double-fire. The Logic admin UI gains a bottom-bar island (below the flow tree) for live logs and input requests, like a terminal.

This makes ask_user the canonical way for an agent flow to handle ambiguity, missing context, or capability gaps in user prompts: instead of failing or fabricating, it asks. It also generalises the @flow step model: every step gets typed inputs/outputs, a deterministic step key, and persisted output that visualises on the trace.

Motivation

The service_agent today fails ~⅔ of seeded examples. After a deep look at five representative failures (see "Background context" at the bottom of this issue) the failures cluster as:

  • Schema/prompt mismatch — the user prompt asks for a capability the picked services don't model, e.g. "marketplace tokens with current_price + 24h volume" against hero_osis_network (which has Listings, SliceProducts, TokenMigrations — none of those).
  • Missing test data — RAG and Bot-driven examples assume a pre-seeded collection / bot that doesn't exist.
  • Schema requires N fields, prompt supplies 2CodeProject requires 15 fields; the model invents 13 of them, gets one wrong, fails.

For all of these, the correct answer is to ask the user. "I see you want X but the service has Y, Z, W — pick one or cancel." Or: "Releases need default_branch — what should it be?". That's also what a real human assistant would do.

Implementing ask_user gets the seeded examples passing without rewriting their prompts; the agent demonstrates the feature instead of hiding the gap.

Design — Resume by replay, not by waiting

The user explicitly does NOT want timeouts or long-blocked subprocesses. Constraints:

  • A flow that calls ask_user(...) exits the Python subprocess cleanly with a special status. The Play is marked awaiting_input server-side; play state (every completed step's output, every span event) is fully persisted on disk.
  • When the user submits an answer via the UI, the server re-runs the play from the top as a fresh subprocess. The boot stub seeds two caches into the runtime:
    1. Step output cache{step_key: output_value} keyed by deterministic step path + arg hash. Every @flow-decorated function checks the cache first and returns the cached value instead of calling the body. Side-effects (RPC calls, model_call) replay as no-ops.
    2. Answer cache{question_id: user_answer}. When ask_user(...) is reached and the cache has a matching answer, it returns immediately. If not (a NEW question), it persists the question and exits again.
  • A flow can ask many questions across many steps — the subprocess exits after each unanswered one and resumes after each answer, with the previous answers all already cached.

The model is essentially continuation-by-replay — same idea as Temporal / Cadence workflow replay, scoped to the @flow decorator boundary.

API — ask_user primitives

Exposed from hero_tracing alongside flow:

from hero_tracing import flow, ask_user

# Free-text
name = ask_user.text("What's the project name?", default=None)

# Number (int or float — pick at the call site)
n = ask_user.number("How many items to seed?", default=10, min=1, max=100)

# Single choice
service = ask_user.choice(
    "hero_osis_network has these rootobjects matching 'marketplace':",
    options=["Listing", "SliceProduct", "TokenMigration", "None — cancel"],
)

# Multiple choice (returns list[str])
fields = ask_user.multi_choice(
    "Which Release fields should we surface?",
    options=["name", "tag_name", "author", "created_at", "is_draft"],
    min_selections=1,
)

# Yes/No
if not ask_user.confirm("Proceed with creating 6 records?", default=False):
    raise flow.Failed("user cancelled")

All five primitives:

  • Take a question: str (the user-facing prompt — always free text, the model writes this).
  • Persist a Question { id, kind, prompt, options?, default?, constraints? } record on the Play.
  • Return the typed answer when one is in the cache.
  • Exit the subprocess with EXIT_CODE_AWAITING_INPUT = 75 (matches EX_TEMPFAIL semantically — "operation could not be performed at this time, try again later").
  • The boot stub catches that exit code and the Rust side flips Play.status = "awaiting_input".

ask_user is NOT a @flow-decorated step

It's a primitive. Calling it does NOT open a span (it would be a 0-duration span anyway). Instead the question gets attached to the enclosing @flow's span as pending_question metadata, so the UI can render the input box right below the flow step that asked.

Step memoization — the deterministic-replay layer

For replay-by-rerun to work, every @flow call must short-circuit when its output is already cached.

Step key

step_key = "/".join(parent_span_path) + ":" + fn.__hero_flow__["name"] + ":" + sha1(json.dumps({**input_kwargs}, sort_keys=True, default=str))

Same (parent path, function name, input args) produces the same key. So the same call in two different parts of the flow correctly memoizes separately, and the same step under retry-loop iteration 1 vs iteration 2 produces different keys (because the parent attempt N span path differs).

Cache lookup in the @flow wrapper

def wrapper(*args, **kwargs):
    key = compute_step_key(fn, kwargs)
    if key in _STEP_CACHE:           # populated from persisted state at boot
        replay_value = _STEP_CACHE[key]
        emit_span("replayed", key, replay_value)   # marks the span as a replay
        return replay_value
    span = flow.start_step(...)
    try:
        result = fn(*args, **kwargs)
    except _AwaitingInput as e:
        # Bubble up — the boot stub handles persistence + exit
        raise
    except Exception:
        span.tag("status", "failed")
        raise
    span.tag("result", result)
    persist_step_output(key, result)   # written via span socket; server stores it
    return result

Output serialisation

@flow decorator already accepts inputs={'name': str, ...}. Add outputs={...} so the persisted output validates on the way in and on replay. Outputs that aren't JSON-serializable (objects, file handles) need explicit handling — proposal: declare in @flow(outputs=...), raise at runtime if a step returns something not coercible.

What if a step is non-deterministic? (model_call, OSIS .set)

Those are exactly the steps we WANT to memoize. The first time model_call(prompt=X) runs, it hits the LLM and spends tokens; on replay we don't want to spend them again. The first time script_execution(code=X) runs, it creates OSIS records; on replay we don't want to double-create. Memoization gives us idempotency for free.

Server-side state changes (hero_logic)

New Play fields

pending_questions: Vec<Question>     // questions with no answer yet
received_answers: HashMap<String, String>  // question_id -> JSON-serialized answer
step_outputs: HashMap<String, String>      // step_key -> JSON-serialized output

(All on the existing Play rootobject — additive, no migration needed for new plays.)

New Question shape

struct Question {
    id: String,                      // stable id derived from span path + question seq
    kind: QuestionKind,              // Text | Number | Choice | MultiChoice | Confirm
    prompt: String,                  // user-facing text
    options: Option<Vec<String>>,    // for Choice / MultiChoice
    default: Option<Value>,          // type matches kind
    constraints: Option<Constraints>, // {min, max, min_selections, ...}
    asked_at_step_path: String,      // for UI placement
    asked_at: i64,
    answered_at: Option<i64>,
    answer: Option<Value>,           // populated by play.answer
}

New RPC methods

Method Params Returns Notes
play.answer play_sid, question_id, answer: Value { ok: true, resume_started: bool } Validates answer matches question kind. On success, server kicks off a new run of the play subprocess with pending_questions minus the answered one, received_answers updated, and the existing step_outputs cache passed through env.
play.pending_questions play_sid list[Question] Read open questions for UI rendering.
play.cancel play_sid { ok: true } Marks play cancelled so subsequent UI doesn't keep re-asking.

Play state machine

pending → running → success
                  → failed
                  → awaiting_input → running (on play.answer) → ...
                                   → cancelled (on play.cancel)

Subprocess lifecycle

  • The Python flow exits with code 75 = "awaiting input". The Rust harness reads stderr for the structured <<HERO_AWAITING_INPUT id="...">>...payload...<</HERO_AWAITING_INPUT>> marker the boot stub writes, parses out the new question(s), and writes them onto the Play. (Use the existing span socket — emit a final awaiting_input event before exit.)
  • On play.answer, the harness spawns a new subprocess with env:
    • HERO_FLOW_INPUT — same as before (the original input)
    • HERO_REPLAY_STEP_OUTPUTS_FILE — path to a JSON file with {step_key: output}
    • HERO_REPLAY_ANSWERS_FILE — path to a JSON file with {question_id: answer}
  • The boot stub loads both files at startup and primes _STEP_CACHE and _ANSWER_CACHE.

UI — bottom-bar island

Below the existing flow-tree island in the play detail page (logic_admin), add a new island that takes ~30% of vertical space (resizable). Layout:

┌───────────────────────────────────────────────────┐
│  Flow tree (existing)                             │
│  - select_services    ok    399ms                 │
│  - compile_stubs      ok      5ms                 │
│  - attempt 1                                      │
│    - service_code_gen ok   2868ms                 │
│    - script_execution ok     49ms                 │
│  - ⏸ ask_user (Choice)        awaiting input      │
├───────────────────────────────────────────────────┤
│ [Logs] [Inputs needed (1)] [Events]   pin to bot ▼│
│                                                    │
│ ┓ pending_question id=q_attempt1.5_0               │
│ │ hero_osis_network has these rootobjects         │
│ │ matching 'marketplace':                          │
│ │   ◉ Listing                                     │
│ │   ○ SliceProduct                                │
│ │   ○ TokenMigration                              │
│ │   ○ None — cancel                               │
│ │ [ Submit ]                                      │
│ ┛                                                  │
│ 09:14:22  span:select_services  ok                │
│ 09:14:23  span:compile_stubs    ok                │
│ 09:14:24  log:catalog has 36 healthy services     │
└───────────────────────────────────────────────────┘
  • Logs tab — live span events as they arrive (uses the existing span socket).
  • Inputs needed tab — every question with answered_at == null. Question count badged on tab. Renders form per Question.kind. On submit, POST play.answer. The page polls or listens for play status change and updates the flow tree to show the resumed run.
  • Events tab — full event history (already in Play.spans).
  • The island is resizable + collapsible. State persists in localStorage so it stays where the user puts it across navigations.

Edge cases

Case Handling
User submits answer but it doesn't match the question's kind (e.g. text where number expected) play.answer validates, returns 422 with reason; UI re-renders question.
Two users answer the same question simultaneously First write wins; answered_at becomes immutable; subsequent answers rejected with "already answered".
Same Play resumed twice in parallel (race on play.answer) Server holds a per-play lock; second resume blocks until first completes its first new exit.
Flow asks a question, user cancels (play.cancel) Play status → cancelled. Replay never re-runs.
Flow asks 5 questions in a row (no other steps between) Each call exits, server collects 5 question records over 5 resumptions. UI can group consecutive pending questions into a single form so the user fills them out together.
User edits a previously-answered question Out of scope for v1. Could be added as play.amend_answer later — would invalidate all step outputs that were computed AFTER that answer.
Step's input args contain non-deterministic values (e.g. int(time.time())) Step key would change on every replay → cache miss → step re-executes → defeats memoization. Mitigation: discourage time-based inputs in @flow step args; document the pattern of pulling time INSIDE the step body, where it's not part of the cache key.
Step's output is non-JSON-serializable Persistence fails at write time with a clear error; flow needs to declare a serializer in @flow(outputs=...).
Answer cache is stale because the flow source changed between exit and resume Detect by hashing the workflow_version_sid into the step keys; mismatch invalidates the cache and the user gets a "flow updated, restart from scratch?" prompt.
Programmatic / batch runs (optimize_flow) need to skip ask_user play_run_async accepts prefill_answers: dict[question_pattern, value] so non-interactive runs can pre-script every expected question. If a question isn't pre-filled, the play fails fast instead of awaiting.

Implementation phases

Phase 1 — server state + RPC

  • Add pending_questions, received_answers, step_outputs fields to Play schema.
  • Add play.answer, play.pending_questions, play.cancel RPC methods.
  • Add Play.status = "awaiting_input" state + transitions.
  • Subprocess wrapper: detect exit code 75, parse structured marker, persist questions, flip status.
  • play.answer writes the answer, spawns a new subprocess with env-var pointers to cache files.

Phase 2 — Python runtime

  • _STEP_CACHE + _ANSWER_CACHE populated from env-var-pointed files at boot.
  • @flow wrapper: compute step_key, check cache, short-circuit or persist+continue.
  • ask_user.text/number/choice/multi_choice/confirm primitives.
  • Boot stub: catch _AwaitingInput exception, write structured marker, exit 75.

Phase 3 — UI

  • Bottom-bar island in logic_admin's play-detail page.
  • Three tabs: Logs / Inputs needed / Events.
  • Per-QuestionKind form rendering.
  • POST handler that calls play.answer and refreshes play state.
  • Resizable + collapsible + localStorage state.

Phase 4 — agent integration

  • Update service_agent prompt: when select_services returns a service whose schema clearly doesn't match the user prompt, call ask_user.choice(...) to confirm before generating code. Same for missing required fields the model would otherwise have to invent — surface them as ask_user.text(...) calls.
  • Add a validate_capability flow step: LLM-judges whether the picked services can satisfy the prompt; if not, asks user.

Phase 5 — visualization polish

  • Mark replayed steps visually in the flow tree (e.g. dashed border + "↻ replayed from cache" tooltip).
  • Show per-step inputs/outputs in a side panel when a step is selected (uses the new outputs={...} declaration).
  • "Resume timeline" view: shows when each subprocess run started/ended and which questions/answers triggered each resume.

Acceptance criteria

  • A @flow function can call ask_user.text("What's your name?"), the play exits with status awaiting_input, the UI renders an input box, the user submits "Alice", the play re-runs and the function gets "Alice" back.
  • A flow with three sequential steps where step 2 asks a question replays steps 1 + 2 (with cached outputs) and only step 3 onwards executes fresh.
  • A flow that does an OSIS *_set followed by an ask_user does NOT double-create the OSIS record on resume.
  • The seeded "Marketplace tokens" example, without prompt rewrite, asks the user to pick one of Listing / SliceProduct / TokenMigration / Cancel; once user picks, the agent proceeds and creates records of that type, succeeding on first attempt.
  • The Logic UI shows pending questions in a dedicated bottom-bar tab with form-rendered inputs per question kind.
  • play_run_async(prefill_answers={...}) lets scripts/run_all_examples.py answer ahead-of-time so the suite stays headless-runnable.

Background context (for the next AI picking this up)

This issue is being filed by Claude Opus 4.7 after several debugging sessions on the service_agent. Useful pointers:

  • The service_agent flow lives at crates/hero_logic/src/seed_flows/service_agent.py; sub-flows at service_code_gen.py, model_call.py, optimize_flow.py.
  • The @flow decorator + meta-path importer for sub-flows live at crates/hero_logic/sdk/python/hero_tracing.py.
  • The boot stub lives at crates/hero_logic/src/engine/python_executor.rs::BOOT_STUB.
  • Spans are emitted via UDS to the per-Play span socket — see crates/hero_logic/src/engine/span_socket.rs.
  • Existing Play state machine + RPC surface in crates/hero_logic/src/logic/....
  • The Logic admin UI is crates/hero_logic_admin/ (Axum + server-rendered HTML + a bit of JS in static/js/workflow_editor.js). Templates in templates/.
  • Recent related work: feat(11-D): delete DAG — Python-only schema cutover (#21), feat(11-C2): per-Play span socket listener (#18). Read those PRs for context on the current Play execution model.
  • The user (timur) prefers terse responses, root-cause fixes over workarounds, and "make every example pass" as the north star. He explicitly does NOT want timeouts or long-blocked subprocesses (this is why the design is exit-and-replay rather than wait-and-resume).

Out of scope (for this issue)

  • Editing previously-given answers (play.amend_answer)
  • Branching flows (different paths based on answer) beyond what plain Python if/elif already gives
  • Streaming / chat-style multi-turn within a single question
  • Backfilling step memoization for existing plays predating this feature

🤖 Generated with Claude Code

## Summary Add a built-in `ask_user` primitive to the Python flow runtime that lets a `@flow` step pause execution and request input from the user, then **exit cleanly with state persisted**. When the user answers, the play **re-runs from scratch** — but every previously-completed step replays from its persisted output instead of re-executing, so no work is lost and side-effects don't double-fire. The Logic admin UI gains a bottom-bar island (below the flow tree) for live logs and input requests, like a terminal. This makes `ask_user` the canonical way for an agent flow to handle ambiguity, missing context, or capability gaps in user prompts: instead of failing or fabricating, it asks. It also generalises the `@flow` step model: every step gets typed inputs/outputs, a deterministic step key, and persisted output that visualises on the trace. ## Motivation The service_agent today fails ~⅔ of seeded examples. After a deep look at five representative failures (see "Background context" at the bottom of this issue) the failures cluster as: - **Schema/prompt mismatch** — the user prompt asks for a capability the picked services don't model, e.g. "marketplace tokens with current_price + 24h volume" against `hero_osis_network` (which has `Listings`, `SliceProducts`, `TokenMigrations` — none of those). - **Missing test data** — RAG and Bot-driven examples assume a pre-seeded collection / bot that doesn't exist. - **Schema requires N fields, prompt supplies 2** — `CodeProject` requires 15 fields; the model invents 13 of them, gets one wrong, fails. For all of these, the *correct* answer is to **ask the user**. "I see you want X but the service has Y, Z, W — pick one or cancel." Or: "Releases need `default_branch` — what should it be?". That's also what a real human assistant would do. Implementing `ask_user` gets the seeded examples passing without rewriting their prompts; the agent demonstrates the feature instead of hiding the gap. ## Design — Resume by replay, not by waiting The user explicitly does NOT want timeouts or long-blocked subprocesses. Constraints: - A flow that calls `ask_user(...)` **exits the Python subprocess cleanly** with a special status. The Play is marked `awaiting_input` server-side; play state (every completed step's output, every span event) is fully persisted on disk. - When the user submits an answer via the UI, the server **re-runs the play from the top** as a fresh subprocess. The boot stub seeds two caches into the runtime: 1. **Step output cache** — `{step_key: output_value}` keyed by deterministic step path + arg hash. Every `@flow`-decorated function checks the cache first and returns the cached value instead of calling the body. Side-effects (RPC calls, model_call) replay as no-ops. 2. **Answer cache** — `{question_id: user_answer}`. When `ask_user(...)` is reached and the cache has a matching answer, it returns immediately. If not (a NEW question), it persists the question and exits again. - A flow can ask many questions across many steps — the subprocess exits after each unanswered one and resumes after each answer, with the previous answers all already cached. The model is essentially **continuation-by-replay** — same idea as Temporal / Cadence workflow replay, scoped to the @flow decorator boundary. ## API — `ask_user` primitives Exposed from `hero_tracing` alongside `flow`: ```python from hero_tracing import flow, ask_user # Free-text name = ask_user.text("What's the project name?", default=None) # Number (int or float — pick at the call site) n = ask_user.number("How many items to seed?", default=10, min=1, max=100) # Single choice service = ask_user.choice( "hero_osis_network has these rootobjects matching 'marketplace':", options=["Listing", "SliceProduct", "TokenMigration", "None — cancel"], ) # Multiple choice (returns list[str]) fields = ask_user.multi_choice( "Which Release fields should we surface?", options=["name", "tag_name", "author", "created_at", "is_draft"], min_selections=1, ) # Yes/No if not ask_user.confirm("Proceed with creating 6 records?", default=False): raise flow.Failed("user cancelled") ``` All five primitives: - Take a `question: str` (the user-facing prompt — **always free text, the model writes this**). - Persist a `Question { id, kind, prompt, options?, default?, constraints? }` record on the Play. - Return the typed answer when one is in the cache. - Exit the subprocess with `EXIT_CODE_AWAITING_INPUT = 75` (matches `EX_TEMPFAIL` semantically — "operation could not be performed at this time, try again later"). - The boot stub catches that exit code and the Rust side flips `Play.status = "awaiting_input"`. ### `ask_user` is NOT a `@flow`-decorated step It's a primitive. Calling it does NOT open a span (it would be a 0-duration span anyway). Instead the question gets attached to the **enclosing** `@flow`'s span as `pending_question` metadata, so the UI can render the input box right below the flow step that asked. ## Step memoization — the deterministic-replay layer For replay-by-rerun to work, every `@flow` call must short-circuit when its output is already cached. ### Step key ```python step_key = "/".join(parent_span_path) + ":" + fn.__hero_flow__["name"] + ":" + sha1(json.dumps({**input_kwargs}, sort_keys=True, default=str)) ``` Same `(parent path, function name, input args)` produces the same key. So the same call in two different parts of the flow correctly memoizes separately, and the same step under retry-loop iteration 1 vs iteration 2 produces different keys (because the parent `attempt N` span path differs). ### Cache lookup in the `@flow` wrapper ```python def wrapper(*args, **kwargs): key = compute_step_key(fn, kwargs) if key in _STEP_CACHE: # populated from persisted state at boot replay_value = _STEP_CACHE[key] emit_span("replayed", key, replay_value) # marks the span as a replay return replay_value span = flow.start_step(...) try: result = fn(*args, **kwargs) except _AwaitingInput as e: # Bubble up — the boot stub handles persistence + exit raise except Exception: span.tag("status", "failed") raise span.tag("result", result) persist_step_output(key, result) # written via span socket; server stores it return result ``` ### Output serialisation `@flow` decorator already accepts `inputs={'name': str, ...}`. Add `outputs={...}` so the persisted output validates on the way in and on replay. Outputs that aren't JSON-serializable (objects, file handles) need explicit handling — proposal: declare in `@flow(outputs=...)`, raise at runtime if a step returns something not coercible. ### What if a step is non-deterministic? (model_call, OSIS .set) Those are exactly the steps we WANT to memoize. The first time `model_call(prompt=X)` runs, it hits the LLM and spends tokens; on replay we don't want to spend them again. The first time `script_execution(code=X)` runs, it creates OSIS records; on replay we don't want to double-create. Memoization gives us idempotency for free. ## Server-side state changes (hero_logic) ### New `Play` fields ```rust pending_questions: Vec<Question> // questions with no answer yet received_answers: HashMap<String, String> // question_id -> JSON-serialized answer step_outputs: HashMap<String, String> // step_key -> JSON-serialized output ``` (All on the existing `Play` rootobject — additive, no migration needed for new plays.) ### New `Question` shape ```rust struct Question { id: String, // stable id derived from span path + question seq kind: QuestionKind, // Text | Number | Choice | MultiChoice | Confirm prompt: String, // user-facing text options: Option<Vec<String>>, // for Choice / MultiChoice default: Option<Value>, // type matches kind constraints: Option<Constraints>, // {min, max, min_selections, ...} asked_at_step_path: String, // for UI placement asked_at: i64, answered_at: Option<i64>, answer: Option<Value>, // populated by play.answer } ``` ### New RPC methods | Method | Params | Returns | Notes | |---|---|---|---| | `play.answer` | `play_sid, question_id, answer: Value` | `{ ok: true, resume_started: bool }` | Validates answer matches question kind. On success, server kicks off a new run of the play subprocess with `pending_questions` minus the answered one, `received_answers` updated, and the existing `step_outputs` cache passed through env. | | `play.pending_questions` | `play_sid` | `list[Question]` | Read open questions for UI rendering. | | `play.cancel` | `play_sid` | `{ ok: true }` | Marks play `cancelled` so subsequent UI doesn't keep re-asking. | ### Play state machine ``` pending → running → success → failed → awaiting_input → running (on play.answer) → ... → cancelled (on play.cancel) ``` ### Subprocess lifecycle - The Python flow exits with code 75 = "awaiting input". The Rust harness reads stderr for the structured `<<HERO_AWAITING_INPUT id="...">>...payload...<</HERO_AWAITING_INPUT>>` marker the boot stub writes, parses out the new question(s), and writes them onto the Play. (Use the existing span socket — emit a final `awaiting_input` event before exit.) - On `play.answer`, the harness spawns a new subprocess with env: - `HERO_FLOW_INPUT` — same as before (the original input) - `HERO_REPLAY_STEP_OUTPUTS_FILE` — path to a JSON file with `{step_key: output}` - `HERO_REPLAY_ANSWERS_FILE` — path to a JSON file with `{question_id: answer}` - The boot stub loads both files at startup and primes `_STEP_CACHE` and `_ANSWER_CACHE`. ## UI — bottom-bar island Below the existing flow-tree island in the play detail page (logic_admin), add a new island that takes ~30% of vertical space (resizable). Layout: ``` ┌───────────────────────────────────────────────────┐ │ Flow tree (existing) │ │ - select_services ok 399ms │ │ - compile_stubs ok 5ms │ │ - attempt 1 │ │ - service_code_gen ok 2868ms │ │ - script_execution ok 49ms │ │ - ⏸ ask_user (Choice) awaiting input │ ├───────────────────────────────────────────────────┤ │ [Logs] [Inputs needed (1)] [Events] pin to bot ▼│ │ │ │ ┓ pending_question id=q_attempt1.5_0 │ │ │ hero_osis_network has these rootobjects │ │ │ matching 'marketplace': │ │ │ ◉ Listing │ │ │ ○ SliceProduct │ │ │ ○ TokenMigration │ │ │ ○ None — cancel │ │ │ [ Submit ] │ │ ┛ │ │ 09:14:22 span:select_services ok │ │ 09:14:23 span:compile_stubs ok │ │ 09:14:24 log:catalog has 36 healthy services │ └───────────────────────────────────────────────────┘ ``` - **Logs tab** — live span events as they arrive (uses the existing span socket). - **Inputs needed tab** — every question with `answered_at == null`. Question count badged on tab. Renders form per `Question.kind`. On submit, POST `play.answer`. The page polls or listens for play status change and updates the flow tree to show the resumed run. - **Events tab** — full event history (already in `Play.spans`). - The island is resizable + collapsible. State persists in localStorage so it stays where the user puts it across navigations. ## Edge cases | Case | Handling | |---|---| | User submits answer but it doesn't match the question's `kind` (e.g. text where number expected) | `play.answer` validates, returns 422 with reason; UI re-renders question. | | Two users answer the same question simultaneously | First write wins; `answered_at` becomes immutable; subsequent answers rejected with "already answered". | | Same Play resumed twice in parallel (race on `play.answer`) | Server holds a per-play lock; second resume blocks until first completes its first new exit. | | Flow asks a question, user cancels (`play.cancel`) | Play status → `cancelled`. Replay never re-runs. | | Flow asks 5 questions in a row (no other steps between) | Each call exits, server collects 5 question records over 5 resumptions. UI can group consecutive pending questions into a single form so the user fills them out together. | | User edits a previously-answered question | Out of scope for v1. Could be added as `play.amend_answer` later — would invalidate all step outputs that were computed AFTER that answer. | | Step's input args contain non-deterministic values (e.g. `int(time.time())`) | Step key would change on every replay → cache miss → step re-executes → defeats memoization. Mitigation: discourage time-based inputs in `@flow` step args; document the pattern of pulling time INSIDE the step body, where it's not part of the cache key. | | Step's output is non-JSON-serializable | Persistence fails at write time with a clear error; flow needs to declare a serializer in `@flow(outputs=...)`. | | Answer cache is stale because the flow source changed between exit and resume | Detect by hashing the workflow_version_sid into the step keys; mismatch invalidates the cache and the user gets a "flow updated, restart from scratch?" prompt. | | Programmatic / batch runs (optimize_flow) need to skip ask_user | `play_run_async` accepts `prefill_answers: dict[question_pattern, value]` so non-interactive runs can pre-script every expected question. If a question isn't pre-filled, the play fails fast instead of awaiting. | ## Implementation phases ### Phase 1 — server state + RPC - Add `pending_questions`, `received_answers`, `step_outputs` fields to `Play` schema. - Add `play.answer`, `play.pending_questions`, `play.cancel` RPC methods. - Add `Play.status = "awaiting_input"` state + transitions. - Subprocess wrapper: detect exit code 75, parse structured marker, persist questions, flip status. - `play.answer` writes the answer, spawns a new subprocess with env-var pointers to cache files. ### Phase 2 — Python runtime - `_STEP_CACHE` + `_ANSWER_CACHE` populated from env-var-pointed files at boot. - `@flow` wrapper: compute step_key, check cache, short-circuit or persist+continue. - `ask_user.text/number/choice/multi_choice/confirm` primitives. - Boot stub: catch `_AwaitingInput` exception, write structured marker, exit 75. ### Phase 3 — UI - Bottom-bar island in logic_admin's play-detail page. - Three tabs: Logs / Inputs needed / Events. - Per-`QuestionKind` form rendering. - POST handler that calls `play.answer` and refreshes play state. - Resizable + collapsible + localStorage state. ### Phase 4 — agent integration - Update service_agent prompt: when `select_services` returns a service whose schema clearly doesn't match the user prompt, call `ask_user.choice(...)` to confirm before generating code. Same for missing required fields the model would otherwise have to invent — surface them as `ask_user.text(...)` calls. - Add a `validate_capability` flow step: LLM-judges whether the picked services can satisfy the prompt; if not, asks user. ### Phase 5 — visualization polish - Mark replayed steps visually in the flow tree (e.g. dashed border + "↻ replayed from cache" tooltip). - Show per-step inputs/outputs in a side panel when a step is selected (uses the new `outputs={...}` declaration). - "Resume timeline" view: shows when each subprocess run started/ended and which questions/answers triggered each resume. ## Acceptance criteria - A `@flow` function can call `ask_user.text("What's your name?")`, the play exits with status `awaiting_input`, the UI renders an input box, the user submits "Alice", the play re-runs and the function gets `"Alice"` back. - A flow with three sequential steps where step 2 asks a question replays steps 1 + 2 (with cached outputs) and only step 3 onwards executes fresh. - A flow that does an OSIS `*_set` followed by an `ask_user` does NOT double-create the OSIS record on resume. - The seeded "Marketplace tokens" example, without prompt rewrite, asks the user to pick one of `Listing` / `SliceProduct` / `TokenMigration` / Cancel; once user picks, the agent proceeds and creates records of that type, succeeding on first attempt. - The Logic UI shows pending questions in a dedicated bottom-bar tab with form-rendered inputs per question kind. - `play_run_async(prefill_answers={...})` lets `scripts/run_all_examples.py` answer ahead-of-time so the suite stays headless-runnable. ## Background context (for the next AI picking this up) This issue is being filed by Claude Opus 4.7 after several debugging sessions on the service_agent. Useful pointers: - The service_agent flow lives at `crates/hero_logic/src/seed_flows/service_agent.py`; sub-flows at `service_code_gen.py`, `model_call.py`, `optimize_flow.py`. - The `@flow` decorator + meta-path importer for sub-flows live at `crates/hero_logic/sdk/python/hero_tracing.py`. - The boot stub lives at `crates/hero_logic/src/engine/python_executor.rs::BOOT_STUB`. - Spans are emitted via UDS to the per-Play span socket — see `crates/hero_logic/src/engine/span_socket.rs`. - Existing Play state machine + RPC surface in `crates/hero_logic/src/logic/...`. - The Logic admin UI is `crates/hero_logic_admin/` (Axum + server-rendered HTML + a bit of JS in `static/js/workflow_editor.js`). Templates in `templates/`. - Recent related work: `feat(11-D): delete DAG — Python-only schema cutover (#21)`, `feat(11-C2): per-Play span socket listener (#18)`. Read those PRs for context on the current Play execution model. - The user (timur) prefers terse responses, root-cause fixes over workarounds, and "make every example pass" as the north star. He explicitly does NOT want timeouts or long-blocked subprocesses (this is why the design is exit-and-replay rather than wait-and-resume). ## Out of scope (for this issue) - Editing previously-given answers (`play.amend_answer`) - Branching flows (different paths based on answer) beyond what plain Python `if/elif` already gives - Streaming / chat-style multi-turn within a single question - Backfilling step memoization for existing plays predating this feature 🤖 Generated with [Claude Code](https://claude.com/claude-code)
timur closed this issue 2026-05-13 15:37:04 +00:00
Sign in to join this conversation.
No labels
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
lhumina_code/hero_logic#28
No description provided.