from openai import OpenAI from text_vector_database import knowledge_comp_rule_regulation_db, knowledge_comp_propaganda_db,knowledge_comp_water_treatment_db,knowledge_comp_project_plan_db,knowledge_comp_operation_report_db import json import time from plc_query_tool import plc_query_helper from web_tool import web_search_tool class KnowAgent: """致道Agent""" def __init__(self): # API self.client = OpenAI( api_key="sk-912ca44a0c04483f9de967de37e8d0ba", base_url="https://api.deepseek.com" ) # 参数 self.max_iterations = 8 self.current_iteration = 0 self.temperature=1.0 # 工具定义 self.tool_maps = { 'knowledge_comp_rule_regulation_db': knowledge_comp_rule_regulation_db.search, 'knowledge_comp_propaganda_db': knowledge_comp_propaganda_db.search, 'knowledge_comp_water_treatment_db': knowledge_comp_water_treatment_db.search, 'knowledge_comp_project_plan_db': knowledge_comp_project_plan_db.search, 'knowledge_comp_operation_report_db': knowledge_comp_operation_report_db.search, 'plc_query_helper':plc_query_helper, 'web_search_tool':web_search_tool.search } self.tool_names_map = { 'knowledge_comp_rule_regulation_db': '公司规章制度知识库', 'knowledge_comp_propaganda_db': '公司宣传资料产品介绍知识库', 'knowledge_comp_water_treatment_db': '污水处理工艺知识库', 'knowledge_comp_project_plan_db': '污水处理厂项目方案知识库', 'knowledge_comp_operation_report_db': '污水处理厂运行报告知识库', 'plc_query_helper':'PLC点位查询工具', 'web_search_tool': '联网搜索工具', ' ': '未知工具', } self.tools = [ # 公司规章制度知识库 { "type": "function", "function": { "name": "knowledge_comp_rule_regulation_db", "description": "这是一个关于公司规章制度的RAG知识库,存放着公司行政、人力、考勤、培训等内容,相当于一本规章制度的百科全书,当你需要查询公司规章制度时,请调用本工具。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。", } }, "required": ["query"] }, } }, # 公司宣传资料产品介绍知识库 { "type": "function", "function": { "name": "knowledge_comp_propaganda_db", "description": "这是一个关于公司产品介绍和宣传资料的RAG知识库,存放着金科环境公司的简介、发展历程,水萝卜智能体产品和新水岛产品的介绍和宣传资料,也包括了金科环境公司的大事记和新闻报道等,当你需要查询公司的发展情况、水萝卜和新水岛产品相关资料时,请调用本工具。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。", } }, "required": ["query"] }, } }, # 污水处理工艺知识库 { "type": "function", "function": { "name": "knowledge_comp_water_treatment_db", "description": "这是一个关于污水处理厂污水处理工艺的RAG知识库,它就像一个工艺专家的百科全书,存放着污水处理领域的物理、化学、生物等知识,水处理工艺知识以及行业标准规范等,当你有问题需要咨询水处理工艺专家时,请调用本工具。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。", } }, "required": ["query"] }, } }, # 污水处理项目方案 { "type": "function", "function": { "name": "knowledge_comp_project_plan_db", "description": "这是一个关于污水处理厂竞标项目方案的RAG知识库,存放了原始的污水处理厂项目方案报告,这通常包括项目背景、水厂设计依据、产水能力、工艺和技术实施方案、运营成本等内容,当你需要浏览污水处理厂的投标方案,请调用本工具。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。", } }, "required": ["query"] }, } }, # 污水处理厂运行报告 { "type": "function", "function": { "name": "knowledge_comp_operation_report_db", "description": "这是一个关于污水处理运行报告的RAG知识库,当你需要参考水厂运行报告的内容时,请调用本工具。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。", } }, "required": ["query"] }, } }, # 污水处理厂plc点位查询工具 { "type": "function", "function": { "name": "plc_query_helper", "description": "这是一个污水处理厂PLC点位数据查询工具,当用户需要查询水厂的传感器的监测数据时,如超滤或反渗透过程中的电导、渗透率、流量、设备功率电耗、药耗等,可以尝试使用本工具。如果查询成功,工具会返回一些候选的点位数据,包括名称和数值。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "这个参数是需要查询的点位名称。如反渗透电导、超滤流量、总进水流量等", } }, "required": ["query"] }, } }, # 联网搜索工具 { "type": "function", "function": { "name": "web_search_tool", "description": "这是一个网络搜索引擎工具,如果知识库没有合适的参考内容,可以使用此工具联网搜索。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "请输入搜索关键词或问题", } }, "required": ["query"] }, } }, ] # 系统提示词 - 指导AI如何智能使用工具 self.system_prompt = f"""你是一个专业的水处理行业AI助手,请遵循以下规则回答问题: 1. **知识查询判断**: - 当需要查询污水处理厂PLC传感器数据时,请先使用【plc_query_helper】工具。(PLC点位查询工具) - 当用户询问公司规章制度、行政管理、人力资源等问题时,使用【knowledge_comp_rule_regulation_db】工具。(公司规章制度知识库) - 当用户询问公司介绍、产品信息(水萝卜智能体、新水岛)、发展历程时,使用【knowledge_comp_propaganda_db】工具。(公司宣传资料知识库) - 当用户询问污水处理工艺、技术标准、行业规范时,使用【knowledge_comp_water_treatment_db】工具。(污水处理工艺知识库) - 当用户询问项目方案、投标资料时,使用【knowledge_comp_project_plan_db】工具。(污水处理项目方案知识库) - 当用户询问运行报告、运营数据时,使用【knowledge_comp_operation_report_db】工具(污水处理厂运行报告知识库) 2. **智能工具调用**: - 只有当问题确实需要专业知识库支持时才调用工具 - 对于简单的操作指令不需要调用任何工具,可以简单回复“好的” - 如果问题明显不需要专业知识,直接友好回答 3. **诚实性原则**: - 如果知识库中没有相关信息,如实告知用户"在现有知识库中未查阅到相关信息" - 不要编造不存在的信息 4. **回答要求**: - 专业问题:先给出结论,再分点解释 - 操作问题:直接给出步骤说明 - 知识库查询结果:注明信息来源 - 当PLC查询工【plc_query_helper】返回很多候选数据时,请结合用户的需求挑选相关数据进行回复。 - 如果需要分析一些深度污水处理领域问题,可以结合PLC查询工具来获取相关数据,然后可以查询相关知识库来检索知识,两者结合进行分析回答。 请根据问题内容智能判断是否需要调用工具,以及调用哪个工具最合适。""" # 存放历史对话,key为会话ID,value为会话内容,会话内容为列表 self.history_chat = { '400': [{"role": "system", "content": self.system_prompt}]} def process_stream_response(self, stream_response, on_chunk=None): """ 处理流式响应,逐块输出内容 """ full_content = "" for chunk in stream_response: if chunk.choices[0].delta.content is not None: content_chunk = chunk.choices[0].delta.content full_content += content_chunk # 如果有回调函数,调用它处理每个chunk if on_chunk and callable(on_chunk): on_chunk(content_chunk) else: # 默认输出方式:逐块打印 print(content_chunk, end="", flush=True) return full_content def send_messages(self, messages, stream:bool=False): """ 发送消息给模型 """ try: if not messages or messages[0]["role"] != "system": # 深拷贝消息列表避免修改原数据 enhanced_messages = [{"role": "system", "content": self.system_prompt}] enhanced_messages.extend(messages) else: enhanced_messages = messages # 构建请求参数 request_data = { 'model':"deepseek-chat", "messages": enhanced_messages, "stream": stream, "tools": self.tools, "temperature": self.temperature, } if stream: # 流式响应 response = self.client.chat.completions.create(**request_data) return response # 返回流式响应对象 else: # 非流式响应 response = self.client.chat.completions.create(**request_data) return response.choices[0].message except Exception as e: print(f"Error: {e}") def function_call(self, function:str, arguments:str, project_id:int): if function not in self.tool_maps: return f"函数名称不在工具列表中,工具列表:{list(self.tool_maps.keys())}" if not arguments: return f"query参数不能为空" return str(self.tool_maps[function](arguments, project_id=project_id)) def simulate_stream_response(self, content, delay=0.01): """ 模拟流式响应输出 :param content: 内容 :param delay: 输出延迟 """ for char in content: print(char, end='', flush=True) time.sleep(delay) print() # 换行 def simulate_stream_response_V2(self, content, base_delay=0.02): """ 模拟更自然的流式响应输出 :param content: 内容 :param base_delay: 基础延迟 """ # 定义不同类型字符的延迟系数 punctuation_delays = { '.': 0.3, # 句号 - 较长停顿 '!': 0.25, # 感叹号 - 中等停顿 '?': 0.25, # 问号 - 中等停顿 ',': 0.15, # 逗号 - 短暂停顿 ';': 0.15, # 中文分号 ',': 0.15, # 中文逗号 '。': 0.3, # 中文句号 ';': 0.15, # 分号 ':': 0.12, # 冒号 '-': 0.08, # 破折号 '"': 0.1, # 引号 "'": 0.1, # 单引号 '\n': 0.4, # 换行符 '\t': 0.2, # 制表符 } for i, char in enumerate(content): # 计算基础延迟 delay = base_delay # 根据字符类型调整延迟 if char in punctuation_delays: delay = punctuation_delays[char] elif char.isupper(): # 大写字母稍慢一点 delay = base_delay * 1.2 elif char.isdigit(): # 数字稍快一点 delay = base_delay * 0.8 elif char in 'abcdefghijklmnopqrstuvwxyz': # 小写字母正常速度 delay = base_delay else: # 其他字符(如中文)略慢 delay = base_delay * 1.1 # 添加随机波动(±20%) import random delay *= random.uniform(0.8, 1.2) print(char, end='', flush=True) time.sleep(delay) print() # 换行 def chat(self, user_input:str,project_id:int, stream:bool=True, conversion_id='400'): # 初始化对话历史 print('='*50) print("开始对话...") print(f"User>\t {user_input}") # 检查历史会话 if conversion_id in self.history_chat: history_messages = self.history_chat[conversion_id] history_messages.extend([{"role": "user", "content": user_input}]) else: history_messages = [{"role": "user", "content": user_input}] self.history_chat[conversion_id] = history_messages tools_history = [''] while self.current_iteration < self.max_iterations: water_robot_message = self.send_messages(history_messages, stream=False) # 检查是否由工具调用 if hasattr(water_robot_message, 'tool_calls') and water_robot_message.tool_calls: # 添加助手消息到历史 history_messages.append({ "role": "assistant", "content": water_robot_message.content, "tool_calls": [ { "id": tool_call.id, "type": tool_call.type, "function": { "name": tool_call.function.name, "arguments": tool_call.function.arguments } } for tool_call in water_robot_message.tool_calls ] }) # 处理每个工具调用 for i, tool_call in enumerate(water_robot_message.tool_calls): print(f"🔍 调用工具{i}:【{self.tool_names_map.get(tool_call.function.name, ' ')}】 ") tools_history.append(f"🔍 调用工具{i}:【{self.tool_names_map.get(tool_call.function.name, ' ')}】 ") arguments = json.loads(tool_call.function.arguments) print(f"📋 工具{i}参数:{arguments} ") tools_history.append(f"📋 工具{i}参数:{arguments} ") tool_result = know_agent.function_call(tool_call.function.name, arguments.get("query", " "), project_id=project_id) # 添加工具响应 history_messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": tool_result }) self.current_iteration += 1 else: # 没有工具调用,显示最终答案 print(f"Model>") self.simulate_stream_response(water_robot_message.content) history_messages.append({ "role": "assistant", "content": water_robot_message.content }) break if self.current_iteration >= self.max_iterations: # 处理迭代次数超限的情况 # 告诉模型迭代次数超限 history_messages.append({ "role": "tool", "tool_call_id": 0, "content": "当前工具调用次数达到上限,请根据现有结果进行分析回复。" }) water_robot_message = self.send_messages(history_messages, stream=False) # 添加最后一次模型给出的结果 history_messages.append({ "role": "assistant", "content": water_robot_message.content }) print("达到最大迭代次数,对话结束。") # 重置迭代计数器 self.current_iteration = 0 # 保存对话历史 self.history_chat[conversion_id].extend(history_messages) tools_history.append('') return ('\n'.join(tools_history) + history_messages[-1].get('content', 'null')) if history_messages[-1].get('role', ' ')=='assistant' else 'null' know_agent = KnowAgent() if __name__ == "__main__": while True: user_input = input("\n用户输入: ") if user_input.lower() == "exit": break res = know_agent.chat(user_input, 92) pass