pearsonr.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. import csv
  2. import sys
  3. sys.path.append("..")
  4. import os
  5. import pandas as pd
  6. import config
  7. from Database.database_ import Database, DatabaseParam
  8. import json
  9. from scipy import stats
  10. import numpy as np
  11. import pickle
  12. from utils.tools import cal_vari_without_zero_nan, cal_vari_without_nan, df_is_symetry, quick_sort, load_transfer_file_name_code
  13. class DFMat:
  14. """输入字段,实现从数据库中获取全部数据,核心的属性是pandas.Dataframe,融合了所有字段的数据,从数据库拿完数据后需要进行数据的清洗和预处理"""
  15. def __init__(self, keys_file_dir: str, min_records:int, db_param: DatabaseParam, transfer_file_dir:str, is_from_local:bool=True):
  16. self.bad_keys = config.EXCLUDE_WORDS
  17. self.keys_file_dir = keys_file_dir
  18. self.min_records = min_records
  19. self.keys = self.load_keys() # 升序排序
  20. self.db_param = db_param
  21. self.transfer_file_dir = transfer_file_dir
  22. self.name_2code_dict, self.code_2name_dict = self.load_transfer_file() # 转换字典
  23. self.diff_words = config.DIFF_WORDS # 需要差分计算的字段,如果字段中包括这些字段就进行差分平稳化
  24. self.is_from_local = is_from_local
  25. # 本地保存数据库数据,避免重复查询
  26. self.local_df_merge_path = config.DF_MERGE_FILE_PATH
  27. self.df_merge = self.__construct() # 构建数据部分,初始化时完成
  28. def load_keys(self):
  29. keys_list = []
  30. with open(self.keys_file_dir, "r", encoding="utf-8") as f:
  31. csv_reader = csv.reader(f)
  32. try:
  33. label = next(csv_reader)
  34. except StopIteration:
  35. print('文件不存在:', self.keys_file_dir)
  36. for row in csv_reader:
  37. records_num = int(row[6])
  38. records_name = row[0]
  39. if records_num < self.min_records: continue
  40. keys_list.append(records_name)
  41. # 升序排序
  42. keys_list = sorted(keys_list)
  43. # 剔除列表不需要的字段
  44. keys_list = self.exclude_keys(keys_list)
  45. return keys_list # 升序排列
  46. def exclude_keys(self, keys_list:list):
  47. """根据剔除列表对键入的字段进行剔除"""
  48. new_keys = []
  49. for name in keys_list:
  50. flag = False
  51. for bad_key in self.bad_keys:
  52. if bad_key in name:
  53. flag = True
  54. break
  55. if flag: continue
  56. new_keys.append(name)
  57. return new_keys
  58. def load_transfer_file(self):
  59. """加载转换文件"""
  60. path = self.transfer_file_dir
  61. return load_transfer_file_name_code(path)
  62. # if not os.path.exists(self.transfer_file_dir):
  63. # raise FileNotFoundError('文件未发现:', self.transfer_file_dir)
  64. # with open(self.transfer_file_dir, "r", encoding="utf-8") as f:
  65. # json_data = json.load(f)
  66. # return json_data.get('name_2_code'), json_data.get('code_2_name')
  67. def save_df_merge(self, data:pd.DataFrame):
  68. """保存文件到本地"""
  69. with open(self.local_df_merge_path, 'wb') as f:
  70. pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)
  71. print(f'mat_shape:{data.shape},文件保存至:', self.local_df_merge_path)
  72. def load_from_local(self) -> pd.DataFrame:
  73. """从本地加载数据"""
  74. with open(self.local_df_merge_path, 'rb') as f:
  75. local_data = pickle.load(f)
  76. return local_data
  77. def normalize(self, data:pd.DataFrame)-> pd.DataFrame:
  78. """对数据进行归一化,消除量纲影响"""
  79. # 皮尔逊系数的计算不需要
  80. pass
  81. @staticmethod
  82. def diff_tool(data: pd.Series):
  83. """用于计算累计量的差分,单调递增"""
  84. data = data.copy()
  85. # 0值替换为Nan
  86. data.replace([np.inf, -np.inf, 0], np.nan, inplace=True)
  87. data = data.diff()
  88. # 检查负值,替换为nan
  89. data[data < 0] = np.nan
  90. data[0] = data.mean()
  91. # 将nan向前填充
  92. data.ffill(inplace=True)
  93. return data
  94. def stabilize(self, data:pd.DataFrame)-> pd.DataFrame:
  95. """数据差分使数据平稳化"""
  96. if len(self.diff_words) == 0: return data
  97. # 获取所有列标签, 仅保留需要做差分的字段
  98. col_label_list = data.columns.tolist()
  99. # 剔除时间戳字段
  100. if 'time' in col_label_list:
  101. col_label_list.remove('time')
  102. # 查找需要平稳化的字段
  103. diff_label_list = set()
  104. for col in col_label_list:
  105. name = self.code_2name_dict[col]
  106. for dword in self.diff_words:
  107. if dword in name:
  108. diff_label_list.add(col)
  109. diff_label_list = list(diff_label_list)
  110. for col in diff_label_list:
  111. data.loc[:, col] = self.diff_tool(data.loc[:, col])
  112. return data
  113. @staticmethod
  114. def remove_outliers(data:pd.Series, fill_value=0, times:int=1)-> pd.Series:
  115. """剔除序列的离群点,使用fill_value进行填充"""
  116. data = data.copy(deep=True)
  117. for time in range(abs(times)): # 执行times次
  118. # 计算均值和方差
  119. mean, std_dev = cal_vari_without_nan(data)
  120. fill_value = mean
  121. threshold = 3 * std_dev
  122. limit_top = mean + threshold
  123. limit_low = mean - threshold
  124. # 处理离群点
  125. mask = data.notna() & (data != 0) & ((data < limit_low) | (data > limit_top))
  126. # 离群点填充
  127. data.loc[mask] = fill_value
  128. # for idx, v in enumerate(data):
  129. # if pd.isna(v) or abs(v - 0.) < 1e-6: continue # 0和nan不处理
  130. # if v > limit_top or v < limit_low:
  131. # data[idx] = fill_value # 离群点填充
  132. return data
  133. def clean(self, data:pd.DataFrame)-> pd.DataFrame:
  134. """对数据进行清洗,把离群值和Nan替换为平均值,0不参与该过程"""
  135. # 获取所有列标签
  136. col_label_list = data.columns.tolist()
  137. if 'time' in col_label_list: col_label_list.remove('time') # 不处理time列
  138. # 逐列处理离群点
  139. for col_label in col_label_list:
  140. # 拿到列数据
  141. col_series = data.loc[:, col_label]
  142. data.loc[:, col_label] = self.remove_outliers(col_series, times=1)
  143. # 统一处理nan值,使用平均值填充nan
  144. cols_mean = data[col_label_list].mean() # 自动跳过平均值
  145. cols_mean = cols_mean.fillna(0)
  146. data[col_label_list] = data[col_label_list].fillna(cols_mean)
  147. return data
  148. def fetch(self)->pd.DataFrame:
  149. """从数据库中拿到数据,拿到原始数据,尽量不要在这里面清洗数据"""
  150. # 数据库操作应在内部
  151. data_names = self.keys
  152. data_codes = [self.name_2code_dict.get(name) for name in data_names]
  153. # 从数据库取数据
  154. with Database(self.db_param) as db: # 连接数据库
  155. # 检查表是否存在
  156. if not db.sheet_exists(config.DB_SHEET_NAME):
  157. raise RuntimeError(f'表{config.DB_SHEET_NAME}不存在于数据库{config.DB_NAME}中!')
  158. # SQL查询数据
  159. group_df = db.query_sql_time_series_group2data_frame(
  160. code_name_dict=self.code_2name_dict,
  161. project_id=config.PROJECT_ID,
  162. sheet_name=config.DB_SHEET_NAME,
  163. data_codes=data_codes,
  164. start_year=config.CHECK_YEAR_START,
  165. end_year=config.CHECK_YEAR_END,
  166. start_month=config.CHECK_MONTH_START,
  167. end_month=config.CHECK_MONTH_END,
  168. start_day=config.CHECK_DAY_START,
  169. end_day=config.CHECK_DAY_END,
  170. start_hour=config.CHECK_HOUR_START,
  171. end_hour=config.CHECK_HOUR_END,
  172. start_minute=config.CHECK_MINUTE_START,
  173. end_minute=config.CHECK_MINUTE_END,
  174. start_second=config.CHECK_SECONDS_START,
  175. end_second=config.CHECK_SECONDS_END)
  176. return group_df
  177. def __construct(self):
  178. """构建所有满足条件的字段dataframe"""
  179. # 尝试从本地加载数据
  180. if self.is_from_local:
  181. if os.path.exists(self.local_df_merge_path):
  182. print(f'从本地{self.local_df_merge_path}加载数据库数据')
  183. return self.load_from_local()
  184. else:
  185. print(f'从本地{self.local_df_merge_path}加载失败,文件不存在!')
  186. # 先从数据库获取数据
  187. print("尝试从数据库获取数据!")
  188. group_df = self.fetch()
  189. # 清洗数据,消除Nan和离群值
  190. group_df = self.clean(group_df) # 把自己的引用给自己
  191. # 平稳化
  192. group_df = self.stabilize(group_df) # 此时数据不存在nan
  193. # 如果数据不存在就保存
  194. if not os.path.exists(self.local_df_merge_path):
  195. self.save_df_merge(group_df)
  196. return group_df
  197. def get_df_merge(self):
  198. return self.df_merge
  199. class PearsonrMat(DFMat):
  200. """实现皮尔逊相关系数矩阵,核心属性为pandas.Dataframe,要求键入key,核心的df行和列也是按照给定的keys写入"""
  201. def __init__(self, keys_file_dir: str, min_records:int, db_param: DatabaseParam, transfer_file_dir:str, is_from_local:bool=True):
  202. super().__init__(keys_file_dir=keys_file_dir, min_records=min_records, db_param=db_param, transfer_file_dir=transfer_file_dir, is_from_local=is_from_local)
  203. self.r_mat = None
  204. self.lag_mat = None
  205. def r_mat_filter(self):
  206. """将mat中只和自己相关的字段过滤掉"""
  207. # 先找到需要删除的字段
  208. filter_label_list = []
  209. label_list = self.r_mat.columns.tolist()
  210. for label in label_list:
  211. r_col = self.r_mat.loc[:, label]
  212. non_zero_counter = 0
  213. for value in r_col:
  214. if value > config.PEARSONR_VALUE_THRESHOLD:
  215. non_zero_counter += 1
  216. if non_zero_counter < 2:
  217. filter_label_list.append(label)
  218. self.r_mat.drop(filter_label_list, axis=0, inplace=True)
  219. self.r_mat.drop(filter_label_list, axis=1, inplace=True)
  220. def pearsonr_with_lag(self, a_series_data_label: str, b_series_data_label: str):
  221. """带滞后的皮尔逊计算"""
  222. lags = config.MAX_LAG
  223. if lags == 0:
  224. left_point = 0
  225. right_point = 1
  226. elif lags > 0:
  227. left_point = -lags
  228. right_point = lags
  229. else:
  230. raise ValueError('最大滞后不能为负数', lags)
  231. step = config.STEP
  232. # 不同滞后下的相关系数
  233. list_r_lag = []
  234. for lag in range(left_point, right_point, step):
  235. if lag < 0: # a滞后于b
  236. series_a_shifted = self.df_merge.loc[:, a_series_data_label][-lag:]
  237. series_b_shifted = self.df_merge.loc[:, b_series_data_label][:lag]
  238. elif lag > 0: # b滞后于a
  239. series_a_shifted = self.df_merge.loc[:, a_series_data_label][:-lag]
  240. series_b_shifted = self.df_merge.loc[:, b_series_data_label][lag:]
  241. elif lag == 0: # 0滞后
  242. series_a_shifted = self.df_merge.loc[:, a_series_data_label]
  243. series_b_shifted = self.df_merge.loc[:, b_series_data_label]
  244. else:
  245. series_a_shifted = None
  246. series_b_shifted = None
  247. # 计算皮尔逊系数和显著性
  248. if series_a_shifted is None or series_b_shifted is None:
  249. raise RuntimeError('数据不应为None',series_a_shifted, series_b_shifted)
  250. r, p_value = stats.pearsonr(series_a_shifted, series_b_shifted)
  251. # 过滤不显著的数据
  252. if p_value <= config.P_VALUE_THRESHOLD:
  253. list_r_lag.append(np.float32(r))
  254. if len(list_r_lag) > 0:
  255. return max(list_r_lag)
  256. else:
  257. return 0
  258. def pearsonr_(self, a_series_data_label: str, b_series_data_label: str)->float:
  259. a_series_data = self.df_merge.loc[:, a_series_data_label]
  260. b_series_data = self.df_merge.loc[:, b_series_data_label]
  261. r, p_value = stats.pearsonr(a_series_data, b_series_data)
  262. if p_value <= config.P_VALUE_THRESHOLD: # 结果显著
  263. return np.float32(r)
  264. else:
  265. return np.float32(0)
  266. def skip_tool(self, series_a_name:str, series_b_name:str)->bool:
  267. # 标签转换
  268. series_a_name = self.code_2name_dict.get(series_a_name)
  269. series_b_name = self.code_2name_dict.get(series_b_name)
  270. if '温度' in series_a_name and '温度' in series_b_name: return True
  271. if '次数' in series_a_name and '次数' in series_b_name: return True
  272. if '累计' in series_a_name and '累计' in series_b_name: return True
  273. if '电流' in series_a_name and '电流' in series_b_name: return True
  274. if '电压' in series_a_name and '电压' in series_b_name: return True
  275. if '电流' in series_a_name and '温度' in series_b_name: return True
  276. if '温度' in series_a_name and '电流' in series_b_name: return True
  277. if '累计电量' in series_a_name and '累计电量' in series_b_name: return True
  278. if '运行时间' in series_a_name and '累计电量' in series_b_name: return True
  279. if '累计电量' in series_a_name and '运行时间' in series_b_name: return True
  280. if '运行时间' in series_a_name and '运行时间' in series_b_name: return True
  281. if '时间设定' in series_a_name and '时间设定' in series_b_name: return True
  282. return False
  283. def calculate_pearsonr_mat(self):
  284. """计算pearson系数"""
  285. # 判断是否能够从本地读取,可以的话就不从新计算了
  286. if os.path.exists(config.R_MAT_JSON_PATH):
  287. print(f"皮尔逊系数矩阵从本地读取, {config.R_MAT_JSON_PATH}")
  288. with open(config.R_MAT_JSON_PATH, 'rb') as f:
  289. self.r_mat = pickle.load(f)
  290. return
  291. # 先算再使标签中文化
  292. all_labels_code = [k for k in self.df_merge.columns.tolist() if k != 'time']
  293. all_labels_name = sorted([self.code_2name_dict.get(l) for l in all_labels_code]) # 升序
  294. self.r_mat = pd.DataFrame(index=all_labels_name, columns=all_labels_name, dtype=np.float32)
  295. self.r_mat.fillna(0, inplace=True) # 全部填充为0
  296. for a_label_idx in range(0, len(all_labels_code), 1): # 行标签
  297. for b_label_idx in range(a_label_idx, len(all_labels_code), 1): # 列标签
  298. # 检查是否属于可跳过的字段组合
  299. a_label = all_labels_code[a_label_idx]
  300. b_label = all_labels_code[b_label_idx]
  301. if self.skip_tool(a_label, b_label):
  302. print(f'跳过组合:{a_label},{b_label}')
  303. self.r_mat.loc[self.code_2name_dict.get(a_label), self.code_2name_dict.get(b_label)] = np.float32(0)
  304. # 正式计算
  305. if config.IS_LAG:
  306. result = self.pearsonr_with_lag(a_label, b_label)
  307. else:
  308. result = self.pearsonr_(a_label, b_label)
  309. # 要保证对称性
  310. self.r_mat.loc[self.code_2name_dict.get(a_label), self.code_2name_dict.get(b_label)] = result
  311. self.r_mat.loc[self.code_2name_dict.get(b_label), self.code_2name_dict.get(a_label)] = result
  312. # 筛选一些无关字段
  313. self.r_mat_filter()
  314. # 保存计算结果
  315. self.save_pearsonr_mat()
  316. def save_pearsonr_mat(self):
  317. path = config.R_MAT_JSON_PATH
  318. if os.path.exists(path):
  319. os.remove(path)
  320. with open(path, 'wb') as f:
  321. pickle.dump(self.r_mat, f, protocol=pickle.HIGHEST_PROTOCOL)
  322. print(f'mat_shape:{self.r_mat.shape},文件保存至:',path)
  323. def query_r_rank_n(self, target:str, n:int=-1)->list[str]:
  324. """输入target字段,从皮尔逊系数矩阵中挑选排名前n的字段, n为-1表示取所有"""
  325. if self.r_mat is None:
  326. raise Exception('r_mat 为None,请先计算皮尔逊系数矩阵!')
  327. # 取出对应的列,皮尔逊矩阵为对称矩阵,因此取一列或者一行就可以了
  328. if not df_is_symetry(self.r_mat):
  329. raise RuntimeError('皮尔逊矩阵非对称,请检查计算过程!')
  330. # 准备排序
  331. label_list = self.r_mat.index.tolist()
  332. if target not in label_list:
  333. raise ValueError(f'查询字段不存在',target)
  334. # 检查输入参数是否合法
  335. if n == -1:
  336. n = np.sum(np.abs(self.r_mat.loc[:, target].to_numpy()) > 0 )
  337. n = int(n)
  338. elif n <= 0:
  339. raise RuntimeError('n输入值非法,应大于0',n)
  340. elements = []
  341. for row_label in label_list:
  342. elements.append((row_label, self.r_mat.loc[row_label, target]))
  343. # 按照皮尔逊相关系数的绝对值进行升序排序
  344. quick_sort(elements, 0, len(elements) - 1)
  345. # 反转list,由大到小排序
  346. elements = elements[::-1]
  347. elements = [elements[e][0] for e in range(n)]
  348. return elements
  349. if __name__ == '__main__':
  350. # 数据库参数
  351. db_param = DatabaseParam(
  352. db_host=config.DB_HOST,
  353. db_user=config.DB_USER,
  354. db_password=config.DB_PASSWORD,
  355. db_name=config.DB_NAME,
  356. db_port=config.DB_PORT)
  357. # 先拿到所有的数据
  358. df_mat = PearsonrMat(keys_file_dir=os.path.join(config.STATISTICS_FILE_DIR, config.STATISTICS_FILE_NAME),
  359. min_records=config.MIN_RECORDS, db_param=db_param,
  360. transfer_file_dir=os.path.join(config.ALL_ITEMS_FILE_DIR, config.TRANSFER_JSON_NAME)
  361. )
  362. # 计算皮尔逊系数和显著性p值(带滞后)
  363. df_mat.calculate_pearsonr_mat()
  364. # 测试函数
  365. # df_mat.query_r_rank_n('反渗透总产水电导')