Middleware
ReliabilityContext
dataclass
Per-run bundle of every reliability primitive the execution loop needs.
Decouples the reliability instrumentation from any particular execution
strategy (linear runner, parallel fan-out, agentic loop, etc.). Created
once per pipeline run via for_run(), then threaded through whichever
execution strategy drives the steps.
for_run(config, registry, checkpoint_manager)
classmethod
Create a fresh context for a single pipeline run.
build_result(status, final_state)
Construct an ExecutionResult from the current tracker/trace state.
execute_step(step_id, func, input_state, ctx, *, contract=None)
async
Execute a single step with full reliability instrumentation.
Runs func with retry logic, contract validation, cost tracking, budget
enforcement, model fallback, checkpointing, trace recording, and failure
classification. Works independently of any orchestration framework —
this is the freestanding reliability loop that StromaRunner delegates
to internally.
If contract is provided, input and output are validated against it. Otherwise the step runs without schema validation.
StromaStep
Wraps any async callable with Stroma's full reliability suite.
Provides a paradigm-agnostic way to apply contract validation, cost
tracking, retry logic, checkpointing, and tracing to individual
execution steps — without requiring StromaRunner or any framework
adapter.
Use wrap() to get back a decorated async callable, or use the
instance as a decorator factory via __call__().
Example
ctx = ReliabilityContext.for_run(config, registry, manager)
step = StromaStep(ctx)
@step("summarize", input=DocInput, output=Summary)
async def summarize(state: DocInput) -> dict:
return {"text": "..."}
result = await summarize(DocInput(text="hello"))
wrap(step_id, func, *, contract=None)
Wrap func so that calling it runs execute_step with full instrumentation.
__call__(step_id, *, input, output)
Decorator that creates a contract, registers it, and wraps the function.
Paradigm-agnostic equivalent of StromaRunner.node().