| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- from openai import OpenAI
- from text_vector_database import knowledge_comp_rule_regulation_db, knowledge_comp_propaganda_db,knowledge_comp_water_treatment_db,knowledge_comp_project_plan_db,knowledge_comp_operation_report_db
- import json
- import time
- from plc_query_tool import plc_query_helper
- from web_tool import web_search_tool
- class KnowAgent:
- """致道Agent"""
- def __init__(self):
- # API
- self.client = OpenAI(
- api_key="sk-912ca44a0c04483f9de967de37e8d0ba",
- base_url="https://api.deepseek.com"
- )
- # 参数
- self.max_iterations = 8
- self.current_iteration = 0
- self.temperature=1.0
- # 工具定义
- self.tool_maps = {
- 'knowledge_comp_rule_regulation_db': knowledge_comp_rule_regulation_db.search,
- 'knowledge_comp_propaganda_db': knowledge_comp_propaganda_db.search,
- 'knowledge_comp_water_treatment_db': knowledge_comp_water_treatment_db.search,
- 'knowledge_comp_project_plan_db': knowledge_comp_project_plan_db.search,
- 'knowledge_comp_operation_report_db': knowledge_comp_operation_report_db.search,
- 'plc_query_helper':plc_query_helper,
- 'web_search_tool':web_search_tool.search
- }
- self.tool_names_map = {
- 'knowledge_comp_rule_regulation_db': '公司规章制度知识库',
- 'knowledge_comp_propaganda_db': '公司宣传资料产品介绍知识库',
- 'knowledge_comp_water_treatment_db': '污水处理工艺知识库',
- 'knowledge_comp_project_plan_db': '污水处理厂项目方案知识库',
- 'knowledge_comp_operation_report_db': '污水处理厂运行报告知识库',
- 'plc_query_helper':'PLC点位查询工具',
- 'web_search_tool': '联网搜索工具',
- ' ': '未知工具',
- }
- self.tools = [
- # 公司规章制度知识库
- {
- "type": "function",
- "function": {
- "name": "knowledge_comp_rule_regulation_db",
- "description": "这是一个关于公司规章制度的RAG知识库,存放着公司行政、人力、考勤、培训等内容,相当于一本规章制度的百科全书,当你需要查询公司规章制度时,请调用本工具。",
- "parameters": {
- "type": "object",
- "properties": {
- "query": {
- "type": "string",
- "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。",
- }
- },
- "required": ["query"]
- },
- }
- },
- # 公司宣传资料产品介绍知识库
- {
- "type": "function",
- "function": {
- "name": "knowledge_comp_propaganda_db",
- "description": "这是一个关于公司产品介绍和宣传资料的RAG知识库,存放着金科环境公司的简介、发展历程,水萝卜智能体产品和新水岛产品的介绍和宣传资料,也包括了金科环境公司的大事记和新闻报道等,当你需要查询公司的发展情况、水萝卜和新水岛产品相关资料时,请调用本工具。",
- "parameters": {
- "type": "object",
- "properties": {
- "query": {
- "type": "string",
- "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。",
- }
- },
- "required": ["query"]
- },
- }
- },
- # 污水处理工艺知识库
- {
- "type": "function",
- "function": {
- "name": "knowledge_comp_water_treatment_db",
- "description": "这是一个关于污水处理厂污水处理工艺的RAG知识库,它就像一个工艺专家的百科全书,存放着污水处理领域的物理、化学、生物等知识,水处理工艺知识以及行业标准规范等,当你有问题需要咨询水处理工艺专家时,请调用本工具。",
- "parameters": {
- "type": "object",
- "properties": {
- "query": {
- "type": "string",
- "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。",
- }
- },
- "required": ["query"]
- },
- }
- },
- # 污水处理项目方案
- {
- "type": "function",
- "function": {
- "name": "knowledge_comp_project_plan_db",
- "description": "这是一个关于污水处理厂竞标项目方案的RAG知识库,存放了原始的污水处理厂项目方案报告,这通常包括项目背景、水厂设计依据、产水能力、工艺和技术实施方案、运营成本等内容,当你需要浏览污水处理厂的投标方案,请调用本工具。",
- "parameters": {
- "type": "object",
- "properties": {
- "query": {
- "type": "string",
- "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。",
- }
- },
- "required": ["query"]
- },
- }
- },
- # 污水处理厂运行报告
- {
- "type": "function",
- "function": {
- "name": "knowledge_comp_operation_report_db",
- "description": "这是一个关于污水处理运行报告的RAG知识库,当你需要参考水厂运行报告的内容时,请调用本工具。",
- "parameters": {
- "type": "object",
- "properties": {
- "query": {
- "type": "string",
- "description": "这个参数是需要查询的内容,请输入语义连贯的句子进行检索。",
- }
- },
- "required": ["query"]
- },
- }
- },
- # 污水处理厂plc点位查询工具
- {
- "type": "function",
- "function": {
- "name": "plc_query_helper",
- "description": "这是一个污水处理厂PLC点位数据查询工具,当用户需要查询水厂的传感器的监测数据时,如超滤或反渗透过程中的电导、渗透率、流量、设备功率电耗、药耗等,可以尝试使用本工具。如果查询成功,工具会返回一些候选的点位数据,包括名称和数值。",
- "parameters": {
- "type": "object",
- "properties": {
- "query": {
- "type": "string",
- "description": "这个参数是需要查询的点位名称。如反渗透电导、超滤流量、总进水流量等",
- }
- },
- "required": ["query"]
- },
- }
- },
- # 联网搜索工具
- {
- "type": "function",
- "function": {
- "name": "web_search_tool",
- "description": "这是一个网络搜索引擎工具,如果知识库没有合适的参考内容,可以使用此工具联网搜索。",
- "parameters": {
- "type": "object",
- "properties": {
- "query": {
- "type": "string",
- "description": "请输入搜索关键词或问题",
- }
- },
- "required": ["query"]
- },
- }
- },
- ]
- # 系统提示词 - 指导AI如何智能使用工具
- self.system_prompt = f"""你是一个专业的水处理行业AI助手,请遵循以下规则回答问题:
- 1. **知识查询判断**:
- - 当需要查询污水处理厂PLC传感器数据时,请先使用【plc_query_helper】工具。(PLC点位查询工具)
- - 当用户询问公司规章制度、行政管理、人力资源等问题时,使用【knowledge_comp_rule_regulation_db】工具。(公司规章制度知识库)
- - 当用户询问公司介绍、产品信息(水萝卜智能体、新水岛)、发展历程时,使用【knowledge_comp_propaganda_db】工具。(公司宣传资料知识库)
- - 当用户询问污水处理工艺、技术标准、行业规范时,使用【knowledge_comp_water_treatment_db】工具。(污水处理工艺知识库)
- - 当用户询问项目方案、投标资料时,使用【knowledge_comp_project_plan_db】工具。(污水处理项目方案知识库)
- - 当用户询问运行报告、运营数据时,使用【knowledge_comp_operation_report_db】工具(污水处理厂运行报告知识库)
-
- 2. **智能工具调用**:
- - 只有当问题确实需要专业知识库支持时才调用工具
- - 对于简单的操作指令不需要调用任何工具,可以简单回复“好的”
- - 如果问题明显不需要专业知识,直接友好回答
- 3. **诚实性原则**:
- - 如果知识库中没有相关信息,如实告知用户"在现有知识库中未查阅到相关信息"
- - 不要编造不存在的信息
- 4. **回答要求**:
- - 专业问题:先给出结论,再分点解释
- - 操作问题:直接给出步骤说明
- - 知识库查询结果:注明信息来源
- - 当PLC查询工【plc_query_helper】返回很多候选数据时,请结合用户的需求挑选相关数据进行回复。
- - 如果需要分析一些深度污水处理领域问题,可以结合PLC查询工具来获取相关数据,然后可以查询相关知识库来检索知识,两者结合进行分析回答。
- 请根据问题内容智能判断是否需要调用工具,以及调用哪个工具最合适。"""
- # 存放历史对话,key为会话ID,value为会话内容,会话内容为列表
- self.history_chat = {
- '400': [{"role": "system", "content": self.system_prompt}]}
- def process_stream_response(self, stream_response, on_chunk=None):
- """
- 处理流式响应,逐块输出内容
- """
- full_content = ""
- for chunk in stream_response:
- if chunk.choices[0].delta.content is not None:
- content_chunk = chunk.choices[0].delta.content
- full_content += content_chunk
- # 如果有回调函数,调用它处理每个chunk
- if on_chunk and callable(on_chunk):
- on_chunk(content_chunk)
- else:
- # 默认输出方式:逐块打印
- print(content_chunk, end="", flush=True)
- return full_content
- def send_messages(self, messages, stream:bool=False):
- """
- 发送消息给模型
- """
- try:
- if not messages or messages[0]["role"] != "system":
- # 深拷贝消息列表避免修改原数据
- enhanced_messages = [{"role": "system", "content": self.system_prompt}]
- enhanced_messages.extend(messages)
- else:
- enhanced_messages = messages
- # 构建请求参数
- request_data = {
- 'model':"deepseek-chat",
- "messages": enhanced_messages,
- "stream": stream,
- "tools": self.tools,
- "temperature": self.temperature,
- }
- if stream:
- # 流式响应
- response = self.client.chat.completions.create(**request_data)
- return response # 返回流式响应对象
- else:
- # 非流式响应
- response = self.client.chat.completions.create(**request_data)
- return response.choices[0].message
- except Exception as e:
- print(f"Error: {e}")
- def function_call(self, function:str, arguments:str, project_id:int):
- if function not in self.tool_maps:
- return f"函数名称不在工具列表中,工具列表:{list(self.tool_maps.keys())}"
- if not arguments:
- return f"query参数不能为空"
- return str(self.tool_maps[function](arguments, project_id=project_id))
- def simulate_stream_response(self, content, delay=0.01):
- """
- 模拟流式响应输出
- :param content: 内容
- :param delay: 输出延迟
- """
- for char in content:
- print(char, end='', flush=True)
- time.sleep(delay)
- print() # 换行
- def simulate_stream_response_V2(self, content, base_delay=0.02):
- """
- 模拟更自然的流式响应输出
- :param content: 内容
- :param base_delay: 基础延迟
- """
- # 定义不同类型字符的延迟系数
- punctuation_delays = {
- '.': 0.3, # 句号 - 较长停顿
- '!': 0.25, # 感叹号 - 中等停顿
- '?': 0.25, # 问号 - 中等停顿
- ',': 0.15, # 逗号 - 短暂停顿
- ';': 0.15, # 中文分号
- ',': 0.15, # 中文逗号
- '。': 0.3, # 中文句号
- ';': 0.15, # 分号
- ':': 0.12, # 冒号
- '-': 0.08, # 破折号
- '"': 0.1, # 引号
- "'": 0.1, # 单引号
- '\n': 0.4, # 换行符
- '\t': 0.2, # 制表符
- }
- for i, char in enumerate(content):
- # 计算基础延迟
- delay = base_delay
- # 根据字符类型调整延迟
- if char in punctuation_delays:
- delay = punctuation_delays[char]
- elif char.isupper(): # 大写字母稍慢一点
- delay = base_delay * 1.2
- elif char.isdigit(): # 数字稍快一点
- delay = base_delay * 0.8
- elif char in 'abcdefghijklmnopqrstuvwxyz': # 小写字母正常速度
- delay = base_delay
- else: # 其他字符(如中文)略慢
- delay = base_delay * 1.1
- # 添加随机波动(±20%)
- import random
- delay *= random.uniform(0.8, 1.2)
- print(char, end='', flush=True)
- time.sleep(delay)
- print() # 换行
- def chat(self, user_input:str,project_id:int, stream:bool=True, conversion_id='400'):
- # 初始化对话历史
- print('='*50)
- print("开始对话...")
- print(f"User>\t {user_input}")
- # 检查历史会话
- if conversion_id in self.history_chat:
- history_messages = self.history_chat[conversion_id]
- history_messages.extend([{"role": "user", "content": user_input}])
- else:
- history_messages = [{"role": "user", "content": user_input}]
- self.history_chat[conversion_id] = history_messages
- tools_history = ['<think>']
- while self.current_iteration < self.max_iterations:
- water_robot_message = self.send_messages(history_messages, stream=False)
- # 检查是否由工具调用
- if hasattr(water_robot_message, 'tool_calls') and water_robot_message.tool_calls:
- # 添加助手消息到历史
- history_messages.append({
- "role": "assistant",
- "content": water_robot_message.content,
- "tool_calls": [
- {
- "id": tool_call.id,
- "type": tool_call.type,
- "function": {
- "name": tool_call.function.name,
- "arguments": tool_call.function.arguments
- }
- } for tool_call in water_robot_message.tool_calls
- ]
- })
- # 处理每个工具调用
- for i, tool_call in enumerate(water_robot_message.tool_calls):
- print(f"🔍 调用工具{i}:【{self.tool_names_map.get(tool_call.function.name, ' ')}】 ")
- tools_history.append(f"🔍 调用工具{i}:【{self.tool_names_map.get(tool_call.function.name, ' ')}】 ")
- arguments = json.loads(tool_call.function.arguments)
- print(f"📋 工具{i}参数:{arguments} ")
- tools_history.append(f"📋 工具{i}参数:{arguments} ")
- tool_result = know_agent.function_call(tool_call.function.name, arguments.get("query", " "), project_id=project_id)
- # 添加工具响应
- history_messages.append({
- "role": "tool",
- "tool_call_id": tool_call.id,
- "content": tool_result
- })
- self.current_iteration += 1
- else:
- # 没有工具调用,显示最终答案
- print(f"Model>")
- self.simulate_stream_response(water_robot_message.content)
- history_messages.append({
- "role": "assistant",
- "content": water_robot_message.content
- })
- break
- if self.current_iteration >= self.max_iterations: # 处理迭代次数超限的情况
- # 告诉模型迭代次数超限
- history_messages.append({
- "role": "tool",
- "tool_call_id": 0,
- "content": "当前工具调用次数达到上限,请根据现有结果进行分析回复。"
- })
- water_robot_message = self.send_messages(history_messages, stream=False)
- # 添加最后一次模型给出的结果
- history_messages.append({
- "role": "assistant",
- "content": water_robot_message.content
- })
- print("达到最大迭代次数,对话结束。")
- # 重置迭代计数器
- self.current_iteration = 0
- # 保存对话历史
- self.history_chat[conversion_id].extend(history_messages)
- tools_history.append('</think>')
- return ('\n'.join(tools_history) + history_messages[-1].get('content', 'null')) if history_messages[-1].get('role', ' ')=='assistant' else 'null'
- know_agent = KnowAgent()
- if __name__ == "__main__":
- while True:
- user_input = input("\n用户输入: ")
- if user_input.lower() == "exit":
- break
- res = know_agent.chat(user_input, 92)
- pass
|