| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- import csv
- import sys
- sys.path.append("..")
- import os
- import pandas as pd
- import config
- from Database.database_ import Database, DatabaseParam
- import json
- from scipy import stats
- import numpy as np
- import pickle
- from utils.tools import cal_vari_without_zero_nan, cal_vari_without_nan, df_is_symetry, quick_sort, load_transfer_file_name_code
- class DFMat:
- """输入字段,实现从数据库中获取全部数据,核心的属性是pandas.Dataframe,融合了所有字段的数据,从数据库拿完数据后需要进行数据的清洗和预处理"""
- def __init__(self, keys_file_dir: str, min_records:int, db_param: DatabaseParam, transfer_file_dir:str, is_from_local:bool=True):
- self.bad_keys = config.EXCLUDE_WORDS
- self.keys_file_dir = keys_file_dir
- self.min_records = min_records
- self.keys = self.load_keys() # 升序排序
- self.db_param = db_param
- self.transfer_file_dir = transfer_file_dir
- self.name_2code_dict, self.code_2name_dict = self.load_transfer_file() # 转换字典
- self.diff_words = config.DIFF_WORDS # 需要差分计算的字段,如果字段中包括这些字段就进行差分平稳化
- self.is_from_local = is_from_local
- # 本地保存数据库数据,避免重复查询
- self.local_df_merge_path = config.DF_MERGE_FILE_PATH
- self.df_merge = self.__construct() # 构建数据部分,初始化时完成
- def load_keys(self):
- keys_list = []
- with open(self.keys_file_dir, "r", encoding="utf-8") as f:
- csv_reader = csv.reader(f)
- try:
- label = next(csv_reader)
- except StopIteration:
- print('文件不存在:', self.keys_file_dir)
- for row in csv_reader:
- records_num = int(row[6])
- records_name = row[0]
- if records_num < self.min_records: continue
- keys_list.append(records_name)
- # 升序排序
- keys_list = sorted(keys_list)
- # 剔除列表不需要的字段
- keys_list = self.exclude_keys(keys_list)
- return keys_list # 升序排列
- def exclude_keys(self, keys_list:list):
- """根据剔除列表对键入的字段进行剔除"""
- new_keys = []
- for name in keys_list:
- flag = False
- for bad_key in self.bad_keys:
- if bad_key in name:
- flag = True
- break
- if flag: continue
- new_keys.append(name)
- return new_keys
- def load_transfer_file(self):
- """加载转换文件"""
- path = self.transfer_file_dir
- return load_transfer_file_name_code(path)
- # if not os.path.exists(self.transfer_file_dir):
- # raise FileNotFoundError('文件未发现:', self.transfer_file_dir)
- # with open(self.transfer_file_dir, "r", encoding="utf-8") as f:
- # json_data = json.load(f)
- # return json_data.get('name_2_code'), json_data.get('code_2_name')
- def save_df_merge(self, data:pd.DataFrame):
- """保存文件到本地"""
- with open(self.local_df_merge_path, 'wb') as f:
- pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)
- print(f'mat_shape:{data.shape},文件保存至:', self.local_df_merge_path)
- def load_from_local(self) -> pd.DataFrame:
- """从本地加载数据"""
- with open(self.local_df_merge_path, 'rb') as f:
- local_data = pickle.load(f)
- return local_data
- def normalize(self, data:pd.DataFrame)-> pd.DataFrame:
- """对数据进行归一化,消除量纲影响"""
- # 皮尔逊系数的计算不需要
- pass
- @staticmethod
- def diff_tool(data: pd.Series):
- """用于计算累计量的差分,单调递增"""
- data = data.copy()
- # 0值替换为Nan
- data.replace([np.inf, -np.inf, 0], np.nan, inplace=True)
- data = data.diff()
- # 检查负值,替换为nan
- data[data < 0] = np.nan
- data[0] = data.mean()
- # 将nan向前填充
- data.ffill(inplace=True)
- return data
- def stabilize(self, data:pd.DataFrame)-> pd.DataFrame:
- """数据差分使数据平稳化"""
- if len(self.diff_words) == 0: return data
- # 获取所有列标签, 仅保留需要做差分的字段
- col_label_list = data.columns.tolist()
- # 剔除时间戳字段
- if 'time' in col_label_list:
- col_label_list.remove('time')
- # 查找需要平稳化的字段
- diff_label_list = set()
- for col in col_label_list:
- name = self.code_2name_dict[col]
- for dword in self.diff_words:
- if dword in name:
- diff_label_list.add(col)
- diff_label_list = list(diff_label_list)
- for col in diff_label_list:
- data.loc[:, col] = self.diff_tool(data.loc[:, col])
- return data
- @staticmethod
- def remove_outliers(data:pd.Series, fill_value=0, times:int=1)-> pd.Series:
- """剔除序列的离群点,使用fill_value进行填充"""
- data = data.copy(deep=True)
- for time in range(abs(times)): # 执行times次
- # 计算均值和方差
- mean, std_dev = cal_vari_without_nan(data)
- fill_value = mean
- threshold = 3 * std_dev
- limit_top = mean + threshold
- limit_low = mean - threshold
- # 处理离群点
- mask = data.notna() & (data != 0) & ((data < limit_low) | (data > limit_top))
- # 离群点填充
- data.loc[mask] = fill_value
- # for idx, v in enumerate(data):
- # if pd.isna(v) or abs(v - 0.) < 1e-6: continue # 0和nan不处理
- # if v > limit_top or v < limit_low:
- # data[idx] = fill_value # 离群点填充
- return data
- def clean(self, data:pd.DataFrame)-> pd.DataFrame:
- """对数据进行清洗,把离群值和Nan替换为平均值,0不参与该过程"""
- # 获取所有列标签
- col_label_list = data.columns.tolist()
- if 'time' in col_label_list: col_label_list.remove('time') # 不处理time列
- # 逐列处理离群点
- for col_label in col_label_list:
- # 拿到列数据
- col_series = data.loc[:, col_label]
- data.loc[:, col_label] = self.remove_outliers(col_series, times=1)
- # 统一处理nan值,使用平均值填充nan
- cols_mean = data[col_label_list].mean() # 自动跳过平均值
- cols_mean = cols_mean.fillna(0)
- data[col_label_list] = data[col_label_list].fillna(cols_mean)
- return data
- def fetch(self)->pd.DataFrame:
- """从数据库中拿到数据,拿到原始数据,尽量不要在这里面清洗数据"""
- # 数据库操作应在内部
- data_names = self.keys
- data_codes = [self.name_2code_dict.get(name) for name in data_names]
- # 从数据库取数据
- with Database(self.db_param) as db: # 连接数据库
- # 检查表是否存在
- if not db.sheet_exists(config.DB_SHEET_NAME):
- raise RuntimeError(f'表{config.DB_SHEET_NAME}不存在于数据库{config.DB_NAME}中!')
- # SQL查询数据
- group_df = db.query_sql_time_series_group2data_frame(
- code_name_dict=self.code_2name_dict,
- project_id=config.PROJECT_ID,
- sheet_name=config.DB_SHEET_NAME,
- data_codes=data_codes,
- start_year=config.CHECK_YEAR_START,
- end_year=config.CHECK_YEAR_END,
- start_month=config.CHECK_MONTH_START,
- end_month=config.CHECK_MONTH_END,
- start_day=config.CHECK_DAY_START,
- end_day=config.CHECK_DAY_END,
- start_hour=config.CHECK_HOUR_START,
- end_hour=config.CHECK_HOUR_END,
- start_minute=config.CHECK_MINUTE_START,
- end_minute=config.CHECK_MINUTE_END,
- start_second=config.CHECK_SECONDS_START,
- end_second=config.CHECK_SECONDS_END)
- return group_df
- def __construct(self):
- """构建所有满足条件的字段dataframe"""
- # 尝试从本地加载数据
- if self.is_from_local:
- if os.path.exists(self.local_df_merge_path):
- print(f'从本地{self.local_df_merge_path}加载数据库数据')
- return self.load_from_local()
- else:
- print(f'从本地{self.local_df_merge_path}加载失败,文件不存在!')
- # 先从数据库获取数据
- print("尝试从数据库获取数据!")
- group_df = self.fetch()
- # 清洗数据,消除Nan和离群值
- group_df = self.clean(group_df) # 把自己的引用给自己
- # 平稳化
- group_df = self.stabilize(group_df) # 此时数据不存在nan
- # 如果数据不存在就保存
- if not os.path.exists(self.local_df_merge_path):
- self.save_df_merge(group_df)
- return group_df
- def get_df_merge(self):
- return self.df_merge
- class PearsonrMat(DFMat):
- """实现皮尔逊相关系数矩阵,核心属性为pandas.Dataframe,要求键入key,核心的df行和列也是按照给定的keys写入"""
- def __init__(self, keys_file_dir: str, min_records:int, db_param: DatabaseParam, transfer_file_dir:str, is_from_local:bool=True):
- super().__init__(keys_file_dir=keys_file_dir, min_records=min_records, db_param=db_param, transfer_file_dir=transfer_file_dir, is_from_local=is_from_local)
- self.r_mat = None
- self.lag_mat = None
- def r_mat_filter(self):
- """将mat中只和自己相关的字段过滤掉"""
- # 先找到需要删除的字段
- filter_label_list = []
- label_list = self.r_mat.columns.tolist()
- for label in label_list:
- r_col = self.r_mat.loc[:, label]
- non_zero_counter = 0
- for value in r_col:
- if value > config.PEARSONR_VALUE_THRESHOLD:
- non_zero_counter += 1
- if non_zero_counter < 2:
- filter_label_list.append(label)
- self.r_mat.drop(filter_label_list, axis=0, inplace=True)
- self.r_mat.drop(filter_label_list, axis=1, inplace=True)
- def pearsonr_with_lag(self, a_series_data_label: str, b_series_data_label: str):
- """带滞后的皮尔逊计算"""
- lags = config.MAX_LAG
- if lags == 0:
- left_point = 0
- right_point = 1
- elif lags > 0:
- left_point = -lags
- right_point = lags
- else:
- raise ValueError('最大滞后不能为负数', lags)
- step = config.STEP
- # 不同滞后下的相关系数
- list_r_lag = []
- for lag in range(left_point, right_point, step):
- if lag < 0: # a滞后于b
- series_a_shifted = self.df_merge.loc[:, a_series_data_label][-lag:]
- series_b_shifted = self.df_merge.loc[:, b_series_data_label][:lag]
- elif lag > 0: # b滞后于a
- series_a_shifted = self.df_merge.loc[:, a_series_data_label][:-lag]
- series_b_shifted = self.df_merge.loc[:, b_series_data_label][lag:]
- elif lag == 0: # 0滞后
- series_a_shifted = self.df_merge.loc[:, a_series_data_label]
- series_b_shifted = self.df_merge.loc[:, b_series_data_label]
- else:
- series_a_shifted = None
- series_b_shifted = None
- # 计算皮尔逊系数和显著性
- if series_a_shifted is None or series_b_shifted is None:
- raise RuntimeError('数据不应为None',series_a_shifted, series_b_shifted)
- r, p_value = stats.pearsonr(series_a_shifted, series_b_shifted)
- # 过滤不显著的数据
- if p_value <= config.P_VALUE_THRESHOLD:
- list_r_lag.append(np.float32(r))
- if len(list_r_lag) > 0:
- return max(list_r_lag)
- else:
- return 0
- def pearsonr_(self, a_series_data_label: str, b_series_data_label: str)->float:
- a_series_data = self.df_merge.loc[:, a_series_data_label]
- b_series_data = self.df_merge.loc[:, b_series_data_label]
- r, p_value = stats.pearsonr(a_series_data, b_series_data)
- if p_value <= config.P_VALUE_THRESHOLD: # 结果显著
- return np.float32(r)
- else:
- return np.float32(0)
- def skip_tool(self, series_a_name:str, series_b_name:str)->bool:
- # 标签转换
- series_a_name = self.code_2name_dict.get(series_a_name)
- series_b_name = self.code_2name_dict.get(series_b_name)
- if '温度' in series_a_name and '温度' in series_b_name: return True
- if '次数' in series_a_name and '次数' in series_b_name: return True
- if '累计' in series_a_name and '累计' in series_b_name: return True
- if '电流' in series_a_name and '电流' in series_b_name: return True
- if '电压' in series_a_name and '电压' in series_b_name: return True
- if '电流' in series_a_name and '温度' in series_b_name: return True
- if '温度' in series_a_name and '电流' in series_b_name: return True
- if '累计电量' in series_a_name and '累计电量' in series_b_name: return True
- if '运行时间' in series_a_name and '累计电量' in series_b_name: return True
- if '累计电量' in series_a_name and '运行时间' in series_b_name: return True
- if '运行时间' in series_a_name and '运行时间' in series_b_name: return True
- if '时间设定' in series_a_name and '时间设定' in series_b_name: return True
- return False
- def calculate_pearsonr_mat(self):
- """计算pearson系数"""
- # 判断是否能够从本地读取,可以的话就不从新计算了
- if os.path.exists(config.R_MAT_JSON_PATH):
- print(f"皮尔逊系数矩阵从本地读取, {config.R_MAT_JSON_PATH}")
- with open(config.R_MAT_JSON_PATH, 'rb') as f:
- self.r_mat = pickle.load(f)
- return
- # 先算再使标签中文化
- all_labels_code = [k for k in self.df_merge.columns.tolist() if k != 'time']
- all_labels_name = sorted([self.code_2name_dict.get(l) for l in all_labels_code]) # 升序
- self.r_mat = pd.DataFrame(index=all_labels_name, columns=all_labels_name, dtype=np.float32)
- self.r_mat.fillna(0, inplace=True) # 全部填充为0
- for a_label_idx in range(0, len(all_labels_code), 1): # 行标签
- for b_label_idx in range(a_label_idx, len(all_labels_code), 1): # 列标签
- # 检查是否属于可跳过的字段组合
- a_label = all_labels_code[a_label_idx]
- b_label = all_labels_code[b_label_idx]
- if self.skip_tool(a_label, b_label):
- print(f'跳过组合:{a_label},{b_label}')
- self.r_mat.loc[self.code_2name_dict.get(a_label), self.code_2name_dict.get(b_label)] = np.float32(0)
- # 正式计算
- if config.IS_LAG:
- result = self.pearsonr_with_lag(a_label, b_label)
- else:
- result = self.pearsonr_(a_label, b_label)
- # 要保证对称性
- self.r_mat.loc[self.code_2name_dict.get(a_label), self.code_2name_dict.get(b_label)] = result
- self.r_mat.loc[self.code_2name_dict.get(b_label), self.code_2name_dict.get(a_label)] = result
- # 筛选一些无关字段
- self.r_mat_filter()
- # 保存计算结果
- self.save_pearsonr_mat()
- def save_pearsonr_mat(self):
- path = config.R_MAT_JSON_PATH
- if os.path.exists(path):
- os.remove(path)
- with open(path, 'wb') as f:
- pickle.dump(self.r_mat, f, protocol=pickle.HIGHEST_PROTOCOL)
- print(f'mat_shape:{self.r_mat.shape},文件保存至:',path)
- def query_r_rank_n(self, target:str, n:int=-1)->list[str]:
- """输入target字段,从皮尔逊系数矩阵中挑选排名前n的字段, n为-1表示取所有"""
- if self.r_mat is None:
- raise Exception('r_mat 为None,请先计算皮尔逊系数矩阵!')
- # 取出对应的列,皮尔逊矩阵为对称矩阵,因此取一列或者一行就可以了
- if not df_is_symetry(self.r_mat):
- raise RuntimeError('皮尔逊矩阵非对称,请检查计算过程!')
- # 准备排序
- label_list = self.r_mat.index.tolist()
- if target not in label_list:
- raise ValueError(f'查询字段不存在',target)
- # 检查输入参数是否合法
- if n == -1:
- n = np.sum(np.abs(self.r_mat.loc[:, target].to_numpy()) > 0 )
- n = int(n)
- elif n <= 0:
- raise RuntimeError('n输入值非法,应大于0',n)
- elements = []
- for row_label in label_list:
- elements.append((row_label, self.r_mat.loc[row_label, target]))
- # 按照皮尔逊相关系数的绝对值进行升序排序
- quick_sort(elements, 0, len(elements) - 1)
- # 反转list,由大到小排序
- elements = elements[::-1]
- elements = [elements[e][0] for e in range(n)]
- return elements
- if __name__ == '__main__':
- # 数据库参数
- db_param = DatabaseParam(
- db_host=config.DB_HOST,
- db_user=config.DB_USER,
- db_password=config.DB_PASSWORD,
- db_name=config.DB_NAME,
- db_port=config.DB_PORT)
- # 先拿到所有的数据
- df_mat = PearsonrMat(keys_file_dir=os.path.join(config.STATISTICS_FILE_DIR, config.STATISTICS_FILE_NAME),
- min_records=config.MIN_RECORDS, db_param=db_param,
- transfer_file_dir=os.path.join(config.ALL_ITEMS_FILE_DIR, config.TRANSFER_JSON_NAME)
- )
- # 计算皮尔逊系数和显著性p值(带滞后)
- df_mat.calculate_pearsonr_mat()
- # 测试函数
- # df_mat.query_r_rank_n('反渗透总产水电导')
|