6. 消息流模式
6.1 stream_mode="messages"
当图中某个节点调用了 LLM(如 OpenAI Chat API),你可能希望 逐 token 地将 LLM 回复 stream 给前端。LangGraph 提供 messages
模式实现这一点:
stream 中相关代码详解
1
2
3
4
|
if "messages" in stream_modes:
run_manager.inheritable_handlers.append(
StreamMessagesHandler(stream.put, subgraphs)
)
|
- 给 callback handler 列表添加一个
StreamMessagesHandler
;
- 它会自动挂载到所有 LLM 调用上(只要用的是 langchain LLM);
- 它将每个 token 连同其 metadata(包括哪一个节点)写入
stream.put()
;
- 如果是子图节点,还会加上完整 namespace 路径。
消息流的输出格式
1
|
("messages", ("Hello", {"name": "llm_node", "type": "llm"}))
|
或者如果启用了子图:
1
|
(("parent_node:xyz", "llm_node:abc"), "messages", ("Hello", {"name": "llm_node"}))
|
这对于 实时展示 token 输出的前端 UI 非常有用。
6.2 stream_mode=“custom”
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
if "custom" in stream_modes:
def stream_writer(c: Any) -> None:
stream.put(
(
# get_config 用于 Runnable Context 中获取 child_runnable_config
# [:-1] 表示去掉,去掉当前 node id,仅保留子图路径,防止误分类?
tuple(
get_config()[CONF][CONFIG_KEY_CHECKPOINT_NS].split(
NS_SEP
)[:-1]
),
"custom",
c,
)
)
elif CONFIG_KEY_STREAM in config[CONF]:
# 从 config 中获取 stream_writer 函数
# CONFIG_KEY_RUNTIME = sys.intern("__pregel_runtime")
stream_writer = config[CONF][CONFIG_KEY_RUNTIME].stream_writer
else:
def stream_writer(c: Any) -> None:
pass
def get_config() -> RunnableConfig:
if sys.version_info < (3, 11):
try:
if asyncio.current_task():
raise RuntimeError(
"Python 3.11 or later required to use this in an async context"
)
except RuntimeError:
pass
if var_config := var_child_runnable_config.get():
return var_config
else:
raise RuntimeError("Called get_config outside of a runnable context")
var_child_runnable_config: ContextVar[RunnableConfig | None] = ContextVar(
"child_runnable_config", default=None
)
|
stream_writer 是一个在 节点执行期间可调用的函数,用于将自定义数据(c)写入 stream 队列,从而实现 自定义流式事件输出(stream_mode=“custom”)。