feat: ask_user primitive + replay-based step memoization for resumable flows #28
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "%!s()"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Summary
Add a built-in
ask_userprimitive to the Python flow runtime that lets a@flowstep pause execution and request input from the user, then exit cleanly with state persisted. When the user answers, the play re-runs from scratch — but every previously-completed step replays from its persisted output instead of re-executing, so no work is lost and side-effects don't double-fire. The Logic admin UI gains a bottom-bar island (below the flow tree) for live logs and input requests, like a terminal.This makes
ask_userthe canonical way for an agent flow to handle ambiguity, missing context, or capability gaps in user prompts: instead of failing or fabricating, it asks. It also generalises the@flowstep model: every step gets typed inputs/outputs, a deterministic step key, and persisted output that visualises on the trace.Motivation
The service_agent today fails ~⅔ of seeded examples. After a deep look at five representative failures (see "Background context" at the bottom of this issue) the failures cluster as:
hero_osis_network(which hasListings,SliceProducts,TokenMigrations— none of those).CodeProjectrequires 15 fields; the model invents 13 of them, gets one wrong, fails.For all of these, the correct answer is to ask the user. "I see you want X but the service has Y, Z, W — pick one or cancel." Or: "Releases need
default_branch— what should it be?". That's also what a real human assistant would do.Implementing
ask_usergets the seeded examples passing without rewriting their prompts; the agent demonstrates the feature instead of hiding the gap.Design — Resume by replay, not by waiting
The user explicitly does NOT want timeouts or long-blocked subprocesses. Constraints:
ask_user(...)exits the Python subprocess cleanly with a special status. The Play is markedawaiting_inputserver-side; play state (every completed step's output, every span event) is fully persisted on disk.{step_key: output_value}keyed by deterministic step path + arg hash. Every@flow-decorated function checks the cache first and returns the cached value instead of calling the body. Side-effects (RPC calls, model_call) replay as no-ops.{question_id: user_answer}. Whenask_user(...)is reached and the cache has a matching answer, it returns immediately. If not (a NEW question), it persists the question and exits again.The model is essentially continuation-by-replay — same idea as Temporal / Cadence workflow replay, scoped to the @flow decorator boundary.
API —
ask_userprimitivesExposed from
hero_tracingalongsideflow:All five primitives:
question: str(the user-facing prompt — always free text, the model writes this).Question { id, kind, prompt, options?, default?, constraints? }record on the Play.EXIT_CODE_AWAITING_INPUT = 75(matchesEX_TEMPFAILsemantically — "operation could not be performed at this time, try again later").Play.status = "awaiting_input".ask_useris NOT a@flow-decorated stepIt's a primitive. Calling it does NOT open a span (it would be a 0-duration span anyway). Instead the question gets attached to the enclosing
@flow's span aspending_questionmetadata, so the UI can render the input box right below the flow step that asked.Step memoization — the deterministic-replay layer
For replay-by-rerun to work, every
@flowcall must short-circuit when its output is already cached.Step key
Same
(parent path, function name, input args)produces the same key. So the same call in two different parts of the flow correctly memoizes separately, and the same step under retry-loop iteration 1 vs iteration 2 produces different keys (because the parentattempt Nspan path differs).Cache lookup in the
@flowwrapperOutput serialisation
@flowdecorator already acceptsinputs={'name': str, ...}. Addoutputs={...}so the persisted output validates on the way in and on replay. Outputs that aren't JSON-serializable (objects, file handles) need explicit handling — proposal: declare in@flow(outputs=...), raise at runtime if a step returns something not coercible.What if a step is non-deterministic? (model_call, OSIS .set)
Those are exactly the steps we WANT to memoize. The first time
model_call(prompt=X)runs, it hits the LLM and spends tokens; on replay we don't want to spend them again. The first timescript_execution(code=X)runs, it creates OSIS records; on replay we don't want to double-create. Memoization gives us idempotency for free.Server-side state changes (hero_logic)
New
Playfields(All on the existing
Playrootobject — additive, no migration needed for new plays.)New
QuestionshapeNew RPC methods
play.answerplay_sid, question_id, answer: Value{ ok: true, resume_started: bool }pending_questionsminus the answered one,received_answersupdated, and the existingstep_outputscache passed through env.play.pending_questionsplay_sidlist[Question]play.cancelplay_sid{ ok: true }cancelledso subsequent UI doesn't keep re-asking.Play state machine
Subprocess lifecycle
<<HERO_AWAITING_INPUT id="...">>...payload...<</HERO_AWAITING_INPUT>>marker the boot stub writes, parses out the new question(s), and writes them onto the Play. (Use the existing span socket — emit a finalawaiting_inputevent before exit.)play.answer, the harness spawns a new subprocess with env:HERO_FLOW_INPUT— same as before (the original input)HERO_REPLAY_STEP_OUTPUTS_FILE— path to a JSON file with{step_key: output}HERO_REPLAY_ANSWERS_FILE— path to a JSON file with{question_id: answer}_STEP_CACHEand_ANSWER_CACHE.UI — bottom-bar island
Below the existing flow-tree island in the play detail page (logic_admin), add a new island that takes ~30% of vertical space (resizable). Layout:
answered_at == null. Question count badged on tab. Renders form perQuestion.kind. On submit, POSTplay.answer. The page polls or listens for play status change and updates the flow tree to show the resumed run.Play.spans).Edge cases
kind(e.g. text where number expected)play.answervalidates, returns 422 with reason; UI re-renders question.answered_atbecomes immutable; subsequent answers rejected with "already answered".play.answer)play.cancel)cancelled. Replay never re-runs.play.amend_answerlater — would invalidate all step outputs that were computed AFTER that answer.int(time.time()))@flowstep args; document the pattern of pulling time INSIDE the step body, where it's not part of the cache key.@flow(outputs=...).play_run_asyncacceptsprefill_answers: dict[question_pattern, value]so non-interactive runs can pre-script every expected question. If a question isn't pre-filled, the play fails fast instead of awaiting.Implementation phases
Phase 1 — server state + RPC
pending_questions,received_answers,step_outputsfields toPlayschema.play.answer,play.pending_questions,play.cancelRPC methods.Play.status = "awaiting_input"state + transitions.play.answerwrites the answer, spawns a new subprocess with env-var pointers to cache files.Phase 2 — Python runtime
_STEP_CACHE+_ANSWER_CACHEpopulated from env-var-pointed files at boot.@flowwrapper: compute step_key, check cache, short-circuit or persist+continue.ask_user.text/number/choice/multi_choice/confirmprimitives._AwaitingInputexception, write structured marker, exit 75.Phase 3 — UI
QuestionKindform rendering.play.answerand refreshes play state.Phase 4 — agent integration
select_servicesreturns a service whose schema clearly doesn't match the user prompt, callask_user.choice(...)to confirm before generating code. Same for missing required fields the model would otherwise have to invent — surface them asask_user.text(...)calls.validate_capabilityflow step: LLM-judges whether the picked services can satisfy the prompt; if not, asks user.Phase 5 — visualization polish
outputs={...}declaration).Acceptance criteria
@flowfunction can callask_user.text("What's your name?"), the play exits with statusawaiting_input, the UI renders an input box, the user submits "Alice", the play re-runs and the function gets"Alice"back.*_setfollowed by anask_userdoes NOT double-create the OSIS record on resume.Listing/SliceProduct/TokenMigration/ Cancel; once user picks, the agent proceeds and creates records of that type, succeeding on first attempt.play_run_async(prefill_answers={...})letsscripts/run_all_examples.pyanswer ahead-of-time so the suite stays headless-runnable.Background context (for the next AI picking this up)
This issue is being filed by Claude Opus 4.7 after several debugging sessions on the service_agent. Useful pointers:
crates/hero_logic/src/seed_flows/service_agent.py; sub-flows atservice_code_gen.py,model_call.py,optimize_flow.py.@flowdecorator + meta-path importer for sub-flows live atcrates/hero_logic/sdk/python/hero_tracing.py.crates/hero_logic/src/engine/python_executor.rs::BOOT_STUB.crates/hero_logic/src/engine/span_socket.rs.crates/hero_logic/src/logic/....crates/hero_logic_admin/(Axum + server-rendered HTML + a bit of JS instatic/js/workflow_editor.js). Templates intemplates/.feat(11-D): delete DAG — Python-only schema cutover (#21),feat(11-C2): per-Play span socket listener (#18). Read those PRs for context on the current Play execution model.Out of scope (for this issue)
play.amend_answer)if/elifalready gives🤖 Generated with Claude Code