Shared Context
Nodes often need access to shared resources — HTTP clients, database connections, API keys, caches, or configuration. Without a sharing mechanism, you'd resort to module-level globals, which make testing painful and concurrency unsafe.
Stroma provides a context dict on RunConfig that is passed to any node that accepts a second parameter.
Passing context to nodes
import asyncio
import httpx
from pydantic import BaseModel
from stroma import StromaRunner
class Url(BaseModel):
url: str
class Page(BaseModel):
body: str
runner = StromaRunner.quick()
@runner.node("fetch", input=Url, output=Page)
async def fetch(state: Url, ctx: dict) -> dict: # (1)!
client = ctx["http"]
resp = await client.get(state.url)
return {"body": resp.text}
async def main():
async with httpx.AsyncClient() as client:
runner.with_context({"http": client}) # (2)!
result = await runner.run([fetch], Url(url="https://example.com"))
print(len(result.final_state.body))
asyncio.run(main())
- Add a second parameter (any name) to receive the context dict. Stroma detects this automatically via
inspect.signature. - Set the context with
.with_context()before callingrun(). The same dict is passed by reference to every node.
Context detection
Stroma checks the number of parameters on each node function:
| Parameters | Behavior |
|---|---|
1 (state) |
No context passed — backwards compatible |
2+ (state, ctx, ...) |
Context dict passed as the second argument |
Existing single-parameter nodes continue to work even when context is set on the config — they just don't receive it.
Mutations persist across nodes
The context dict is passed by reference. Mutations in one node are visible in subsequent nodes:
@runner.node("step1", input=Input, output=Intermediate)
async def step1(state: Input, ctx: dict) -> dict:
ctx["step1_complete"] = True # (1)!
return {"value": state.value + 1}
@runner.node("step2", input=Intermediate, output=Output)
async def step2(state: Intermediate, ctx: dict) -> dict:
assert ctx["step1_complete"] is True # (2)!
return {"result": state.value * 2}
- Node writes to the context dict.
- The next node sees the mutation.
This is useful for accumulating metadata, passing intermediate state that doesn't fit the pipeline's schema, or flagging conditions for downstream nodes.
Common patterns
Injecting an API client
config = RunConfig(context={"openai": openai_client, "model": "gpt-4o"})
Feature flags
config = RunConfig(context={"enable_summarization": True, "max_tokens": 500})
Accumulating metrics
config = RunConfig(context={"node_timings": {}})
@runner.node("slow_step", input=In, output=Out)
async def slow_step(state: In, ctx: dict) -> dict:
start = time.monotonic()
result = await expensive_call(state)
ctx["node_timings"]["slow_step"] = time.monotonic() - start
return result
Recap
RunConfig.contextis a plaindict[str, Any]passed to nodes by reference- Nodes with two or more parameters receive context automatically
- Nodes with one parameter are unaffected — full backwards compatibility
- Mutations in one node are visible in subsequent nodes
- Use context for HTTP clients, config, feature flags, or accumulated metadata
Next: Parallel Execution — run independent nodes concurrently.