Checkpointing
CheckpointStore
Bases: Protocol
Protocol for checkpoint storage backends.
Implementations must provide save, load, and delete.
Optionally implement save_typed and load_typed for schema-aware
storage and retrieval.
save(run_id, node_id, state)
Persist the state for a given run/node pair.
load(run_id, node_id)
Load previously saved state, or return None if not found.
delete(run_id)
Delete all checkpoints for a run.
AsyncCheckpointStore
Bases: Protocol
Async protocol for checkpoint storage backends.
Mirrors CheckpointStore but with coroutine methods, suitable for
non-blocking I/O backends like redis.asyncio.
InMemoryStore
In-memory checkpoint store for testing and short-lived pipelines.
State is serialized to JSON internally so that round-trip behavior matches persistent backends.
Example
store = InMemoryStore()
store.save("run-1", "node-1", MyModel(value=42))
loaded = store.load("run-1", "node-1")
save(run_id, node_id, state)
Save state as JSON, recording the schema type for later deserialization.
load(run_id, node_id)
Load state using the schema that was recorded at save time.
delete(run_id)
Delete all checkpoints for a run.
save_typed(run_id, node_id, state, schema)
Save state coerced to a specific schema type.
load_typed(run_id, node_id, schema)
Load state, deserializing with the provided schema regardless of what was saved.
AsyncInMemoryStore
Async in-memory checkpoint store for testing and short-lived pipelines.
Wraps InMemoryStore with an asyncio.Lock so that all operations are
coroutine-safe while keeping serialization behavior identical to the
sync variant.
Example
store = AsyncInMemoryStore()
await store.save("run-1", "node-1", MyModel(value=42))
loaded = await store.load("run-1", "node-1")
RedisStore
Async Redis-backed checkpoint store for durable, distributed pipelines.
Requires the redis extra: uv add stroma[redis]. Uses
redis.asyncio under the hood so all operations are non-blocking.
Pass a redis_url (e.g. redis://localhost:6379) and an optional
ttl_seconds (defaults to 3600) to control key expiration.
Example
store = RedisStore("redis://localhost:6379", ttl_seconds=7200)
manager = CheckpointManager(store)
save(run_id, node_id, state)
async
Persist state to Redis with TTL.
load(run_id, node_id)
async
Load state from Redis, resolving the schema dynamically.
delete(run_id)
async
Delete all checkpoints for a run using async SCAN.
save_typed(run_id, node_id, state, schema)
async
Persist state to Redis using an explicit schema reference.
load_typed(run_id, node_id, schema)
async
Load state from Redis, deserializing with the provided schema.
SyncRedisStore
Synchronous Redis-backed checkpoint store for durable, distributed pipelines.
Requires the redis extra: uv add stroma[redis].
Pass a redis_url (e.g. redis://localhost:6379) and an optional
ttl_seconds (defaults to 3600) to control key expiration.
Example
store = SyncRedisStore("redis://localhost:6379", ttl_seconds=7200)
manager = CheckpointManager(store)
save(run_id, node_id, state)
Persist state to Redis with TTL.
load(run_id, node_id)
Load state from Redis, resolving the schema dynamically.
delete(run_id)
Delete all checkpoints for a run using SCAN + pipeline for efficiency.
save_typed(run_id, node_id, state, schema)
Persist state to Redis using an explicit schema reference.
load_typed(run_id, node_id, schema)
Load state from Redis, deserializing with the provided schema.
CheckpointManager
High-level checkpoint operations: save, resume, and clear.
Wraps a CheckpointStore or AsyncCheckpointStore and handles
schema-aware loading when the underlying store supports it. All
public methods are async; sync stores are called directly (without
await) when the wrapped store is not async.
Example
manager = CheckpointManager(AsyncInMemoryStore())
await manager.checkpoint("run-1", "node-1", output_state)
resumed = await manager.resume("run-1", "node-1", OutputSchema)
checkpoint(run_id, node_id, state)
async
Save a checkpoint for the given run/node pair.
resume(run_id, node_id, schema)
async
Load a checkpoint, coercing to schema if the store supports typed loading.
Returns None if no checkpoint exists. Raises TypeError if the
loaded state does not match schema and the store does not support
typed loading.
clear(run_id)
async
Delete all checkpoints for a run.