pregel algo - 2
前面我们介绍了 _algo.py
中关联的对象,这一节我们来介绍 _algo.py
的这几个核心函数:
prepare_single_task
prepare_next_tasks
local_read
apply_writes
1. prepare_next_tasks
1.1 prepare_next_tasks 入参
prepare_next_tasks 函数的定义如下:
|
|
下面是入参说明列表:
参数名 | 类型 | 说明 |
---|---|---|
checkpoint |
Checkpoint |
当前步骤的系统快照,包含图状态、消息、管理值等 |
pending_writes |
list[PendingWrite] |
等待写入状态的数据,用于节点状态更新、触发 channel 等 |
processes |
Mapping[str, PregelNode] |
当前图中所有节点的映射,每个节点一个 PregelNode 实例 |
channels |
Mapping[str, BaseChannel] |
所有通道(channel)的映射,例如边、消息队列、流数据 |
managed |
ManagedValueMapping |
管理变量的定义(类似于全局状态变量),如 counters、RAG 存储等 |
config |
RunnableConfig |
运行时配置,如 stream 模式、 tracing、stream_writer 等 |
step |
int |
当前执行步数,表示 pregel loop 的当前 tick |
stop |
int |
最多允许的步数(pregel loop 的终止条件之一) |
for_execution |
bool |
是否为真正执行任务(True )还是仅用于 dry-run、准备 |
store |
BaseStore | None |
可选的状态存储系统,用于 task 执行时读取全局数据 |
checkpointer |
BaseCheckpointSaver | None |
检查点保存器,用于在任务中触发 checkpoint |
manager |
ParentRunManager | AsyncParentRunManager | None |
LangChain 的 tracing 上下文管理器,用于日志与可视化 |
trigger_to_nodes |
Mapping[str, Sequence[str]] | None |
可选:哪些通道触发哪些节点,用于优化计算路径 |
updated_channels |
set[str] | None |
可选:上一步被更新的通道集合,与 trigger_to_nodes 配合用于优化触发范围 |
retry_policy |
Sequence[RetryPolicy] |
可选:任务失败时的重试策略配置 |
cache_policy |
CachePolicy | None |
可选:是否启用缓存,比如对某些节点结果复用缓存结果 |
1.2 prepare_next_tasks 执行逻辑
prepare_next_tasks 中把任务分为了两种类型:
- Push 表示边(channel)的行为:把数据推入边中
- Pull 表示节点(node)的行为:处理输入并产出输出
任务类型 | task_path 形式 | 含义 |
---|---|---|
PUSH 任务 | (PUSH, index) |
index 是 TASKS channel 中某个 Send 的索引 |
PULL 任务 | (PULL, node_id) |
node_id 是被触发的节点名称 |
下面是整个函数执行的概览:
|
|
|
|
2. prepare_single_task
构造 task 主要在 prepare_single_task 内。
2.1 prepare_single_task 入参
|
|
下面是 prepare_single_task
函数的参数列表,按照功能归类
任务标识相关参数
参数名 | 类型 | 说明 |
---|---|---|
task_path |
tuple[Any, ...] |
当前任务路径(决定任务类型,如 PUSH / PULL) |
task_id_checksum |
str | None |
任务 ID 的校验用 checksum,用于构造唯一 ID |
step |
int |
当前执行步数 |
stop |
int |
最大允许执行步数 |
图状态 & Checkpoint 相关
参数名 | 类型 | 说明 |
---|---|---|
checkpoint |
Checkpoint |
当前图的检查点,包含状态和快照信息 |
checkpoint_id_bytes |
bytes |
当前检查点 ID 的二进制形式(用于唯一标识) |
checkpoint_null_version |
V | None |
当前检查点的初始版本,通常用于判定是否为新状态 |
节点、通道、输入输出相关
参数名 | 类型 | 说明 |
---|---|---|
processes |
Mapping[str, PregelNode] |
图中所有节点定义(name -> PregelNode) |
channels |
Mapping[str, BaseChannel] |
通道名称与通道实例映射 |
managed |
ManagedValueMapping |
由调度器托管的中间值/变量映射 |
pending_writes |
list[PendingWrite] |
上一步产生的、尚未提交的写入数据 |
运行配置 & 控制器
参数名 | 类型 | 说明 |
---|---|---|
config |
RunnableConfig |
当前任务的执行配置(可传递 tracing / tags / handlers 等) |
for_execution |
bool |
是否立即执行任务(返回 ExecutableTask),否则仅调度(Task) |
manager |
ParentRunManager | AsyncParentRunManager | None |
上层运行管理器(用于 tracing / callbacks) |
缓存、存储、持久化策略
参数名 | 类型 | 说明 |
---|---|---|
store |
BaseStore | None |
可选的数据存储器,用于通道状态保存 |
checkpointer |
BaseCheckpointSaver | None |
持久化 checkpoint 的组件 |
input_cache |
dict[INPUT_CACHE_KEY_TYPE, Any] | None |
输入缓存,用于避免重复执行 |
cache_policy |
CachePolicy | None |
缓存策略定义 |
retry_policy |
Sequence[RetryPolicy] |
失败任务的重试策略 |
2.2 prepare_single_task 处理流程
prepare_single_task 有三个生成 task 的流程,分别对应代码中的三个 if。
|
|
PUSH + Call
这个分支在 prepare_next_tasks 内没有调用。
|
|
PUSH 任务
PUSH 是直接触发 Node 执行,并向其出入值。
|
|
PULL 任务
PULL 是让 Node 发起检查,是否需要执行,从 Node 监听的 channel 获取值。
|
|
PregelExecutableTask 创建
创建 PregelExecutableTask 的三个分支代码都比较长,但是代码结构完全一样。PregelExecutableTask 创建用到的参数如下:
|
|
PregelExecutableTask 初始化是用到了很多 PregelNode 的属性。可以看到
proc_node=Pregel.node
Pregel.node 是属性方法,最终会返回 RunnableSeq(self.bound, *writers)
- self.bound 是传入的节点执行器
writers=self.flat_writers
- self.flat_writers 是对初始化传入
writers=[ChannelWrite(self._writes)]
里的 ChannelWrite 进行了合并 - 所以最终
proc_node=RunnableSeq(self.bound, ChannelWrite(self._writes))
self._writes 是 List[ChannelWriteEntry]
包装了要写入的 channelRunnableSeq.invoke
会分别调用self.bound
和ChannelWrite
的invoke
方法- ChannelWrite.invoke 的调用过程前面的章节我们分析过,其写入最终调用的是从 RunnableConfig 中配置的写入函数:
write: TYPE_SEND = config[CONF][CONFIG_KEY_SEND]
config[CONF][CONFIG_KEY_SEND]
正是 PregelExecutableTask 初始化时配置的writes.extend
。- 所以 PregelExecutableTask 执行的最终结果会把节点的执行结果写入到 PregelExecutableTask.writes 的 dequeue 中。
|
|
3. local_read
这个函数 local_read 是 LangGraph 中用于从当前节点的 局部上下文状态 中读取数据的工具函数之一。它的作用是为**条件边(conditional edges)**提供一个“视图”——这个视图能看到当前节点写入的值(task.writes),但不会影响全局状态(即不会真正写入通道和托管状态)。
下面是 local_read 的代码:
|
|
参数名 | 含义 |
---|---|
scratchpad |
当前 Pregel 节点执行上下文状态缓存(传给托管值) |
channels |
当前所有普通通道(channel)的只读视图,key 是通道名 |
managed |
当前所有托管变量(ManagedValue),key 是托管名 |
task |
当前节点的写操作对象,里面包含 .writes 字段:写入哪些变量以及它们的值 |
select |
想要读取的变量名(可以是字符串或字符串列表) |
fresh |
如果为 True,则需要将当前节点写入的值“临时应用”后再读取 |
task 传入的是 PregelTaskWrites(task_path, name, writes, triggers)
,writes = deque()
。
|
|
4. apply_write
函数 apply_writes
根据一组任务的写入操作:
- 将数据写入到通道(channels);
- 更新版本信息(checkpoint);
- 判断哪些通道被修改;
- 返回被更新的通道集合,以供调度器用来触发后续节点。
4.1 函数签名
|
|
参数名 | 类型 | 含义 |
---|---|---|
checkpoint |
dict |
保存每个通道的版本号、任务见过的版本等,属于全局运行状态的一部分 |
channels |
Mapping[str, BaseChannel] |
所有当前通道对象(数据传递容器) |
tasks |
Iterable[WritesProtocol] |
当前要应用的任务集合(通常是一个 Pregel 步骤中活跃节点) |
get_next_version |
可选的版本生成函数 | 用于在写入后分配新版本号 |
trigger_to_nodes |
映射 | 记录每个通道更新后可能触发的节点(用于图调度) |
4.2 代码逻辑
apply_writes 正常流程下,有两类 channel 会被打上新的版本号:
- task.triggers 中的 channel;及触发当前task 的channel
- updated_channels,即被更新的 channel
|
|