plclib.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. import os
  2. script_dir = os.path.dirname(os.path.abspath(__file__))
  3. import sys
  4. sys.path.append(script_dir)
  5. import jieba
  6. import jieba.posseg as pseg
  7. import re
  8. import os
  9. import json
  10. import textdistance
  11. import warnings
  12. import numpy as np
  13. import faiss
  14. from bge.remote_model import RemoteBGEModel
  15. class PLCLib:
  16. def __init__(self):
  17. """缓存待实现"""
  18. self.project_id = None
  19. self.plc_dict_root_dir = None
  20. self.name_2_code_dict = None
  21. self.plc_database_name_template_list = None
  22. self.dict_equivalent_wordmap = None
  23. self.dict_level_2 = None
  24. self.dict_level_1 = None
  25. self.user_dict_list = None
  26. self.knowledge = None
  27. # 加载bge-m3和bge-reranker远程模型
  28. self.model = RemoteBGEModel('dev')
  29. # 加载用户自定义词典,添加到jieba词库, 不依赖水厂id
  30. self.script_dir = os.path.dirname(os.path.abspath(__file__)) # 脚本绝对路径
  31. user_dictionary_dir = os.path.join(self.script_dir, 'user_maintain_dictionary', 'jieba_words')
  32. if not os.path.exists(user_dictionary_dir):
  33. warnings.warn(f'用户分词词典不存在,严重影响匹配成功率,请检查路径{user_dictionary_dir}是否存在!', UserWarning)
  34. else:
  35. self.user_dict_list = [os.path.join(user_dictionary_dir, _) for _ in os.listdir(user_dictionary_dir) if _.split('.')[-1] == 'txt'] # 用户词典
  36. self.__load_user_dict()
  37. def load(self, project_id):
  38. """加载词典"""
  39. self.project_id = project_id
  40. self.plc_dict_root_dir = os.path.join(self.script_dir, 'plc_dictionary',f'{self.project_id}_plc_dictionary')
  41. # 加载name2code
  42. self.name_2_code_dict = self.__read_pcl()
  43. self.plc_database_name_template_list = list(self.name_2_code_dict.keys())
  44. # 加载等价词表
  45. self.dict_equivalent_wordmap = self.__construct_equivalent_wordmap()
  46. # 加载二级词典
  47. self.dict_level_2 =self.__make_level_two_dictionary()
  48. # 加载一级词典
  49. self.dict_level_1 = self.__make_level_one_dictionary()
  50. # 加载本地知识库
  51. self.knowledge = self.__load_faiss_database()
  52. def __load_faiss_database(self):
  53. """从本地加载向量数据库"""
  54. # 水厂的数据库字段知识库
  55. faiss_path = os.path.join(self.plc_dict_root_dir, f'{self.project_id}_knowledge.faiss')
  56. # 尝试从本地加载
  57. if os.path.exists(faiss_path):
  58. # print('PLC点位查询功能从本地加载点位字段向量知识库...')
  59. local_faiss = faiss.read_index(faiss_path)
  60. else:
  61. raise FileNotFoundError('file not found!', faiss_path)
  62. return local_faiss
  63. @staticmethod
  64. def field_align(input_str:str)->str:
  65. """按照锡山中荷命名规范对齐字段,1#UF替换为UF1,1#RO替换为RO1,保持统一"""
  66. sources_uf = re.findall(r'\d+#UF', input_str, re.IGNORECASE) # 匹配1#UF
  67. sources_ro = re.findall(r'\d+#RO', input_str, re.IGNORECASE) # 匹配1#RO
  68. sources = sources_uf + sources_ro
  69. for sou in sources:
  70. number_, flag_ = sou.split('#')
  71. input_str = input_str.replace(sou, flag_.upper() + number_) # 统一转为大写
  72. return input_str
  73. def __construct_equivalent_wordmap(self):
  74. """构建等价词汇映射表"""
  75. # 检查文件是否存在
  76. equivalent_wordmap_path = os.path.join(self.script_dir, 'user_maintain_dictionary','equivalent_words', 'dict_equivalent_wordmap.json')
  77. if os.path.exists(equivalent_wordmap_path):
  78. with open(equivalent_wordmap_path, 'r', encoding='utf-8') as f:
  79. equivalent_wordmap = json.load(f)
  80. else:
  81. raise FileNotFoundError('file not found!', equivalent_wordmap_path)
  82. return equivalent_wordmap
  83. def __make_level_one_dictionary(self):
  84. """创建一级字典"""
  85. # 尝试从本地加载一级字典
  86. dict_level_1_path = os.path.join(self.plc_dict_root_dir, f'{self.project_id}_dict_level_1.json')
  87. if os.path.exists(dict_level_1_path):
  88. with open(dict_level_1_path, 'r', encoding='utf-8') as f:
  89. group_dict = json.load(f)
  90. else:
  91. raise FileNotFoundError('file not found!', dict_level_1_path)
  92. return group_dict
  93. def __make_level_two_dictionary(self):
  94. """创建二级字典,对点位所有字段进行正则匹配中文,将中文一样的字段聚合为同一个字典键值对,键为正则提取的中文字符"""
  95. # 尝试从本地加载二级字典
  96. dict_level2_dict_path = os.path.join(self.plc_dict_root_dir, f'{self.project_id}_dict_level_2.json')
  97. if os.path.exists(dict_level2_dict_path):
  98. with open(dict_level2_dict_path, 'r', encoding='utf-8') as f:
  99. group_dict = json.load(f)
  100. else:
  101. raise FileNotFoundError('file not found!', dict_level2_dict_path)
  102. return group_dict
  103. def __read_pcl(self):
  104. """
  105. 读取pcl文件,生成name2code词典
  106. :return:
  107. """
  108. # 尝试从本地加载name-code映射字典
  109. dict_name2code_path = os.path.join(self.plc_dict_root_dir, f'{self.project_id}_dict_name_2_code.json')
  110. if os.path.exists(dict_name2code_path):
  111. with open(dict_name2code_path, 'r', encoding='utf-8') as f:
  112. dict_name2code = json.load(f)
  113. else:
  114. raise FileNotFoundError('file not found!', dict_name2code_path)
  115. return dict_name2code
  116. def __load_user_dict(self):
  117. """加载用户词典,添加到jieba词库"""
  118. # 删除
  119. jieba.del_word('反渗透')
  120. jieba.del_word('超滤')
  121. for user_dict_txt in self.user_dict_list:
  122. # # 检查文件是否存在
  123. # if not os.path.exists(user_dict_txt):
  124. # raise FileNotFoundError(f'{user_dict_txt} does not exist')
  125. # # 检查文件后缀名是否合法
  126. # if os.path.splitext(user_dict_txt)[1] != '.txt':
  127. # continue
  128. # 分词库加载用户字典
  129. jieba.load_userdict(user_dict_txt)
  130. @ staticmethod
  131. def quicksort_up_part(arr:list, start:int, end:int)-> int:
  132. """升序排序"""
  133. # 双指针
  134. low = start
  135. high = end
  136. pivot = arr[start][1] # 基准值
  137. # 大数放在基准值右边,小数放在基准值左边
  138. while low < high:
  139. # 先从右向左找比基准值小的
  140. while low< high and arr[high][1] >= pivot:
  141. high -= 1
  142. # 此时high指向值小于基准值,交换
  143. if low < high:
  144. arr[low], arr[high] = arr[high], arr[low]
  145. low +=1
  146. # 现在开始从左向右找,比基准值大的数
  147. while low < high and arr[low][1] <= pivot:
  148. low += 1
  149. # 此时low指向值大于基准值,交换
  150. if low < high:
  151. arr[high], arr[low] = arr[low], arr[high]
  152. high -= 1
  153. return low
  154. def quicksort_up(self, arr:list, start:int, end:int):
  155. """按照元组第二个元素值大小进行升序排序"""
  156. if start >= end:
  157. return
  158. # 先排一次获得基准值位置
  159. mid = self.quicksort_up_part(arr, start, end)
  160. # 排左面
  161. self.quicksort_up(arr, start, mid - 1)
  162. # 排右面
  163. self.quicksort_up(arr, mid + 1, end)
  164. def words_similarity_score_sorted(self, query:str, candidates:list)->list:
  165. """计算输入语句与候选词的相似度并按照相似度分值进行排序"""
  166. # 选择算法(示例使用Levenshtein,归一化到0-1)
  167. candidates = candidates.copy()
  168. jarowinkler = textdistance.JaroWinkler()
  169. key_score_list = [(candidate, jarowinkler.normalized_similarity(query, candidate)) for candidate in candidates]
  170. self.quicksort_up(key_score_list, 0, len(key_score_list) - 1) # 升序排序
  171. key_sorted_list = [tuple_element[0] for tuple_element in key_score_list] # 取出key
  172. key_sorted_list = key_sorted_list[::-1] # 反转,变为降序
  173. return key_sorted_list
  174. def words_similarity_score_sorted_v2(self, query:str, candidates:list)->list:
  175. """通过rerank的方式为候选词进行相似度排序"""
  176. # 调用远程reranker模型
  177. n = len(candidates) # 候选词数量
  178. group_query = [(query, i) for i in candidates]
  179. score = self.model.compute_score(group_query)
  180. key_score_list = [(candidates[i], score[i]) for i in range(n)]
  181. self.quicksort_up(key_score_list, 0, len(key_score_list) - 1) # 升序排序
  182. key_sorted_list = [tuple_element[0] for tuple_element in key_score_list] # 取出key
  183. key_sorted_list = key_sorted_list[::-1] # 反转,变为降序
  184. return key_sorted_list
  185. def query(self, promt, is_agent:bool=False):
  186. """直接拷贝PLCMatch_match_v2_on函数"""
  187. """
  188. 模糊匹配v2
  189. :param is_agent:
  190. :param promt:
  191. :return:
  192. """
  193. # print("=" * 50)
  194. # 命名风格转换
  195. # print("原始查询:", promt)
  196. promt = promt.replace('超滤', 'UF').replace('反渗透', 'RO').replace('号', '#').replace('组', '#')
  197. promt = self.field_align(promt)
  198. # print("转换查询:", promt)
  199. # 输入分词
  200. nz_words = []
  201. for w, f in pseg.lcut(promt):
  202. # print(f'{w}({f})', end="")
  203. if f == 'nz':
  204. nz_words.append(w)
  205. # print('\n备查nz词:', nz_words)
  206. # 处理专有名词的等价词,为了保证高召回率,我们将备查词的所有等价说法都放入备查序列
  207. equivalent_words = []
  208. for nz_idx, nz in enumerate(nz_words):
  209. # 首先判断nz词是否在等价词汇表中,如果不在根本无法替换
  210. if nz in self.dict_equivalent_wordmap.keys():
  211. # 然后把等价的说法都添加进去就好了
  212. equivalent_words = self.dict_equivalent_wordmap.get(nz, [])
  213. if equivalent_words:
  214. nz_words += equivalent_words
  215. nz_words = list(set(nz_words))
  216. # print('等价备查nz词:', nz_words)
  217. del equivalent_words
  218. # 进行一级查询,根据nz词是否包含于词典
  219. query_level_one = []
  220. for i in range(len(nz_words)): # 为第i个nz词进行初次匹配
  221. result = []
  222. # 如果nz词包含在一级词典中就算匹配成功
  223. for dict_level_1_key in self.dict_level_1.keys():
  224. if nz_words[i] in dict_level_1_key: # 如果nz词包含在一级词典内
  225. result+= self.dict_level_1.get(dict_level_1_key)
  226. query_level_one.append(result) # 放入一级查询结果中
  227. # 进行二级查询
  228. query_level_two = []
  229. for idx_nz, i_nz_query_result in enumerate(query_level_one): # 遍历每个nz词的查询结果
  230. result = [] # 为第i个nz词进行二次匹配
  231. # 如果第i个nz词一级查询不为空
  232. if i_nz_query_result: # 第i个nz词的查询结果list
  233. for res_word_level_one in i_nz_query_result:
  234. if res_word_level_one in self.dict_level_2.keys():
  235. result += self.dict_level_2.get(res_word_level_one) # self.dict_level_2的value本身就是字典,所以用+=拼接
  236. # 虽然一级查询失败,但是并不意味着映射词典里没有,因为一级词典忽略英文。
  237. else: # 如果一级查询失败,就直接在name2code字典中查询
  238. if nz_words[idx_nz] in self.name_2_code_dict.keys():# 如果第i个nz词在2级词典,就直接添加到结果中
  239. result.append(nz_words[idx_nz])
  240. # 如果第i个nz词的一级查询结果为空,则添加空列表占位
  241. query_level_two.append(result)
  242. # 常规精确匹配结束,如果匹配成功,结构为二维列表,否则为空列表
  243. matched_keys = query_level_two # 获取已匹配的字段
  244. # 备查词合并,我们约定所有备查词进行统一的查询,后面怎么用这些结果取决于外部的应用,对于agent模式,将会输出许多结果,对月非agent只会输出概率最高的结果
  245. tem_matched_keys = []
  246. for item in matched_keys:
  247. tem_matched_keys += item
  248. matched_keys = [list(set(tem_matched_keys))]
  249. del tem_matched_keys
  250. # 当同一个nz词存在多个检索结果时,按照相似度对检索结果排序
  251. # 该步骤可以省略,因为后面由reranker统一排序
  252. # for idx, i_nz_keys in enumerate(matched_keys):
  253. # if len(i_nz_keys) > 1:
  254. # # matched_keys[idx] = self.words_similarity_score_sorted(query=promt, candidates=i_nz_keys)
  255. # # 通过rerank的方式对候选词进行重新排序
  256. # matched_keys[idx] = self.words_similarity_score_sorted_v2(query=promt, candidates=i_nz_keys)
  257. # 如果精确匹配失败,没有匹配到任何结果则按照语义进行模糊匹配,返回满足条件的置信度最高的结果
  258. # if not nz_words or ([] in matched_keys):
  259. # 比起手动维护词典,我们更相信语义相似度
  260. top_k = 5
  261. confi = 0.2 # 置信度阈值
  262. # print(f'进入模糊匹配,召回Top:{top_k} 置信度阈值:{confi}...')
  263. # 调用远程bge-m3模型进行embedding
  264. query_embedding = np.array(self.model.encode([promt], normalize=True), dtype=np.float32) # 要求query_embedding是一个二维矩阵,形状为(1, 1024)
  265. distances, indices = self.knowledge.search(query_embedding, top_k)
  266. group_query = [(promt, self.plc_database_name_template_list[indices[0][i]]) for i in range(top_k)]
  267. # 我们更愿意相信bge,因此把词典关键词匹配的结果一并放进去重排序
  268. group_query_manuel = [(promt, k) for keys in matched_keys for k in keys]
  269. group_query += group_query_manuel
  270. del group_query_manuel
  271. group_query = list(set(group_query)) # 去重
  272. # 调用远程bge-reranker模型
  273. score = self.model.compute_score(group_query)
  274. rerank_result = sorted([(group_query[i][1], score[i]) for i in range(len(group_query))], key=lambda x: x[1], reverse=True)
  275. # print(F'打印前top{top_k}候选词结果:', rerank_result[:top_k])
  276. # print(f'首元素模糊匹配到{rerank_result[0][0]}, 置信度为{rerank_result[0][1]}')
  277. # matched_keys 为最终结果,保持形状为二维列表
  278. matched_keys = [[i[0] for i in rerank_result]]
  279. # 每个匹配结果的置信度
  280. matched_keys_score = [[i[1] for i in rerank_result]]
  281. # 为结果创建映射字典
  282. result_list = []
  283. for i_nz_keys in matched_keys:
  284. result_list.append([{key: self.name_2_code_dict.get(key)} for key in i_nz_keys])
  285. # print(f"查询到{len([_ for _ in result_list if _])}个结果:")
  286. if not is_agent:
  287. # 非agent模式每个匹配结果只取第一个元素的英文
  288. tem_list = []
  289. for res in result_list:
  290. if res:
  291. for k, v in res[0].items(): # 每个nz词的查询结果都是一个list,每个list可能包含多个字典
  292. tem_list.append(f'{k}:{v}')
  293. result_list = tem_list
  294. # print('以非agent模式返回:', result_list)
  295. return result_list
  296. # print('以agent模式返回:', result_list)
  297. # print('='*50)
  298. return result_list, matched_keys_score
  299. # 步骤1:实例化,单例模式
  300. helper = PLCLib()
  301. if __name__ == '__main__':
  302. # demo
  303. # 步骤2:按照水厂加载数据库
  304. helper.load(92)
  305. # 步骤3:根据查询匹配水厂
  306. # helper.query("查询RO1回收率、RO2回收率、...")
  307. helper.query("查询中荷水厂产水电导率", is_agent=False)
  308. # agent 模式
  309. # 输出格式:list, [RO1回收率查询结果, RO2回收率查询结果, ...]
  310. # RO1回收率查询结果:list, [{'RO1回收率': 'RO1HSL'}]
  311. # RO2回收率查询结果:list, [{'RO2回收率': 'RO2HSL'}]
  312. # ...
  313. # 完整查询格式: [[{'RO1回收率': 'RO1HSL'}], [{'RO2回收率': 'RO2HSL'}]]
  314. # 非agent 模式,每个结果取首个元素,直接返回英文code
  315. #