import csv import sys sys.path.append("..") import os import pandas as pd import config from Database.database_ import Database, DatabaseParam import json from scipy import stats import numpy as np import pickle from utils.tools import cal_vari_without_zero_nan, cal_vari_without_nan, df_is_symetry, quick_sort, load_transfer_file_name_code class DFMat: """输入字段,实现从数据库中获取全部数据,核心的属性是pandas.Dataframe,融合了所有字段的数据,从数据库拿完数据后需要进行数据的清洗和预处理""" def __init__(self, keys_file_dir: str, min_records:int, db_param: DatabaseParam, transfer_file_dir:str, is_from_local:bool=True): self.bad_keys = config.EXCLUDE_WORDS self.keys_file_dir = keys_file_dir self.min_records = min_records self.keys = self.load_keys() # 升序排序 self.db_param = db_param self.transfer_file_dir = transfer_file_dir self.name_2code_dict, self.code_2name_dict = self.load_transfer_file() # 转换字典 self.diff_words = config.DIFF_WORDS # 需要差分计算的字段,如果字段中包括这些字段就进行差分平稳化 self.is_from_local = is_from_local # 本地保存数据库数据,避免重复查询 self.local_df_merge_path = config.DF_MERGE_FILE_PATH self.df_merge = self.__construct() # 构建数据部分,初始化时完成 def load_keys(self): keys_list = [] with open(self.keys_file_dir, "r", encoding="utf-8") as f: csv_reader = csv.reader(f) try: label = next(csv_reader) except StopIteration: print('文件不存在:', self.keys_file_dir) for row in csv_reader: records_num = int(row[6]) records_name = row[0] if records_num < self.min_records: continue keys_list.append(records_name) # 升序排序 keys_list = sorted(keys_list) # 剔除列表不需要的字段 keys_list = self.exclude_keys(keys_list) return keys_list # 升序排列 def exclude_keys(self, keys_list:list): """根据剔除列表对键入的字段进行剔除""" new_keys = [] for name in keys_list: flag = False for bad_key in self.bad_keys: if bad_key in name: flag = True break if flag: continue new_keys.append(name) return new_keys def load_transfer_file(self): """加载转换文件""" path = self.transfer_file_dir return load_transfer_file_name_code(path) # if not os.path.exists(self.transfer_file_dir): # raise FileNotFoundError('文件未发现:', self.transfer_file_dir) # with open(self.transfer_file_dir, "r", encoding="utf-8") as f: # json_data = json.load(f) # return json_data.get('name_2_code'), json_data.get('code_2_name') def save_df_merge(self, data:pd.DataFrame): """保存文件到本地""" with open(self.local_df_merge_path, 'wb') as f: pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL) print(f'mat_shape:{data.shape},文件保存至:', self.local_df_merge_path) def load_from_local(self) -> pd.DataFrame: """从本地加载数据""" with open(self.local_df_merge_path, 'rb') as f: local_data = pickle.load(f) return local_data def normalize(self, data:pd.DataFrame)-> pd.DataFrame: """对数据进行归一化,消除量纲影响""" # 皮尔逊系数的计算不需要 pass @staticmethod def diff_tool(data: pd.Series): """用于计算累计量的差分,单调递增""" data = data.copy() # 0值替换为Nan data.replace([np.inf, -np.inf, 0], np.nan, inplace=True) data = data.diff() # 检查负值,替换为nan data[data < 0] = np.nan data[0] = data.mean() # 将nan向前填充 data.ffill(inplace=True) return data def stabilize(self, data:pd.DataFrame)-> pd.DataFrame: """数据差分使数据平稳化""" if len(self.diff_words) == 0: return data # 获取所有列标签, 仅保留需要做差分的字段 col_label_list = data.columns.tolist() # 剔除时间戳字段 if 'time' in col_label_list: col_label_list.remove('time') # 查找需要平稳化的字段 diff_label_list = set() for col in col_label_list: name = self.code_2name_dict[col] for dword in self.diff_words: if dword in name: diff_label_list.add(col) diff_label_list = list(diff_label_list) for col in diff_label_list: data.loc[:, col] = self.diff_tool(data.loc[:, col]) return data @staticmethod def remove_outliers(data:pd.Series, fill_value=0, times:int=1)-> pd.Series: """剔除序列的离群点,使用fill_value进行填充""" data = data.copy(deep=True) for time in range(abs(times)): # 执行times次 # 计算均值和方差 mean, std_dev = cal_vari_without_nan(data) fill_value = mean threshold = 3 * std_dev limit_top = mean + threshold limit_low = mean - threshold # 处理离群点 mask = data.notna() & (data != 0) & ((data < limit_low) | (data > limit_top)) # 离群点填充 data.loc[mask] = fill_value # for idx, v in enumerate(data): # if pd.isna(v) or abs(v - 0.) < 1e-6: continue # 0和nan不处理 # if v > limit_top or v < limit_low: # data[idx] = fill_value # 离群点填充 return data def clean(self, data:pd.DataFrame)-> pd.DataFrame: """对数据进行清洗,把离群值和Nan替换为平均值,0不参与该过程""" # 获取所有列标签 col_label_list = data.columns.tolist() if 'time' in col_label_list: col_label_list.remove('time') # 不处理time列 # 逐列处理离群点 for col_label in col_label_list: # 拿到列数据 col_series = data.loc[:, col_label] data.loc[:, col_label] = self.remove_outliers(col_series, times=1) # 统一处理nan值,使用平均值填充nan cols_mean = data[col_label_list].mean() # 自动跳过平均值 cols_mean = cols_mean.fillna(0) data[col_label_list] = data[col_label_list].fillna(cols_mean) return data def fetch(self)->pd.DataFrame: """从数据库中拿到数据,拿到原始数据,尽量不要在这里面清洗数据""" # 数据库操作应在内部 data_names = self.keys data_codes = [self.name_2code_dict.get(name) for name in data_names] # 从数据库取数据 with Database(self.db_param) as db: # 连接数据库 # 检查表是否存在 if not db.sheet_exists(config.DB_SHEET_NAME): raise RuntimeError(f'表{config.DB_SHEET_NAME}不存在于数据库{config.DB_NAME}中!') # SQL查询数据 group_df = db.query_sql_time_series_group2data_frame( code_name_dict=self.code_2name_dict, project_id=config.PROJECT_ID, sheet_name=config.DB_SHEET_NAME, data_codes=data_codes, start_year=config.CHECK_YEAR_START, end_year=config.CHECK_YEAR_END, start_month=config.CHECK_MONTH_START, end_month=config.CHECK_MONTH_END, start_day=config.CHECK_DAY_START, end_day=config.CHECK_DAY_END, start_hour=config.CHECK_HOUR_START, end_hour=config.CHECK_HOUR_END, start_minute=config.CHECK_MINUTE_START, end_minute=config.CHECK_MINUTE_END, start_second=config.CHECK_SECONDS_START, end_second=config.CHECK_SECONDS_END) return group_df def __construct(self): """构建所有满足条件的字段dataframe""" # 尝试从本地加载数据 if self.is_from_local: if os.path.exists(self.local_df_merge_path): print(f'从本地{self.local_df_merge_path}加载数据库数据') return self.load_from_local() else: print(f'从本地{self.local_df_merge_path}加载失败,文件不存在!') # 先从数据库获取数据 print("尝试从数据库获取数据!") group_df = self.fetch() # 清洗数据,消除Nan和离群值 group_df = self.clean(group_df) # 把自己的引用给自己 # 平稳化 group_df = self.stabilize(group_df) # 此时数据不存在nan # 如果数据不存在就保存 if not os.path.exists(self.local_df_merge_path): self.save_df_merge(group_df) return group_df def get_df_merge(self): return self.df_merge class PearsonrMat(DFMat): """实现皮尔逊相关系数矩阵,核心属性为pandas.Dataframe,要求键入key,核心的df行和列也是按照给定的keys写入""" def __init__(self, keys_file_dir: str, min_records:int, db_param: DatabaseParam, transfer_file_dir:str, is_from_local:bool=True): super().__init__(keys_file_dir=keys_file_dir, min_records=min_records, db_param=db_param, transfer_file_dir=transfer_file_dir, is_from_local=is_from_local) self.r_mat = None self.lag_mat = None def r_mat_filter(self): """将mat中只和自己相关的字段过滤掉""" # 先找到需要删除的字段 filter_label_list = [] label_list = self.r_mat.columns.tolist() for label in label_list: r_col = self.r_mat.loc[:, label] non_zero_counter = 0 for value in r_col: if value > config.PEARSONR_VALUE_THRESHOLD: non_zero_counter += 1 if non_zero_counter < 2: filter_label_list.append(label) self.r_mat.drop(filter_label_list, axis=0, inplace=True) self.r_mat.drop(filter_label_list, axis=1, inplace=True) def pearsonr_with_lag(self, a_series_data_label: str, b_series_data_label: str): """带滞后的皮尔逊计算""" lags = config.MAX_LAG if lags == 0: left_point = 0 right_point = 1 elif lags > 0: left_point = -lags right_point = lags else: raise ValueError('最大滞后不能为负数', lags) step = config.STEP # 不同滞后下的相关系数 list_r_lag = [] for lag in range(left_point, right_point, step): if lag < 0: # a滞后于b series_a_shifted = self.df_merge.loc[:, a_series_data_label][-lag:] series_b_shifted = self.df_merge.loc[:, b_series_data_label][:lag] elif lag > 0: # b滞后于a series_a_shifted = self.df_merge.loc[:, a_series_data_label][:-lag] series_b_shifted = self.df_merge.loc[:, b_series_data_label][lag:] elif lag == 0: # 0滞后 series_a_shifted = self.df_merge.loc[:, a_series_data_label] series_b_shifted = self.df_merge.loc[:, b_series_data_label] else: series_a_shifted = None series_b_shifted = None # 计算皮尔逊系数和显著性 if series_a_shifted is None or series_b_shifted is None: raise RuntimeError('数据不应为None',series_a_shifted, series_b_shifted) r, p_value = stats.pearsonr(series_a_shifted, series_b_shifted) # 过滤不显著的数据 if p_value <= config.P_VALUE_THRESHOLD: list_r_lag.append(np.float32(r)) if len(list_r_lag) > 0: return max(list_r_lag) else: return 0 def pearsonr_(self, a_series_data_label: str, b_series_data_label: str)->float: a_series_data = self.df_merge.loc[:, a_series_data_label] b_series_data = self.df_merge.loc[:, b_series_data_label] r, p_value = stats.pearsonr(a_series_data, b_series_data) if p_value <= config.P_VALUE_THRESHOLD: # 结果显著 return np.float32(r) else: return np.float32(0) def skip_tool(self, series_a_name:str, series_b_name:str)->bool: # 标签转换 series_a_name = self.code_2name_dict.get(series_a_name) series_b_name = self.code_2name_dict.get(series_b_name) if '温度' in series_a_name and '温度' in series_b_name: return True if '次数' in series_a_name and '次数' in series_b_name: return True if '累计' in series_a_name and '累计' in series_b_name: return True if '电流' in series_a_name and '电流' in series_b_name: return True if '电压' in series_a_name and '电压' in series_b_name: return True if '电流' in series_a_name and '温度' in series_b_name: return True if '温度' in series_a_name and '电流' in series_b_name: return True if '累计电量' in series_a_name and '累计电量' in series_b_name: return True if '运行时间' in series_a_name and '累计电量' in series_b_name: return True if '累计电量' in series_a_name and '运行时间' in series_b_name: return True if '运行时间' in series_a_name and '运行时间' in series_b_name: return True if '时间设定' in series_a_name and '时间设定' in series_b_name: return True return False def calculate_pearsonr_mat(self): """计算pearson系数""" # 判断是否能够从本地读取,可以的话就不从新计算了 if os.path.exists(config.R_MAT_JSON_PATH): print(f"皮尔逊系数矩阵从本地读取, {config.R_MAT_JSON_PATH}") with open(config.R_MAT_JSON_PATH, 'rb') as f: self.r_mat = pickle.load(f) return # 先算再使标签中文化 all_labels_code = [k for k in self.df_merge.columns.tolist() if k != 'time'] all_labels_name = sorted([self.code_2name_dict.get(l) for l in all_labels_code]) # 升序 self.r_mat = pd.DataFrame(index=all_labels_name, columns=all_labels_name, dtype=np.float32) self.r_mat.fillna(0, inplace=True) # 全部填充为0 for a_label_idx in range(0, len(all_labels_code), 1): # 行标签 for b_label_idx in range(a_label_idx, len(all_labels_code), 1): # 列标签 # 检查是否属于可跳过的字段组合 a_label = all_labels_code[a_label_idx] b_label = all_labels_code[b_label_idx] if self.skip_tool(a_label, b_label): print(f'跳过组合:{a_label},{b_label}') self.r_mat.loc[self.code_2name_dict.get(a_label), self.code_2name_dict.get(b_label)] = np.float32(0) # 正式计算 if config.IS_LAG: result = self.pearsonr_with_lag(a_label, b_label) else: result = self.pearsonr_(a_label, b_label) # 要保证对称性 self.r_mat.loc[self.code_2name_dict.get(a_label), self.code_2name_dict.get(b_label)] = result self.r_mat.loc[self.code_2name_dict.get(b_label), self.code_2name_dict.get(a_label)] = result # 筛选一些无关字段 self.r_mat_filter() # 保存计算结果 self.save_pearsonr_mat() def save_pearsonr_mat(self): path = config.R_MAT_JSON_PATH if os.path.exists(path): os.remove(path) with open(path, 'wb') as f: pickle.dump(self.r_mat, f, protocol=pickle.HIGHEST_PROTOCOL) print(f'mat_shape:{self.r_mat.shape},文件保存至:',path) def query_r_rank_n(self, target:str, n:int=-1)->list[str]: """输入target字段,从皮尔逊系数矩阵中挑选排名前n的字段, n为-1表示取所有""" if self.r_mat is None: raise Exception('r_mat 为None,请先计算皮尔逊系数矩阵!') # 取出对应的列,皮尔逊矩阵为对称矩阵,因此取一列或者一行就可以了 if not df_is_symetry(self.r_mat): raise RuntimeError('皮尔逊矩阵非对称,请检查计算过程!') # 准备排序 label_list = self.r_mat.index.tolist() if target not in label_list: raise ValueError(f'查询字段不存在',target) # 检查输入参数是否合法 if n == -1: n = np.sum(np.abs(self.r_mat.loc[:, target].to_numpy()) > 0 ) n = int(n) elif n <= 0: raise RuntimeError('n输入值非法,应大于0',n) elements = [] for row_label in label_list: elements.append((row_label, self.r_mat.loc[row_label, target])) # 按照皮尔逊相关系数的绝对值进行升序排序 quick_sort(elements, 0, len(elements) - 1) # 反转list,由大到小排序 elements = elements[::-1] elements = [elements[e][0] for e in range(n)] return elements if __name__ == '__main__': # 数据库参数 db_param = DatabaseParam( db_host=config.DB_HOST, db_user=config.DB_USER, db_password=config.DB_PASSWORD, db_name=config.DB_NAME, db_port=config.DB_PORT) # 先拿到所有的数据 df_mat = PearsonrMat(keys_file_dir=os.path.join(config.STATISTICS_FILE_DIR, config.STATISTICS_FILE_NAME), min_records=config.MIN_RECORDS, db_param=db_param, transfer_file_dir=os.path.join(config.ALL_ITEMS_FILE_DIR, config.TRANSFER_JSON_NAME) ) # 计算皮尔逊系数和显著性p值(带滞后) df_mat.calculate_pearsonr_mat() # 测试函数 # df_mat.query_r_rank_n('反渗透总产水电导')