create_pcl_match_dictionary.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. import os
  2. from sympy.solvers.diophantine.diophantine import equivalent
  3. script_dir = os.path.dirname(os.path.abspath(__file__))
  4. import sys
  5. sys.path.append(script_dir)
  6. import pandas as pd
  7. import jieba
  8. import jieba.posseg as pseg
  9. import re
  10. import numpy as np
  11. import json
  12. import textdistance
  13. import faiss
  14. from remote_model import RemoteBGEModel
  15. class PLCMatch:
  16. """通过关键词+语义相似度的方式,从用户输入中匹配PLC点位"""
  17. def __init__(self, project_id:int):
  18. # 水厂id
  19. self.project_id = str(project_id)
  20. # 路径
  21. self.script_dir = os.path.dirname(os.path.abspath(__file__)) # 脚本绝对路径
  22. # 水厂的词典根路径
  23. self.plc_dict_root_dir = os.path.join(self.script_dir, f'plc_dictionary/{self.project_id}_plc_dictionary')
  24. # 读取pcl点位文件,生成name-code映射字典
  25. self.name_2_code_dict = self.__read_pcl()
  26. # 加载用户自定义词典,添加到jieba词库
  27. user_dictionary_dir = os.path.join(self.script_dir, 'user_maintain_dictionary', 'jieba_words')
  28. user_dict_list = [os.path.join(user_dictionary_dir, _) for _ in os.listdir(user_dictionary_dir) if _.split('.')[-1] == 'txt'] # 用户词典
  29. self.user_dict_list = user_dict_list
  30. self.__load_user_dict()
  31. # 生成二级字典
  32. self.dict_level_2 = self.__make_level_two_dictionary()
  33. # 生成一级字典
  34. self.dict_level_1 = self.__make_level_one_dictionary()
  35. # 等价词映射表
  36. self.equivalent_wordmap_txt = os.path.join(self.script_dir,'user_maintain_dictionary','equivalent_words', 'equivalent_wordmap.txt')
  37. self.dict_equivalent_wordmap = self.__construct_equivalent_wordmap()
  38. # 生成知识库,PLC点位数据库中文字段
  39. # 加载bge-m3和bge-reranker远程模型
  40. self.plc_database_name_template_list = list(self.name_2_code_dict.keys())
  41. self.model = RemoteBGEModel('dev')
  42. self.knowledge = self.__load_faiss_database()
  43. def __load_faiss_database(self):
  44. """从本地加载向量数据库"""
  45. # 水厂的数据库字段知识库
  46. faiss_path = os.path.join(self.plc_dict_root_dir, f'{self.project_id}_knowledge.faiss')
  47. # 尝试从本地加载
  48. if os.path.exists(faiss_path):
  49. print('PLC点位查询功能从本地加载点位字段向量知识库...')
  50. return faiss.read_index(faiss_path)
  51. # 如果不存在就尝试重新创建
  52. # 首先,我们需要拿到数据库的点位名称,可以直接从name-code映射字典当中获取
  53. plc_database_name_template_list = self.plc_database_name_template_list
  54. # 调用远程embedding模型,one by one 地处理,远程模型通过配置参数进行归一化
  55. embeddings = [self.model.encode([temp], normalize=True)[0] for temp in plc_database_name_template_list]
  56. for _ in embeddings:
  57. if _ is None:
  58. raise RuntimeError('为plc数据库中文字段构建向量数据库时发生异常,embeddings不能存在None')
  59. # 要求embeddings是一个二维矩阵,类型为float32
  60. embeddings = np.array(embeddings, dtype=np.float32)
  61. # 创建 FAISS 索引
  62. dimension = embeddings[0].shape[0]
  63. local_faiss = faiss.IndexFlatIP(dimension) # 建立内积索引
  64. local_faiss.add(embeddings) # 添加索引
  65. # 保存未来使用
  66. faiss.write_index(local_faiss, faiss_path)
  67. return local_faiss
  68. def __read_pcl(self):
  69. """
  70. 读取pcl文件,生成name2code词典
  71. :return:
  72. """
  73. # name-code映射词典路径
  74. dict_name2code_path = os.path.join(self.plc_dict_root_dir, f'{self.project_id}_dict_name_2_code.json')
  75. # 尝试从本地加载name-code映射字典
  76. if os.path.exists(dict_name2code_path):
  77. with open(dict_name2code_path, 'r', encoding='utf-8') as f:
  78. dict_name2code = json.load(f)
  79. return dict_name2code
  80. # 如果本地没有就重新生成
  81. # 检查点位文件是否存在
  82. pcl_file_path = os.path.join(self.plc_dict_root_dir, f'{self.project_id}_点位.xlsx') # 点位文件路径
  83. if not os.path.exists(pcl_file_path):
  84. raise FileNotFoundError(f'{pcl_file_path} does not exist')
  85. # 读点位
  86. points = pd.read_excel(pcl_file_path)
  87. # 列名称,name | code
  88. column_label_alias, column_label_code = points.columns.tolist()
  89. # 中英文匹配
  90. names = points.loc[:, column_label_alias].to_numpy()
  91. codes = points.loc[:, column_label_code].to_numpy()
  92. # 对齐命名规范, 按照中荷水厂命名风格,将1#UF或1#RO统一替换为UF1,RO1,将所有反渗透文字替换为RO,所有超滤文字替换为UF
  93. names = [s.replace('超滤','UF').replace('反渗透','RO') for s in names]
  94. names = [self.field_align(s) for s in names]
  95. # 名到英文的字典
  96. dict_name2code = dict(zip(names, codes))
  97. # name-code映射字典保存到本地文件
  98. with open(dict_name2code_path, 'w', encoding='utf-8') as f:
  99. json.dump(dict_name2code, f, ensure_ascii=False)
  100. return dict_name2code
  101. def __load_user_dict(self):
  102. """加载用户词典,添加到jieba词库"""
  103. # 删除
  104. jieba.del_word('反渗透')
  105. jieba.del_word('超滤')
  106. for user_dict_txt in self.user_dict_list:
  107. # 检查文件是否存在
  108. if not os.path.exists(user_dict_txt):
  109. raise FileNotFoundError(f'{user_dict_txt} does not exist')
  110. # 检查文件后缀名是否合法
  111. if os.path.splitext(user_dict_txt)[1] != '.txt':
  112. continue
  113. # 分词库加载用户字典
  114. jieba.load_userdict(user_dict_txt)
  115. def __construct_equivalent_wordmap(self):
  116. """构建等价词汇映射表,等价词汇的使用方式是将备查词的所有等效说法都纳入备查序列,从而保证了搜索的高召回率"""
  117. # 检查文件是否存在
  118. equivalent_wordmap_path = os.path.join(self.script_dir, 'user_maintain_dictionary','equivalent_words', 'dict_equivalent_wordmap.json')
  119. if os.path.exists(equivalent_wordmap_path):
  120. with open(equivalent_wordmap_path, 'r', encoding='utf-8') as f:
  121. equivalent_wordmap = json.load(f)
  122. return equivalent_wordmap
  123. # 如果本地不存在等价词典json文件,那么就尝试创建
  124. if not os.path.exists(self.equivalent_wordmap_txt):
  125. raise FileNotFoundError(f'{self.equivalent_wordmap_txt} does not exist')
  126. with open(self.equivalent_wordmap_txt, 'r', encoding='utf-8') as f:
  127. all_lines = [_.strip() for _ in f.readlines()]
  128. # 创建等价词汇映射表
  129. dict_equi_wordmap = {}
  130. for line in all_lines:
  131. split_list = line.split('=')
  132. for i in range(len(split_list)):
  133. dict_equi_wordmap[split_list[i]] = split_list
  134. with open(equivalent_wordmap_path, 'w', encoding='utf-8') as f:
  135. json.dump(dict_equi_wordmap,f,ensure_ascii=False)
  136. return dict_equi_wordmap
  137. def __make_level_two_dictionary(self):
  138. """创建二级字典,对点位所有字段进行正则匹配中文,将中文一样的字段聚合为同一个字典键值对,键为正则提取的中文字符"""
  139. group_dict = {}
  140. # 尝试从本地加载二级字典
  141. dict_level2_dict_path = os.path.join(self.plc_dict_root_dir, f'{self.project_id}_dict_level_2.json')
  142. if os.path.exists(dict_level2_dict_path):
  143. with open(dict_level2_dict_path, 'r', encoding='utf-8') as f:
  144. group_dict = json.load(f)
  145. return group_dict
  146. if self.name_2_code_dict is None:
  147. raise ValueError(f'name_2_code_dict is None', self.name_2_code_dict)
  148. data = self.name_2_code_dict.keys()
  149. # 创建二级字典
  150. for item in data:
  151. k = re.sub(r'[^\u4e00-\u9fa5]', '', item)
  152. # 处理没有汉字的字段
  153. if k == '':
  154. k = "无"
  155. if k not in group_dict.keys():
  156. group_dict[k] = [item]
  157. else:
  158. group_dict[k].append(item)
  159. # 保存二级字典到本地
  160. with open(dict_level2_dict_path, 'w', encoding='utf-8') as f:
  161. json.dump(group_dict, f, ensure_ascii=False)
  162. return group_dict
  163. @staticmethod
  164. def cut_compair(arr_a: str, arr_b: str, condition='nz') -> str:
  165. """
  166. :param condition: 词性
  167. :param arr_a:
  168. :param arr_b:
  169. :return: 第一个相同nz词
  170. """
  171. # a: w1,f1 w2,f2 w3, f3
  172. # b: w1,f1 w2,f2 w3, f3
  173. cut_arr_a = [list(_) for _ in pseg.lcut(arr_a)]
  174. cut_arr_b = [list(_) for _ in pseg.lcut(arr_b)]
  175. for i in range(len(cut_arr_a)):
  176. for j in range(i, len(cut_arr_b)):
  177. # 只比较nz词性
  178. if cut_arr_a[i][1] != condition or cut_arr_b[j][1] != condition:
  179. continue
  180. if cut_arr_a[i][0] == cut_arr_b[j][0] and cut_arr_a[i][1] == cut_arr_b[j][1]:
  181. return cut_arr_a[i][0]
  182. return ''
  183. def __make_level_one_dictionary(self):
  184. """创建一级字典"""
  185. group_dict = {} # 存放二次分组的结果
  186. # 尝试从本地加载一级字典
  187. dict_level_1_path = os.path.join(self.plc_dict_root_dir, f'{self.project_id}_dict_level_1.json')
  188. if os.path.exists(dict_level_1_path):
  189. with open(dict_level_1_path, 'r', encoding='utf-8') as f:
  190. group_dict = json.load(f)
  191. return group_dict
  192. if self.dict_level_2.keys() is None:
  193. raise ValueError(f'dict_lev2 is None', self.dict_level_2)
  194. # 提取二级字典的所有key
  195. data = self.dict_level_2.keys()
  196. # 如果不存在就重新生成一级字典
  197. # 根据用户词典进行分词,筛选出所有带nz词的字段
  198. no_nz_list = [] # 没有nz词的字段
  199. nz_list = [] # 有nz词的字段
  200. for item in data:
  201. # 判断是否存在nz名词
  202. is_exist_n = False
  203. for w, f in pseg.lcut(item):
  204. if f == 'nz': # 查看词性
  205. is_exist_n = True
  206. break
  207. if is_exist_n: # 存在词
  208. nz_list.append(item)
  209. else: # 不存在nz词
  210. no_nz_list.append(item)
  211. # 聚合具有相同nz名词的字段
  212. while len(nz_list) > 0:
  213. pos = [1 for _ in range(len(nz_list))] # 0表示不被取,1表示需要被取,默认都要被取,用来更新nz_list给下次判断使用
  214. pos[0] = 0 # 标记第一个单词为不需要处理
  215. for i in range(len(nz_list)):
  216. # 查看是否存在相同的nz词
  217. same_nz_word = self.cut_compair(nz_list[0], nz_list[i])
  218. if same_nz_word:
  219. # 执行聚合
  220. if same_nz_word not in group_dict.keys():
  221. # 首次聚合,与自身比较,创建自身类别
  222. group_dict[same_nz_word] = [nz_list[i]]
  223. else:
  224. group_dict[same_nz_word].append(nz_list[i])
  225. pos[i] = 0
  226. # 处理完一趟就要变更nz_list
  227. nz_list = np.array(nz_list)[np.array(pos, dtype=np.bool)].tolist()
  228. # 聚合不包含nz的名词, 单独占一个类别
  229. for item in no_nz_list:
  230. group_dict[item] = [item]
  231. with open(dict_level_1_path, 'w', encoding='utf-8') as f:
  232. json.dump(group_dict, f, ensure_ascii=False)
  233. return group_dict
  234. @staticmethod
  235. def field_align(input_str:str)->str:
  236. """按照锡山中荷命名规范对齐字段,1#UF替换为UF1,1#RO替换为RO1,保持统一"""
  237. sources_uf = re.findall(r'\d+#UF', input_str, re.IGNORECASE) # 匹配1#UF
  238. sources_ro = re.findall(r'\d+#RO', input_str, re.IGNORECASE) # 匹配1#RO
  239. sources = sources_uf + sources_ro
  240. for sou in sources:
  241. number_, flag_ = sou.split('#')
  242. input_str = input_str.replace(sou, flag_.upper() + number_) # 统一转为大写
  243. return input_str
  244. @ staticmethod
  245. def quicksort_up_part(arr:list, start:int, end:int)-> int:
  246. """升序排序"""
  247. # 双指针
  248. low = start
  249. high = end
  250. pivot = arr[start][1] # 基准值
  251. # 大数放在基准值右边,小数放在基准值左边
  252. while low < high:
  253. # 先从右向左找比基准值小的
  254. while low< high and arr[high][1] >= pivot:
  255. high -= 1
  256. # 此时high指向值小于基准值,交换
  257. if low < high:
  258. arr[low], arr[high] = arr[high], arr[low]
  259. low +=1
  260. # 现在开始从左向右找,比基准值大的数
  261. while low < high and arr[low][1] <= pivot:
  262. low += 1
  263. # 此时low指向值大于基准值,交换
  264. if low < high:
  265. arr[high], arr[low] = arr[low], arr[high]
  266. high -= 1
  267. return low
  268. def quicksort_up(self, arr:list, start:int, end:int):
  269. """按照元组第二个元素值大小进行升序排序"""
  270. if start >= end:
  271. return
  272. # 先排一次获得基准值位置
  273. mid = self.quicksort_up_part(arr, start, end)
  274. # 排左面
  275. self.quicksort_up(arr, start, mid - 1)
  276. # 排右面
  277. self.quicksort_up(arr, mid + 1, end)
  278. def words_similarity_score_sorted(self, query:str, candidates:list)->list:
  279. """计算输入语句与候选词的相似度并按照相似度分值进行排序"""
  280. # 选择算法(示例使用Levenshtein,归一化到0-1)
  281. candidates = candidates.copy()
  282. jarowinkler = textdistance.JaroWinkler()
  283. key_score_list = [(candidate, jarowinkler.normalized_similarity(query, candidate)) for candidate in candidates]
  284. self.quicksort_up(key_score_list, 0, len(key_score_list) - 1) # 升序排序
  285. key_sorted_list = [tuple_element[0] for tuple_element in key_score_list] # 取出key
  286. key_sorted_list = key_sorted_list[::-1] # 反转,变为降序
  287. return key_sorted_list
  288. def words_similarity_score_sorted_v2(self, query:str, candidates:list)->list:
  289. """通过rerank的方式为候选词进行相似度排序"""
  290. # 调用远程reranker模型
  291. n = len(candidates) # 候选词数量
  292. group_query = [(query, i) for i in candidates]
  293. score = self.model.compute_score(group_query)
  294. key_score_list = [(candidates[i], score[i]) for i in range(n)]
  295. self.quicksort_up(key_score_list, 0, len(key_score_list) - 1) # 升序排序
  296. key_sorted_list = [tuple_element[0] for tuple_element in key_score_list] # 取出key
  297. key_sorted_list = key_sorted_list[::-1] # 反转,变为降序
  298. return key_sorted_list
  299. def match_v2_on(self, promt: str,is_agent:bool=False):
  300. """
  301. 模糊匹配v2
  302. :param is_agent:
  303. :param promt:
  304. :return:
  305. """
  306. print("=" * 50)
  307. # 命名风格转换
  308. print("原始查询:", promt)
  309. promt = promt.replace('超滤', 'UF').replace('反渗透', 'RO').replace('号', '#').replace('组', '#')
  310. promt = self.field_align(promt)
  311. print("转换查询:", promt)
  312. # 输入分词
  313. nz_words = []
  314. for w, f in pseg.lcut(promt):
  315. print(f'{w}({f})', end="")
  316. if f == 'nz':
  317. nz_words.append(w)
  318. print('\n备查nz词:', nz_words)
  319. # 处理专有名词的等价词,为了保证高召回率,我们将备查词的所有等价说法都放入备查序列
  320. equivalent_words = []
  321. for nz_idx, nz in enumerate(nz_words):
  322. # 首先判断nz词是否在等价词汇表中,如果不在根本无法替换
  323. if nz in self.dict_equivalent_wordmap.keys():
  324. # 然后把等价的说法都添加进去就好了
  325. equivalent_words = self.dict_equivalent_wordmap.get(nz, [])
  326. if equivalent_words:
  327. nz_words += equivalent_words
  328. nz_words = list(set(nz_words))
  329. print('等价备查nz词:', nz_words)
  330. del equivalent_words
  331. # 进行一级查询,根据nz词是否包含于词典
  332. query_level_one = []
  333. for i in range(len(nz_words)): # 为第i个nz词进行初次匹配
  334. result = []
  335. # 如果nz词包含在一级词典中就算匹配成功
  336. for dict_level_1_key in self.dict_level_1.keys():
  337. if nz_words[i] in dict_level_1_key: # 如果nz词包含在一级词典内
  338. result+= self.dict_level_1.get(dict_level_1_key)
  339. query_level_one.append(result) # 放入一级查询结果中
  340. # 进行二级查询
  341. query_level_two = []
  342. for idx_nz, i_nz_query_result in enumerate(query_level_one): # 遍历每个nz词的查询结果
  343. result = [] # 为第i个nz词进行二次匹配
  344. # 如果第i个nz词一级查询不为空
  345. if i_nz_query_result: # 第i个nz词的查询结果list
  346. for res_word_level_one in i_nz_query_result:
  347. if res_word_level_one in self.dict_level_2.keys():
  348. result += self.dict_level_2.get(res_word_level_one) # self.dict_level_2的value本身就是字典,所以用+=拼接
  349. # 虽然一级查询失败,但是并不意味着映射词典里没有,因为一级词典忽略英文。
  350. else: # 如果一级查询失败,就直接在name2code字典中查询
  351. if nz_words[idx_nz] in self.name_2_code_dict.keys():# 如果第i个nz词在2级词典,就直接添加到结果中
  352. result.append(nz_words[idx_nz])
  353. # 如果第i个nz词的一级查询结果为空,则添加空列表占位
  354. query_level_two.append(result)
  355. # 常规精确匹配结束,如果匹配成功,结构为二维列表,否则为空列表
  356. matched_keys = query_level_two # 获取已匹配的字段
  357. # 备查词合并,我们约定所有备查词进行统一的查询,后面怎么用这些结果取决于外部的应用,对于agent模式,将会输出许多结果,对月非agent只会输出概率最高的结果
  358. tem_matched_keys = []
  359. for item in matched_keys:
  360. tem_matched_keys += item
  361. matched_keys = [list(set(tem_matched_keys))]
  362. del tem_matched_keys
  363. # 如果精确匹配失败,没有匹配到任何结果则按照语义进行模糊匹配,返回满足条件的置信度最高的结果
  364. # if not nz_words or ([] in matched_keys):
  365. # 比起手动维护词典,我们更相信语义相似度
  366. top_k = 5
  367. confi = 0.2 # 置信度阈值
  368. print(f'进入模糊匹配,召回Top:{top_k} 置信度阈值:{confi}...')
  369. # 调用远程bge-m3模型进行embedding
  370. query_embedding = np.array(self.model.encode([promt], normalize=True), dtype=np.float32) # 要求query_embedding是一个二维矩阵,形状为(1, 1024)
  371. distances, indices = self.knowledge.search(query_embedding, top_k)
  372. group_query = [(promt, self.plc_database_name_template_list[indices[0][i]]) for i in range(top_k)]
  373. # 我们更愿意相信bge,因此把词典关键词匹配的结果一并放进去重排序
  374. group_query_manuel = [(promt, k) for keys in matched_keys for k in keys]
  375. group_query += group_query_manuel
  376. del group_query_manuel
  377. group_query = list(set(group_query)) # 去重
  378. # 调用远程bge-reranker模型
  379. score = self.model.compute_score(group_query)
  380. rerank_result = sorted([(group_query[i][1], score[i]) for i in range(len(group_query))], key=lambda x: x[1], reverse=True)
  381. print(F'打印前top{top_k}候选词结果:', rerank_result[:top_k])
  382. print(f'首元素模糊匹配到{rerank_result[0][0]}, 置信度为{rerank_result[0][1]}')
  383. # matched_keys 为最终结果,保持形状为二维列表
  384. matched_keys = [[i[0] for i in rerank_result]]
  385. # 每个匹配结果的置信度
  386. matched_keys_score = [[i[1] for i in rerank_result]]
  387. # 为结果创建映射字典
  388. result_list = []
  389. for i_nz_keys in matched_keys:
  390. result_list.append([{key: self.name_2_code_dict.get(key)} for key in i_nz_keys])
  391. print(f"查询到{len([_ for _ in result_list if _])}个结果:")
  392. if not is_agent:
  393. # 非agent模式每个匹配结果只取第一个元素的英文
  394. tem_list = []
  395. for res in result_list:
  396. if res:
  397. for k, v in res[0].items(): # 每个nz词的查询结果都是一个list,每个list可能包含多个字典
  398. tem_list.append(f'{k}:{v}')
  399. result_list = tem_list
  400. print('以非agent模式返回:', result_list)
  401. return result_list
  402. print('以agent模式返回:', result_list)
  403. print('='*50)
  404. return result_list, matched_keys_score
  405. if __name__ == '__main__':
  406. pj = 92 # pcl点位
  407. pcl_helper = PLCMatch(project_id=pj)
  408. # 用户输入
  409. my_promt = "我想要查询锡山中荷进水电导率"
  410. # query_res = pcl_helper.match_v2_on(my_promt, is_agent=True)
  411. query_res = pcl_helper.match_v2_on(my_promt, is_agent=False)
  412. pass