from openai import OpenAI from text_vector_database import knowledge_comp_rule_regulation_db, knowledge_comp_propaganda_db,knowledge_comp_water_treatment_db,knowledge_comp_project_plan_db,knowledge_comp_operation_report_db import json import time class WaterRobot: def __init__(self): # API self.client = OpenAI( api_key="sk-912ca44a0c04483f9de967de37e8d0ba", base_url="https://api.deepseek.com" ) # 参数 self.max_iterations = 5 self.current_iteration = 0 self.temperature=1.0 # 工具定义 self.tool_maps = { 'knowledge_comp_rule_regulation_db': knowledge_comp_rule_regulation_db.search, 'knowledge_comp_propaganda_db': knowledge_comp_propaganda_db.search, 'knowledge_comp_water_treatment_db': knowledge_comp_water_treatment_db.search, 'knowledge_comp_project_plan_db': knowledge_comp_project_plan_db.search, 'knowledge_comp_operation_report_db': knowledge_comp_operation_report_db.search, } self.tool_names_map = { 'knowledge_comp_rule_regulation_db': '公司规章制度知识库', 'knowledge_comp_propaganda_db': '公司宣传资料产品介绍知识库', 'knowledge_comp_water_treatment_db': '污水处理工艺知识库', 'knowledge_comp_project_plan_db': '污水处理厂项目方案知识库', 'knowledge_comp_operation_report_db': '污水处理厂运行报告知识库', ' ': '未知工具', } self.tools = [ # 公司规章制度知识库 { "type": "function", "function": { "name": "knowledge_comp_rule_regulation_db", "description": "这是一个关于公司规章制度的RAG知识库,存放着公司行政、人力、考勤、培训等内容,相当于一本规章制度的百科全书,当你需要查询公司规章制度时,请调用本工具。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。", } }, "required": ["query"] }, } }, # 公司宣传资料产品介绍知识库 { "type": "function", "function": { "name": "knowledge_comp_propaganda_db", "description": "这是一个关于公司产品介绍和宣传资料的RAG知识库,存放着金科环境公司的简介、发展历程,水萝卜智能体产品和新水岛产品的介绍和宣传资料,也包括了金科环境公司的大事记和新闻报道等,当你需要查询公司的发展情况、水萝卜和新水岛产品相关资料时,请调用本工具。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。", } }, "required": ["query"] }, } }, # 污水处理工艺知识库 { "type": "function", "function": { "name": "knowledge_comp_water_treatment_db", "description": "这是一个关于污水处理厂污水处理工艺的RAG知识库,它就像一个工艺专家的百科全书,存放着污水处理领域的物理、化学、生物等知识,水处理工艺知识以及行业标准规范等,当你有问题需要咨询水处理工艺专家时,请调用本工具。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。", } }, "required": ["query"] }, } }, # 污水处理项目方案 { "type": "function", "function": { "name": "knowledge_comp_project_plan_db", "description": "这是一个关于污水处理厂竞标项目方案的RAG知识库,存放了原始的污水处理厂项目方案报告,这通常包括项目背景、水厂设计依据、产水能力、工艺和技术实施方案、运营成本等内容,当你需要浏览污水处理厂的投标方案,请调用本工具。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。", } }, "required": ["query"] }, } }, # 污水处理厂运行报告 { "type": "function", "function": { "name": "knowledge_comp_operation_report_db", "description": "这是一个关于污水处理运行报告的RAG知识库,当你需要参考水厂运行报告的内容时,请调用本工具。", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。", } }, "required": ["query"] }, } }, ] # 系统提示词 - 指导AI如何智能使用工具 self.system_prompt = """你是一个专业的水处理行业AI助手,请遵循以下规则回答问题: 1. **知识查询判断**: - 当用户询问公司规章制度、行政管理、人力资源等问题时,使用【公司规章制度知识库】 - 当用户询问公司介绍、产品信息(水萝卜智能体、新水岛)、发展历程时,使用【公司宣传资料知识库】 - 当用户询问污水处理工艺、技术标准、行业规范时,使用【污水处理工艺知识库】 - 当用户询问项目方案、投标资料时,使用【污水处理项目方案知识库】 - 当用户询问运行报告、运营数据时,使用【污水处理厂运行报告知识库】 2. **智能工具调用**: - 只有当问题确实需要专业知识库支持时才调用工具 - 对于简单的操作指令不需要调用任何工具,可以简单回复“好的” - 如果问题明显不需要专业知识,直接友好回答 3. **诚实性原则**: - 如果知识库中没有相关信息,如实告知用户"在现有知识库中未查阅到相关信息" - 不要编造不存在的信息 4. **回答要求**: - 专业问题:先给出结论,再分点解释 - 操作问题:直接给出步骤说明 - 知识库查询结果:注明信息来源 请根据问题内容智能判断是否需要调用工具,以及调用哪个工具最合适。""" def process_stream_response(self, stream_response, on_chunk=None): """ 处理流式响应,逐块输出内容 """ full_content = "" for chunk in stream_response: if chunk.choices[0].delta.content is not None: content_chunk = chunk.choices[0].delta.content full_content += content_chunk # 如果有回调函数,调用它处理每个chunk if on_chunk and callable(on_chunk): on_chunk(content_chunk) else: # 默认输出方式:逐块打印 print(content_chunk, end="", flush=True) return full_content def send_messages(self, messages, stream:bool=False): """ 发送消息给模型 """ if not messages or messages[0]["role"] != "system": # 深拷贝消息列表避免修改原数据 enhanced_messages = [{"role": "system", "content": self.system_prompt}] enhanced_messages.extend(messages) else: enhanced_messages = messages # 构建请求参数 request_data = { 'model':"deepseek-chat", "messages": enhanced_messages, "stream": stream, "tools": self.tools, "temperature": self.temperature, } if stream: # 流式响应 response = self.client.chat.completions.create(**request_data) return response # 返回流式响应对象 else: # 非流式响应 response = self.client.chat.completions.create(**request_data) return response.choices[0].message def function_call(self, function:str, arguments:str): if function not in self.tool_maps: return f"函数名称不在工具列表中,工具列表:{list(self.tool_maps.keys())}" if not arguments: return f"query参数不能为空" return str(self.tool_maps[function](arguments)) def simulate_stream_response(self, content, delay=0.01): """ 模拟流式响应输出 :param content: 内容 :param delay: 输出延迟 """ for char in content: print(char, end='', flush=True) time.sleep(delay) print() # 换行 def simulate_stream_response_V2(self, content, base_delay=0.02): """ 模拟更自然的流式响应输出 :param content: 内容 :param base_delay: 基础延迟 """ # 定义不同类型字符的延迟系数 punctuation_delays = { '.': 0.3, # 句号 - 较长停顿 '!': 0.25, # 感叹号 - 中等停顿 '?': 0.25, # 问号 - 中等停顿 ',': 0.15, # 逗号 - 短暂停顿 ';': 0.15, # 中文分号 ',': 0.15, # 中文逗号 '。': 0.3, # 中文句号 ';': 0.15, # 分号 ':': 0.12, # 冒号 '-': 0.08, # 破折号 '"': 0.1, # 引号 "'": 0.1, # 单引号 '\n': 0.4, # 换行符 '\t': 0.2, # 制表符 } for i, char in enumerate(content): # 计算基础延迟 delay = base_delay # 根据字符类型调整延迟 if char in punctuation_delays: delay = punctuation_delays[char] elif char.isupper(): # 大写字母稍慢一点 delay = base_delay * 1.2 elif char.isdigit(): # 数字稍快一点 delay = base_delay * 0.8 elif char in 'abcdefghijklmnopqrstuvwxyz': # 小写字母正常速度 delay = base_delay else: # 其他字符(如中文)略慢 delay = base_delay * 1.1 # 添加随机波动(±20%) import random delay *= random.uniform(0.8, 1.2) print(char, end='', flush=True) time.sleep(delay) print() # 换行 def chat(self, user_input:str, stream:bool=True): # 初始化对话历史 print('='*50) print("开始对话...") print(f"User>\t {user_input}") history_messages = [{"role": "user", "content": user_input}] while self.current_iteration < self.max_iterations: water_robot_message = self.send_messages(history_messages, stream=False) # 检查是否由工具调用 if hasattr(water_robot_message, 'tool_calls') and water_robot_message.tool_calls: # 添加助手消息到历史 history_messages.append({ "role": "assistant", "content": water_robot_message.content, "tool_calls": [ { "id": tool_call.id, "type": tool_call.type, "function": { "name": tool_call.function.name, "arguments": tool_call.function.arguments } } for tool_call in water_robot_message.tool_calls ] }) # 处理每个工具调用 for tool_call in water_robot_message.tool_calls: print(f"***正在调用工具:【{self.tool_names_map.get(tool_call.function.name, ' ')}】") arguments = json.loads(tool_call.function.arguments) print(f"***工具参数:{arguments}") tool_result = water_robot.function_call(tool_call.function.name, arguments.get("query", " ")) # 添加工具响应 history_messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": tool_result }) self.current_iteration += 1 else: # 没有工具调用,显示最终答案 print(f"Model>") self.simulate_stream_response(water_robot_message.content) history_messages.append({ "role": "assistant", "content": water_robot_message.content }) break if self.current_iteration >= self.max_iterations: print("达到最大迭代次数,对话结束。") # 重置迭代计数器 self.current_iteration = 0 water_robot = WaterRobot() if __name__ == "__main__": while True: user_input = input("\n用户输入: ") if user_input.lower() == "exit": break water_robot.chat(user_input)