database_.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import sys
  2. sys.path.append("..")
  3. import pandas as pd
  4. import pymysql
  5. from utils.tools import fmt_date
  6. import config
  7. class DatabaseParam:
  8. def __init__(self, db_user: str, db_password: str, db_host: str, db_name: str, db_port: int, db_charset: str='utf8mb4'):
  9. self.db_user = db_user
  10. self.db_password = db_password
  11. self.db_host = db_host
  12. self.db_name = db_name
  13. self.db_port = db_port
  14. self.db_charset = db_charset
  15. @property
  16. def params(self) -> dict:
  17. # 执行一些转换或者参数检查, 待补充
  18. pass
  19. return {'db_user': self.db_user,
  20. 'db_password': self.db_password,
  21. 'db_host': self.db_host,
  22. 'db_name': self.db_name,
  23. 'db_port': self.db_port,
  24. 'db_charset': self.db_charset
  25. }
  26. class Database:
  27. def __init__(self, params: DatabaseParam):
  28. self.params = params.params # 参数
  29. self.db_conn = None # 连接
  30. self.cursor = None # 游标
  31. def __enter__(self):
  32. try:
  33. # 连接失败仍为None
  34. self.db_conn = pymysql.connect(host=self.params.get('db_host'),
  35. user=self.params.get('db_user'),
  36. password=self.params.get('db_password'),
  37. database=self.params.get('db_name'),
  38. port=self.params.get('db_port'),
  39. charset='utf8mb4')
  40. self.db_cursor = self.db_conn.cursor()
  41. except pymysql.MySQLError as e:
  42. print('数据库连接失败:', e)
  43. print(f'请检查 host: {self.params.get('db_host')}, user: {self.params.get('db_user')}, password: , database: {self.params.get('db_name')}, port: {self.params.get('db_port')}')
  44. return None
  45. if self.db_cursor and self.db_conn: print(f'数据库{self.params.get('db_name')}已连接!')
  46. return self
  47. def __exit__(self, exc_type, exc_val, exc_tb):
  48. if self.db_cursor:
  49. self.db_cursor.close()
  50. self.db_cursor = None
  51. if self.db_conn:
  52. self.db_conn.close()
  53. self.db_conn = None
  54. if self.db_cursor is None and self.db_conn is None: print(f'数据库{self.params.get('db_name')}已断开!')
  55. def sheet_exists(self, sheet_name: str) -> bool:
  56. sql = f"""SHOW TABLES FROM {self.params.get('db_name')} LIKE '{sheet_name}'"""
  57. self.db_cursor.execute(sql)
  58. result = self.db_cursor.fetchall()
  59. if len(result) == 0:
  60. return False
  61. else:
  62. return True
  63. def query_sql_time_series2data_frame(self,
  64. project_id:int ,
  65. sheet_name:str ,
  66. data_code:str,
  67. start_year:int , end_year:int ,
  68. start_month:int=1, end_month:int=12,
  69. start_day:int=1, end_day:int=31,
  70. start_hour:int=0, end_hour:int=23,
  71. start_minute:int=0, end_minute:int=59,
  72. start_second:int=0, end_second:int=59):
  73. # 时间格式化
  74. start_datetime, end_datetime = fmt_date(start_year=start_year,start_month=start_month,start_day=start_day,
  75. end_year=end_year,end_month=end_month,end_day=end_day,
  76. start_hour=start_hour,start_minute=start_minute,start_second=start_second,
  77. end_hour=end_hour,end_minute=end_minute,end_second=end_second)
  78. # 查询语句
  79. sql = f"""SELECT * FROM {sheet_name} WHERE item_name = '{data_code}' AND project_id = '{project_id}' AND h_time >= '{start_datetime}' AND h_time <= '{end_datetime}'"""
  80. #print(sql)
  81. if self.db_cursor is None: raise TypeError('数据库可能未连接,值不能为None.', self.db_cursor)
  82. # 查询数据
  83. self.db_cursor.execute(sql)
  84. result = self.db_cursor.fetchall()
  85. #result = self.db_cursor.fetchmany(3)
  86. df = pd.DataFrame(result, columns=[desc[0] for desc in self.db_cursor.description])
  87. if not len(df):
  88. print(f'查询到0条数据,序列标签:{data_code}, 数量:{len(df)}')
  89. return None
  90. if df.iloc[0]['item_name'].strip() != data_code:
  91. raise RuntimeError(f'数据库中序列名称与输入不一致,输入:{data_code}, 数据库:{df.iloc[0]['item_name']}')
  92. # # 消除Nan
  93. # df.dropna(subset=['val'], inplace=True) # 不要在数据库这里消除Nan
  94. # 修改标签
  95. val_label = df.iloc[0]['item_name'].strip()# + '_val'
  96. df.rename(columns={'val': f'{val_label}', 'h_time':'time'}, inplace=True)
  97. # 删除无关列
  98. df.drop(columns=['project_id', 'item_name'], inplace=True, axis=1)
  99. # 转换值数据类型
  100. df[val_label] = df[val_label].astype("float32")
  101. return df[['time', val_label]]
  102. def query_sql_time_series_group2data_frame(self,
  103. code_name_dict: dict,
  104. project_id:int ,
  105. sheet_name:str ,
  106. data_codes:list,
  107. start_year:int , end_year:int ,
  108. start_month:int=1, end_month:int=1,
  109. start_day:int=1, end_day:int=1,
  110. start_hour:int=0, end_hour:int=0,
  111. start_minute:int=0, end_minute:int=0,
  112. start_second:int=0, end_second:int=0):
  113. """从数据库中查询多个字段,返回统一结果"""
  114. frame_list = []
  115. data_codes = set(data_codes)
  116. for data_code in data_codes:
  117. frame = self.query_sql_time_series2data_frame(project_id=project_id,
  118. sheet_name=sheet_name,
  119. data_code=str(data_code),
  120. start_year=start_year,end_year=end_year,
  121. start_month=start_month,end_month=end_month,
  122. start_day=start_day, end_day=end_day,
  123. start_hour=start_hour, end_hour=end_hour,
  124. start_minute=start_minute, end_minute=end_minute,
  125. start_second=start_second, end_second=end_second,
  126. )
  127. if frame is None: continue
  128. # 过滤常数序列
  129. if frame[frame.columns[1]].nunique() <= 2:
  130. print(f'跳过常数列{frame.columns[1]}')
  131. continue
  132. frame_list.append(frame)
  133. # 融合所有字段
  134. if len(frame_list) == 0: return None
  135. df_merge = frame_list[0]
  136. for i in range(1, len(frame_list)):
  137. df_merge = pd.merge(df_merge, frame_list[i], how='outer', on='time') # 外连接融合所有结果
  138. # 按照日期排序
  139. df_merge.sort_values('time', kind='mergesort', inplace=True)
  140. return df_merge
  141. if __name__ == '__main__':
  142. # 创建参数
  143. db_param = DatabaseParam(
  144. db_host= '192.168.50.4',
  145. db_user='root',
  146. db_password='*B-@p2b+97D5xAF1e6',
  147. db_name='ws_data',
  148. db_port=4000)
  149. # 数据库操作应在内部
  150. with Database(db_param) as db:
  151. df_ = db.query_sql_time_series2data_frame(92,
  152. 'dc_item_history_data_day',
  153. 'QSWGB3_n',
  154. 2025, 2025,
  155. 3, 9,
  156. 25,10)
  157. print(df_)