| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- from dify_api import ChatHistoryCollector
- from user_api import UserCollector
- import time
- from db import ConversationDatabaseManage
- from logger_config import logger
- from datetime import datetime
- import schedule
- def write_to_database(conversations):
- """将聊天记录写入数据库"""
- db = ConversationDatabaseManage()
- for conversation in conversations:
- db.insert_record(conversation['id'], conversation['record'], conversation['time'])
- def run(user='admin'):
- try:
- # 创建收集器实例
- collector = ChatHistoryCollector()
- # 创建数据库实例
- db = ConversationDatabaseManage()
- counter = 0
- with db:
- # 先获取一次数据
- time.sleep(1)
- has_more, conversations = collector.collect(user=user)
- for conversation in conversations:
- if db.insert_record(id_=conversation['id'], record=conversation['name'], time=conversation['created_at']):
- counter += 1
- # 写入一次数据库
- while has_more:
- time.sleep(1) # 避免请求过快触发API限流
- # 传入上一次的最后一条数据的id,获取下一条数据
- has_more, conversations = collector.collect(user=user, last_id=conversations[-1]['id'])
- for conversation in conversations:
- if db.insert_record(id_=conversation['id'], record=conversation['name'], time=conversation['created_at']):
- counter+=1
- logger.info(f"写入{counter}条数据")
- except Exception as e:
- logger.error(f"写入数据库时出错: {str(e)}")
- def job():
- # 创建用户采集器实例
- user_collector = UserCollector()
- user_info = user_collector.get_user_info()
- for user in user_info:
- logger.info(f"开始处理用户: {user}")
- run(user=user)
- logger.info(f"处理用户: {user}完成")
- if __name__ == "__main__":
- """主函数 - 设置定时任务并启动调度器"""
- logger.info("定时任务调度器启动...")
- # 设置定时任务 - 每隔 30 分钟执行一次
- # 你可以根据需要修改时间间隔,例如:
- # schedule.every(1).hours.do(job) # 每小时执行一次
- # schedule.every(2).hours.do(job) # 每2小时执行一次
- schedule.every().day.at("03:00").do(job) # 每天上午3点执行
- # schedule.every().monday.do(job) # 每周一执行
- # schedule.every(30).minutes.do(job)
- logger.info("定时任务已设置: 每天执行一次")
- logger.info("按 Ctrl+C 停止调度器")
- # 立即执行一次
- logger.info("立即执行第一次任务...")
- job()
- # 启动调度器循环
- try:
- while True:
- logger.info("检查并运行待处理的任务...")
- schedule.run_pending()
- time.sleep(3600)
- except KeyboardInterrupt:
- logger.info("收到停止信号,调度器已停止")
|