pregel loop
1. PregelLoop
|
|
1.1 PregelLoop 语义
PregelLoop
是 LangGraph 中实现 Pregel 模式的核心类之一,承担了图执行调度器的角色。它负责以超步(superstep)方式逐步调度图中的节点(PregelNode
),并在每个 tick 中处理任务的生成、执行、中断判断、检查点记录、流写入以及缓存等一系列关键操作。PregelLoop 应该是 LangGraph 中最复杂的类,也是最核心的类。
角色 | 说明 |
---|---|
图调度器 | 管理 Pregel 模式下的图执行生命周期(构建任务 → 执行任务 → 收集结果 → 写入 checkpoint)。 |
状态管理者 | 保存当前步骤、节点状态、通道值、缓存等,用于恢复或继续执行。 |
检查点协作者 | 协调持久化逻辑(通过 checkpointer )将中间状态保存至外部系统(如 DB、文件系统)。 |
中断机制协调者 | 提供 interrupt_before 与 interrupt_after 中断控制机制。 |
输出协调者 | 控制哪些任务的输出将通过 stream 发出,以及是否触发结果值输出。 |
1.3 PregelLoop 属性
属性名 | 类型 | 说明 |
---|---|---|
config |
RunnableConfig |
当前执行配置(包含命名空间、恢复标记等元数据)。 |
store |
BaseStore | None |
存储层(用于读取通道值等)。 |
stream |
StreamProtocol | None |
流系统(用于发送中间状态,如 task、values)。 |
step , stop |
int |
当前执行步数、最多步数。 |
input |
Any | None |
初始输入值。 |
cache |
BaseCache[WritesT] | None |
缓存系统。 |
checkpointer |
BaseCheckpointSaver | None |
检查点保存器。 |
nodes |
Mapping[str, PregelNode] |
图中所有节点定义。 |
specs |
Mapping[str, BaseChannel | ManagedValueSpec] |
通道定义集合。 |
input_keys , output_keys , stream_keys |
str | list[str] |
指定输入、输出、流通道 key。 |
skip_done_tasks |
bool |
是否跳过已完成任务(用于恢复场景)。 |
is_nested |
bool |
是否为子图执行(由 config 中是否含 task_id 判定)。 |
manager |
AsyncParentRunManager | ParentRunManager | None |
执行管理器。 |
interrupt_before , interrupt_after |
All | Sequence[str] |
中断点配置。 |
durability |
Durability |
是否启用持久化机制。 |
retry_policy |
Sequence[RetryPolicy] |
重试策略。 |
cache_policy |
CachePolicy | None |
缓存策略。 |
checkpointer_get_next_version |
GetNextVersion |
获取通道版本号的函数。 |
checkpointer_put_writes |
Callable | None |
写入 checkpoint 的方法。 |
checkpointer_put_writes_accepts_task_path |
bool |
是否支持传入 task.path 参数。 |
_checkpointer_put_after_previous |
Callable | None |
checkpoint 之后的异步提交钩子。 |
_migrate_checkpoint |
Callable | None |
检查点迁移逻辑(通常用于版本迁移)。 |
submit |
Submit |
异步提交方法(通常是线程池或事件循环封装)。 |
channels |
Mapping[str, BaseChannel] |
所有运行中的通道状态。 |
managed |
ManagedValueMapping |
管理的通道值(含版本)。 |
checkpoint |
Checkpoint |
当前执行中的检查点状态。 |
checkpoint_id_saved |
str |
最近保存的 checkpoint ID。 |
checkpoint_ns |
tuple[str, ...] |
当前命名空间(嵌套图结构用)。 |
checkpoint_config |
RunnableConfig |
用于 checkpoint 的配置快照。 |
checkpoint_metadata |
CheckpointMetadata |
checkpoint 的元信息。 |
checkpoint_pending_writes |
list[PendingWrite] |
待保存的写入项。 |
checkpoint_previous_versions |
dict[str, str | float | int] |
上一个 checkpoint 的通道版本信息。 |
prev_checkpoint_config |
RunnableConfig | None |
上一次的 config 快照(用于调试对比)。 |
status |
Literal[...] |
当前状态标识:input、pending、done、interrupt、等。 |
tasks |
dict[str, PregelExecutableTask] |
当前调度周期生成的所有任务。 |
output |
dict[str, Any] | None |
最终输出。 |
updated_channels |
set[str] | None |
最新被写入的通道集合。 |
trigger_to_nodes |
Mapping[str, Sequence[str]] |
通道变更 -> 触发节点映射关系。 |
下面是 PregelLoop 初始化的代码,代码比较长:
|
|
很大一部分是从 RunnableConfig 中提取相关配置。这里对相关的配置项做一个整理:
|
|
StreamProtocol
StreamProtocol 是一个带有“输出模式”的流处理协议接口,它包装了一个可调用函数,用于处理流数据(如日志、模型输出、状态更新等)。
|
|
1.4 PregelLoop 方法
PregelLoop 类的方法都比较复杂,接下来我们一一讲解。
方法名 | 作用 | 是否私有 | 关键点说明 |
---|---|---|---|
__init__ |
初始化所有运行环境、检查点、通道等 | ❌ | 初始化复杂,含 scratchpad、resume、namespace 重写等逻辑 |
tick |
执行一次迭代:构建任务、判断中断、执行缓存 | ❌ | Pregel 超步逻辑的关键入口 |
after_tick |
完成迭代后的尾处理:写入、emit、更新状态 | ❌ | 会在每次 tick 成功后调用 |
_first |
初始化输入阶段的 apply 逻辑 | ✅ | 在图起点或恢复时触发 |
_put_checkpoint |
执行 checkpoint 写入 | ✅ | 控制 step 增长、metadata 更新 |
_put_pending_writes |
执行所有缓存写入提交 | ✅ | 提交到 checkpointer |
put_writes |
写入一个 task 的 writes 数据 | ❌ | 包含 checkpoint 保存逻辑 |
_match_writes |
将写入匹配到已有任务 | ✅ | 用于 skip_done_task 时将缓存应用到 task |
accept_push |
接收 PUSH 操作并构建新的任务 | ❌ | 从某个 task 中激发出一个新的 task(如 subgraph) |
match_cached_writes / amatch_cached_writes |
匹配缓存中的 writes(未实现) | ❌ | 接口预留 |
_suppress_interrupt |
处理 GraphInterrupt 时的行为控制 | ✅ | 主要用于嵌套图控制 |
output_writes |
发出任务写入的 stream 事件 | ❌ | 控制是否输出、是否过滤 hidden |
_emit |
执行 stream 写入 | ✅ | 支持 debug 模式映射 |
2. SyncPregelLoop ContextManager 实现
PregelLoop 一部分参数是在上下文管理器中实现的。上下文管理器实现在 PregelLoop 子类上,这里我们以 SyncPregelLoop 为例。
2.1 SyncPregelLoop 初始化
初始化参数完成 checkpointer 相关参数的初始化,并设置 self.stack
|
|
2.2 ExitStack
ExitStack
是 contextlib 提供的 可组合的上下文管理器堆栈,我们通过一个示例来了解他的用法:
|
|
这在“资源数量不确定”时非常有用,例如:
|
|
2.3 ContextManager
讲解完 SyncPregelLoop 初始化,我们来讲解 SyncPregelLoop 的上下文管理器的实现。
__enter__
主要完成以下几个任务:
- 加载 checkpoint,load 其参数
- 初始化 self.submit 参数
- channels_from_checkpoint 从checkpoint 中恢复 channel 的值
|
|
self.submit
|
|
这段代码会调用 BackgroundExecutor 的 __enter__
方法,BackgroundExecutor.__enter__
返回的是其 submit 方法。
channels_from_checkpoint
channels_from_checkpoint 用于从 checkpoint 中恢复 channel 的值。
|
|
至此我们基本上讲解完了 PregelLoop 的初始化,接下来我们来讲解 PregelLoop 的方法。前面我们讲解 Pregel 代码时已经分析了 stream 方法的基本流程。我们将按照下面的顺序介绍 PregelLoop 中的方法:
- tick
- match_cached_writes
- output_writes
- after_tick
- _put_checkpoint_fut
3. tick
|
|
tick 方法中,会调用 prepare_next_tasks 方法生成当前 step 的 task,prepare_next_tasks 中的入参来源如下:
- self.checkpoint:
Loop.__enter__
方法中,会使用 configurable 中保存的 CONFIG_KEY_CHECKPOINT_ID 重新load checkpoint。- 如果没有 checkpoint,使用 empty_checkpoint() 生成一个空的 checkpoint
- self.checkpoint_pending_writes:
- 从恢复的 checkpoint 读取,默认为空
- self.nodes: Loop 初始化传入,
Mapping[str, PregelNode]
- self.channels:
- Loop 初始化传入
spec=Mapping[str, BaseChannel | ManagedValueSpec]
Loop.__enter__
方法中,会调用 channels_from_checkpoint 方法从 checkpoint 中恢复 channel 的值,并提取出 ManagedValueSpec
- Loop 初始化传入
- self.managed 同 self.channels
- self.config: Loop 初始化传入,并在调用过程中更新
- self.step: Loop 初始化为 0,
Loop.__enter__
中重置为self.checkpoint_metadata["step"] + 1
- self.stop: Loop 初始化为 0,
Loop.__enter__
中重置为self.step + self.config["recursion_limit"] + 1
- self.manager: Loop 初始化传入
- self.store: Loop 初始化传入
- self.checkpointer: Loop 初始化传入
- self.trigger_to_nodes: Loop 初始化传入
Mapping[str, Sequence[str]]
channel 更新要触发的节点映射 - self.updated_channels
Loop.__enter__
方法中,会调用self._first(input_keys=self.input_keys)
对 updated_channels 做初始化after_tick
方法中,会调用self.updated_channels = apply_writes)
完成 channel 的更新,并返回更新过的 channel
- self.retry_policy: Loop 初始化传入
- self.cache_policy: Loop 初始化传入
现在我们来看与 updated_channels 相关的 loop._first
方法,
4. _first 方法
_first 是 Pregel 图首次调度(或恢复)的核心入口。
4.1 _first 方法签名说明
|
|
input_keys
: 当前节点接收的输入通道名(channel key),用于解析输入。- 返回值是
set[str]
或None
,表示有哪些通道被更新(用于后续调度)。
4.2 _first 执行逻辑
按照执行逻辑,函数大致分为 6 步:
判断是否处于恢复状态(is_resuming)
➡️ 作用:决定是否要跳过输入写入,直接恢复。
|
|
恢复状态的判断条件为:
-
检查点存在通道版本(说明确实有中断)
-
并且配置中
resuming
为 True 或逻辑判断为恢复状态:- 输入为 None(说明可能是主图首次运行或重启)
- 输入是
Command
(说明可能是子图恢复) - 或者在非嵌套图中,
run_id
一致(说明是上次运行)
处理输入为 Command
的情况
➡️ 作用:将 command 中显式指定的写入应用到状态中。
|
|
self.put_writes
我们后面详述。
应用 null writes
➡️ 作用:恢复一些全局层面(非 task 级别)的写入。
NULL_TASK_ID
表示“当前不存在任务(Task)”或“这是一个无任务 ID 的占位符”。
-
在图的执行过程中,LangGraph 使用
task_id
来标识每一个任务节点(node)或 loop 中的迭代任务。 -
但是有些情况下,例如:
- 初始化阶段(未开始执行时)
- 跳过某些节点或任务
- 中断或终止
- 空的分支
都可能需要用一个合法但“无含义”的 ID 来表示当前没有实际任务,此时就会用 NULL_TASK_ID
。
|
|
如果是恢复运行,直接跳到恢复流程
➡️ 作用:跳过输入处理,直接进入恢复后的调度。
|
|
self._emit 方法我们后面再详述。
否则处理正常输入写入流程
➡️ 作用:标准运行流程,处理新一轮输入。
|
|
这里比较难理解的部分是 discard_tasks。处理新一轮输入,不一定是这条图有史以来第一次运行:
- 可能上一次运行一半被 kill、失败或中断;
- 系统重启或调度器崩溃了;
- Checkpoint 已保存,但部分任务写入未应用;
传给 apply_writes 有两个部分
- discard_tasks: 其
writes=dequeue
,discard_tasks 还没有被执行,所以 writes 为空 - PregelTaskWrites: 其
writes=deque(map_input(input_keys, self.input))
这样经过 apply_writes,未被处理的 channel 就会直接被 update 为空:
|
|
channels[TASKS]
是如何恢复的
这里还有个问题需要注意,prepare_next_tasks 在生成 PUSH 任务时,会先做如下判断:
|
|
channels[TASKS]
的值是在 Pregel 初始化时添加的,此时
|
|
在 Loop __enter__
方法中,会调用 channels_from_checkpoint,尝试从 checkpoint 恢复 channel,如果 checkpoint 中存在 channels[TASKS]
就会恢复,此时 channels[TASKS]
就有值。
最后 _put_checkpoint
方法,我们后面再详述。
更新配置、状态
|
|
- 将
resuming
信息注入配置 - 设置当前节点状态为
"pending"
,准备调度
✅ 返回
返回的是 apply_writes(...)
中得到的 被写入的通道名集合(set[str]
),用于触发下游节点。
🧠 总结
_first
是 Pregel 图首次调度(或恢复)的核心入口:
- 如果是
Command
,执行恢复流程 - 如果是中断恢复,跳过输入处理
- 如果是正常输入,解析写入并更新通道
- 若都不是,抛出异常
- 最终返回写入了哪些通道,供调度器判断哪些节点应被触发
5. after_tick
与 tick 方法对应的还有一个 after_tick 方法,代码不长我们直接来看代码:
|
|