pregel - 2
1. Pregel 的方法
前面我们总结过 Pregel 有如下方法。
方法名 | 输入类型 | 输出类型 | 作用 |
---|---|---|---|
__init__ |
nodes , channels , input_channels , output_channels 等 |
None |
初始化 Pregel 实例,设置节点、通道、输入输出等配置 |
get_graph |
config: RunnableConfig | None , xray: int | bool = False |
Graph |
返回计算图的可绘制表示 |
aget_graph |
config: RunnableConfig | None , xray: int | bool = False |
Graph |
异步返回计算图的可绘制表示 |
_repr_mimebundle_ |
**kwargs: Any |
dict[str, Any] |
Jupyter 显示图形用的 MIME 包 |
copy |
update: dict[str, Any] | None = None |
Self |
创建 Pregel 对象的副本 |
with_config |
config: RunnableConfig | None = None , **kwargs: Any |
Self |
使用更新配置创建 Pregel 副本 |
validate |
无参数 | Self |
验证图形配置的有效性 |
config_schema |
include: Sequence[str] | None = None |
type[BaseModel] |
获取配置模式(已弃用) |
get_config_jsonschema |
include: Sequence[str] | None = None |
dict[str, Any] |
获取配置 JSON 模式(已弃用) |
get_context_jsonschema |
无参数 | dict[str, Any] | None |
获取上下文 JSON 模式 |
get_input_schema |
config: RunnableConfig | None = None |
type[BaseModel] |
获取输入模式 |
get_input_jsonschema |
config: RunnableConfig | None = None |
dict[str, Any] |
获取输入 JSON 模式 |
get_output_schema |
config: RunnableConfig | None = None |
type[BaseModel] |
获取输出模式 |
get_output_jsonschema |
config: RunnableConfig | None = None |
dict[str, Any] |
获取输出 JSON 模式 |
get_subgraphs |
namespace: str | None = None , recurse: bool = False |
Iterator[tuple[str, PregelProtocol]] |
获取图的子图 |
aget_subgraphs |
namespace: str | None = None , recurse: bool = False |
AsyncIterator[tuple[str, PregelProtocol]] |
异步获取图的子图 |
get_state |
config: RunnableConfig , subgraphs: bool = False |
StateSnapshot |
获取图的当前状态 |
aget_state |
config: RunnableConfig , subgraphs: bool = False |
StateSnapshot |
异步获取图的当前状态 |
get_state_history |
config: RunnableConfig , filter , before , limit |
Iterator[StateSnapshot] |
获取图状态历史 |
aget_state_history |
config: RunnableConfig , filter , before , limit |
AsyncIterator[StateSnapshot] |
异步获取图状态历史 |
bulk_update_state |
config: RunnableConfig , supersteps: Sequence[Sequence[StateUpdate]] |
RunnableConfig |
批量更新图状态 |
abulk_update_state |
config: RunnableConfig , supersteps: Sequence[Sequence[StateUpdate]] |
RunnableConfig |
异步批量更新图状态 |
update_state |
config: RunnableConfig , values , as_node , task_id |
RunnableConfig |
更新图状态 |
aupdate_state |
config: RunnableConfig , values , as_node , task_id |
RunnableConfig |
异步更新图状态 |
stream |
input , config , context , stream_mode 等 |
Iterator[dict[str, Any] | Any] |
流式执行图并返回步骤输出 |
astream |
input , config , context , stream_mode 等 |
AsyncIterator[dict[str, Any] | Any] |
异步流式执行图并返回步骤输出 |
invoke |
input , config , context , stream_mode 等 |
dict[str, Any] | Any |
同步执行图并返回最终结果 |
ainvoke |
input , config , context , stream_mode 等 |
dict[str, Any] | Any |
异步执行图并返回最终结果 |
clear_cache |
nodes: Sequence[str] | None = None |
None |
清除指定节点的缓存 |
aclear_cache |
nodes: Sequence[str] | None = None |
None |
异步清除指定节点的缓存 |
主要功能分类:
- 图形管理:
get_graph
,aget_graph
,validate
,copy
,with_config
- 模式获取:
get_input_schema
,get_output_schema
,get_context_jsonschema
等 - 状态管理:
get_state
,aget_state
,get_state_history
,update_state
等 - 执行控制:
invoke
,ainvoke
,stream
,astream
- 缓存管理:
clear_cache
,aclear_cache
- 子图操作:
get_subgraphs
,aget_subgraphs
上一节我们介绍了 invoke 方法的入参,这一节我们就来介绍 invoke 方法的实现。invoke 内部主要是调用了 stream 方法。所以我们要先来看 stream 方法。
2. stream 方法
stream 方法非常的长,我直接把代码拷贝到 ChatGpt了,让 ChatGpt 给我解释了这段代码,下面的内容是结合 ChatGpt 的回答整理的。如代码里面注释标注的,stream 代码分成如下几块:
- 参数预处理与默认值解析
- 设置 stream 管道(事件队列)
- 配置归一化和 callback 设置
- 参数标准化
- Subgraph 处理
- 消息流模式处理
- 配置 Runtime
- 启动主循环:SyncPregelLoop
- 初始化 PregelRunner
- 执行 loop.tick 生成 tasks
- 执行 runner.tick 执行 tasks
- _output 输出中间结果
- 执行 loop.after_tick 更新 channel
- _output 输出最终结果
核心的代码我们在之前介绍 Loop,Runner 时都已经介绍过了。
|
|
2.1 stream 管道
stream=SyncQueue()
SyncQueue 是一个先进先出的队列。实现比较简单,内部是一个 queue 和 信号量。
|
|
引入信号量,是为了给 get 操作提供超时控制。
2.2 参数标准化
参数标准化通过 _default
方法实现,下面是 _defaults
的源码:
|
|
_default 用于标准化 pregel 参数。_default
会从 config 中获取如下配置:
|
|
durability
durability
是用于控制 LangGraph 执行过程中的 状态持久化(checkpoint)策略 的参数。主要影响 在图执行过程中的哪个阶段保存中间状态(State)
模式 | 持久化时机 | 安全性 | 性能 | 适合场景 |
---|---|---|---|---|
sync |
每步后同步保存 | 高 | 中 | 高可用要求的生产流程 |
async |
每步后异步保存 | 中 | 高 | 性能优先,风险可接受的流程 |
exit |
图执行完后才保存一次 | 低 | 最高 | 可重跑流程、实验性流程 |
2.3 Subgraph 处理
LangGraph 支持在图中调用嵌套子图(subgraph),比如:
|
|
为了让主图和子图的 stream 输出可以区分并追踪,LangGraph 引入了 namespace
机制:每个子图的事件都带上其命名路径,如:
|
|
stream 方法中关于子图的处理有两个部分:
|
|
namespace:
self.checkpointer is True
:代表这是一个启用了持久化的子图;recast_checkpoint_ns
:去掉命名空间中的<task_id>
部分,只保留路径部分;
流式输出支持:
-
如果调用
stream(subgraphs=True)
,则将当前loop.stream
对象写入子图 config; -
这样子图执行期间产生的事件也会通过主图的 stream 发出;
-
子图发出的事件格式如下:
- 单一
stream_mode
:(namespace: tuple, data)
- 多重
stream_mode
:(namespace: tuple, mode, data)
- 单一
示例输出:
|
|
你可以通过解析这个 namespace
路径,知道数据是在哪个子图中哪个节点产出的。
2.4 namespace
这里补充一下 Langgraph 中有关 namespace 的知识。
假设你有一个 LangGraph 工作流,用于问答系统(QA system),主图如下:
|
|
你想要为每个用户的每次请求做持久化记录,并为主图和子图都创建独立的 namespace,以便更好地控制 checkpoint 和缓存数据的范围。
主图的 namespace 示例
设定参数如下:
参数 | 值 |
---|---|
graph_name | qa_graph |
user_id | user_123 |
run_id | run_abc456 |
主图的 namespace 可以设为:
|
|
或等效的字符串形式:
|
|
这就是该用户本次请求的主图持久化空间。子图应派生在主图的 namespace 下,追加子图名称,以实现命名空间继承 + 局部隔离。
子图 namespace
子图名:summarize_graph
子图 namespace 派生自主图:
|
|
图与子图的 namespace 层级关系
|
|
3. invoke 方法
我们对着前面的示例来看 invoke 的代码,invoke 调用 stream,并从 stream 的输出中提取 stream_mode=values 的值,作为 invoke 的返回值。
|
|