| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- import os
- script_dir = os.path.dirname(os.path.abspath(__file__))
- import sys
- sys.path.append(script_dir)
- import jieba
- import jieba.posseg as pseg
- import re
- import os
- import json
- import textdistance
- import warnings
- import numpy as np
- import faiss
- from remote_model import RemoteBGEModel
- class PLCLib:
- def __init__(self):
- """缓存待实现"""
- self.project_id = None
- self.plc_dict_root_dir = None
- self.name_2_code_dict = None
- self.plc_database_name_template_list = None
- self.dict_equivalent_wordmap = None
- self.dict_level_2 = None
- self.dict_level_1 = None
- self.user_dict_list = None
- self.knowledge = None
- # 加载bge-m3和bge-reranker远程模型
- self.model = RemoteBGEModel('dev')
- # 加载用户自定义词典,添加到jieba词库, 不依赖水厂id
- self.script_dir = os.path.dirname(os.path.abspath(__file__)) # 脚本绝对路径
- user_dictionary_dir = os.path.join(self.script_dir, 'user_maintain_dictionary', 'jieba_words')
- if not os.path.exists(user_dictionary_dir):
- warnings.warn(f'用户分词词典不存在,严重影响匹配成功率,请检查路径{user_dictionary_dir}是否存在!', UserWarning)
- else:
- self.user_dict_list = [os.path.join(user_dictionary_dir, _) for _ in os.listdir(user_dictionary_dir) if _.split('.')[-1] == 'txt'] # 用户词典
- self.__load_user_dict()
- def load(self, project_id):
- """加载词典"""
- self.project_id = project_id
- self.plc_dict_root_dir = os.path.join(self.script_dir, 'plc_dictionary',f'{self.project_id}_plc_dictionary')
- # 加载name2code
- self.name_2_code_dict = self.__read_pcl()
- self.plc_database_name_template_list = list(self.name_2_code_dict.keys())
- # 加载等价词表
- self.dict_equivalent_wordmap = self.__construct_equivalent_wordmap()
- # 加载二级词典
- self.dict_level_2 =self.__make_level_two_dictionary()
- # 加载一级词典
- self.dict_level_1 = self.__make_level_one_dictionary()
- # 加载本地知识库
- self.knowledge = self.__load_faiss_database()
- def __load_faiss_database(self):
- """从本地加载向量数据库"""
- # 水厂的数据库字段知识库
- faiss_path = os.path.join(self.plc_dict_root_dir, f'{self.project_id}_knowledge.faiss')
- # 尝试从本地加载
- if os.path.exists(faiss_path):
- print('PLC点位查询功能从本地加载点位字段向量知识库...')
- local_faiss = faiss.read_index(faiss_path)
- else:
- raise FileNotFoundError('file not found!', faiss_path)
- return local_faiss
- @staticmethod
- def field_align(input_str:str)->str:
- """按照锡山中荷命名规范对齐字段,1#UF替换为UF1,1#RO替换为RO1,保持统一"""
- sources_uf = re.findall(r'\d+#UF', input_str, re.IGNORECASE) # 匹配1#UF
- sources_ro = re.findall(r'\d+#RO', input_str, re.IGNORECASE) # 匹配1#RO
- sources = sources_uf + sources_ro
- for sou in sources:
- number_, flag_ = sou.split('#')
- input_str = input_str.replace(sou, flag_.upper() + number_) # 统一转为大写
- return input_str
- def __construct_equivalent_wordmap(self):
- """构建等价词汇映射表"""
- # 检查文件是否存在
- equivalent_wordmap_path = os.path.join(self.script_dir, 'user_maintain_dictionary','equivalent_words', 'dict_equivalent_wordmap.json')
- if os.path.exists(equivalent_wordmap_path):
- with open(equivalent_wordmap_path, 'r', encoding='utf-8') as f:
- equivalent_wordmap = json.load(f)
- else:
- raise FileNotFoundError('file not found!', equivalent_wordmap_path)
- return equivalent_wordmap
- def __make_level_one_dictionary(self):
- """创建一级字典"""
- # 尝试从本地加载一级字典
- dict_level_1_path = os.path.join(self.plc_dict_root_dir, f'{self.project_id}_dict_level_1.json')
- if os.path.exists(dict_level_1_path):
- with open(dict_level_1_path, 'r', encoding='utf-8') as f:
- group_dict = json.load(f)
- else:
- raise FileNotFoundError('file not found!', dict_level_1_path)
- return group_dict
- def __make_level_two_dictionary(self):
- """创建二级字典,对点位所有字段进行正则匹配中文,将中文一样的字段聚合为同一个字典键值对,键为正则提取的中文字符"""
- # 尝试从本地加载二级字典
- dict_level2_dict_path = os.path.join(self.plc_dict_root_dir, f'{self.project_id}_dict_level_2.json')
- if os.path.exists(dict_level2_dict_path):
- with open(dict_level2_dict_path, 'r', encoding='utf-8') as f:
- group_dict = json.load(f)
- else:
- raise FileNotFoundError('file not found!', dict_level2_dict_path)
- return group_dict
- def __read_pcl(self):
- """
- 读取pcl文件,生成name2code词典
- :return:
- """
- # 尝试从本地加载name-code映射字典
- dict_name2code_path = os.path.join(self.plc_dict_root_dir, f'{self.project_id}_dict_name_2_code.json')
- if os.path.exists(dict_name2code_path):
- with open(dict_name2code_path, 'r', encoding='utf-8') as f:
- dict_name2code = json.load(f)
- else:
- raise FileNotFoundError('file not found!', dict_name2code_path)
- return dict_name2code
- def __load_user_dict(self):
- """加载用户词典,添加到jieba词库"""
- # 删除
- jieba.del_word('反渗透')
- jieba.del_word('超滤')
- for user_dict_txt in self.user_dict_list:
- # # 检查文件是否存在
- # if not os.path.exists(user_dict_txt):
- # raise FileNotFoundError(f'{user_dict_txt} does not exist')
- # # 检查文件后缀名是否合法
- # if os.path.splitext(user_dict_txt)[1] != '.txt':
- # continue
- # 分词库加载用户字典
- jieba.load_userdict(user_dict_txt)
- @ staticmethod
- def quicksort_up_part(arr:list, start:int, end:int)-> int:
- """升序排序"""
- # 双指针
- low = start
- high = end
- pivot = arr[start][1] # 基准值
- # 大数放在基准值右边,小数放在基准值左边
- while low < high:
- # 先从右向左找比基准值小的
- while low< high and arr[high][1] >= pivot:
- high -= 1
- # 此时high指向值小于基准值,交换
- if low < high:
- arr[low], arr[high] = arr[high], arr[low]
- low +=1
- # 现在开始从左向右找,比基准值大的数
- while low < high and arr[low][1] <= pivot:
- low += 1
- # 此时low指向值大于基准值,交换
- if low < high:
- arr[high], arr[low] = arr[low], arr[high]
- high -= 1
- return low
- def quicksort_up(self, arr:list, start:int, end:int):
- """按照元组第二个元素值大小进行升序排序"""
- if start >= end:
- return
- # 先排一次获得基准值位置
- mid = self.quicksort_up_part(arr, start, end)
- # 排左面
- self.quicksort_up(arr, start, mid - 1)
- # 排右面
- self.quicksort_up(arr, mid + 1, end)
- def words_similarity_score_sorted(self, query:str, candidates:list)->list:
- """计算输入语句与候选词的相似度并按照相似度分值进行排序"""
- # 选择算法(示例使用Levenshtein,归一化到0-1)
- candidates = candidates.copy()
- jarowinkler = textdistance.JaroWinkler()
- key_score_list = [(candidate, jarowinkler.normalized_similarity(query, candidate)) for candidate in candidates]
- self.quicksort_up(key_score_list, 0, len(key_score_list) - 1) # 升序排序
- key_sorted_list = [tuple_element[0] for tuple_element in key_score_list] # 取出key
- key_sorted_list = key_sorted_list[::-1] # 反转,变为降序
- return key_sorted_list
- def words_similarity_score_sorted_v2(self, query:str, candidates:list)->list:
- """通过rerank的方式为候选词进行相似度排序"""
- # 调用远程reranker模型
- n = len(candidates) # 候选词数量
- group_query = [(query, i) for i in candidates]
- score = self.model.compute_score(group_query)
- key_score_list = [(candidates[i], score[i]) for i in range(n)]
- self.quicksort_up(key_score_list, 0, len(key_score_list) - 1) # 升序排序
- key_sorted_list = [tuple_element[0] for tuple_element in key_score_list] # 取出key
- key_sorted_list = key_sorted_list[::-1] # 反转,变为降序
- return key_sorted_list
- def query(self, promt, is_agent:bool=False):
- """直接拷贝PLCMatch_match_v2_on函数"""
- """
- 模糊匹配v2
- :param is_agent:
- :param promt:
- :return:
- """
- print("=" * 50)
- # 命名风格转换
- print("原始查询:", promt)
- promt = promt.replace('超滤', 'UF').replace('反渗透', 'RO').replace('号', '#').replace('组', '#')
- promt = self.field_align(promt)
- print("转换查询:", promt)
- # 输入分词
- nz_words = []
- for w, f in pseg.lcut(promt):
- print(f'{w}({f})', end="")
- if f == 'nz':
- nz_words.append(w)
- print('\n备查nz词:', nz_words)
- # 处理专有名词的等价词,为了保证高召回率,我们将备查词的所有等价说法都放入备查序列
- equivalent_words = []
- for nz_idx, nz in enumerate(nz_words):
- # 首先判断nz词是否在等价词汇表中,如果不在根本无法替换
- if nz in self.dict_equivalent_wordmap.keys():
- # 然后把等价的说法都添加进去就好了
- equivalent_words = self.dict_equivalent_wordmap.get(nz, [])
- if equivalent_words:
- nz_words += equivalent_words
- nz_words = list(set(nz_words))
- print('等价备查nz词:', nz_words)
- del equivalent_words
- # 进行一级查询,根据nz词是否包含于词典
- query_level_one = []
- for i in range(len(nz_words)): # 为第i个nz词进行初次匹配
- result = []
- # 如果nz词包含在一级词典中就算匹配成功
- for dict_level_1_key in self.dict_level_1.keys():
- if nz_words[i] in dict_level_1_key: # 如果nz词包含在一级词典内
- result+= self.dict_level_1.get(dict_level_1_key)
- query_level_one.append(result) # 放入一级查询结果中
- # 进行二级查询
- query_level_two = []
- for idx_nz, i_nz_query_result in enumerate(query_level_one): # 遍历每个nz词的查询结果
- result = [] # 为第i个nz词进行二次匹配
- # 如果第i个nz词一级查询不为空
- if i_nz_query_result: # 第i个nz词的查询结果list
- for res_word_level_one in i_nz_query_result:
- if res_word_level_one in self.dict_level_2.keys():
- result += self.dict_level_2.get(res_word_level_one) # self.dict_level_2的value本身就是字典,所以用+=拼接
- # 虽然一级查询失败,但是并不意味着映射词典里没有,因为一级词典忽略英文。
- else: # 如果一级查询失败,就直接在name2code字典中查询
- if nz_words[idx_nz] in self.name_2_code_dict.keys():# 如果第i个nz词在2级词典,就直接添加到结果中
- result.append(nz_words[idx_nz])
- # 如果第i个nz词的一级查询结果为空,则添加空列表占位
- query_level_two.append(result)
- # 常规精确匹配结束,如果匹配成功,结构为二维列表,否则为空列表
- matched_keys = query_level_two # 获取已匹配的字段
- # 备查词合并,我们约定所有备查词进行统一的查询,后面怎么用这些结果取决于外部的应用,对于agent模式,将会输出许多结果,对月非agent只会输出概率最高的结果
- tem_matched_keys = []
- for item in matched_keys:
- tem_matched_keys += item
- matched_keys = [list(set(tem_matched_keys))]
- del tem_matched_keys
- # 如果精确匹配失败,没有匹配到任何结果则按照语义进行模糊匹配,返回满足条件的置信度最高的结果
- # if not nz_words or ([] in matched_keys):
- # 比起手动维护词典,我们更相信语义相似度
- top_k = 5
- confi = 0.2 # 置信度阈值
- print(f'进入模糊匹配,召回Top:{top_k} 置信度阈值:{confi}...')
- # 调用远程bge-m3模型进行embedding
- query_embedding = np.array(self.model.encode([promt], normalize=True), dtype=np.float32) # 要求query_embedding是一个二维矩阵,形状为(1, 1024)
- distances, indices = self.knowledge.search(query_embedding, top_k)
- group_query = [(promt, self.plc_database_name_template_list[indices[0][i]]) for i in range(top_k)]
- # 我们更愿意相信bge,因此把词典关键词匹配的结果一并放进去重排序
- group_query_manuel = [(promt, k) for keys in matched_keys for k in keys]
- group_query += group_query_manuel
- del group_query_manuel
- group_query = list(set(group_query)) # 去重
- # 调用远程bge-reranker模型
- score = self.model.compute_score(group_query)
- rerank_result = sorted([(group_query[i][1], score[i]) for i in range(len(group_query))], key=lambda x: x[1], reverse=True)
- print(F'打印前top{top_k}候选词结果:', rerank_result[:top_k])
- print(f'首元素模糊匹配到{rerank_result[0][0]}, 置信度为{rerank_result[0][1]}')
- # matched_keys 为最终结果,保持形状为二维列表
- matched_keys = [[i[0] for i in rerank_result]]
- # 每个匹配结果的置信度
- matched_keys_score = [[i[1] for i in rerank_result]]
- # 为结果创建映射字典
- result_list = []
- for i_nz_keys in matched_keys:
- result_list.append([{key: self.name_2_code_dict.get(key)} for key in i_nz_keys])
- print(f"查询到{len([_ for _ in result_list if _])}个结果:")
- if not is_agent:
- # 非agent模式每个匹配结果只取第一个元素的英文
- tem_list = []
- for res in result_list:
- if res:
- for k, v in res[0].items(): # 每个nz词的查询结果都是一个list,每个list可能包含多个字典
- tem_list.append(f'{k}:{v}')
- result_list = tem_list
- print('以非agent模式返回:', result_list)
- return result_list
- print('以agent模式返回:', result_list)
- print('='*50)
- return result_list, matched_keys_score
- # 步骤1:实例化,单例模式
- helper = PLCLib()
- if __name__ == '__main__':
- # demo
- # 步骤2:按照水厂加载数据库
- helper.load(92)
- # 步骤3:根据查询匹配水厂
- # helper.query("查询RO1回收率、RO2回收率、...")
- helper.query("查询中荷水厂产水电导率", is_agent=False)
- # agent 模式
- # 输出格式:list, [RO1回收率查询结果, RO2回收率查询结果, ...]
- # RO1回收率查询结果:list, [{'RO1回收率': 'RO1HSL'}]
- # RO2回收率查询结果:list, [{'RO2回收率': 'RO2HSL'}]
- # ...
- # 完整查询格式: [[{'RO1回收率': 'RO1HSL'}], [{'RO2回收率': 'RO2HSL'}]]
- # 非agent 模式,每个结果取首个元素,直接返回英文code
- #
|