pregel - 1
经过前面这么多节的铺垫,我们了解了 Pregel 组合的各种对象,现在我们来看看 Pregel 类的实现。
1. Pregel
1.1 Pregel 属性
|
|
属性名 | 类型 | 说明 | 默认值 |
---|---|---|---|
nodes |
dict[str, PregelNode] |
图中的节点集合,键为节点名,值为对应节点对象 | 无默认值(必须提供) |
channels |
dict[str, BaseChannel | ManagedValueSpec] |
所有定义的通道,包括状态传递通道、值通道等 | 无默认值 |
stream_mode |
StreamMode (如 "values" 、"messages" 、"custom" ) |
控制流的输出流模式 | "values" |
stream_eager |
bool |
是否立即输出事件流(如 trace、消息)。部分模式下自动为 True | False |
output_channels |
str | Sequence[str] |
输出数据写入的目标通道 | 无默认值 |
stream_channels |
str | Sequence[str] | None |
哪些通道的值将被流式输出(如果未指定,则为所有非保留通道) | None |
interrupt_after_nodes |
All | Sequence[str] |
指定在哪些节点之后中断执行(可用于调试) | 无默认值 |
interrupt_before_nodes |
All | Sequence[str] |
指定在哪些节点之前中断执行(可用于调试) | 无默认值 |
input_channels |
str | Sequence[str] |
接收输入的通道名称 | 无默认值 |
step_timeout |
float | None |
每一步最大等待时间(秒),防止阻塞 | None |
debug |
bool |
是否打印调试信息 | 默认 False (源码中未指定默认但文档描述为 False) |
checkpointer |
Checkpointer | None |
用于保存和恢复图状态的检查点管理器 | None |
store |
BaseStore | None |
用于持久化共享值(SharedValues)的存储后端 | None |
cache |
BaseCache | None |
用于节点结果缓存的缓存后端 | None |
retry_policy |
Sequence[RetryPolicy] |
节点执行失败时的重试策略,空序列表示不重试 | () (空元组) |
cache_policy |
CachePolicy | None |
所有节点通用的缓存策略,可被单节点覆盖 | None |
context_schema |
type[ContextT] | None |
context 对象的类型,用于强类型验证 |
None |
config |
RunnableConfig | None |
运行配置(用于 tracing、callback 等) | None |
name |
str |
图的名称 | "LangGraph" |
trigger_to_nodes |
Mapping[str, Sequence[str]] |
定义触发器到节点的映射(如外部事件触发哪些节点) | 无默认值 |
1.2 使用示例
|
|
我们以这个示例为切入口,先来学习 Pregel 基础部分。
1.3 NodeBuilder
NodeBuilder 是 Pregel 中用于构建节点的类。它提供了一种灵活的方式来定义节点的行为,包括订阅通道、执行操作和写入通道。
✅ NodeBuilder 属性
|
|
属性名 | 类型 | 初始值说明 | 说明 |
---|---|---|---|
_channels |
str 或 list[str] |
[] |
订阅的通道(单个或多个) |
_triggers |
list[str] |
[] |
节点被触发时响应的通道 |
_tags |
list[str] |
[] |
节点标签(用于标记或筛选) |
_metadata |
dict[str, Any] |
{} |
节点元信息 |
_writes |
list[ChannelWriteEntry] |
[] |
节点执行后的写入通道 |
_bound |
Runnable |
DEFAULT_BOUND |
节点执行的实际函数 |
_retry_policy |
list[RetryPolicy] |
[] |
节点重试策略列表 |
_cache_policy |
CachePolicy 或 None |
None |
节点缓存策略 |
从 build 函数可以看到这些属性都是传递给 PregelNode 的参数,其中比较重要的是
- channels: 节点的读取数据通道
- triggers: 节点被触发的数据通道
- writes: 节点的结果写入数据通道
NodeBuilder 方法
方法名 | 作用说明 | 输出类型 |
---|---|---|
subscribe_only |
订阅单个通道,节点仅监听此通道 | Self |
subscribe_to |
订阅多个通道,可以配置是否读取其数据 | Self |
read_from |
仅从指定通道读取数据(不会触发节点执行) | Self |
do |
配置节点执行的函数(可以是组合函数链) | Self |
write_to |
设置节点写入的通道,可使用位置参数或命名参数(支持静态值/函数) | Self |
meta |
添加标签和元信息 | Self |
add_retry_policies |
添加重试策略 | Self |
add_cache_policy |
添加缓存策略 | Self |
build |
构建为 PregelNode 实例(最终生成的节点对象) |
PregelNode |
下面是每个函数的代码,
|
|
1.4 示例代码解读
|
|
了解了 node 的构建,现在我们来看 Pregel 的 invoke 方法。invoke 方法比较复杂,我们将分为 invoke 输入参数、invoke 执行逻辑两个部分讲解
2. Pregel invoke 入参
|
|
invoke 方法接受以下参数:
参数名 | 类型 | 默认值 | 作用说明 |
---|---|---|---|
input |
InputT | Command | None |
无 | 图运行的输入数据,通常为 dict 或任意支持的结构。也可为 Command 实例表示特殊指令。 |
config |
RunnableConfig | None |
None |
可选,用于配置本次运行的上下文,如 tags、metadata、run name、callbacks 等。 |
context |
ContextT | None |
None |
静态上下文对象(0.6.0 后新增),为整个运行提供共享数据。不同于 input,它不被流经节点,但可用于表达式引用。 |
stream_mode |
StreamMode (通常为 "values" 、"none" 、"all" 等) |
"values" |
控制运行返回什么类型的数据;默认 "values" 表示仅返回最终输出值。其他选项如 "all" 表示返回中间节点的所有输出。 |
print_mode |
StreamMode | Sequence[StreamMode] |
() |
用于调试,控制哪些输出在控制台打印。不影响实际输出结果。可选 "values" 、"all" 等与 stream_mode 相同的值。 |
output_keys |
str | Sequence[str] | None |
None |
指定要返回的输出字段,若不设置,返回图的全部输出。用于只关心部分输出结果时提高性能。 |
interrupt_before |
All | Sequence[str] | None |
None |
中断图运行的时机点:在指定节点执行前中断运行(用于调试或手动接管节点执行)。 |
interrupt_after |
All | Sequence[str] | None |
None |
中断图运行的时机点:在指定节点执行后中断运行。 |
**kwargs |
Any |
无 | 传入额外参数,这些参数会传递给图的根节点或 config 中定义的 callbacks,用于扩展自定义逻辑。 |
其中:
input
与context
是两个不同维度的数据:input
: 节点可以感知和读取的数据。context
: 静态共享上下文,一般不会自动传入节点,但支持get("key")
的方式访问。
stream_mode
和print_mode
组合使用时:stream_mode
控制实际返回;print_mode
控制是否在终端打印过程值(调试用途)。
interrupt_before
/interrupt_after
非常适合用于调试或 LangGraph IDE 场景中注入逻辑。
输入参数中有几个类型我们之前没有见过,我们先来看看。
2.1 InputT
|
|
部分 | 含义说明 |
---|---|
TypeVar("InputT", ...) |
创建一个类型变量 InputT ,可用于泛型函数、类、方法中 |
bound=StateLike |
表示 InputT 必须是 StateLike 子类(上界约束) |
default=StateT |
如果使用者不提供具体类型参数时,默认 InputT = StateT |
2.2 ContextT
|
|
部分 | 含义说明 |
---|---|
TypeVar("ContextT", ...) |
创建一个类型变量 ContextT ,可用于泛型函数、类、方法中 |
bound=Union[StateLike, None] |
表示 ContextT 必须是 StateLike 或 None 的子类(上界约束) |
default=None |
如果使用者不提供具体类型参数时,默认 ContextT = None |
2.3 StreamMode
StreamMode 控制 PregelLoop 或调度器在执行过程中,以什么粒度、方式、频率将中间结果(包括 task 输出、中断、写入等)传输给使用者。
|
|
模式值 | 含义说明 | 典型用途 |
---|---|---|
"values" |
每一步后输出整个状态中的所有值,包括中断点。 如果使用函数式 API,仅在最终输出时返回一次。 |
默认模式,适合最终结果导向的运行 |
"updates" |
每一步后仅输出本次运行中各节点(或任务)返回的更新内容(如写入的通道名和值),多节点会分开多次输出。 | 更轻量的流式模式,便于追踪增量变化 |
"custom" |
允许节点或任务内部使用 StreamWriter 显式发送自定义输出。 |
适合自定义进度信息或日志的输出 |
"messages" |
对于节点中调用 LLM(大模型)的部分,逐 token 发送消息及相关元数据。 | 流式输出大模型 token,用于聊天可视化 |
"checkpoints" |
每次创建 checkpoint 时发出事件,格式与 get_state() 返回值一致。 |
检查状态保存逻辑,适用于持久化观测 |
"tasks" |
每个任务(如节点)开始与结束时发出事件,包含执行结果或错误。 | 性能监控、错误定位、任务时间测量 |
"debug" |
是 "checkpoints" 和 "tasks" 的组合,便于调试。 |
调试模式,获取完整运行过程 |
2.4 Command
Command 的泛型数据类,是 LangGraph 中用于 控制图状态更新和节点跳转的核心指令对象。你可以把它理解为图执行中的“一次行动指令”,在一个节点执行后返回,用于告诉引擎下一步干什么。后面我们讲解 interrupt 时,就能理解 Command 的作用了。我们先看它的实现。
|
|
Command 属性
属性名 | 类型 | 说明 |
---|---|---|
graph |
str | None |
指定命令作用的图。 - None 表示当前图(默认)- Command.PARENT 表示最近的父图 |
update |
Any | None |
用于更新图的状态。可以是: - dict[str, Any] :字段更新键值对- 支持结构化对象(会转成键值对) - 单个值(会包装为 ("__root__", value) ) |
resume |
dict[str, Any] | Any | None |
恢复中断执行的值,用于与 interrupt() 配合。可为:- 中断 ID 到恢复值的映射 - 单一恢复值,应用于最近的中断 |
goto |
Send | Sequence[Send | N] | N |
指定接下来要跳转执行的节点(或节点序列)或发送命令。可以是: - 节点名( str 或 Literal )- Send 对象(含输入数据)- 上述任意组合的列表 |
_update_as_tuples
_update_as_tuples 用于统一 Command update 值,其用到了 get_cached_annotated_keys 和 get_update_as_tuples 两个函数。
|
|
3. Pregel.Stream
Pregel.invoke 内部调用的是 stream 方法。所以我们重点关注 stream 方法。stream 方法内部调用了很多其他的对象,这里我们重点关注两点:
- stream 方法的执行流程
- 内部使用的对象,为我们下一步学习提供指导。
下面是 stream 的源码及其注释:
|
|
在 Stream 的实现中,主要用到了如下对象:
- RunTime:
- SyncPregelLoop: 会包含 Pregel 提供的方法,stream 中核心调用了如下方法:
- tick
- match_cached_writes
- after_tick
- PregelRunner: 会包含 SyncPregelLoop 提供的方法,stream 中调用了其 tick 方法
后面我们会先学习这几个对象,然后再回到 Pregel 上来。