langgraph api 流程总结
目录
本节我们总结回顾一下 Langgraph API 相关重要的流程,包括
- StateGraph API 如何映射为 Pregel
- Function API 如何映射为 Pregel
1. StateGraph API 如何映射为 Pregel
在之前的总结里,我们已经学习了 Pregel 执行的过程:
|
|
在 Pregel 的抽象里只包含 channel、node(PregelNode) 两个核心组件。
而在 StateGraph 里有如下抽象:
- nodes:
dict[str, StateNodeSpec[Any, ContextT]]
- edges:
set[tuple[str, str]]
- branches:
defaultdict[str, dict[str, BranchSpec]]
除了这些抽象,StateGraph 提供的另一个重要功能通过类型注解,声明:
- node 包含的 channel、ManagedValue
我们需要关注的重点就是 nodes、edges、branches 如何映射为 channel、node
1.1 StateNodeSpec -> PregelNode
StateNodeSpec -> PregelNode 的映射位于 CompiledStateGraph.attach_node
- 每个 node 都会单独定义一个 branch_channel,命名为
branch:to:{node_key}
- node 会被自己的 branch_channel 触发,这个 branch_channel 只起到触发作用,不传递值。
在 PregelNode 的初始化中:
- channels: 是从 node 的 input_schema 中解析的 channel
- mapper: 后续执行时,会将 input 绑定到 input_schema
|
|
PregelNode writers 参数的初始化最为复杂:
- StateGraph.add_node 添加节点时,会从输入函数的返回值解析出 node.ends,ChannelWriteEntry 会根据 node.ends 生成提示信息
- PregelNode 会填两个 ChannelWriteEntry,ChannelWriteEntry 接收一个 mapper 函数,这个 mapper 函数接收的是
bound_return=PregelNode.bound(input)
执行结果- output_keys 是 StateGraph 中收集的所有 channel,
output_keys == ["__root__"]
表示无法解析 input_schema - _get_updates 会从 bound_return 中过滤出所有在 output_keys 的返回值,输出为对 channel 的更改,对于嵌套结构会递归处理
- _get_root 类似 _get_updates,但是因为
output_keys == ["__root__"]
,只有一个 channel 默认会把 bound_return 整体作为__root__
channel 的值,不回去判断 bound_return 内部是有__root__
- _control_branch 用于从 bond_return 中提取出 Send 和 Command.goto,生成节点跳转的任务
- output_keys 是 StateGraph 中收集的所有 channel,
- 总结一下 PregelNode.writers
writers=[ChannelWrite(List[ChannelWriteEntry|ChannelWriteTupleEntry])]
- Pregel 的原始 API
- PregelNode.writers 包含的是 ChannelWriteEntry
- ChannelWriteEntry 包含 channel,PregelNode.bound 只返回值,ChannelWrite 返回 (channel, mapper(value))
- StateGraph 中
- PregelNode.writers 包含的是 ChannelWriteTupleEntry
- ChannelWriteTupleEntry 包含的是 mapper 函数,返回 mapper(value)
- mapper 有 _get_updates,_control_branch,正是因为这些 mapper 函数,StateGraph 的 node 函数,可以返回
dict|Command
- Pregel 的原始 API
这里解释了 node 函数的返回值是如何处理。作为对比,我们额外说明一下 tool 函数的返回值是如何处理的。
- 首先 tool 函数被包装在 Tool 类中,Tool 会调用 _format_output 对函数返回结果进行包装:
- 如果 content 就是一个类 ToolOutputMixin 的实例,直接返回
- Command、ToolMessage 都是 ToolOutputMixin 的子类,会直接返回
- 其他类型,会包装为 ToolMessage
- 正因为如此,如果要在 Tool 中,返回对 channel 的修改,必须使用 Command
- tools 会被包装在 ToolNode 中,ToolNode 会调用 _combine_tool_outputs 对多个 tool 的返回结果进行合并
- tool 的返回值经过 _format_output 包装后,只会返回 ToolMessage 和 Command
- command 被收集后返回,ToolMessage 会包装成
{message_key: [ToolMessage]}
返回
- 回到上面 StateGraph 中对 node 函数返回值的处理流程
|
|
1.2 edges
edge 的处理比较简单,edge 会被翻译成节点的 writer:
- 在 start.writers 添加对 end.branch_channel 的写入任务
- 前面我们说过,每个节点都会添加一个 branch_channel,并被这个 branch_channel 触发
|
|
attach_edge 还需要处理一种特殊情况,多个 start 节点触发一个 end 任务。end 任务需要等所有 start 节点都更新后,才能被触发。处理方法是使用一个特殊的 channel。
- 创建一个 NamedBarrierValue channel,end 节点被这个 channel 触发
- 每个 start 节点添加对 NamedBarrierValue 更新的 writers。
- 当 start 节点更新时,会触发 NamedBarrierValue 记录对 start 节点的可见
- 当所有节点都可见时,NamedBarrierValue 就是 available
- apply_writes 会在 updated_channel 应用之后,在检查一下所有 channel 是否 available,这样就可以检测到可用的 NamedBarrierValue,并将其追加到 updated_channel,这样就可以触发 end 节点
|
|
1.3 Branch
Branch 处理的是 condition_edge,接收一个处理函数 path,执行完之后输出要跳转的节点。condition_edge 只确定 start 节点,并且不能是多个 start 节点。
Branch 的处理比较复杂,我们先看 attach_branch 提供的两个函数
- get_writes:
- 前面我们提到,每个 node 都有一个触发它的 branch_channel。所以对 node 的触发,必须格式化为
_CHANNEL_BRANCH_TO
- get_writes 用于标准化 channel,并将对 channel 的写入转换为 ChannelWriteEntry
- ChannelWriteEntry 默认的 value 是 PASSTHROUGH,get_writes 中设置成了 None
- 前面我们提到,每个 node 都有一个触发它的 branch_channel。所以对 node 的触发,必须格式化为
- reader:
- path 的入参可能需要从多个 channel 读取值
- 从哪些 channel 读取值,由 path 的 input_schema 定义
- reader 用于实现从 channel 读取值,并生成 input_schema 的值
- reader 调用 ChannelRead.do_read 内部,会从
config[CONF][CONFIG_KEY_READ]
获取一个 read 函数,这个 read 函数正是prepare_single_task
在执行时配置的
|
|
现在我们再来理解 branch.run(get_writes, reader)
- branch 有三个参数
- path: branch_func 路由函数
- ends: 路由函数的输出,映射到哪个节点
- input_schema: 路由函数的输入 schema
- run 方法返回的是一个 RunnableCallable
- 执行入口是 branch._route
- writer=get_writes
- reader=reader
- writer、reader 都会作为 kwargs 最终传递给 branch._route
- 可以忽略
ChannelWrite.register_writer
只是将返回的 Runnable 标识为一个 ChannelWrite,并添加 static
- branch._route
- brach.run 的返回值 是添加到
node.writers
,他会像 ChannelWrite 一样被调用,即invoke(input=bound_return)
value = reader(config)
获取 path 函数需要读取的 channel 的值result = self.path.invoke(value, config)
返回的路由到哪些 node,或者是 Send
- brach.run 的返回值 是添加到
- branch._finish
- writer=get_writes,
entries = writer(destinations, False)
标准化 branch_channel,并将 str 的 branch_channel 转换为 ChannelWriteEntry if need_passthrough: return ChannelWrite(entries)
: 暂时不知道这个分支何时被调用。ChannelWrite.do_write(config, entries)
: get_writes 中将 value 设置成了 None,所以会直接调用这个分支,生成对 branch_channel 的写入,进而触发对应 node 的执行
- writer=get_writes,
|
|
1.4 不同节点的返回
至此我们总结一下,不同节点的返回值:
- node: 可以返回
dict|command
,dict 表示对应 channel 的更改 - tool: 可以返回
any|command
,tool 内如果想实现对 channel 的更新和节点跳转,必须使用 command - branch: 可以返回
str|Send
,str 会转换为 ChannelWriteEntry(channel, None) 表示对 channel 的更新,Send 表示节点跳转