1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
|
# -*- coding: utf-8 -*-
# 这份文件实现了一个中英文混合分词器 RagTokenizer:
# - 通过 datrie(双数组 Trie)加载词典,支持前缀/反向前缀匹配
# - 中文:正向/反向最大匹配 + DFS 穷举 + 打分 解决歧义
# - 英文:NLTK 分词 + Lemmatize + Stem
# - 预处理:全角转半角、繁体转简体、统一小写、去掉非单词字符
# - 支持加载用户词典(可缓存为 .trie 文件以加速下次启动)
import logging # 标准库日志,用于记录信息/调试/异常
import copy # 提供 deepcopy 等深拷贝操作,DFS 时复制路径用
import datrie # 双数组 trie,支持快速前缀查询/持久化到磁盘
import math # 数学库,用于 log/exp/四舍五入等
import os # 与文件路径/存在性检查相关
import re # 正则表达式,广泛用于清洗/切分
import string # 常见字符集合(如 string.printable)
import sys # 系统接口(此处未直接使用)
from hanziconv import HanziConv # 用于繁体转简体
from nltk import word_tokenize # 英文分词
from nltk.stem import PorterStemmer, WordNetLemmatizer # 英文词干化/词形还原
from api.utils.file_utils import get_project_base_directory # 获取项目根目录(项目内工具)
class RagTokenizer:
"""
RagTokenizer:面向 RAG 的中英文混合分词器。
主要职责:
- 维护一个 datrie.Trie 作为词典容器(含正向 key 和反向 key)
- 对输入文本进行清洗(全角->半角、繁->简、小写、去非单词字符)
- 根据中/英不同语言段落选择不同分词策略
* 中文:最大正/反向匹配 -> 若冲突则 DFS 穷举所有可切分路径并打分择优
* 英文:nltk 分词 + 词形还原 + 词干化
- 支持用户词典加载/追加 + Trie 文件缓存
"""
def key_(self, line):
"""
将字符串标准化为 trie 的正向 key:
- 转为小写
- 以 UTF-8 编码得到 bytes
- 再转为形如 "b'xxx'" 的字符串并去掉前后 "b'" 和 "'",得到裸字节表示
"""
return str(line.lower().encode("utf-8"))[2:-1]
def rkey_(self, line):
"""
构造反向 key(用于后向最大匹配):
- 将字符串反转并小写
- 在前面加上前缀 "DD"(避免与正常正向 key 冲突;也可作为命名空间)
- 同样做 UTF-8 编码并去掉 "b'...'" 包装
"""
return str(("DD" + (line[::-1].lower())).encode("utf-8"))[2:-1]
def loadDict_(self, fnm):
"""
从文本词典文件加载词条到 trie,并落盘为 .trie 缓存文件。
词典每行格式约定:word<空白>freq<空白>tag
存储策略:
- 正向 key:存 (F, tag),其中 F = round(log(freq / DENOMINATOR))
- 反向 key:存一个存在性标记 1(用于 has_keys_with_prefix 的后向检查)
"""
logging.info(f"[HUQIE]:Build trie from {fnm}")
try:
of = open(fnm, "r", encoding='utf-8') # 以 UTF-8 打开词典文件
while True:
line = of.readline() # 逐行读取
if not line: # EOF 退出
break
line = re.sub(r"[\r\n]+", "", line) # 去掉换行符
line = re.split(r"[ \t]", line) # 按空格/Tab 分列 -> [word, freq, tag]
k = self.key_(line[0]) # 标准化正向 key
# 对词频做对数变换并四舍五入 -> 压缩频率范围,避免极端值影响
F = int(math.log(float(line[1]) / self.DENOMINATOR) + .5)
# 正向 key 若不存在或已有 freq 更小,则更新为更大的 freq(保留更可信的频率)
if k not in self.trie_ or self.trie_[k][0] < F:
self.trie_[self.key_(line[0])] = (F, line[2]) # 存 (对数频率, 词性/标签)
# 同时写入反向 key 标记,用于后向最大匹配时的前缀检查
self.trie_[self.rkey_(line[0])] = 1
dict_file_cache = fnm + ".trie" # 缓存文件名
logging.info(f"[HUQIE]:Build trie cache to {dict_file_cache}")
self.trie_.save(dict_file_cache) # 将 trie 持久化为二进制文件
of.close()
except Exception:
# 若有异常(如文件不存在/格式不对),打印堆栈方便排查
logging.exception(f"[HUQIE]:Build trie {fnm} failed")
def __init__(self, debug=False):
"""
初始化:
- 设置 DEBUG 开关、频率归一化分母、词典目录
- 初始化英文词干化/词形还原器
- 定义用于语言切段的分隔正则(标点/空白/英数等)
- 若存在 .trie 缓存则直接加载,否则创建空 trie 并从 .txt 词典构建
"""
self.DEBUG = debug
self.DENOMINATOR = 1000000
# 词典基本路径:<project>/rag/res/huqie
self.DIR_ = os.path.join(get_project_base_directory(), "rag/res", "huqie")
# 英文词形处理:lemmatize 归一化词形,再用 stemmer 取词干(二者组合避免漏召)
self.stemmer = PorterStemmer()
self.lemmatizer = WordNetLemmatizer()
# 语言切段/分隔正则:
# - 第一部分:各种标点/空白/中文标点
# - 第二部分:成段的英数/连字符/点号等(作为不可再细分的一段)
self.SPLIT_CHAR = r"([ ,\.<>/?;:'\[\]\\`!@#$%^&*\(\)\{\}\|_+=《》,。?、;‘’:“”【】~!¥%……()——-]+|[a-zA-Z0-9,\.-]+)"
trie_file_name = self.DIR_ + ".txt.trie" # 词典缓存文件全路径
# 优先尝试加载已有缓存,以避免每次启动都重建 Trie(性能更好)
if os.path.exists(trie_file_name):
try:
# datrie.Trie.load 会根据构建时的 alphabet 自动还原
self.trie_ = datrie.Trie.load(trie_file_name)
return # 加载成功则直接返回
except Exception:
# 若缓存损坏或版本不兼容,则记录异常并回退到空 Trie(使用 string.printable 作为 alphabet)
logging.exception(f"[HUQIE]:Fail to load trie file {trie_file_name}, build the default trie file")
self.trie_ = datrie.Trie(string.printable)
else:
# 首次运行或缓存未生成:创建空 Trie(alphabet 指定可接受的字符集;此处取 printable)
logging.info(f"[HUQIE]:Trie file {trie_file_name} not found, build the default trie file")
self.trie_ = datrie.Trie(string.printable)
# 若上面没有 return,说明需要从原始 .txt 构建 Trie,并在构建完落盘 .trie
self.loadDict_(self.DIR_ + ".txt")
def loadUserDict(self, fnm):
"""
加载用户词典:
- 优先尝试直接加载 fnm.trie(缓存)
- 若失败,则新建空 Trie 并从 fnm 文本词典构建
"""
try:
self.trie_ = datrie.Trie.load(fnm + ".trie") # 直接读缓存
return
except Exception:
# 缓存不存在或损坏,则创建空 trie 并用文本词典构建
self.trie_ = datrie.Trie(string.printable)
self.loadDict_(fnm)
def addUserDict(self, fnm):
"""
在当前 Trie 基础上追加加载新的用户词典(可多次调用叠加新词)。
"""
self.loadDict_(fnm)
def _strQ2B(self, ustring):
"""Convert full-width characters to half-width characters(全角->半角)"""
rstring = ""
for uchar in ustring:
inside_code = ord(uchar) # 获取字符的 Unicode 码位
if inside_code == 0x3000: # 全角空格特殊处理
inside_code = 0x0020
else:
inside_code -= 0xfee0 # 全角字符与半角字符间的固定偏移
# 如果转换后不在可打印 ASCII 范围,则保持原字符(避免误转)
if inside_code < 0x0020 or inside_code > 0x7e:
rstring += uchar
else:
rstring += chr(inside_code) # 否则写入转换后的半角字符
return rstring
def _tradi2simp(self, line):
"""繁体 -> 简体(借助 hanziconv)"""
return HanziConv.toSimplified(line)
def dfs_(self, chars, s, preTks, tkslist, _depth=0, _memo=None):
"""
对给定字符数组 chars 从位置 s 开始做 DFS 切分,将所有可能切分路径加入 tkslist。
参数:
- chars: list[str],字符列表
- s: int,当前搜索起点
- preTks: list[(token, (freq, tag))],到目前为止的切分序列(路径)
- tkslist: list[list[(token, (freq, tag))]],收集所有完整切分的容器
- _depth: 当前递归深度(用于设定上限防止爆栈)
- _memo: 记忆化缓存,key=(s, tuple(已有 token 序列)) -> 最远可达位置;避免重复子问题
逻辑:
- 超过 MAX_DEPTH:将剩余字符视作一个整体 token(打低分 freq=-12),收束并返回
- 若 s 已到结尾:把已有路径加入结果,返回
- 重复字符快速路径:连续相同字符(如 "——" 或 "......")合并处理,减少分支
- 主流程:尝试从 s 向后扩展,优先使用 trie 的 has_keys_with_prefix 做剪枝
- 如无法扩展(res == s),则按单字切分推进(保底)
"""
if _memo is None:
_memo = {}
MAX_DEPTH = 10
if _depth > MAX_DEPTH:
if s < len(chars):
copy_pretks = copy.deepcopy(preTks) # 复制路径以免后续污染
remaining = "".join(chars[s:]) # 将剩余部分合成一个 token
copy_pretks.append((remaining, (-12, ''))) # 频率给很低值表示退化路径
tkslist.append(copy_pretks) # 收集为一种切分
return s
# 记忆化 key:当前位置 + 路径的 token 串(只取 token 字符,不取元信息)
state_key = (s, tuple(tk[0] for tk in preTks)) if preTks else (s, None)
if state_key in _memo:
return _memo[state_key]
res = s
if s >= len(chars):
# 到达末尾,当前路径是一个完整切分,记入 tkslist
tkslist.append(preTks)
_memo[state_key] = s
return s
# 快速路径:检测是否有 >=5 个相同字符连串,若有则批量吞掉(减少递归深度)
if s < len(chars) - 4:
is_repetitive = True
char_to_check = chars[s]
for i in range(1, 5):
if s + i >= len(chars) or chars[s + i] != char_to_check:
is_repetitive = False
break
if is_repetitive:
# end 指向重复段尾,mid 取最多 10 个字符作为 token
end = s
while end < len(chars) and chars[end] == char_to_check:
end += 1
mid = s + min(10, end - s)
t = "".join(chars[s:mid]) # 重复段的前缀作为一个 token
k = self.key_(t)
copy_pretks = copy.deepcopy(preTks)
if k in self.trie_:
copy_pretks.append((t, self.trie_[k]))
else:
copy_pretks.append((t, (-12, '')))
# 传入 mid,减少递归
next_res = self.dfs_(chars, mid, copy_pretks, tkslist, _depth + 1, _memo)
res = max(res, next_res) # 记录最远推进位置
_memo[state_key] = res
return res
# S 是起始可尝试的最短终点(用于剪枝,尽量避免 1 字节步进的爆炸)
S = s + 1
if s + 2 <= len(chars):
t1 = "".join(chars[s:s + 1]) # 取 1 字符
t2 = "".join(chars[s:s + 2]) # 取 2 字符
# 如果前缀有 1 字符词前缀,但没有 2 字符前缀,则直接跳过一个字符
if self.trie_.has_keys_with_prefix(self.key_(t1)) and not self.trie_.has_keys_with_prefix(self.key_(t2)):
S = s + 2
# 降低连续 1 字符 token 的概率:若前面连续 3 个 1 字符 token 且下一个与其可前缀拼接,则 S 跳 2
if len(preTks) > 2 and len(preTks[-1][0]) == 1 and len(preTks[-2][0]) == 1 and len(preTks[-3][0]) == 1:
t1 = preTks[-1][0] + "".join(chars[s:s + 1])
if self.trie_.has_keys_with_prefix(self.key_(t1)):
S = s + 2
# 主循环:尝试从 s 向右扩展到 e,尽量走前缀存在的路径(否则 break)
for e in range(S, len(chars) + 1):
t = "".join(chars[s:e]) # 候选 token
k = self.key_(t)
if e > s + 1 and not self.trie_.has_keys_with_prefix(k):
break # 一旦前缀不存在,直接停止扩展
if k in self.trie_: # 命中词典则作为一个备选分支
pretks = copy.deepcopy(preTks)
pretks.append((t, self.trie_[k]))
# 递归继续向后搜索,并尝试更新最远推进位置
res = max(res, self.dfs_(chars, e, pretks, tkslist, _depth + 1, _memo))
# 若至少有一个分支推进了(res > s),记录记忆化并返回
if res > s:
_memo[state_key] = res
return res
# 否则退化为按单字切分(保证能前进)
t = "".join(chars[s:s + 1])
k = self.key_(t)
copy_pretks = copy.deepcopy(preTks)
if k in self.trie_:
copy_pretks.append((t, self.trie_[k]))
else:
copy_pretks.append((t, (-12, '')))
result = self.dfs_(chars, s + 1, copy_pretks, tkslist, _depth + 1, _memo)
_memo[state_key] = result
return result
def freq(self, tk):
"""
查询词频(将存储的对数频率还原为近似原频率):
- 若不在 trie,返回 0
- 在 trie:exp(F) * DENOMINATOR,四舍五入为 int
"""
k = self.key_(tk)
if k not in self.trie_:
return 0
return int(math.exp(self.trie_[k][0]) * self.DENOMINATOR + 0.5)
def tag(self, tk):
"""返回词性/标签;不在 trie 返回空串"""
k = self.key_(tk)
if k not in self.trie_:
return ""
return self.trie_[k][1]
def score_(self, tfts):
"""
对一个切分序列(含每个 token 的 (freq, tag))打分:
- F:累加所有 token 的对数频率
- L:长度>1 的 token 占比(偏好更长的词)
- 惩罚项 B=30:序列越长 B/len 越小(鼓励较少的切分)
返回:(token 列表, 分数)
"""
B = 30
F, L, tks = 0, 0, []
for tk, (freq, tag) in tfts:
F += freq
L += 0 if len(tk) < 2 else 1
tks.append(tk)
#F /= len(tks) # 如需平均频率,可打开;当前策略直接累加
L /= len(tks)
logging.debug("[SC] {} {} {} {} {}".format(tks, len(tks), L, F, B / len(tks) + L + F))
return tks, B / len(tks) + L + F
def sortTks_(self, tkslist):
"""
对多条候选切分(list of token-with-meta)打分排序,分数高的在前。
返回:[ (token_str_list, score), ... ]
"""
res = []
for tfts in tkslist:
tks, s = self.score_(tfts)
res.append((tks, s))
return sorted(res, key=lambda x: x[1], reverse=True)
def merge_(self, tks):
"""
将空格分割的 token 串做合并优化:
- 先归一连续空格
- 以最多 5 个 token 的滑窗,尝试把 [s:e] 合并为更长的 token
* 仅当合并后字符串匹配 SPLIT_CHAR(即“像一个合理片段”)
且在词典中 freq(tk) > 0 时才合并
- 返回合并后的字符串(仍以空格分割 token)
"""
res = []
tks = re.sub(r"[ ]+", " ", tks).split() # 归一空格后按空格切
s = 0
while True:
if s >= len(tks):
break
E = s + 1
# 最多尝试合并到 s+5
for e in range(s + 2, min(len(tks) + 2, s + 6)):
tk = "".join(tks[s:e]) # 尝试合并为无空格字符串
if re.search(self.SPLIT_CHAR, tk) and self.freq(tk):
E = e # 满足条件则延长可合并窗口
res.append("".join(tks[s:E])) # 将 [s:E) 合并加入结果
s = E
return " ".join(res)
def maxForward_(self, line):
"""
正向最大匹配:
- 从左到右扩展 e,直到前缀不存在再回退
- 命中词典则取 (token, meta),否则 freq=0 作为未知词
- 最终对得到的序列打分,返回 (token_list, score)
"""
res = []
s = 0
while s < len(line):
e = s + 1
t = line[s:e]
# 只要前缀存在就持续扩展
while e < len(line) and self.trie_.has_keys_with_prefix(
self.key_(t)):
e += 1
t = line[s:e]
# 回退到最后一个在词典中的位置(或单字符)
while e - 1 > s and self.key_(t) not in self.trie_:
e -= 1
t = line[s:e]
# 记录命中或未知 token
if self.key_(t) in self.trie_:
res.append((t, self.trie_[self.key_(t)]))
else:
res.append((t, (0, '')))
s = e
return self.score_(res)
def maxBackward_(self, line):
"""
反向最大匹配:
- 从右到左扩展 s,直到反向前缀不存在再回退
- 命中词典则取 (token, meta),否则 freq=0
- 返回时将 token 序列反转为从左到右,并打分
"""
res = []
s = len(line) - 1
while s >= 0:
e = s + 1
t = line[s:e]
# 使用反向 key 做前缀检查(rkey_)
while s > 0 and self.trie_.has_keys_with_prefix(self.rkey_(t)):
s -= 1
t = line[s:e]
# 回退到字典中存在的位置
while s + 1 < e and self.key_(t) not in self.trie_:
s += 1
t = line[s:e]
# 记录命中或未知 token
if self.key_(t) in self.trie_:
res.append((t, self.trie_[self.key_(t)]))
else:
res.append((t, (0, '')))
s -= 1
return self.score_(res[::-1]) # 还原为从左到右
def english_normalize_(self, tks):
"""
对英文 token 做词形归一:先 lemmatize 再 stem。
仅对匹配 [a-zA-Z_-]+ 的 token 应用,其余原样返回。
"""
return [self.stemmer.stem(self.lemmatizer.lemmatize(t)) if re.match(r"[a-zA-Z_-]+$", t) else t for t in tks]
def _split_by_lang(self, line):
"""
将输入字符串按语言块拆分为 (片段文本, 是否中文) 的列表:
- 先按 SPLIT_CHAR 切出“块”(标点/英数段会独立成块)
- 在每个块内部,再按中文/非中文的切换拆分成更细粒度的 (txt, zh_flag)
eg: line = "今天天气很好, hello world! 3.14"
arr = ['今天天气很好', ',', 'hello', 'world', '!', '3.14']
"""
txt_lang_pairs = []
arr = re.split(self.SPLIT_CHAR, line) # 使用捕获组,分隔符也会出现在结果中(保留结构)
for a in arr:
if not a:
continue
s = 0
e = s + 1
zh = is_chinese(a[s]) # 以首字符判断中文/非中文
while e < len(a):
_zh = is_chinese(a[e])
if _zh == zh:
e += 1
continue
txt_lang_pairs.append((a[s: e], zh)) # 语言状态切换处截断
s = e
e = s + 1
zh = _zh
if s >= len(a):
continue
txt_lang_pairs.append((a[s: e], zh))
return txt_lang_pairs
def tokenize(self, line):
"""
对输入字符串进行完整分词流程:
1) 清洗:去非单词字符 -> 全角转半角 -> 小写 -> 繁转简
2) 语言切段:中文/英文分别处理
- 英文:nltk.word_tokenize + lemmatize + stem
- 中文:正向/反向最大匹配;若出现不一致区间,则对该区间用 DFS 穷举并打分选最优
3) 合并策略:merge_ 对英文粘连/可合并片段进行合并
4) 返回以空格分隔的一串 token
"""
line = re.sub(r"\W+", " ", line) # 将非单词字符替换为空格(先粗清洗)
line = self._strQ2B(line).lower() # 全角->半角 + 全部小写
line = self._tradi2simp(line) # 繁体->简体
arr = self._split_by_lang(line) # 拆成若干 (文本片段, 是否中文)
res = []
for L,lang in arr:
if not lang:
# 英文/非中文路径:直接用 nltk 分词,再做词形归一
res.extend([self.stemmer.stem(self.lemmatizer.lemmatize(t)) for t in word_tokenize(L)])
continue
# 中文路径:对很短或明显是英数的,直接原样加入
if len(L) < 2 or re.match(
r"[a-z\.-]+$", L) or re.match(r"[0-9\.-]+$", L):
res.append(L)
continue
# 先做一次正向/反向最大匹配
tks, s = self.maxForward_(L)
tks1, s1 = self.maxBackward_(L)
if self.DEBUG:
logging.debug("[FW] {} {}".format(tks, s))
logging.debug("[BW] {} {}".format(tks1, s1))
# 合并两种切分:从两者共同的前缀开始,遇到不一致区间时使用 DFS 穷举并选高分方案
i, j, _i, _j = 0, 0, 0, 0
same = 0
while i + same < len(tks1) and j + same < len(tks) and tks1[i + same] == tks[j + same]:
same += 1
if same > 0:
# 两者开头的一段是相同的,先放入结果
res.append(" ".join(tks[j: j + same]))
_i = i + same
_j = j + same
j = _j + 1
i = _i + 1
# 在不一致的区间里,滑动窗口比较,直到对齐
while i < len(tks1) and j < len(tks):
tk1, tk = "".join(tks1[_i:i]), "".join(tks[_j:j])
if tk1 != tk:
# 尚未对齐:伸长较短一方以尝试对齐
if len(tk1) > len(tk):
j += 1
else:
i += 1
continue
if tks1[i] != tks[j]:
# 虽然拼接字符串一致,但单个 token 边界不同 -> 继续伸长尝试更稳定对齐
i += 1
j += 1
continue
# 到此说明 [ _i:i ) 与 [ _j:j ) 范围对应同一串字符,但 token 边界不同
# 对该不一致区间用 DFS 穷举所有切分并打分,选最优加入结果
tkslist = []
self.dfs_("".join(tks[_j:j]), 0, [], tkslist)
res.append(" ".join(self.sortTks_(tkslist)[0][0]))
# 之后再次跳过两者新的相同前缀
same = 1
while i + same < len(tks1) and j + same < len(tks) and tks1[i + same] == tks[j + same]:
same += 1
res.append(" ".join(tks[j: j + same]))
_i = i + same
_j = j + same
j = _j + 1
i = _i + 1
# _i 表示已经达成匹配的开始点
# 若还有尾部未处理(两者从 _i/_j 起应等长且字符相同),对尾部再做一次 DFS 优化
if _i < len(tks1):
assert _j < len(tks)
assert "".join(tks1[_i:]) == "".join(tks[_j:])
tkslist = []
self.dfs_("".join(tks[_j:]), 0, [], tkslist)
res.append(" ".join(self.sortTks_(tkslist)[0][0]))
res = " ".join(res) # 将所有片段拼成空格分隔的 token 串
logging.debug("[TKS] {}".format(self.merge_(res)))
return self.merge_(res) # 最后做一次合并优化并返回
def fine_grained_tokenize(self, tks):
"""
对 tokenize 的结果再做细粒度处理:
- 若中文 token 占比较低(<20%),则对英文 token 进一步以 "/" 切分子词
- 对中文 token:
* 短长度(<3)或纯数字直接保留
* 否则用 DFS 尝试更细切分(长度>10 的直接视为整体,避免爆炸)
- 最终对英文做 english_normalize_(lemma+stem)
"""
tks = tks.split()
zh_num = len([1 for c in tks if c and is_chinese(c[0])])
if zh_num < len(tks) * 0.2:
# 英文占多:把包含 '/' 的 token 进一步拆开
res = []
for tk in tks:
res.extend(tk.split("/"))
return " ".join(res)
res = []
for tk in tks:
if len(tk) < 3 or re.match(r"[0-9,\.-]+$", tk):
# 短 token 或数字:直接保留
res.append(tk)
continue
tkslist = []
if len(tk) > 10:
# 很长的 token 不 DFS(控制复杂度),直接保留原样
tkslist.append(tk)
else:
# 对较短的中文 token 用 DFS 尝试更细粒度切分
self.dfs_(tk, 0, [], tkslist)
if len(tkslist) < 2:
# 没有产生可对比的候选:原样保留
res.append(tk)
continue
# 选第 2 高分(索引 1)的方案(实现者主观策略,避免过度合并)
stk = self.sortTks_(tkslist)[1][0]
if len(stk) == len(tk):
# 若切分后长度等于原长度(本质没切开),退回原 token
stk = tk
else:
if re.match(r"[a-z\.-]+$", tk):
# 英文 token:若有短子词(<3),则退回原 token;否则以空格连接子词
for t in stk:
if len(t) < 3:
stk = tk
break
else:
stk = " ".join(stk)
else:
# 中文 token:子词直接用空格连接
stk = " ".join(stk)
res.append(stk)
# 最后对英文 token 做词形归一(lemma+stem)
return " ".join(self.english_normalize_(res))
def is_chinese(s):
"""
判断单个字符是否是常用 CJK 汉字区间(\u4e00-\u9fa5)。
注意:这不是严格意义上所有中文字符的全集,仅覆盖常见汉字。
"""
if s >= u'\u4e00' and s <= u'\u9fa5':
return True
else:
return False
def is_number(s):
"""判断单个字符是否是阿拉伯数字(0-9,对应 \u0030-\u0039)"""
if s >= u'\u0030' and s <= u'\u0039':
return True
else:
return False
def is_alphabet(s):
"""判断单个字符是否是英文字母(A-Z 或 a-z)"""
if (s >= u'\u0041' and s <= u'\u005a') or (
s >= u'\u0061' and s <= u'\u007a'):
return True
else:
return False
def naiveQie(txt):
"""
朴素切词器:
- 先按空格切为若干片段
- 若相邻两个片段都以英文字母结尾/开头(实际上是“末 token 末尾是字母 + 当前 token 末尾是字母”的判断),
在它们之间插入一个独立的空格 token,确保展示时英文之间有空格
- 返回一个包含字符串和空格 token 的列表(如 ["Deep", " ", "Learning"])
"""
tks = []
for t in txt.split():
# 如果上一个 token 和当前 token 都以英文字母结尾,则在它们之间插入空格 token
if tks and re.match(r".*[a-zA-Z]$", tks[-1]
) and re.match(r".*[a-zA-Z]$", t):
tks.append(" ")
tks.append(t)
return tks
|