1. Pregel Algo
pregel 有关任务生成的代码位于 langgraph\pregel\_algo.py
。这个应该算是 pregel 最核心的部分了。_algo.py
内有如下几个函数:
2. prepare_next_tasks
- 用于生成下一个 Pregel step 中的任务。
- 内部会调用 prepare_single_task
3. apply_writes
- 把对 channel 的写入应用到 channel 中,并返回 updated_channels
在介绍这些函数之前我们需要先学习一下与之相关的一些基础对象,包括:
- PregelTask/PregelExecutableTask:
- Call
- PregelScratchpad
1. PregelTask/PregelExecutableTask
这两个类是 LangGraph 中 Pregel 模式调度系统的一部分,
PregelTask
:任务的执行记录(结果)
- 用于描述某个 节点任务的执行状态、结果或错误。
- 通常在调度完成后用于追踪状态,支持错误处理、调试、回溯等。
PregelExecutableTask
:任务的调度单元(执行计划)
- 表示一个准备被调度执行的任务单元,携带输入、处理器、配置、写入目标等信息。
- 通常在
step_once()
中被生成并传给执行逻辑。
类别 |
PregelExecutableTask |
PregelTask |
角色 |
调度器要执行的“待办事项” |
执行完毕后的“历史记录” |
生命周期 |
前置结构,被调度使用 |
后置结构,保存运行结果 |
状态 |
尚未运行、正在执行 |
已运行,可能成功/失败 |
结合示意图帮助理解
1
|
[PregelExecutableTask] --- 调度执行 --> [PregelTask]
|
你可以理解为:
PregelExecutableTask
是一个调度器打包好的“任务指令”,
- 执行完后,就转化为一个
PregelTask
,记录执行历史(包含错误、返回值、状态等)。
1.1 PregelExecutableTask
PregelExecutableTask 的定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
@dataclass(**_T_DC_KWARGS)
class PregelExecutableTask:
name: str
input: Any
proc: Runnable
# writes: deque[tuple[str, Any]] = deque()
writes: deque[tuple[str, Any]]
config: RunnableConfig
# triggers = (PUSH,) 谁触发了当前节点
triggers: Sequence[str]
retry_policy: Sequence[RetryPolicy]
cache_key: CacheKey | None
id: str
# (PUSH, parent_task_path, index)
path: tuple[str | int | tuple, ...]
# writers=PregelNode.flat_writers,
writers: Sequence[Runnable] = ()
# writers=PregelNode.subgraphs,
subgraphs: Sequence[PregelProtocol] = ()
class CacheKey(NamedTuple):
"""Cache key for a task."""
ns: tuple[str, ...]
"""Namespace for the cache entry."""
key: str
"""Key for the cache entry."""
ttl: int | None
"""Time to live for the cache entry in seconds."""
|
PregelExecutableTask 包含了一个 task 执行关联的所有信息。下面是其属性的含义
属性名 |
类型 |
默认值 |
含义 |
|
|
name |
str |
无 |
节点名称 |
|
|
input |
Any |
无 |
输入数据 |
|
|
proc |
Runnable |
无 |
节点的执行体(一个 LCEL 节点) |
|
|
writes |
deque[tuple[str, Any]] |
无 |
预期要写入的通道与对应值 |
|
|
config |
RunnableConfig |
无 |
当前任务的配置上下文 |
|
|
triggers |
Sequence[str] |
无 |
激活当前节点的依赖 key(谁触发了它) |
|
|
retry_policy |
Sequence[RetryPolicy] |
无 |
重试策略定义 |
|
|
cache_key |
`CacheKey |
None` |
None |
用于缓存查找与存储的 key |
|
id |
str |
无 |
任务的唯一标识符 |
|
|
path |
`tuple[str |
int |
tuple, …]` |
无 |
节点在图中的路径标识 |
writers |
Sequence[Runnable] |
() |
输出写入副作用处理器(如写入 store) |
|
|
subgraphs |
Sequence[PregelProtocol] |
() |
嵌套的子图列表(用于子流程) |
|
|
PregelExecutableTask 的执行
PregelExecutableTask 的执行定义在 langgraph\pregel\_retry.py
下:
- run_with_retry: 同步执行函数
- arun_with_retry: 异步执行函数
run_with_retry 代码很长,大多数都是异常处理的逻辑,正常执行流程就是如下几行代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
def run_with_retry(
task: PregelExecutableTask,
retry_policy: Sequence[RetryPolicy] | None,
configurable: dict[str, Any] | None = None,
) -> None:
"""Run a task with retries."""
retry_policy = task.retry_policy or retry_policy
attempts = 0
config = task.config
if configurable is not None:
config = patch_configurable(config, configurable)
while True:
try:
# clear any writes from previous attempts
task.writes.clear()
# run the task
return task.proc.invoke(task.input, config)
|
task.proc.invoke
的调用链很长,需要结合 PregelExecutableTask 创建过程讲解,这个我们放到 prepare_single_task 时在详述。
1.2 PregelTask
1
2
3
4
5
6
7
8
9
10
|
class PregelTask(NamedTuple):
"""A Pregel task."""
id: str
name: str
path: tuple[str | int | tuple, ...]
error: Exception | None = None
interrupts: tuple[Interrupt, ...] = ()
state: None | RunnableConfig | StateSnapshot = None
result: Any | None = None
|
PregelTask 的定义也比较简单,下面是其属性的含义:
属性名 |
类型 |
默认值 |
含义 |
|
|
id |
str |
无 |
任务的唯一标识符 |
|
|
name |
str |
无 |
节点名称(或任务名称) |
|
|
path |
`tuple[str |
int |
tuple, …]` |
无 |
节点在嵌套子图中的定位路径 |
error |
`Exception |
None` |
None |
执行中出现的异常(如果有) |
|
interrupts |
tuple[Interrupt, ...] |
() |
中断信息(如取消信号) |
|
|
state |
`None |
RunnableConfig |
StateSnapshot` |
None |
执行时的状态或快照 |
result |
`Any |
None` |
None |
节点运行后的返回结果(成功时) |
|
PregelTask 里面有两个对象,我们之前没有见过:
- Interrupt
- StateSnapshot
我们先来看一下这两个对象:
1.3 StateSnapshot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
class StateSnapshot(NamedTuple):
"""Snapshot of the state of the graph at the beginning of a step."""
values: dict[str, Any] | Any
"""Current values of channels."""
next: tuple[str, ...]
"""The name of the node to execute in each task for this step."""
config: RunnableConfig
"""Config used to fetch this snapshot."""
metadata: CheckpointMetadata | None
"""Metadata associated with this snapshot."""
created_at: str | None
"""Timestamp of snapshot creation."""
parent_config: RunnableConfig | None
"""Config used to fetch the parent snapshot, if any."""
tasks: tuple[PregelTask, ...]
"""Tasks to execute in this step. If already attempted, may contain an error."""
interrupts: tuple[Interrupt, ...]
"""Interrupts that occurred in this step that are pending resolution."""
|
StateSnapshot 是 LangGraph 中的一个核心数据结构,用于表示图执行过程中的某个 step 开始前的完整状态快照。可以理解为:PregelLoop 即将执行某一 step 之前,把此时的图状态、输入值、调度计划等打包成一个快照,命名为 StateSnapshot。
属性名 |
类型 |
含义 |
values |
dict[str, Any] | Any |
图中所有 channel 当前的值(可为字典或原始结构) |
next |
tuple[str, ...] |
当前 step 中需要调度执行的节点名称 |
config |
RunnableConfig |
当前快照对应的运行配置(含 tags、metadata) |
metadata |
CheckpointMetadata | None |
与该快照绑定的 checkpoint 元数据 |
created_at |
str | None |
快照创建时间(通常是 ISO 时间戳) |
parent_config |
RunnableConfig | None |
上一个快照所用的配置(用于回溯 parent) |
tasks |
tuple[PregelTask, ...] |
本 step 要执行的任务(含执行状态、错误等) |
interrupts |
tuple[Interrupt, ...] |
此 step 中触发的中断事件(如用户取消) |
StateSnapshot 的使用场景
场景 |
用途 |
检查点存储 |
可持久化保存每一步状态用于恢复 |
调试与追踪 |
可以记录每一步执行了哪些任务,出了什么错 |
调度分支判断 |
next 属性用于决定本 step 执行哪些节点 |
中断与恢复 |
interrupts 表明是否有异常/终止操作 |
1.4 Interrupt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
@final
@dataclass(init=False, **_DC_SLOTS)
class Interrupt:
"""Information about an interrupt that occurred in a node.
!!! version-added "Added in version 0.2.24."
!!! version-changed "Changed in version v0.4.0"
* `interrupt_id` was introduced as a property
!!! version-changed "Changed in version v0.6.0"
The following attributes have been removed:
* `ns`
* `when`
* `resumable`
* `interrupt_id`, deprecated in favor of `id`
"""
value: Any
"""The value associated with the interrupt."""
id: str
"""The ID of the interrupt. Can be used to resume the interrupt directly."""
def __init__(
self,
value: Any,
id: str = _DEFAULT_INTERRUPT_ID,
**deprecated_kwargs: Unpack[DeprecatedKwargs],
) -> None:
self.value = value
if (
(ns := deprecated_kwargs.get("ns", MISSING)) is not MISSING
and (id == _DEFAULT_INTERRUPT_ID)
and (isinstance(ns, Sequence))
):
self.id = xxh3_128_hexdigest("|".join(ns).encode())
else:
self.id = id
@classmethod
def from_ns(cls, value: Any, ns: str) -> Interrupt:
return cls(value=value, id=xxh3_128_hexdigest(ns.encode()))
@property
@deprecated(
"`interrupt_id` is deprecated. Use `id` instead.",
stacklevel=2,
)
def interrupt_id(self) -> str:
warn(
"`interrupt_id` is deprecated. Use `id` instead.",
LangGraphDeprecatedSinceV10,
stacklevel=2,
)
return self.id
|
Interrupt
是 LangGraph 中用于表示 节点中断事件 的类。它用于在图执行过程中标识:
- 某个节点被人为或逻辑上中断(例如:用户取消、外部事件打断)
- 中断携带的值(
value
),以及唯一标识符(id
)
- 可以挂起执行状态,稍后通过
id
恢复
类定义语义解释
1
2
3
|
@final
@dataclass(init=False, **_DC_SLOTS)
class Interrupt:
|
- 使用
@final
修饰,表示此类不可被继承
- 使用
dataclass(init=False)
,但自己手动定义了 __init__
- 使用了 slots 优化 内存布局
- 设计上是一个不可变的、结构明确的中断信号容器
类的主要作用
Interrupt
表示图运行中某个节点发生了中断的事件,并且提供恢复用的唯一 ID 以支持 resume。
这种机制通常用于支持:
- ✅ 异步流程中途暂停(如等待用户输入)
- ✅ 人工干预打断节点执行
- ✅ 状态持久化并可恢复(checkpoint + resume)
Interrupt
的属性说明
属性名 |
类型 |
默认值 |
含义 |
value |
Any |
无(构造时提供) |
中断事件携带的值,例如中断原因、状态等 |
id |
str |
_DEFAULT_INTERRUPT_ID |
中断的唯一标识,用于 resume(可通过 namespace hash 自动生成) |
方法和属性说明
名称 |
类型 |
说明 |
__init__ |
构造方法 |
支持 ns 参数,用于根据命名空间生成 ID(兼容老版本) |
from_ns |
classmethod |
从字符串命名空间构造中断对象(自动生成 ID) |
interrupt_id |
@property (已弃用) |
兼容旧版本的 interrupt_id 字段,现推荐使用 id |
2. Call
这个 Call 类表示一次函数调用的封装,其语义是在一个可控环境中(带有重试策略、缓存策略和回调函数)调用一个函数 func,并记录传入的输入参数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
Callbacks = Optional[Union[list[BaseCallbackHandler], BaseCallbackManager]]
class Call:
__slots__ = ("func", "input", "retry_policy", "cache_policy", "callbacks")
func: Callable
input: tuple[tuple[Any, ...], dict[str, Any]]
retry_policy: Sequence[RetryPolicy] | None
cache_policy: CachePolicy | None
callbacks: Callbacks
def __init__(
self,
func: Callable,
input: tuple[tuple[Any, ...], dict[str, Any]],
*,
retry_policy: Sequence[RetryPolicy] | None,
cache_policy: CachePolicy | None,
callbacks: Callbacks,
) -> None:
self.func = func
self.input = input
self.retry_policy = retry_policy
self.cache_policy = cache_policy
self.callbacks = callbacks
|
属性名 |
类型 |
说明 |
|
func |
Callable |
要执行的目标函数(可调用对象) |
|
input |
tuple[tuple[Any, ...], dict[str, Any]] |
传递给函数的位置参数和关键字参数,形式如 (args, kwargs) |
|
retry_policy |
`Sequence[RetryPolicy] |
None` |
用于控制函数失败后的重试逻辑策略列表,可选 |
cache_policy |
`CachePolicy |
None` |
控制函数调用结果是否缓存,以及缓存策略 |
callbacks |
Callbacks |
函数调用过程中的回调钩子,如执行前、执行后、失败时等 |
|
3. PregelScratchpad
3.1 PregelScratchpad 的定义
在 Pregel 模型中,图的每个节点以同步轮次进行计算(step),节点之间通过消息传递进行通信,每个 step 都是一个迭代单位,直到满足终止条件(如没有更多消息传递或达到最大轮次)。PregelScratchpad
就是在某一轮 Pregel 计算 step 中记录该轮的状态和辅助逻辑的结构体。
PregelScratchpad 的定义比较简单,源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
@dataclasses.dataclass(**_DC_KWARGS)
class PregelScratchpad:
step: int
stop: int
# call
call_counter: Callable[[], int]
# interrupt
interrupt_counter: Callable[[], int]
get_null_resume: Callable[[bool], Any]
resume: list[Any]
# subgraph
subgraph_counter: Callable[[], int]
|
本节我们核心要关注的是 PregelScratchpad 的生成逻辑。PregelScratchpad 的生成位于 langgraph\pregel\_algo.py
下的 _scratchpad
。_scratchpad
的调用入口主要是同目录下的 prepare_single_task
函数。
3.1 PregelScratchpad 的生成
_scratchpad 函数用于生成 PregelScratchpad。代码并不复杂,复杂的是里面 null_resume_write、task_resume_write、resume_map 这几个变量的语义和生成逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
def _scratchpad(
parent_scratchpad: PregelScratchpad | None,
pending_writes: list[PendingWrite],
task_id: str,
namespace_hash: str,
resume_map: dict[str, Any] | None,
step: int,
stop: int,
) -> PregelScratchpad:
if len(pending_writes) > 0:
# find global resume value
for w in pending_writes:
if w[0] == NULL_TASK_ID and w[1] == RESUME:
null_resume_write = w
break
else:
# None cannot be used as a resume value, because it would be difficult to
# distinguish from missing when used over http
null_resume_write = None
# find task-specific resume value
for w in pending_writes:
if w[0] == task_id and w[1] == RESUME:
task_resume_write = w[2]
if not isinstance(task_resume_write, list):
task_resume_write = [task_resume_write]
break
else:
task_resume_write = []
del w
# find namespace and task-specific resume value
if resume_map and namespace_hash in resume_map:
mapped_resume_write = resume_map[namespace_hash]
task_resume_write.append(mapped_resume_write)
else:
null_resume_write = None
task_resume_write = []
def get_null_resume(consume: bool = False) -> Any:
if null_resume_write is None:
if parent_scratchpad is not None:
return parent_scratchpad.get_null_resume(consume)
return None
if consume:
try:
pending_writes.remove(null_resume_write)
return null_resume_write[2]
except ValueError:
return None
return null_resume_write[2]
# using itertools.count as an atomic counter (+= 1 is not thread-safe)
return PregelScratchpad(
step=step,
stop=stop,
# call
call_counter=LazyAtomicCounter(),
# interrupt
interrupt_counter=LazyAtomicCounter(),
resume=task_resume_write,
get_null_resume=get_null_resume,
# subgraph
subgraph_counter=LazyAtomicCounter(),
)
|
_scratchpad 用到了几个对象:
- PendingWrite
- LazyAtomicCounter
我们简单看一下他们的实现:
3.2 PendingWrite
PendingWrite = tuple[str, str, Any]
是三元组,分别表示
- task_id: 任务 ID
- channel: 通道名
- value: 写入 channel 的值
3.3 LazyAtomicCounter
LazyAtomicCounter 是一个计数器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
class LazyAtomicCounter:
__slots__ = ("_counter",)
_counter: Callable[[], int] | None
def __init__(self) -> None:
self._counter = None
def __call__(self) -> int:
if self._counter is None:
with LAZY_ATOMIC_COUNTER_LOCK:
if self._counter is None:
self._counter = itertools.count(0).__next__
return self._counter()
|