langgraph graph api
这一节我们来学习 langgraph 的 graph api。
1. graph api
1.1 graph api 使用示例
我们来看 langgraph 官方文档提供的智能机器人的示例:
|
|
1.2 graph api 类图
从 UML 类图可以看到 Stategraph.compile 之后得到的是 CompiledStateGraph,这个类是 Pregel 的子类。
2. StateGraph
|
|
_add_schema 用于从输入的 schema 中提取 channels, managed 信息,不过要看懂 _add_schema 代码,需要先了解 Python Annotated 的用法。
2.1 Annotated
Annotated
是 Python 3.9+(typing
模块)提供的一种 类型标注扩展机制,
用来给一个类型加上额外的元数据,而不影响这个类型本身。
它的官方定义大概是:
|
|
- 原始类型:真正的字段类型(比如
str
,int
,list[str]
)。 - 额外信息:可以是任意 Python 对象(通常是配置、说明、约束等),这些信息会被保存在
__metadata__
属性里。
基本语法
|
|
这里:
- 类型:
str
- 额外信息:
"must be non-empty"
获取元数据
|
|
多个元数据
|
|
2.2 schema 的定义
下面是一个更复杂的 schema 的定义:
|
|
_add_schema 函数功能就是从 State 的定义中提取 channels, managed 信息。
2.2 schema 解析
schema 解析的功能定义在 _get_channels 函数中:
|
|
_get_channels 的核心是 _get_channel,可以看到
- schema 申明的所有字段都会被包装为 channel
- Langgraph 中所谓的 reducer,会被包装为 BinaryOperatorAggregate
|
|
2.3 _add_schema
_add_schema 会将解析出来的 channel、ManagedValue 添加到 StateGraph 相关的容器字段中。
|
|
这里要特别注意,_add_schema 中下面的代码:
- 每个 channel 都重载了
__eq__
方法,用于比较 channel 是否相等。所以这里的 != 不是比较他们是不是相同的实例,而是比较的他们是不是统一类型,是不是觉有相同的参数 - 这意味着不同的 input_schema 可以通过申明相同的 schema 而共用相同的 channel,从而做到在不同的 Node 之间传递数据。
|
|
StateGraph 初始化之后就是:
- 添加节点: add_node
- 添加边: add_conditional_edges/add_edge
- 编译: compile
2.4 add_node
StateNodeSpec
stategraph 将 node 定义为 StateNodeSpec,其中 runnable 定义为 StateNode。StateNode Union 中的每一种类型,表示 node 入参的一种定义形式。
|
|
属性名 | 类型 | 说明 |
---|---|---|
runnable |
StateNode[NodeInputT, ContextT] |
节点的可执行对象,通常是一个符合 StateNode 接口的函数或类实例,定义了节点的执行逻辑。 |
metadata |
dict[str, Any] | None |
节点的元数据,可用于存储额外的描述信息、标签等辅助信息。 |
input_schema |
type[NodeInputT] |
节点输入数据的模式(Schema),通常用于类型检查和自动文档生成。 |
retry_policy |
RetryPolicy | Sequence[RetryPolicy] | None |
节点执行失败时的重试策略,可以是单个策略或策略列表。 |
cache_policy |
CachePolicy | None |
节点的缓存策略,用于控制结果是否缓存以及缓存失效规则。 |
ends |
tuple[str, ...] | dict[str, str] | None |
节点的结束标志或跳转规则:可以是目标节点名称的元组,或从条件到节点名称的映射。 |
defer |
bool |
是否延迟执行该节点,如果为 True ,节点将在后续某个时间点再执行。 |
defer “当前节点的执行结果不要马上触发后续节点调度,而是延迟到下一轮 tick 才处理。”
defer 的典型用途是:
- 控制执行节奏:防止在一个 tick 中出现过长的同步调用链(避免阻塞或递归过深)。
- 批量处理:有些节点需要等到本轮所有节点都跑完后再统一处理下游任务(比如聚合节点、合并更新)。
- 分阶段执行:让某些逻辑分多个 tick 进行,保持每个 tick 内状态变更的可控性和可观测性。
add_node 执行逻辑
下面是 add_node 带注释的源码:
- 参数检查和标准化入参
- 根据类型注解,推断 input_schema 和 ends(跳转的目标节点)
- 将 node 构造成 StateNodeSpec 并添加到 self.nodes 属性中
|
|
2.5 add_edge
add_edge:
- 如果 start_key 是单节点,添加到 self.edges
- 如果 start_key 是多节点,添加到 self.waiting_edges
|
|
2.6 add_conditional_edges
conditional_edges 以 BranchSpec 保存在 self.branches 中。这里我们需要重点理解 BranchSpec
|
|
关于边的处理,目前我们可以做一个简单的总结:
- 如果起点是但节点,边会添加到 self.edges
- 如果起点是多节点,边会添加到 self.waiting_edges
- 如果是 condition_edge,边会添加到 self.branches
2.7 validate
StateGragph 在 compile 之前会调用 validate 进行参数校验。主要校验的逻辑是,边的起点、结束点是否在 self.nodes 中,以及中断是否在 self.nodes 内。
|
|
2.8 compile
compile 将 Stategraph 编译成 CompiledStateGraph,CompiledStateGraph 继承自 pregel 是 langgraph的运行时。从compile 代码可以看到除了正常的参数传递,CompiledStateGraph 还调用了三个函数:
- attach_node: 将 StateNodeSpec 转换为 PregelNode
- attach_edge: 处理 edge
- attach_branch: 处理 condition_edge
CompiledStateGraph 是如何与 Pregel 关联起来的,我们放到了 langgraph api 流程总结 中。
|
|
示例代码中,我们还用到了 ToolNode, tools_condition 这里我们介绍一下他们的实现。
3. ToolNode
3.1 初始化
在调用 graph_builder.add_node("tools", tool_node)
时:
- tool_node 不是可调用对象,所以不会尝试从参数中解析出 input_schema
- tool_node 的 input_schema 默认为 state_schema
- ToolNode 初始化有个特殊的 messages_key 参数。就是指定从 input_schema 读取 message 需要访问的字段名。
|
|
_get_state_args
_get_state_args 函数是在 LangGraph 里用来分析 Tool 的输入参数,看哪些参数需要自动从 graph state 注入的。
|
|
执行过程:
- 目标:找到 Tool 参数中用
InjectedState
注解的字段。 - 如果
InjectedState(field="xxx")
→ 说明只注入state["xxx"]
。 - 如果
InjectedState()
→ 注入整个 state。 - 如果没加注解 → 不自动注入。
- 如果加了多个
InjectedState
→ 抛错。
下面是一个代码示例:
|
|
然后你有一个 Tool 输入 schema:
|
|
调用 _get_state_args(MyTool())
:
|
|
输出:
|
|
_get_store_arg
_get_store_arg 和 _get_state_args 很像,只不过它是专门检测 Tool 里是否有参数需要注入 graph store。
_get_store_arg 只找 InjectedStore 注解(区别于 _get_state_args 里找的是 InjectedState)。
|
|
_inject_state
这段 _inject_state
是 LangGraph 在 Tool 执行前,把 Graph State 里需要的字段自动注入到 Tool 参数的关键步骤:
- 读取
tool_to_state_args
(来自_get_state_args
生成的映射)。 - 检查输入格式是否和注入要求匹配。
- 从 Graph State 里取出对应字段,自动塞进 Tool 的参数里。
- 返回带有完整参数的
tool_call
。
|
|
_inject_store
|
|
总结一下:
- inject_store, inject_state 是把 State 的字段映射进 tool_call 的 arg 参数内,发生在 ToolNode 中
- InjectedToolCallId 是往 Tool.func 传入 tool_id,发生在 tool.run 方法内。
3.2 _func
ToolNode 继承自 RunnableCallable,ToolNode.invoke 将调用 ToolNodel._func。
|
|
3.3 _run_one
_run_one 会执行一下逻辑:
- 校验 toll_call 中的 tool 是否在输入的 tool 中
- 执行 tool.invoke 并处理异常
- 针对 tool 返回 Command 类型的返回值做校验
|
|
3.4 _validate_tool_command
_validate_tool_command 对 tool 返回的 Command 类型做校验:
- Command.update 类型校验:必须和输入格式匹配(dict ↔ dict,list ↔ list)
- 工具调用回复验证:必须有一个 ToolMessage,且 tool_call_id 对应当前的 call.id
|
|
3.5 _combine_tool_outputs
_combine_tool_outputs 用于合并多个 tool_call 的输出结果。
|
|
4. tools_condition
tools_condition 的逻辑很简答,就是判断 messages 的最后一条有没有 tool_calls 属性,有就跳转到 tools 节点,否则跳转到 END 节点。
|
|