Skip to content

Checkpointing

CheckpointStore

Bases: Protocol

Protocol for checkpoint storage backends.

Implementations must provide save, load, and delete. Optionally implement save_typed and load_typed for schema-aware storage and retrieval.

save(run_id, node_id, state)

Persist the state for a given run/node pair.

load(run_id, node_id)

Load previously saved state, or return None if not found.

delete(run_id)

Delete all checkpoints for a run.

AsyncCheckpointStore

Bases: Protocol

Async protocol for checkpoint storage backends.

Mirrors CheckpointStore but with coroutine methods, suitable for non-blocking I/O backends like redis.asyncio.

InMemoryStore

In-memory checkpoint store for testing and short-lived pipelines.

State is serialized to JSON internally so that round-trip behavior matches persistent backends.

Example

store = InMemoryStore()
store.save("run-1", "node-1", MyModel(value=42))
loaded = store.load("run-1", "node-1")

save(run_id, node_id, state)

Save state as JSON, recording the schema type for later deserialization.

load(run_id, node_id)

Load state using the schema that was recorded at save time.

delete(run_id)

Delete all checkpoints for a run.

save_typed(run_id, node_id, state, schema)

Save state coerced to a specific schema type.

load_typed(run_id, node_id, schema)

Load state, deserializing with the provided schema regardless of what was saved.

AsyncInMemoryStore

Async in-memory checkpoint store for testing and short-lived pipelines.

Wraps InMemoryStore with an asyncio.Lock so that all operations are coroutine-safe while keeping serialization behavior identical to the sync variant.

Example

store = AsyncInMemoryStore()
await store.save("run-1", "node-1", MyModel(value=42))
loaded = await store.load("run-1", "node-1")

RedisStore

Async Redis-backed checkpoint store for durable, distributed pipelines.

Requires the redis extra: uv add stroma[redis]. Uses redis.asyncio under the hood so all operations are non-blocking.

Pass a redis_url (e.g. redis://localhost:6379) and an optional ttl_seconds (defaults to 3600) to control key expiration.

Example

store = RedisStore("redis://localhost:6379", ttl_seconds=7200)
manager = CheckpointManager(store)

save(run_id, node_id, state) async

Persist state to Redis with TTL.

load(run_id, node_id) async

Load state from Redis, resolving the schema dynamically.

delete(run_id) async

Delete all checkpoints for a run using async SCAN.

save_typed(run_id, node_id, state, schema) async

Persist state to Redis using an explicit schema reference.

load_typed(run_id, node_id, schema) async

Load state from Redis, deserializing with the provided schema.

SyncRedisStore

Synchronous Redis-backed checkpoint store for durable, distributed pipelines.

Requires the redis extra: uv add stroma[redis].

Pass a redis_url (e.g. redis://localhost:6379) and an optional ttl_seconds (defaults to 3600) to control key expiration.

Example

store = SyncRedisStore("redis://localhost:6379", ttl_seconds=7200)
manager = CheckpointManager(store)

save(run_id, node_id, state)

Persist state to Redis with TTL.

load(run_id, node_id)

Load state from Redis, resolving the schema dynamically.

delete(run_id)

Delete all checkpoints for a run using SCAN + pipeline for efficiency.

save_typed(run_id, node_id, state, schema)

Persist state to Redis using an explicit schema reference.

load_typed(run_id, node_id, schema)

Load state from Redis, deserializing with the provided schema.

CheckpointManager

High-level checkpoint operations: save, resume, and clear.

Wraps a CheckpointStore or AsyncCheckpointStore and handles schema-aware loading when the underlying store supports it. All public methods are async; sync stores are called directly (without await) when the wrapped store is not async.

Example

manager = CheckpointManager(AsyncInMemoryStore())
await manager.checkpoint("run-1", "node-1", output_state)
resumed = await manager.resume("run-1", "node-1", OutputSchema)

checkpoint(run_id, node_id, state) async

Save a checkpoint for the given run/node pair.

resume(run_id, node_id, schema) async

Load a checkpoint, coercing to schema if the store supports typed loading.

Returns None if no checkpoint exists. Raises TypeError if the loaded state does not match schema and the store does not support typed loading.

clear(run_id) async

Delete all checkpoints for a run.