from logger_config import logger import sqlite3 import os import datetime class ConversationDatabaseManage: def __init__(self): self.logger = logger self.db_file = 'chat_history.db' self.conn = None self.cursor = None # 初始化数据库表 if not os.path.exists(self.db_file): self.logger.info(f"创建新的数据库文件: {self.db_file}") self.create_table() def create_table(self): """创建数据库表""" try: self.conn = sqlite3.connect(self.db_file) self.cursor = self.conn.cursor() # 创建表 create_table_query = """ CREATE TABLE IF NOT EXISTS intent_recognition_records ( id TEXT, record TEXT NOT NULL PRIMARY KEY, time TEXT NOT NULL, label INTEGER DEFAULT 0 ) """ self.cursor.execute(create_table_query) self.conn.commit() self.logger.info("数据库表创建成功") except Exception as e: self.logger.error(f"创建数据库表时出错: {str(e)}") finally: if self.conn: self.conn.close() self.conn = None self.cursor = None self.logger.info("数据库连接已关闭") def insert_record(self, id_:str, record:str, time): """插入一条记录""" try: # 如果是datetime对象,转换为字符串 if isinstance(time, datetime.datetime): time_str = time.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(time, int): # 将时间戳转换为UTC时间字符串 time_str = datetime.datetime.fromtimestamp(time, tz=datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S') else: time_str = str(time) self.conn.execute('BEGIN') # 检查record是否已存在 check_query = """ SELECT 1 FROM intent_recognition_records WHERE record = ? """ self.cursor.execute(check_query, (record,)) if self.cursor.fetchone(): self.logger.info(f"record已存在,跳过: {record}") self.conn.commit() # 提交只读事务,避免事务挂起 return True insert_query = """ INSERT INTO intent_recognition_records (id, record, time) VALUES (?, ?, ?) """ self.cursor.execute(insert_query, (id_, record, time_str)) self.conn.commit() self.logger.info(f"成功插入一条记录: {id_}-{time_str}") return True except Exception as e: self.conn.rollback() self.logger.error(f"插入记录时出错,已回滚事务: {str(e)} ") return False def connect(self): """建立数据库连接(私有方法)""" try: self.conn = sqlite3.connect(self.db_file) self.cursor = self.conn.cursor() self.logger.info("数据库连接建立成功") except Exception as e: self.logger.error(f"数据库连接失败: {str(e)}") raise def close(self): """手动关闭数据库连接""" if self.conn: self.conn.close() self.conn = None self.cursor = None self.logger.info("数据库连接已关闭") def __del__(self): """析构函数,确保连接被关闭""" self.close() def __enter__(self): self.connect() return self def __exit__(self, exc_type, exc_value, traceback): self.close() if __name__ == '__main__': db = ConversationDatabaseManage() with db: db.insert_record('1', 'test', datetime.datetime.now())