1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
async def do_handle_task(task):
# 统计当前文档切分后的唯一 chunk 数量
chunk_count = len(set([chunk["id"] for chunk in chunks]))
# 记录任务起始时间(用于耗时统计)
start_ts = timer()
# 初始化存储结果
doc_store_result = ""
# 定义一个异步函数:删除 MinIO 中存储的图片
async def delete_image(kb_id, chunk_id):
try:
async with minio_limiter: # 控制并发,避免对 MinIO 造成过大压力
STORAGE_IMPL.delete(kb_id, chunk_id)
except Exception:
logging.exception(
"Deleting image of chunk {}/{}/{} got exception".format(task["location"], task["name"], chunk_id))
raise
# 分批处理 chunks,避免一次性插入过多数据(按 DOC_BULK_SIZE 批次)
for b in range(0, len(chunks), DOC_BULK_SIZE):
# 调用 docStoreConn.insert 将当前批次的 chunks 插入到存储/索引系统(可能是 Elasticsearch/Infinity)
doc_store_result = await trio.to_thread.run_sync(
lambda: settings.docStoreConn.insert(
chunks[b:b + DOC_BULK_SIZE],
search.index_name(task_tenant_id),
task_dataset_id
)
)
# 检查任务是否被取消
task_canceled = has_canceled(task_id)
if task_canceled:
progress_callback(-1, msg="Task has been canceled.") # 上报任务被取消
return
# 每处理 128 批,更新一次进度条(0.8 ~ 0.9 区间)
if b % 128 == 0:
progress_callback(prog=0.8 + 0.1 * (b + 1) / len(chunks), msg="")
# 如果插入失败,报错并终止任务
if doc_store_result:
error_message = f"Insert chunk error: {doc_store_result}, please check log file and Elasticsearch/Infinity status!"
progress_callback(-1, msg=error_message)
raise Exception(error_message)
# 取出当前已经插入的 chunk 的 id 列表,并拼接成字符串
chunk_ids = [chunk["id"] for chunk in chunks[:b + DOC_BULK_SIZE]]
chunk_ids_str = " ".join(chunk_ids)
try:
# 更新任务服务中的 chunk 记录
TaskService.update_chunk_ids(task["id"], chunk_ids_str)
except DoesNotExist:
# 如果任务不存在,说明 task 已被删除或异常终止
logging.warning(f"do_handle_task update_chunk_ids failed since task {task['id']} is unknown.")
# 回滚:删除刚才插入的 chunks
doc_store_result = await trio.to_thread.run_sync(
lambda: settings.docStoreConn.delete(
{"id": chunk_ids},
search.index_name(task_tenant_id),
task_dataset_id
)
)
# 同时删除 MinIO 里的相关图片
async with trio.open_nursery() as nursery:
for chunk_id in chunk_ids:
nursery.start_soon(delete_image, task_dataset_id, chunk_id)
# 上报失败
progress_callback(-1, msg=f"Chunk updates failed since task {task['id']} is unknown.")
return
# 记录日志:索引完成(包含文档名、页码范围、chunk 数量、耗时)
logging.info("Indexing doc({}), page({}-{}), chunks({}), elapsed: {:.2f}".format(
task_document_name, task_from_page, task_to_page, len(chunks), timer() - start_ts
))
# 更新文档服务中的统计信息(chunk 数量、token 数等)
DocumentService.increment_chunk_num(
task_doc_id, task_dataset_id, token_count, chunk_count, 0
)
# 计算耗时
time_cost = timer() - start_ts # 当前阶段耗时
task_time_cost = timer() - task_start_ts # 整个任务耗时
# 上报任务完成
progress_callback(
prog=1.0,
msg="Indexing done ({:.2f}s). Task done ({:.2f}s)".format(time_cost, task_time_cost)
)
# 记录最终日志
logging.info(
"Chunk doc({}), page({}-{}), chunks({}), token({}), elapsed:{:.2f}".format(
task_document_name, task_from_page, task_to_page,
len(chunks), token_count, task_time_cost
)
)
|