main.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. from dify_api import ChatHistoryCollector
  2. from user_api import UserCollector
  3. import time
  4. from db import ConversationDatabaseManage
  5. from logger_config import logger
  6. from datetime import datetime
  7. import schedule
  8. def write_to_database(conversations):
  9. """将聊天记录写入数据库"""
  10. db = ConversationDatabaseManage()
  11. for conversation in conversations:
  12. db.insert_record(conversation['id'], conversation['record'], conversation['time'])
  13. def run(user='admin'):
  14. try:
  15. # 创建收集器实例
  16. collector = ChatHistoryCollector()
  17. # 创建数据库实例
  18. db = ConversationDatabaseManage()
  19. counter = 0
  20. with db:
  21. # 先获取一次数据
  22. time.sleep(1)
  23. has_more, conversations = collector.collect(user=user)
  24. for conversation in conversations:
  25. if db.insert_record(id_=conversation['id'], record=conversation['name'], time=conversation['created_at']):
  26. counter += 1
  27. # 写入一次数据库
  28. while has_more:
  29. time.sleep(1) # 避免请求过快触发API限流
  30. # 传入上一次的最后一条数据的id,获取下一条数据
  31. has_more, conversations = collector.collect(user=user, last_id=conversations[-1]['id'])
  32. for conversation in conversations:
  33. if db.insert_record(id_=conversation['id'], record=conversation['name'], time=conversation['created_at']):
  34. counter+=1
  35. logger.info(f"写入{counter}条数据")
  36. except Exception as e:
  37. logger.error(f"写入数据库时出错: {str(e)}")
  38. def job():
  39. # 创建用户采集器实例
  40. user_collector = UserCollector()
  41. user_info = user_collector.get_user_info()
  42. for user in user_info:
  43. logger.info(f"开始处理用户: {user}")
  44. run(user=user)
  45. logger.info(f"处理用户: {user}完成")
  46. if __name__ == "__main__":
  47. """主函数 - 设置定时任务并启动调度器"""
  48. logger.info("定时任务调度器启动...")
  49. # 设置定时任务 - 每隔 30 分钟执行一次
  50. # 你可以根据需要修改时间间隔,例如:
  51. # schedule.every(1).hours.do(job) # 每小时执行一次
  52. # schedule.every(2).hours.do(job) # 每2小时执行一次
  53. schedule.every().day.at("03:00").do(job) # 每天上午3点执行
  54. # schedule.every().monday.do(job) # 每周一执行
  55. # schedule.every(30).minutes.do(job)
  56. logger.info("定时任务已设置: 每天执行一次")
  57. logger.info("按 Ctrl+C 停止调度器")
  58. # 立即执行一次
  59. logger.info("立即执行第一次任务...")
  60. job()
  61. # 启动调度器循环
  62. try:
  63. while True:
  64. logger.info("检查并运行待处理的任务...")
  65. schedule.run_pending()
  66. time.sleep(3600)
  67. except KeyboardInterrupt:
  68. logger.info("收到停止信号,调度器已停止")