Skip to content

Quickstart

Stroma's value is clearest when things go wrong. This page leads with failure — a pipeline that crashes, retries, gets caught at the contract boundary, and resumes from checkpoint — then shows what success looks like.

Install

uv add stroma

Define your schemas

from pydantic import BaseModel


class Document(BaseModel):
    text: str


class Summary(BaseModel):
    summary: str
    word_count: int

Build the pipeline

from stroma import StromaRunner

runner = StromaRunner.quick()


@runner.node("summarize", input=Document, output=Summary)
async def summarize(state: Document) -> dict:
    words = state.text.split()
    return {"summary": " ".join(words[:10]) + "...", "word_count": len(words)}


@runner.node("validate", input=Summary, output=Summary)
async def validate(state: Summary) -> dict:
    if state.word_count == 0:
        raise ValueError("Empty summary")
    return state.model_dump()

What happens when things go wrong

Bad output — caught at the boundary

When a node returns data that doesn't match its output schema, Stroma raises ContractViolation immediately at that boundary — classified as terminal, so there's no retry. The error surfaces at the node that caused it, not three steps downstream where it would otherwise corrupt state silently.

import asyncio


@runner.node("bad_summarize", input=Document, output=Summary)
async def bad_summarize(state: Document) -> dict:
    return {"wrong_field": "this will be caught"}  # missing summary and word_count


async def main():
    result = await runner.run([bad_summarize], Document(text="hello"))
    print(result.status)  # FAILED

    for event in result.trace.failures():
        print(f"{event.node_id}: {event.failure_message}")
    # summarize (output): summary: Field required; word_count: Field required


asyncio.run(main())

Transient failure — automatic retry

TimeoutError is classified as recoverable. Stroma retries with jittered backoff, up to the configured limit, without any code from you.

attempt = {"count": 0}


@runner.node("flaky_summarize", input=Document, output=Summary)
async def flaky_summarize(state: Document) -> dict:
    attempt["count"] += 1
    if attempt["count"] < 3:
        raise TimeoutError("upstream API timed out")
    words = state.text.split()
    return {"summary": " ".join(words[:10]) + "...", "word_count": len(words)}


async def main():
    result = await runner.run([flaky_summarize], Document(text="hello world from stroma"))
    print(result.status)       # COMPLETED
    print(attempt["count"])    # 3 — failed twice, succeeded on third attempt


asyncio.run(main())

Crash mid-pipeline — resume from checkpoint

This is the scenario that separates Stroma from a raw execution loop. After each node succeeds, its output is checkpointed. If a later node fails, you resume from where you left off — the completed nodes are skipped entirely, their outputs loaded from the store.

from stroma import (
    AsyncInMemoryStore,
    CheckpointManager,
    ContractRegistry,
    NodeContract,
    RunConfig,
    StromaRunner,
    stroma_node,
)


registry = ContractRegistry()
store = AsyncInMemoryStore()
manager = CheckpointManager(store)

c1 = NodeContract(node_id="summarize", input_schema=Document, output_schema=Summary)
c2 = NodeContract(node_id="validate", input_schema=Summary, output_schema=Summary)
registry.register(c1)
registry.register(c2)

summarize_run_count = {"n": 0}


@stroma_node("summarize", c1)
async def summarize_checkpointed(state: Document) -> dict:
    summarize_run_count["n"] += 1
    words = state.text.split()
    return {"summary": " ".join(words[:10]) + "...", "word_count": len(words)}


@stroma_node("validate", c2)
async def validate_failing(state: Summary) -> dict:
    raise RuntimeError("downstream service down")


@stroma_node("validate", c2)
async def validate_fixed(state: Summary) -> dict:
    return state.model_dump()


async def main():
    # First run — summarize succeeds and is checkpointed, validate crashes
    config1 = RunConfig(run_id="qs-run-1")
    runner1 = StromaRunner(registry, manager, config1)
    result1 = await runner1.run(
        [summarize_checkpointed, validate_failing],
        Document(text="Stroma adds reliability to async agent pipelines"),
    )
    print(result1.status)              # FAILED
    print(summarize_run_count["n"])    # 1

    # Resume — summarize is skipped, its checkpoint is loaded
    config2 = RunConfig(run_id="qs-run-1", resume_from="validate")
    runner2 = StromaRunner(registry, manager, config2)
    result2 = await runner2.run(
        [summarize_checkpointed, validate_fixed],
        Document(text="Stroma adds reliability to async agent pipelines"),
    )
    print(result2.status)              # RESUMED
    print(result2.final_state)         # summary='Stroma adds reliability to async...' word_count=7
    print(summarize_run_count["n"])    # still 1 — never ran again

    # Diff the two traces to see exactly what changed
    diffs = result1.trace.diff(result2.trace)
    for d in diffs:
        print(d)


asyncio.run(main())

When everything works

async def main():
    result = await runner.run(
        [summarize, validate],
        Document(text="Stroma adds reliability to async agent pipelines"),
    )
    print(result.status)       # COMPLETED
    print(result.final_state)  # summary='Stroma adds reliability to async...' word_count=7

asyncio.run(main())

Next steps

  • Tutorial — Step-by-step walkthrough of every feature.
  • Concepts — Architecture and design decisions.
  • Extending Stroma — Custom backends, classifiers, and OTel integration.