from dify_api import ChatHistoryCollector from user_api import UserCollector import time from db import ConversationDatabaseManage from logger_config import logger from datetime import datetime import schedule def write_to_database(conversations): """将聊天记录写入数据库""" db = ConversationDatabaseManage() for conversation in conversations: db.insert_record(conversation['id'], conversation['record'], conversation['time']) def run(user='admin'): try: # 创建收集器实例 collector = ChatHistoryCollector() # 创建数据库实例 db = ConversationDatabaseManage() counter = 0 with db: # 先获取一次数据 time.sleep(1) has_more, conversations = collector.collect(user=user) for conversation in conversations: if db.insert_record(id_=conversation['id'], record=conversation['name'], time=conversation['created_at']): counter += 1 # 写入一次数据库 while has_more: time.sleep(1) # 避免请求过快触发API限流 # 传入上一次的最后一条数据的id,获取下一条数据 has_more, conversations = collector.collect(user=user, last_id=conversations[-1]['id']) for conversation in conversations: if db.insert_record(id_=conversation['id'], record=conversation['name'], time=conversation['created_at']): counter+=1 logger.info(f"写入{counter}条数据") except Exception as e: logger.error(f"写入数据库时出错: {str(e)}") def job(): # 创建用户采集器实例 user_collector = UserCollector() user_info = user_collector.get_user_info() for user in user_info: logger.info(f"开始处理用户: {user}") run(user=user) logger.info(f"处理用户: {user}完成") if __name__ == "__main__": """主函数 - 设置定时任务并启动调度器""" logger.info("定时任务调度器启动...") # 设置定时任务 - 每隔 30 分钟执行一次 # 你可以根据需要修改时间间隔,例如: # schedule.every(1).hours.do(job) # 每小时执行一次 # schedule.every(2).hours.do(job) # 每2小时执行一次 schedule.every().day.at("03:00").do(job) # 每天上午3点执行 # schedule.every().monday.do(job) # 每周一执行 # schedule.every(30).minutes.do(job) logger.info("定时任务已设置: 每天执行一次") logger.info("按 Ctrl+C 停止调度器") # 立即执行一次 logger.info("立即执行第一次任务...") job() # 启动调度器循环 try: while True: logger.info("检查并运行待处理的任务...") schedule.run_pending() time.sleep(3600) except KeyboardInterrupt: logger.info("收到停止信号,调度器已停止")