1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
class RecursiveAbstractiveProcessing4TreeOrganizedRetrieval:
"""
递归式摘要聚类处理类,用于对树形组织的文档检索结果进行聚类与抽象生成。
主要功能:
1. 将原始文档 chunk 转化为向量表示;
2. 使用 UMAP 降维 + 高斯混合聚类进行多层聚类;
3. 对每个聚类生成摘要(abstractive summary)并加入 chunks;
4. 返回增强后的 chunk 列表。
"""
def __init__(
self, max_cluster, llm_model, embd_model, prompt, max_token=512, threshold=0.1
):
"""
初始化处理器
参数:
- max_cluster: int,单层聚类最大簇数
- llm_model: LLM 模型,用于生成摘要文本
- embd_model: 向量化模型,将文本转成向量
- prompt: str,用于 LLM 的生成模板
- max_token: int,LLM 最大输出 token
- threshold: float,GaussianMixture 聚类概率阈值
"""
self._max_cluster = max_cluster
self._llm_model = llm_model
self._embd_model = embd_model
self._threshold = threshold
self._prompt = prompt
self._max_token = max_token
@timeout(60)
async def _chat(self, system, history, gen_conf):
"""
调用 LLM 生成摘要或文本
1. 尝试从缓存读取结果
2. 如果缓存不存在,通过 LLM 生成
3. 对生成文本做简单清理
4. 将结果缓存
"""
response = get_llm_cache(self._llm_model.llm_name, system, history, gen_conf)
if response:
return response
response = await trio.to_thread.run_sync(
lambda: self._llm_model.chat(system, history, gen_conf)
)
response = re.sub(r"^.*</think>", "", response, flags=re.DOTALL)
if response.find("**ERROR**") >= 0:
raise Exception(response)
set_llm_cache(self._llm_model.llm_name, system, response, history, gen_conf)
return response
@timeout(2)
async def _embedding_encode(self, txt):
"""
将文本编码为向量
1. 尝试从缓存读取
2. 调用 embd_model.encode 生成向量
3. 将向量缓存
"""
response = get_embed_cache(self._embd_model.llm_name, txt)
if response is not None:
return response
embds, _ = await trio.to_thread.run_sync(lambda: self._embd_model.encode([txt]))
if len(embds) < 1 or len(embds[0]) < 1:
raise Exception("Embedding error: ")
embds = embds[0]
set_embed_cache(self._embd_model.llm_name, txt, embds)
return embds
def _get_optimal_clusters(self, embeddings: np.ndarray, random_state: int):
"""
使用 BIC(贝叶斯信息准则)寻找最佳聚类数
1. 尝试从 1 到 max_clusters 的聚类数
2. 对每个聚类数使用 GaussianMixture 拟合
3. 返回 BIC 最小的聚类数
"""
max_clusters = min(self._max_cluster, len(embeddings))
n_clusters = np.arange(1, max_clusters)
bics = []
for n in n_clusters:
gm = GaussianMixture(n_components=n, random_state=random_state)
gm.fit(embeddings)
bics.append(gm.bic(embeddings))
optimal_clusters = n_clusters[np.argmin(bics)]
return optimal_clusters
async def __call__(self, chunks, random_state, callback=None):
"""
核心调用函数,递归执行聚类与抽象生成
参数:
- chunks: list[(文本, 向量)], 原始 chunk 列表
- random_state: int,用于聚类随机数
- callback: 可选回调,用于进度更新
返回:
- 增强后的 chunks 列表,包括原始和生成的摘要 chunk
"""
if len(chunks) <= 1:
return []
# 过滤掉无文本或无向量的 chunk
chunks = [(s, a) for s, a in chunks if s and len(a) > 0]
layers = [(0, len(chunks))]
start, end = 0, len(chunks)
@timeout(60)
async def summarize(ck_idx: list[int]):
"""
对一个聚类的 chunk 列表生成摘要
1. 将聚类中的文本拼接并截断
2. 调用 _chat 生成摘要
3. 对摘要生成向量
4. 将摘要作为新 chunk 添加到 chunks
"""
nonlocal chunks
texts = [chunks[i][0] for i in ck_idx]
len_per_chunk = int(
(self._llm_model.max_length - self._max_token) / len(texts)
)
cluster_content = "\n".join(
[truncate(t, max(1, len_per_chunk)) for t in texts]
)
async with chat_limiter:
cnt = await self._chat(
"You're a helpful assistant.",
[
{
"role": "user",
"content": self._prompt.format(
cluster_content=cluster_content
),
}
],
{"max_tokens": self._max_token},
)
cnt = re.sub(
"(······\n由于长度的原因,回答被截断了,要继续吗?|For the content length reason, it stopped, continue?)",
"",
cnt,
)
logging.debug(f"SUM: {cnt}")
embds = await self._embedding_encode(cnt)
chunks.append((cnt, embds))
labels = []
while end - start > 1:
embeddings = [embd for _, embd in chunks[start:end]]
# 特殊情况:仅 2 个 chunk 时直接生成摘要
if len(embeddings) == 2:
await summarize([start, start + 1])
if callback:
callback(
msg="Cluster one layer: {} -> {}".format(
end - start, len(chunks) - end
)
)
labels.extend([0, 0])
layers.append((end, len(chunks)))
start = end
end = len(chunks)
continue
# 1️⃣ 使用 UMAP 降维向量
n_neighbors = int((len(embeddings) - 1) ** 0.8)
reduced_embeddings = umap.UMAP(
n_neighbors=max(2, n_neighbors),
n_components=min(12, len(embeddings) - 2),
metric="cosine",
).fit_transform(embeddings)
# 2️⃣ 自动选择最佳聚类数
n_clusters = self._get_optimal_clusters(reduced_embeddings, random_state)
# 3️⃣ 高斯混合聚类,获取每个 chunk 的簇标签
if n_clusters == 1:
lbls = [0 for _ in range(len(reduced_embeddings))]
else:
gm = GaussianMixture(n_components=n_clusters, random_state=random_state)
gm.fit(reduced_embeddings)
probs = gm.predict_proba(reduced_embeddings)
lbls = [np.where(prob > self._threshold)[0] for prob in probs]
lbls = [lbl[0] if isinstance(lbl, np.ndarray) else lbl for lbl in lbls]
# 4️⃣ 对每个簇生成摘要(并发执行)
async with trio.open_nursery() as nursery:
for c in range(n_clusters):
ck_idx = [i + start for i in range(len(lbls)) if lbls[i] == c]
assert len(ck_idx) > 0
nursery.start_soon(summarize, ck_idx)
labels.extend(lbls)
layers.append((end, len(chunks)))
if callback:
callback(
msg="Cluster one layer: {} -> {}".format(
end - start, len(chunks) - end
)
)
start = end
end = len(chunks)
return chunks
|