import sys sys.path.append("..") import pandas as pd import pymysql from utils.tools import fmt_date import config class DatabaseParam: def __init__(self, db_user: str, db_password: str, db_host: str, db_name: str, db_port: int, db_charset: str='utf8mb4'): self.db_user = db_user self.db_password = db_password self.db_host = db_host self.db_name = db_name self.db_port = db_port self.db_charset = db_charset @property def params(self) -> dict: # 执行一些转换或者参数检查, 待补充 pass return {'db_user': self.db_user, 'db_password': self.db_password, 'db_host': self.db_host, 'db_name': self.db_name, 'db_port': self.db_port, 'db_charset': self.db_charset } class Database: def __init__(self, params: DatabaseParam): self.params = params.params # 参数 self.db_conn = None # 连接 self.cursor = None # 游标 def __enter__(self): try: # 连接失败仍为None self.db_conn = pymysql.connect(host=self.params.get('db_host'), user=self.params.get('db_user'), password=self.params.get('db_password'), database=self.params.get('db_name'), port=self.params.get('db_port'), charset='utf8mb4') self.db_cursor = self.db_conn.cursor() except pymysql.MySQLError as e: print('数据库连接失败:', e) print(f'请检查 host: {self.params.get('db_host')}, user: {self.params.get('db_user')}, password: , database: {self.params.get('db_name')}, port: {self.params.get('db_port')}') return None if self.db_cursor and self.db_conn: print(f'数据库{self.params.get('db_name')}已连接!') return self def __exit__(self, exc_type, exc_val, exc_tb): if self.db_cursor: self.db_cursor.close() self.db_cursor = None if self.db_conn: self.db_conn.close() self.db_conn = None if self.db_cursor is None and self.db_conn is None: print(f'数据库{self.params.get('db_name')}已断开!') def sheet_exists(self, sheet_name: str) -> bool: sql = f"""SHOW TABLES FROM {self.params.get('db_name')} LIKE '{sheet_name}'""" self.db_cursor.execute(sql) result = self.db_cursor.fetchall() if len(result) == 0: return False else: return True def query_sql_time_series2data_frame(self, project_id:int , sheet_name:str , data_code:str, start_year:int , end_year:int , start_month:int=1, end_month:int=12, start_day:int=1, end_day:int=31, start_hour:int=0, end_hour:int=23, start_minute:int=0, end_minute:int=59, start_second:int=0, end_second:int=59): # 时间格式化 start_datetime, end_datetime = fmt_date(start_year=start_year,start_month=start_month,start_day=start_day, end_year=end_year,end_month=end_month,end_day=end_day, start_hour=start_hour,start_minute=start_minute,start_second=start_second, end_hour=end_hour,end_minute=end_minute,end_second=end_second) # 查询语句 sql = f"""SELECT * FROM {sheet_name} WHERE item_name = '{data_code}' AND project_id = '{project_id}' AND h_time >= '{start_datetime}' AND h_time <= '{end_datetime}'""" #print(sql) if self.db_cursor is None: raise TypeError('数据库可能未连接,值不能为None.', self.db_cursor) # 查询数据 self.db_cursor.execute(sql) result = self.db_cursor.fetchall() #result = self.db_cursor.fetchmany(3) df = pd.DataFrame(result, columns=[desc[0] for desc in self.db_cursor.description]) if not len(df): print(f'查询到0条数据,序列标签:{data_code}, 数量:{len(df)}') return None if df.iloc[0]['item_name'].strip() != data_code: raise RuntimeError(f'数据库中序列名称与输入不一致,输入:{data_code}, 数据库:{df.iloc[0]['item_name']}') # # 消除Nan # df.dropna(subset=['val'], inplace=True) # 不要在数据库这里消除Nan # 修改标签 val_label = df.iloc[0]['item_name'].strip()# + '_val' df.rename(columns={'val': f'{val_label}', 'h_time':'time'}, inplace=True) # 删除无关列 df.drop(columns=['project_id', 'item_name'], inplace=True, axis=1) # 转换值数据类型 df[val_label] = df[val_label].astype("float32") return df[['time', val_label]] def query_sql_time_series_group2data_frame(self, code_name_dict: dict, project_id:int , sheet_name:str , data_codes:list, start_year:int , end_year:int , start_month:int=1, end_month:int=1, start_day:int=1, end_day:int=1, start_hour:int=0, end_hour:int=0, start_minute:int=0, end_minute:int=0, start_second:int=0, end_second:int=0): """从数据库中查询多个字段,返回统一结果""" frame_list = [] data_codes = set(data_codes) for data_code in data_codes: frame = self.query_sql_time_series2data_frame(project_id=project_id, sheet_name=sheet_name, data_code=str(data_code), start_year=start_year,end_year=end_year, start_month=start_month,end_month=end_month, start_day=start_day, end_day=end_day, start_hour=start_hour, end_hour=end_hour, start_minute=start_minute, end_minute=end_minute, start_second=start_second, end_second=end_second, ) if frame is None: continue # 过滤常数序列 if frame[frame.columns[1]].nunique() <= 2: print(f'跳过常数列{frame.columns[1]}') continue frame_list.append(frame) # 融合所有字段 if len(frame_list) == 0: return None df_merge = frame_list[0] for i in range(1, len(frame_list)): df_merge = pd.merge(df_merge, frame_list[i], how='outer', on='time') # 外连接融合所有结果 # 按照日期排序 df_merge.sort_values('time', kind='mergesort', inplace=True) return df_merge if __name__ == '__main__': # 创建参数 db_param = DatabaseParam( db_host= '192.168.50.4', db_user='root', db_password='*B-@p2b+97D5xAF1e6', db_name='ws_data', db_port=4000) # 数据库操作应在内部 with Database(db_param) as db: df_ = db.query_sql_time_series2data_frame(92, 'dc_item_history_data_day', 'QSWGB3_n', 2025, 2025, 3, 9, 25,10) print(df_)