pregel checkpoint
目录
langgraph 中的 Checkpointer 用于在流程中持久化执行进度,使得可以恢复中断或支持长时间运行。
1. Checkpointer
Langgraph 中定义了三个类用于实现对 Checkpointer 的定义: 2. Checkpoint
- CheckpointTuple
- CheckpointMetadata
序号 | 名称 | 角色 | 说明 |
---|---|---|---|
1️⃣ | CheckpointTuple |
📦 顶层容器 | 封装了 checkpoint 本体、其元数据、相关配置等 |
2️⃣ | Checkpoint |
🧠 核心数据(状态快照) | 图执行中保存的值、状态、调度上下文等 |
3️⃣ | CheckpointMetadata |
🏷️ 元数据标签 | checkpoint 的非功能信息(时间戳、ID、tags 等) |
1.1 Checkpoint
|
|
属性名 | 含义 |
---|---|
v |
检查点格式的版本号,目前为 1 。用于向前兼容。 |
id |
检查点的唯一标识符,单调递增(可用于排序)。 |
ts |
检查点的时间戳,ISO 8601 格式字符串,例如 "2025-08-04T10:32:00.000Z" 。 |
channel_values |
当前检查点时所有 channel 的值,键为 channel 名,值为反序列化后的数据。 |
channel_versions |
每个 channel 当前的版本号,键为 channel 名,值为字符串、整数或浮点数等单调递增值。 |
versions_seen |
用于追踪每个 node 最近“看见”的 channel 版本。键为 node 名,值为该 node 最近读到的 {channel: version} 映射。可用于判断哪些节点需要重新执行。 |
下面是一个示例:
|
|
1.2 CheckpointMetadata
|
|
属性名 | 说明 |
---|---|
source |
检查点的来源类型。可选值为: • "input" :由 .invoke() / .stream() / .batch() 触发时创建的初始检查点。• "loop" :由 Pregel 主循环中自动创建的检查点。• "update" :由手动更新状态触发的检查点。• "fork" :复制自其他检查点。 |
step |
当前检查点所在的步骤编号。 • -1 表示最初的 "input" 检查点。• 0 表示 "loop" 开始的第一步。• 正整数表示之后的每一步。 |
parents |
记录当前检查点的直接“父”检查点 ID。 键为检查点命名空间(namespace),值为父检查点的 ID。用于构建检查点的“血缘关系图”或分支管理。 |
下面是一个值示例:
|
|
1.3 CheckpointTuple
|
|
属性名 | 说明 |
---|---|
config |
当前执行上下文的配置(RunnableConfig 类型),包含调用参数、Tracing、Callbacks 等信息。 |
checkpoint |
当前检查点的核心内容(类型为 Checkpoint ),记录了状态快照、channel 值和版本等。 |
metadata |
与该检查点相关的元信息(CheckpointMetadata ),例如来源、step 序号、父检查点等。 |
parent_config |
(可选)父检查点的配置。如果当前检查点是从另一个分支或复制而来的,记录其原始配置。 |
pending_writes |
(可选)尚未执行但计划写入的 PendingWrite 列表,通常用于延迟写入或静态分析。 |
2. BaseCheckpointSaver
Checkpointer 如何保存由 BaseCheckpointSaver 抽象基类约定。
2.1 BaseCheckpointSaver 属性
BaseCheckpointSaver 属性只有一个序列化器。
|
|
2.2 BaseCheckpointSaver 方法
BaseCheckpointSaver 定义了如下方法,我们意义来看具体实现。
方法名 | 作用 | 返回值类型 | |
---|---|---|---|
config_specs |
定义检查点存储器支持的配置字段(用于参数展示或 UI) | list |
|
get(config) |
获取指定配置的检查点(简化形式,仅取 checkpoint 部分) |
`Checkpoint | None` |
get_tuple(config) |
获取指定配置的检查点元组(包含 checkpoint、metadata、version) | `CheckpointTuple | None` |
list(config, ...) |
列出符合条件的检查点(可加过滤器、上限) | Iterator[CheckpointTuple] |
|
put(config, checkpoint, metadata, new_versions) |
存储一个检查点及其元数据 | RunnableConfig |
|
put_writes(config, writes, task_id, task_path) |
存储与任务关联的中间产出(write events) | None |
|
delete_thread(thread_id) |
删除与特定 thread_id 相关的所有 checkpoint 和 write 数据 | None |
|
aget(config) |
异步获取检查点(简化) | `Awaitable[Checkpoint | None]` |
aget_tuple(config) |
异步获取检查点元组 | `Awaitable[CheckpointTuple | None]` |
alist(config, ...) |
异步列出检查点列表 | AsyncIterator[CheckpointTuple] |
|
aput(config, checkpoint, metadata, new_versions) |
异步存储检查点及其版本信息 | Awaitable[RunnableConfig] |
|
aput_writes(config, writes, task_id, task_path) |
异步存储中间写入产物 | Awaitable[None] |
|
adelete_thread(thread_id) |
异步删除与某 thread 相关的所有 checkpoint 和 write | Awaitable[None] |
|
get_next_version(current, channel) |
生成下一版本号(默认:数字自增) | V (泛型,可为 int/float/str ) |
3. InMemorySaver
我们来看 Langgraph 提供的 BaseCheckpointSaver 的一个具体实现:InMemorySaver。位于 langgraph\checkpoint\memory\__init__.py
下面是 InMemorySaver 的使用示例:
|
|
3.1 InMemorySaver 属性
|
|
3.2 InMemorySaver 方法
put 相关方法
|
|
get 相关方法
|
|