| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- from dify_api import ChatHistoryCollector
- import time
- from db import ConversationDatabaseManage
- from logger_config import logger
- from datetime import datetime
- import schedule
- def write_to_database(conversations):
- """将聊天记录写入数据库"""
- db = ConversationDatabaseManage()
- for conversation in conversations:
- db.insert_record(conversation['id'], conversation['record'], conversation['time'])
- def run(user='admin'):
- try:
- # 创建收集器实例
- collector = ChatHistoryCollector()
- # 创建数据库实例
- db = ConversationDatabaseManage()
- with db:
- # 先获取一次数据
- has_more, conversations = collector.collect(user=user)
- for conversation in conversations:
- db.insert_record(id_=conversation['id'], record=conversation['name'], time=conversation['created_at'])
- # 写入一次数据库
- while has_more:
- time.sleep(1) # 避免请求过快触发API限流
- # 传入上一次的最后一条数据的id,获取下一条数据
- has_more, conversations = collector.collect(user=user, last_id=conversations[-1]['id'])
- for conversation in conversations:
- db.insert_record(id_=conversation['id'], record=conversation['name'], time=conversation['created_at'])
- except Exception as e:
- logger.error(f"写入数据库时出错: {str(e)}")
- def job():
- run(user='admin')
- if __name__ == "__main__":
- """主函数 - 设置定时任务并启动调度器"""
- logger.info("定时任务调度器启动...")
- # 设置定时任务 - 每隔 30 分钟执行一次
- # 你可以根据需要修改时间间隔,例如:
- # schedule.every(1).hours.do(job) # 每小时执行一次
- # schedule.every(2).hours.do(job) # 每2小时执行一次
- schedule.every().day.at("03:00").do(job) # 每天上午3点执行
- # schedule.every().monday.do(job) # 每周一执行
- # schedule.every(30).minutes.do(job)
- logger.info("定时任务已设置: 每天执行一次")
- logger.info("按 Ctrl+C 停止调度器")
- # 立即执行一次
- logger.info("立即执行第一次任务...")
- job()
- # 启动调度器循环
- try:
- while True:
- schedule.run_pending()
- time.sleep(3600)
- except KeyboardInterrupt:
- logger.info("收到停止信号,调度器已停止")
|