from logger_config import logger import requests import os import dotenv dotenv.load_dotenv() class ChatHistoryCollector: """聊天历史数据收集器""" def __init__(self, limit=100, timeout=5): # 聊天历史API的URL和请求头 self.base_url = os.getenv('DIFY_URL', '') self.headers = { 'Authorization': F'Bearer {os.getenv('DIFY_API_KEY', '')}', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36' } self.logger = logger self.limit = limit self.timeout = timeout def collect(self, user='admin', sort_by='-updated_at', last_id=None, timeout=10): """ 收集聊天历史数据 :param timeout: 延时等待事时间 :param user: 用户名,默认为'admin' :param sort_by: 排序字段,默认为'-updated_at' :param last_id: 最后一条数据的ID,用于分页,默认为None :return: 收集到的聊天历史数据 """ params = {'user': user, 'limit': self.limit, 'sort_by': sort_by} if last_id: params['last_id'] = last_id try: self.logger.info("开始访问聊天历史API") # 发送带认证头的请求 response = requests.get(self.base_url, headers=self.headers,params=params, timeout=timeout) if response.status_code == 200: self.logger.info(f"访问成功,状态码: {response.status_code}") # 打印响应内容 try: response_data = response.json() self.logger.info(f"返回数据: {len(response_data.get('data', []))}条") # 返回是否存在更多数据,和数据列表 return response_data.get('has_more', False), response_data.get('data', []) except Exception as e: self.logger.info(f"返回内容: {response.text}", "捕获异常: ", e) else: self.logger.warning(f"访问失败,状态码: {response.status_code}") except requests.exceptions.RequestException as e: self.logger.error(f"请求异常: {str(e)}") except Exception as e: self.logger.error(f"其他错误: {str(e)}") if __name__ == '__main__': # 创建一个聊天历史数据收集器对象 collector = ChatHistoryCollector() collector.collect()