| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- from logger_config import logger
- import sqlite3
- import os
- import datetime
- class ConversationDatabaseManage:
- def __init__(self):
- self.logger = logger
- self.db_file = 'chat_history.db'
- self.conn = None
- self.cursor = None
- # 初始化数据库表
- if not os.path.exists(self.db_file):
- self.logger.info(f"创建新的数据库文件: {self.db_file}")
- self.create_table()
- def create_table(self):
- """创建数据库表"""
- try:
- self.conn = sqlite3.connect(self.db_file)
- self.cursor = self.conn.cursor()
- # 创建表
- create_table_query = """
- CREATE TABLE IF NOT EXISTS intent_recognition_records (
- id TEXT,
- record TEXT NOT NULL PRIMARY KEY,
- time TEXT NOT NULL,
- label INTEGER DEFAULT 0
- )
- """
- self.cursor.execute(create_table_query)
- self.conn.commit()
- self.logger.info("数据库表创建成功")
- except Exception as e:
- self.logger.error(f"创建数据库表时出错: {str(e)}")
- finally:
- if self.conn:
- self.conn.close()
- self.conn = None
- self.cursor = None
- self.logger.info("数据库连接已关闭")
- def insert_record(self, id_:str, record:str, time):
- """插入一条记录"""
- try:
- # 如果是datetime对象,转换为字符串
- if isinstance(time, datetime.datetime):
- time_str = time.strftime('%Y-%m-%d %H:%M:%S')
- elif isinstance(time, int):
- # 将时间戳转换为UTC时间字符串
- time_str = datetime.datetime.fromtimestamp(time, tz=datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
- else:
- time_str = str(time)
- self.conn.execute('BEGIN')
- # 检查record是否已存在
- check_query = """
- SELECT 1 FROM intent_recognition_records
- WHERE record = ?
- """
- self.cursor.execute(check_query, (record,))
- if self.cursor.fetchone():
- self.logger.info(f"record已存在,跳过: {record}")
- self.conn.commit() # 提交只读事务,避免事务挂起
- return True
- insert_query = """
- INSERT INTO intent_recognition_records (id, record, time) VALUES (?, ?, ?)
- """
- self.cursor.execute(insert_query, (id_, record, time_str))
- self.conn.commit()
- self.logger.info(f"成功插入一条记录: {id_}-{time_str}")
- return True
- except Exception as e:
- self.conn.rollback()
- self.logger.error(f"插入记录时出错,已回滚事务: {str(e)} ")
- return False
- def connect(self):
- """建立数据库连接(私有方法)"""
- try:
- self.conn = sqlite3.connect(self.db_file)
- self.cursor = self.conn.cursor()
- self.logger.info("数据库连接建立成功")
- except Exception as e:
- self.logger.error(f"数据库连接失败: {str(e)}")
- raise
- def close(self):
- """手动关闭数据库连接"""
- if self.conn:
- self.conn.close()
- self.conn = None
- self.cursor = None
- self.logger.info("数据库连接已关闭")
- def __del__(self):
- """析构函数,确保连接被关闭"""
- self.close()
- def __enter__(self):
- self.connect()
- return self
- def __exit__(self, exc_type, exc_value, traceback):
- self.close()
- if __name__ == '__main__':
- db = ConversationDatabaseManage()
- with db:
- db.insert_record('1', 'test', datetime.datetime.now())
|