| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- import sys
- sys.path.append("..")
- import pandas as pd
- import pymysql
- from utils.tools import fmt_date
- import config
- class DatabaseParam:
- def __init__(self, db_user: str, db_password: str, db_host: str, db_name: str, db_port: int, db_charset: str='utf8mb4'):
- self.db_user = db_user
- self.db_password = db_password
- self.db_host = db_host
- self.db_name = db_name
- self.db_port = db_port
- self.db_charset = db_charset
- @property
- def params(self) -> dict:
- # 执行一些转换或者参数检查, 待补充
- pass
- return {'db_user': self.db_user,
- 'db_password': self.db_password,
- 'db_host': self.db_host,
- 'db_name': self.db_name,
- 'db_port': self.db_port,
- 'db_charset': self.db_charset
- }
- class Database:
- def __init__(self, params: DatabaseParam):
- self.params = params.params # 参数
- self.db_conn = None # 连接
- self.cursor = None # 游标
- def __enter__(self):
- try:
- # 连接失败仍为None
- self.db_conn = pymysql.connect(host=self.params.get('db_host'),
- user=self.params.get('db_user'),
- password=self.params.get('db_password'),
- database=self.params.get('db_name'),
- port=self.params.get('db_port'),
- charset='utf8mb4')
- self.db_cursor = self.db_conn.cursor()
- except pymysql.MySQLError as e:
- print('数据库连接失败:', e)
- print(f'请检查 host: {self.params.get('db_host')}, user: {self.params.get('db_user')}, password: , database: {self.params.get('db_name')}, port: {self.params.get('db_port')}')
- return None
- if self.db_cursor and self.db_conn: print(f'数据库{self.params.get('db_name')}已连接!')
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- if self.db_cursor:
- self.db_cursor.close()
- self.db_cursor = None
- if self.db_conn:
- self.db_conn.close()
- self.db_conn = None
- if self.db_cursor is None and self.db_conn is None: print(f'数据库{self.params.get('db_name')}已断开!')
- def sheet_exists(self, sheet_name: str) -> bool:
- sql = f"""SHOW TABLES FROM {self.params.get('db_name')} LIKE '{sheet_name}'"""
- self.db_cursor.execute(sql)
- result = self.db_cursor.fetchall()
- if len(result) == 0:
- return False
- else:
- return True
- def query_sql_time_series2data_frame(self,
- project_id:int ,
- sheet_name:str ,
- data_code:str,
- start_year:int , end_year:int ,
- start_month:int=1, end_month:int=12,
- start_day:int=1, end_day:int=31,
- start_hour:int=0, end_hour:int=23,
- start_minute:int=0, end_minute:int=59,
- start_second:int=0, end_second:int=59):
- # 时间格式化
- start_datetime, end_datetime = fmt_date(start_year=start_year,start_month=start_month,start_day=start_day,
- end_year=end_year,end_month=end_month,end_day=end_day,
- start_hour=start_hour,start_minute=start_minute,start_second=start_second,
- end_hour=end_hour,end_minute=end_minute,end_second=end_second)
- # 查询语句
- sql = f"""SELECT * FROM {sheet_name} WHERE item_name = '{data_code}' AND project_id = '{project_id}' AND h_time >= '{start_datetime}' AND h_time <= '{end_datetime}'"""
- #print(sql)
- if self.db_cursor is None: raise TypeError('数据库可能未连接,值不能为None.', self.db_cursor)
- # 查询数据
- self.db_cursor.execute(sql)
- result = self.db_cursor.fetchall()
- #result = self.db_cursor.fetchmany(3)
- df = pd.DataFrame(result, columns=[desc[0] for desc in self.db_cursor.description])
- if not len(df):
- print(f'查询到0条数据,序列标签:{data_code}, 数量:{len(df)}')
- return None
- if df.iloc[0]['item_name'].strip() != data_code:
- raise RuntimeError(f'数据库中序列名称与输入不一致,输入:{data_code}, 数据库:{df.iloc[0]['item_name']}')
- # # 消除Nan
- # df.dropna(subset=['val'], inplace=True) # 不要在数据库这里消除Nan
- # 修改标签
- val_label = df.iloc[0]['item_name'].strip()# + '_val'
- df.rename(columns={'val': f'{val_label}', 'h_time':'time'}, inplace=True)
- # 删除无关列
- df.drop(columns=['project_id', 'item_name'], inplace=True, axis=1)
- # 转换值数据类型
- df[val_label] = df[val_label].astype("float32")
- return df[['time', val_label]]
- def query_sql_time_series_group2data_frame(self,
- code_name_dict: dict,
- project_id:int ,
- sheet_name:str ,
- data_codes:list,
- start_year:int , end_year:int ,
- start_month:int=1, end_month:int=1,
- start_day:int=1, end_day:int=1,
- start_hour:int=0, end_hour:int=0,
- start_minute:int=0, end_minute:int=0,
- start_second:int=0, end_second:int=0):
- """从数据库中查询多个字段,返回统一结果"""
- frame_list = []
- data_codes = set(data_codes)
- for data_code in data_codes:
- frame = self.query_sql_time_series2data_frame(project_id=project_id,
- sheet_name=sheet_name,
- data_code=str(data_code),
- start_year=start_year,end_year=end_year,
- start_month=start_month,end_month=end_month,
- start_day=start_day, end_day=end_day,
- start_hour=start_hour, end_hour=end_hour,
- start_minute=start_minute, end_minute=end_minute,
- start_second=start_second, end_second=end_second,
- )
- if frame is None: continue
- # 过滤常数序列
- if frame[frame.columns[1]].nunique() <= 2:
- print(f'跳过常数列{frame.columns[1]}')
- continue
- frame_list.append(frame)
- # 融合所有字段
- if len(frame_list) == 0: return None
- df_merge = frame_list[0]
- for i in range(1, len(frame_list)):
- df_merge = pd.merge(df_merge, frame_list[i], how='outer', on='time') # 外连接融合所有结果
- # 按照日期排序
- df_merge.sort_values('time', kind='mergesort', inplace=True)
- return df_merge
- if __name__ == '__main__':
- # 创建参数
- db_param = DatabaseParam(
- db_host= '192.168.50.4',
- db_user='root',
- db_password='*B-@p2b+97D5xAF1e6',
- db_name='ws_data',
- db_port=4000)
- # 数据库操作应在内部
- with Database(db_param) as db:
- df_ = db.query_sql_time_series2data_frame(92,
- 'dc_item_history_data_day',
- 'QSWGB3_n',
- 2025, 2025,
- 3, 9,
- 25,10)
- print(df_)
|