Runner
NodeHooks
dataclass
Optional async callbacks fired at node execution boundaries.
on_node_start(run_id, node_id, input_state_dict)on_node_success(run_id, node_id, output_state_dict, tokens_used)on_node_failure(run_id, node_id, exc, failure_class)
RunConfig
Bases: BaseModel
Configuration for a single pipeline run.
A run_id is auto-generated if not provided. Set budget, policy_map,
classifiers, or resume_from to customize behavior.
StromaRunner
Orchestrates sequential node execution with reliability primitives.
Runs a sequence of async node functions, applying contract validation,
retry policies, cost tracking, and checkpointing at each step. Requires
a ContractRegistry for input/output validation, a CheckpointManager
for persistence, and a RunConfig with budget, policies, and classifiers.
Example
runner = StromaRunner(registry, checkpoint_manager, config)
result = await runner.run([node1, node2], initial_state)
print(result.status, result.final_state)
quick(*, store=None, budget=None, policy_map=None, classifiers=None, hooks=None, **config_kwargs)
classmethod
Create a runner with sensible defaults — no boilerplate required.
Sets up an InMemoryStore, unlimited ExecutionBudget, and
default_policy_map() retry policies. Pass store, budget,
policy_map, classifiers, or hooks to override any of these.
Additional keyword arguments are forwarded to RunConfig.
Returns a fully configured StromaRunner ready to use.
Example
runner = StromaRunner.quick()
@runner.node("double", input=InputState, output=OutputState)
async def double(state: InputState) -> dict:
return {"result": state.value * 2}
result = await runner.run([double], InputState(value=5))
with_redis(redis_url, ttl_seconds=3600)
Replace the checkpoint backend with a Redis-backed store.
Requires the redis extra (uv add stroma[redis]).
with_budget(*, tokens=None, cost_usd=None, latency_ms=None)
Set execution budget limits for token count, cost, or latency.
with_model_fallback(model, *, to, at_budget_pct=0.8)
Append a model fallback rule that activates when spend crosses at_budget_pct.
When cumulative cost reaches at_budget_pct of the budget, nodes
that would use model receive to as the model signal in
ctx["_stroma_model"] instead, and cost accounting uses to for
pricing.
with_classifiers(classifiers)
Set custom failure classifiers for error handling.
with_hooks(hooks)
Set node lifecycle hooks for observability.
with_context(context)
Set the shared context dict passed to nodes that accept a second argument.
with_policy_map(policy_map)
Set the global failure policy map.
with_node_policies(node_policies)
Set per-node failure policy overrides.
with_node_timeouts(node_timeouts)
Set per-node execution timeouts in milliseconds.
Nodes that exceed their timeout raise TimeoutError, which is
classified as RECOVERABLE by default and retried according to
the applicable retry policy.
node(node_id, *, input, output)
Decorator that creates a contract, registers it, and marks the function as a node.
Combines stroma_node, NodeContract creation, and
ContractRegistry.register into a single step. Pass the node_id,
input schema, and output schema and the returned decorator handles
the rest.
Example
runner = StromaRunner.quick()
@runner.node("extract", input=Document, output=Entities)
async def extract(state: Document) -> dict:
return {"entities": ["Python", "Pydantic"]}
run(node_sequence, initial_state)
async
Execute a sequence of nodes with full reliability instrumentation.
Runs each node in node_sequence starting from initial_state,
returning an ExecutionResult with the final state, trace, and
cost info.
Node decorators
Stroma provides two ways to declare a pipeline node:
| Decorator | When to use |
|---|---|
@runner.node("id", input=In, output=Out) |
You already have a StromaRunner instance and want the contract registered automatically. |
@stroma_node("id", contract) |
You're building the registry manually, testing in isolation, or sharing node definitions across runners. |
Both attach the same _stroma_node_id and _stroma_contract metadata to the
function. @runner.node is syntactic sugar that also calls
registry.register(...) for you.
stroma_node(node_id, contract)
Decorator that attaches contract metadata to an async node function.
Binds node_id and contract as attributes on the decorated function, returning a decorator that marks it as a stroma pipeline node.
Example
@stroma_node("extract", NodeContract(node_id="extract", input_schema=In, output_schema=Out))
async def extract(state: In) -> dict:
return {"url": state.url, "content": "..."}
parallel(*nodes)
Wrap multiple nodes to run concurrently with asyncio.gather.
The returned pseudo-node fans out the current pipeline state to all child nodes, runs them concurrently, then merges their output dicts (last write wins on key conflicts). All child nodes must accept the same input schema.
Each child's output is validated against its declared contract (if decorated
with @stroma_node or @runner.node) before merging. Cost tuples returned
by children are accumulated and returned as a 4-tuple so the runner can
track token usage.
On any child failure (including ContractViolation), remaining tasks are
cancelled and the exception propagates to the runner's failure handling.
Example
result = await runner.run([node_a, parallel(node_b, node_c), node_d], state)