Concepts
Stroma is built around core primitives that work together to make agent pipelines reliable. Each can be used independently, but they compose naturally when wired through the StromaRunner.
New to Stroma?
If you're looking for a hands-on introduction, start with the Tutorial. This page covers architecture and design decisions — it's a reference, not a walkthrough.
Contracts
A contract defines the expected input and output schemas for a pipeline node using Pydantic models. Contracts catch data shape issues at node boundaries before they propagate downstream.
from pydantic import BaseModel
from stroma import ContractRegistry, NodeContract
class Query(BaseModel):
text: str
max_results: int
class SearchResult(BaseModel):
urls: list[str]
scores: list[float]
registry = ContractRegistry()
registry.register(NodeContract(
node_id="search",
input_schema=Query,
output_schema=SearchResult,
))
When validation fails, a ContractViolation is raised with the node ID, direction ("input" or "output"), the raw data, and Pydantic's error details. The str() representation includes the first 5 field-level errors so you can diagnose failures without digging into the trace:
Contract violation for 'search' (output): urls: Input should be a valid list; scores: Input should be a valid list
The runner classifies contract violations as terminal failures — no retries, because the data shape won't fix itself.
Why terminal?
A ContractViolation means the node produced structurally invalid data. Retrying with the same input will produce the same bad output. The fix is in the node's logic, not in a retry.
Failure Classification
Not all errors are equal. Stroma categorizes every exception into one of three classes:
| Class | Meaning | Default behavior |
|---|---|---|
RECOVERABLE |
Transient failure (timeouts, rate limits) | Retry up to 3 times with jittered backoff |
TERMINAL |
Permanent failure (bad data, logic errors) | Stop immediately |
AMBIGUOUS |
Unknown — might resolve on retry | Retry once with short backoff |
Built-in rules handle common exceptions:
| Exception | Classification |
|---|---|
ContractViolation |
TERMINAL |
BudgetExceeded |
RECOVERABLE |
TimeoutError |
RECOVERABLE |
ValueError |
AMBIGUOUS |
| Everything else | AMBIGUOUS |
Override with custom classifiers — callables that inspect the exception and context, returning a FailureClass or None to defer:
from stroma import FailureClass, StromaRunner
def classify_rate_limit(exc, ctx): # (1)!
if "rate limit" in str(exc).lower():
return FailureClass.RECOVERABLE
return None # (2)!
runner = StromaRunner.quick(
classifiers=[classify_rate_limit],
)
ctxis aNodeContextwithnode_id,attempt, andrun_id— use it to make classification decisions based on where and when the error occurred.- Return
Noneto pass to the next classifier. If no classifier matches, built-in rules apply.
Custom classifiers are checked first, in order. If none return a result, the built-in rules apply.
Retry Policies
Each failure class maps to a retry policy controlling:
max_retries— how many times to retry before giving upbackoff_seconds— maximum backoff (actual delay is jittered between 0 and this value)
from stroma import FailureClass, FailurePolicy
custom_policies = {
FailureClass.RECOVERABLE: FailurePolicy(max_retries=5, backoff_seconds=2.0),
FailureClass.TERMINAL: FailurePolicy(max_retries=0),
FailureClass.AMBIGUOUS: FailurePolicy(max_retries=2, backoff_seconds=0.5),
}
Per-node policy overrides
The global policy_map applies to all nodes. When a specific node needs different retry behavior, use node_policies to override on a per-node basis:
from stroma import RunConfig, FailureClass, FailurePolicy
config = RunConfig(
policy_map={ # global defaults
FailureClass.RECOVERABLE: FailurePolicy(max_retries=3, backoff_seconds=1.0),
FailureClass.TERMINAL: FailurePolicy(max_retries=0),
FailureClass.AMBIGUOUS: FailurePolicy(max_retries=1, backoff_seconds=0.5),
},
node_policies={ # per-node overrides
"llm_call": {
FailureClass.RECOVERABLE: FailurePolicy(max_retries=5, backoff_seconds=2.0),
},
"fast_transform": {
FailureClass.RECOVERABLE: FailurePolicy(max_retries=0),
},
},
)
The lookup order is: per-node override → global policy_map → built-in defaults. Unspecified failure classes for a node fall through to the global policy.
Jittered backoff
The actual delay before each retry is random.uniform(0, backoff_seconds) — not a fixed sleep. This prevents thundering herd problems when multiple pipelines retry simultaneously against the same API.
Cost Tracking
Stroma tracks three resource dimensions across your pipeline:
| Dimension | Field | Unit |
|---|---|---|
| Tokens | max_tokens_total |
integer |
| Cost | max_cost_usd |
USD (float) |
| Latency | max_latency_ms |
milliseconds |
Set budget limits on any dimension. When a limit is exceeded, BudgetExceeded is raised — classified as RECOVERABLE by default.
from stroma import ExecutionBudget
budget = ExecutionBudget(
max_tokens_total=10_000,
max_cost_usd=0.50,
max_latency_ms=30_000,
)
Reporting token usage and model
Nodes report usage by returning a tuple. Four shapes are supported:
| Return shape | Fields |
|---|---|
dict |
No usage tracking (tokens=0, cost=0) |
(dict, tokens) |
Total tokens, no model-based pricing |
(dict, input_tokens, model) |
Input tokens + model name for cost estimation |
(dict, input_tokens, output_tokens, model) |
Full usage with separate input/output tokens |
When a model string is provided, estimate_cost_usd looks up pricing from KNOWN_MODELS and computes the USD cost automatically:
@stroma_node("summarize", contract)
async def summarize(state: Input) -> tuple:
result = await llm.generate(state.text)
return (
{"summary": result.text},
result.usage.input_tokens,
result.usage.output_tokens,
"gpt-4o", # (1)!
)
- Stroma looks up
"gpt-4o"inKNOWN_MODELSand computes cost as(input_tokens * input_price + output_tokens * output_price) / 1_000_000. Unknown models default to $0.00.
Built-in pricing
KNOWN_MODELS ships with per-million-token pricing for these models:
| Model | Input ($/1M tokens) | Output ($/1M tokens) |
|---|---|---|
gpt-4o |
2.50 | 10.00 |
gpt-4o-mini |
0.15 | 0.60 |
gpt-4-turbo |
10.00 | 30.00 |
claude-opus-4-6 |
15.00 | 75.00 |
claude-sonnet-4-6 |
3.00 | 15.00 |
claude-haiku-4-5 |
0.80 | 4.00 |
gemini-1.5-pro |
3.50 | 10.50 |
gemini-1.5-flash |
0.35 | 1.05 |
Unknown models default to $0.00 — no error, just no cost tracking. You can inspect or extend KNOWN_MODELS at runtime:
from stroma import KNOWN_MODELS
# Add a custom model
KNOWN_MODELS["my-fine-tune"] = (5.00, 15.00)
The max_cost_usd budget enforces actual dollar limits based on model pricing.
Checkpointing
After each successful node execution, the output state is checkpointed. If a pipeline fails partway through, you can resume from the last checkpoint instead of re-running completed nodes:
from stroma import RunConfig
# First run fails at node3
config1 = RunConfig(run_id="run-123")
result = await runner.run([node1, node2, node3], initial_state)
# result.status == PARTIAL or FAILED
# Resume from node3 — node1 and node2 are skipped, output loaded from checkpoint
config2 = RunConfig(run_id="run-123", resume_from="node3") # (1)!
result = await runner.run([node1, node2, node3], initial_state)
# result.status == RESUMED
- The
run_idmust match the original run. The runner loads the checkpoint for the node beforeresume_fromand uses it as input.
Storage backends
| Backend | Interface | Use case |
|---|---|---|
InMemoryStore |
sync | Testing, short-lived pipelines |
AsyncInMemoryStore |
async | Testing with async runners (default for StromaRunner.quick()) |
RedisStore |
async | Production, distributed systems (uses redis.asyncio) |
SyncRedisStore |
sync | Legacy or sync-only environments |
The CheckpointManager handles both sync and async stores transparently. Implement the CheckpointStore or AsyncCheckpointStore protocol to add your own backend — no subclassing required, just implement save, load, and delete.
Additive with LangGraph
If you're already using LangGraph's checkpointer (MemorySaver, SqliteSaver, etc.), keep it. Stroma's checkpointing is for non-LangGraph pipelines or as a complementary layer.
Execution Tracing
Every node execution attempt is recorded as a TraceEvent capturing:
- Node ID, run ID, and attempt number
- Timestamp and duration
- Input and output state (as dicts)
- Failure class and message (if failed)
The ExecutionTrace provides filtering, diffing, replay, and JSON serialization:
result = await runner.run(nodes, state)
# Inspect failures
for event in result.trace.failures():
print(f"{event.node_id} attempt {event.attempt}: {event.failure_message}")
# Compare two runs
diffs = trace_a.diff(trace_b) # (1)!
# Export for monitoring
json_payload = result.trace.to_json()
diff()compares node IDs, attempts, inputs, outputs, and failure states — ignoring timestamps and durations that naturally vary between runs.
Hooks, Context, and Logging
These features are composable and optional — use what your pipeline needs.
Observability hooks — NodeHooks provides async callbacks (on_node_start, on_node_success, on_node_failure) at node boundaries for telemetry. All optional, zero overhead when None. See the hooks tutorial and Extending Stroma for full OTel examples.
Shared context — a mutable dict on RunConfig passed to nodes that accept a second parameter. Use it for HTTP clients, API keys, caches, or accumulated metadata. See the shared context tutorial.
Structured logging — the runner creates a per-run logging.LoggerAdapter with run_id in its extra dict, making it filterable in JSON log pipelines.
Pipeline Execution
The StromaRunner ties everything together. For each node in the sequence it:
- Validates input against the contract
- Fires
on_node_starthook (if configured) - Executes the async node function, applying per-node timeout if configured
- Validates output against the contract
- Records resource usage, computes cost from model pricing, and checks the budget
- Saves a checkpoint
- Records a trace event
- Fires
on_node_successhook (if configured)
On failure, it fires on_node_failure, classifies the exception, looks up the per-node or global retry policy, and either retries with jittered backoff, stops with a terminal status, or gives up after exhausting retries.
Per-node timeouts
Set per-node timeouts to guard against hanging calls. Timeouts are specified in milliseconds and keyed by node ID:
runner = StromaRunner.quick().with_node_timeouts({
"llm_call": 30_000, # 30 seconds
"embedding": 10_000, # 10 seconds
})
When a node exceeds its timeout, asyncio.wait_for raises TimeoutError. This is classified as RECOVERABLE (see the failure classification table), so timed-out nodes are retried automatically. Nodes without a configured timeout run without any time limit. Timeouts apply to both sequential and parallel nodes.
Input State → [Hook:start] → [Contract] → Node → [Contract] → [Budget] → [Checkpoint] → [Hook:success] → Output State
↑ |
└──────────── retry (if recoverable) ──────────────────────────────┘
Parallel fan-out
For independent nodes that can run simultaneously, use parallel():
from stroma import parallel
result = await runner.run(
[node_a, parallel(node_b, node_c), node_d],
state,
)
parallel() wraps multiple nodes into a single pseudo-node that runs them concurrently with asyncio.gather. Child outputs are merged into a single dict (last write wins on key conflicts). Each child's output is validated against its declared contract before merging, so structurally invalid data is caught immediately. On any child failure (including ContractViolation), remaining tasks are cancelled and the exception propagates to the runner's failure handling. Parallel nodes also support retries — transient failures are retried with the same backoff policies as sequential nodes.
Warning
Parallel nodes bypass individual contract validation. Each child node returns a raw dict. The merged output is validated by the next sequential node's input contract — not by each child's declared output contract. If you rely on contracts for per-node correctness guarantees, this is a gap to be aware of.
Stateless runner
CostTracker, RetryBudget, and ExecutionTrace are created fresh per run() call. Calling runner.run() multiple times on the same instance does not accumulate state between runs.
Stability
Not every primitive is at the same maturity level. Use this as a guide for production planning.
| Status | Primitives |
|---|---|
| Stable | StromaRunner, contracts, failure classification, retry policies, execution tracing |
| Stable | Checkpointing (InMemoryStore, AsyncInMemoryStore, RedisStore) |
| Beta | LangGraph adapter, fluent builder API (.with_budget(), .with_hooks(), etc.), per-node timeouts |
| Experimental | Cost tracking / KNOWN_MODELS pricing, ModelHint / fallback routing |
Stable means the API won't change without a major version bump. Beta means the API is solid but may see minor adjustments. Experimental means useful today but subject to change.