Skip to content

Runner

NodeHooks dataclass

Optional async callbacks fired at node execution boundaries.

  • on_node_start(run_id, node_id, input_state_dict)
  • on_node_success(run_id, node_id, output_state_dict, tokens_used)
  • on_node_failure(run_id, node_id, exc, failure_class)

RunConfig

Bases: BaseModel

Configuration for a single pipeline run.

A run_id is auto-generated if not provided. Set budget, policy_map, classifiers, or resume_from to customize behavior.

StromaRunner

Orchestrates sequential node execution with reliability primitives.

Runs a sequence of async node functions, applying contract validation, retry policies, cost tracking, and checkpointing at each step. Requires a ContractRegistry for input/output validation, a CheckpointManager for persistence, and a RunConfig with budget, policies, and classifiers.

Example

runner = StromaRunner(registry, checkpoint_manager, config)
result = await runner.run([node1, node2], initial_state)
print(result.status, result.final_state)

quick(*, store=None, budget=None, policy_map=None, classifiers=None, hooks=None, **config_kwargs) classmethod

Create a runner with sensible defaults — no boilerplate required.

Sets up an InMemoryStore, unlimited ExecutionBudget, and default_policy_map() retry policies. Pass store, budget, policy_map, classifiers, or hooks to override any of these. Additional keyword arguments are forwarded to RunConfig.

Returns a fully configured StromaRunner ready to use.

Example
runner = StromaRunner.quick()

@runner.node("double", input=InputState, output=OutputState)
async def double(state: InputState) -> dict:
    return {"result": state.value * 2}

result = await runner.run([double], InputState(value=5))

with_redis(redis_url, ttl_seconds=3600)

Replace the checkpoint backend with a Redis-backed store.

Requires the redis extra (uv add stroma[redis]).

with_budget(*, tokens=None, cost_usd=None, latency_ms=None)

Set execution budget limits for token count, cost, or latency.

with_model_fallback(model, *, to, at_budget_pct=0.8)

Append a model fallback rule that activates when spend crosses at_budget_pct.

When cumulative cost reaches at_budget_pct of the budget, nodes that would use model receive to as the model signal in ctx["_stroma_model"] instead, and cost accounting uses to for pricing.

with_classifiers(classifiers)

Set custom failure classifiers for error handling.

with_hooks(hooks)

Set node lifecycle hooks for observability.

with_context(context)

Set the shared context dict passed to nodes that accept a second argument.

with_policy_map(policy_map)

Set the global failure policy map.

with_node_policies(node_policies)

Set per-node failure policy overrides.

with_node_timeouts(node_timeouts)

Set per-node execution timeouts in milliseconds.

Nodes that exceed their timeout raise TimeoutError, which is classified as RECOVERABLE by default and retried according to the applicable retry policy.

node(node_id, *, input, output)

Decorator that creates a contract, registers it, and marks the function as a node.

Combines stroma_node, NodeContract creation, and ContractRegistry.register into a single step. Pass the node_id, input schema, and output schema and the returned decorator handles the rest.

Example
runner = StromaRunner.quick()

@runner.node("extract", input=Document, output=Entities)
async def extract(state: Document) -> dict:
    return {"entities": ["Python", "Pydantic"]}

run(node_sequence, initial_state) async

Execute a sequence of nodes with full reliability instrumentation.

Runs each node in node_sequence starting from initial_state, returning an ExecutionResult with the final state, trace, and cost info.

Node decorators

Stroma provides two ways to declare a pipeline node:

Decorator When to use
@runner.node("id", input=In, output=Out) You already have a StromaRunner instance and want the contract registered automatically.
@stroma_node("id", contract) You're building the registry manually, testing in isolation, or sharing node definitions across runners.

Both attach the same _stroma_node_id and _stroma_contract metadata to the function. @runner.node is syntactic sugar that also calls registry.register(...) for you.

stroma_node(node_id, contract)

Decorator that attaches contract metadata to an async node function.

Binds node_id and contract as attributes on the decorated function, returning a decorator that marks it as a stroma pipeline node.

Example

@stroma_node("extract", NodeContract(node_id="extract", input_schema=In, output_schema=Out))
async def extract(state: In) -> dict:
    return {"url": state.url, "content": "..."}

parallel(*nodes)

Wrap multiple nodes to run concurrently with asyncio.gather.

The returned pseudo-node fans out the current pipeline state to all child nodes, runs them concurrently, then merges their output dicts (last write wins on key conflicts). All child nodes must accept the same input schema.

Each child's output is validated against its declared contract (if decorated with @stroma_node or @runner.node) before merging. Cost tuples returned by children are accumulated and returned as a 4-tuple so the runner can track token usage.

On any child failure (including ContractViolation), remaining tasks are cancelled and the exception propagates to the runner's failure handling.

Example

result = await runner.run([node_a, parallel(node_b, node_c), node_d], state)