pregel runner
目录
1. PregelRunner
这个 PregelRunner 类是 LangGraph 中 Pregel 模式的一部分,用于并发执行节点任务、提交写入数据、处理中断与输出流。
它不是图的核心结构(如 PregelLoop、PregelNode),但作为执行调度器,用于运行单个“超级步(superstep)”中的节点任务并控制其生命周期。
1.1 PregelRunner 属性
属性名 | 类型 | 说明 |
---|---|---|
submit |
weakref.ref[Submit] |
指向任务提交函数的弱引用,用于提交某个节点的可运行任务(如调用 .invoke() ) |
put_writes |
weakref.ref[Callable[[str, Sequence[tuple[str, Any]]], None]] |
指向写入结果处理函数的弱引用,负责提交节点的写操作(如向 channel 写) |
use_astream |
bool |
是否启用异步流输出模式(astream ),用于实时响应中间结果(比如流式输出到用户) |
node_finished |
Callable[[str], None] 或 None |
可选回调,当某个节点执行完成后调用,用于记录、清理或调试 |
|
|
在 Pregel stream 方法中,PregelRunner 初始化如下:
|
|
1.2 PregelRunner 方法
PregelRunner 只有三个方法,分别是 tick
、atick
、commit
。
方法名 | 类型 | 作用说明 |
---|---|---|
tick |
同步函数 | 同步执行一批节点任务(一个超级步),并收集每个节点的写入结果(channel/store),返回写入内容。 |
atick |
异步函数 | 异步执行一批节点任务,功能与 tick 相同,但使用 await 实现并发执行,适合异步上下文。 |
commit |
同步函数 | 将 tick / atick 收集到的写入结果,提交到 channel、store 或 managed value 中,推进 Pregel 状态。 |
2. tick
在 Pregel stream 方法中,Runner.tick 被调用的代码如下,通过这段代码,我们可以了解 runner.tick方法每个参数的含义。
2.1 tick 调用
|
|
tick()
异步调度并执行一组任务(PregelExecutableTask
),允许并发执行、失败重试、延迟调度、任务间独立、并支持 yield 回调、异常传播、限时等待等高级控制。
2.2 tick 代码
|
|
这是一个 协程生成器(generator)函数,用于逐步调度任务,期间通过 yield
暴露控制权给调用方(loop.run(...)
等调度循环)。在 tick 内会按照如下顺序执行:
- run_with_retry(task): 执行
task.writes.clear()
return task.proc.invoke(task.input, config)
task.proc
保存的是 pregelnode.node 属性的返回值,RunnableSeq(self.bound, *writers)
, writers 对应为 ChannelWrite- task.proc.invoke 执行时,会先调用 bound 的 invoke 方法,然后调用 ChannelWrite.invoke 方法
- ChannelWrite.invoke 调用时会从
config[CONF][CONFIG_KEY_SEND]
获取 write 函数,并调用write(Sequence[tuple[str, Any]]])
config[CONF][CONFIG_KEY_SEND]
是在PregelExecutableTask
初始化时配置的,并在run_with_retry
中对 config 做了合并,最终write=task.writes.extend
- 所以最终的结果是
task.proc.invoke(task.input, config)
将[(channel, value)]
写入到 task.writes 的队列中
self.commit(task, None)
- 会调用
self.put_writes()(task.id, task.writes)
,self.put_writes()=loop.put_writes
loop.put_writes
会将task.writes
写入到loop.checkpoint_pending_writes
中,并把 task.writes 保存到 checkpoint db 中。
- 会调用
|
|
2.3 commit
3. _output
在 Pregel.stream 的代码中,调用 runner.tick 的过程中会调用一个 _output 向外输出结果。因为这部分内容是和 runner 密切相关的,所以我们放在这里讲解。下面是调用的代码, _output 各个入参的类型如下:
- stream_mode:
tuple[str]
- print_mode:
tuple[str]
- subgraphs: bool
- stream: SyncQueue
|
|
3.1 _output 源码
|
|
3.2 SyncQueue 的数据传递流程
我们现在来看数据是如何在 SyncQueue 中传递的:
- 前面我们提到过 runner.tick 在执行时会调用 commit 方法
- commit 方法会调用 loop.put_writes,将
writes=[(channel, value)]
传递过去 - 在 loop.put_writes 方法内存在如下调用链:
loop.output_writes
loop._emit
self.stream
self.stream
是在 Loop 初始化时传入的StreamProtocol(stream.put, stream_modes)
- 最终 runner.tick 会把数据放到 SyncQueue 中,_output 会从 SyncQueue 获取数据返回