Skip to content

Middleware

ReliabilityContext dataclass

Per-run bundle of every reliability primitive the execution loop needs.

Decouples the reliability instrumentation from any particular execution strategy (linear runner, parallel fan-out, agentic loop, etc.). Created once per pipeline run via for_run(), then threaded through whichever execution strategy drives the steps.

for_run(config, registry, checkpoint_manager) classmethod

Create a fresh context for a single pipeline run.

build_result(status, final_state)

Construct an ExecutionResult from the current tracker/trace state.

execute_step(step_id, func, input_state, ctx, *, contract=None) async

Execute a single step with full reliability instrumentation.

Runs func with retry logic, contract validation, cost tracking, budget enforcement, model fallback, checkpointing, trace recording, and failure classification. Works independently of any orchestration framework — this is the freestanding reliability loop that StromaRunner delegates to internally.

If contract is provided, input and output are validated against it. Otherwise the step runs without schema validation.

StromaStep

Wraps any async callable with Stroma's full reliability suite.

Provides a paradigm-agnostic way to apply contract validation, cost tracking, retry logic, checkpointing, and tracing to individual execution steps — without requiring StromaRunner or any framework adapter.

Use wrap() to get back a decorated async callable, or use the instance as a decorator factory via __call__().

Example

ctx = ReliabilityContext.for_run(config, registry, manager)
step = StromaStep(ctx)

@step("summarize", input=DocInput, output=Summary)
async def summarize(state: DocInput) -> dict:
    return {"text": "..."}

result = await summarize(DocInput(text="hello"))

wrap(step_id, func, *, contract=None)

Wrap func so that calling it runs execute_step with full instrumentation.

__call__(step_id, *, input, output)

Decorator that creates a contract, registers it, and wraps the function.

Paradigm-agnostic equivalent of StromaRunner.node().