import sys sys.path.append("..") from Database.database_ import Database, DatabaseParam import pandas as pd from scipy import stats from utils_analysis import label_queue, diff_tool, skip_tool import config_analysis import os import json import time # 打印信息确认 print(f""" 查询数据库:ws_data 查询表:{config_analysis.DB_SHEET_NAME} 起始日期:{config_analysis.CHECK_YEAR_START}-{config_analysis.CHECK_MONTH_START}-{config_analysis.CHECK_DAY_START} 终止日期:{config_analysis.CHECK_YEAR_END}-{config_analysis.CHECK_MONTH_END}-{config_analysis.CHECK_DAY_END} 项目ID:{config_analysis.PROJECT_ID} """) time.sleep(6) # 创建数据库参数 db_param = DatabaseParam( db_host='192.168.50.4', db_user='root', db_password='*B-@p2b+97D5xAF1e6', db_name='ws_data', db_port=4000) # 存储总数量 total_name_list = [] total_code_list = [] # 数据库操作应在内部,Database定义了上下文管理器,负责自动释放连接和游标 with Database(db_param) as db: # 排除常数序列 # 选择从文件加载 if os.path.exists(config_analysis.TOTAL_LIST_JSON_FILE): with open(config_analysis.TOTAL_LIST_JSON_FILE, "r", encoding="utf-8") as f: loaded_data = json.load(f) print(f'从文件{config_analysis.TOTAL_LIST_JSON_FILE}中加载待分析列表...') total_name_list = loaded_data['total_name_list'] total_code_list = loaded_data['total_code_list'] # 文件不存在进行及时分析 else: for lab in label_queue(): time_series_name = lab.get('name') time_series_code = lab.get('code') df = db.query_sql_time_series2data_frame(project_id=config_analysis.PROJECT_ID, sheet_name=config_analysis.DB_SHEET_NAME, data_code=time_series_code, start_year=config_analysis.CHECK_YEAR_START, end_year = config_analysis.CHECK_YEAR_END, start_month=config_analysis.CHECK_MONTH_START, end_month=config_analysis.CHECK_MONTH_END, start_day=config_analysis.CHECK_DAY_START, end_day=config_analysis.CHECK_DAY_END) if df is None: continue # 过滤常数序列 if df[df.columns[1]].nunique() <= 2: print(f'过滤常数序列{time_series_name}({time_series_code})!') continue else: total_name_list.append(time_series_name) total_code_list.append(time_series_code) # 保存文件 saved_data = { 'total_name_list': total_name_list, 'total_code_list': total_code_list, } with open(config_analysis.TOTAL_LIST_JSON_FILE, "w", encoding="utf-8") as f: json.dump(saved_data, f, ensure_ascii=False, indent=4) print(f'分析列表保存到{config_analysis.TOTAL_LIST_JSON_FILE}') # 存储所有计算结果 result = [] """ result: [dict, dict, ...] dict格式: { 'A':{'name':,'code':}, 'B':{'name':,'code':}, 'res':[{'k':值,'r':值,'p':值},...]: } """ # 寻找需要分析的数据,应该从文件中读取字段 # 序列A for a_idx in range(0, len(total_code_list), 1): time_series_a_name = total_name_list[a_idx] time_series_a_code = total_code_list[a_idx] # 获取A列 df_a = db.query_sql_time_series2data_frame(project_id=config_analysis.PROJECT_ID, sheet_name=config_analysis.DB_SHEET_NAME, data_code=time_series_a_code, start_year=config_analysis.CHECK_YEAR_START, end_year=config_analysis.CHECK_YEAR_END, start_month=config_analysis.CHECK_MONTH_START, end_month=config_analysis.CHECK_MONTH_END, start_day=config_analysis.CHECK_DAY_START, end_day=config_analysis.CHECK_DAY_END) if df_a is None: continue # 过滤常数序列 if df_a[df_a.columns[1]].nunique() <= 2 : print(f'序列A.{time_series_a_name}({time_series_a_code})遇到常数列, 跳过计算!') continue # 平稳化 df_a = diff_tool(time_series_a_name, df_a, df_a.columns[1]) # 序列B for b_idx in range(a_idx, len(total_code_list), 1): time_series_b_name = total_name_list[b_idx] time_series_b_code = total_code_list[b_idx] if skip_tool(time_series_a_name, time_series_b_name): print(f'跳过组合:{time_series_a_name} vs. {time_series_b_name}') continue # 获取B列 df_b = db.query_sql_time_series2data_frame(project_id=config_analysis.PROJECT_ID, sheet_name=config_analysis.DB_SHEET_NAME, data_code=time_series_b_code, start_year=config_analysis.CHECK_YEAR_START, end_year=config_analysis.CHECK_YEAR_END, start_month=config_analysis.CHECK_MONTH_START, end_month=config_analysis.CHECK_MONTH_END, start_day=config_analysis.CHECK_DAY_START, end_day=config_analysis.CHECK_DAY_END) if df_b is None: continue # if abs(len(df_a) - len(df_b)) > 20: raise ValueError('时序数据数量差异过大:len(A), len(B)', len(df_a), # len(df_b)) # 过滤常数序列,有一些数列为常数,这些数据方差接近0,无法计算协方差 if df_b[df_b.columns[1]].nunique() <= 2: print(f'序列B.{time_series_b_name}({time_series_b_code})遇到常数列, 跳过计算!') continue # 平稳化,根据name筛选出需要平稳化的数据,进行一阶差分 df_b = diff_tool(time_series_b_name, df_b, df_b.columns[1]) # 融合AB序列 df_merge = pd.merge(df_a, df_b, how='inner', on='time').sort_values('time', kind='mergesort') _, time_series_a_column, time_series_b_column = df_merge.columns # 互相关分析 series_a = df_merge[time_series_a_column] series_b = df_merge[time_series_b_column] lags = config_analysis.MAX_LAG # 最大滞后 step = 1 print(f'正在进行互相关性分析:A.{time_series_a_name}({time_series_a_code}) | B.{time_series_b_name}({time_series_b_code}) ') tem_dict = {'A': {'name': time_series_a_name, 'code': time_series_a_code}, 'B': {'name': time_series_b_name, 'code': time_series_b_code}, 'res':[]} for lag in range(-lags, lags, step): if lag < 0: # a滞后于b series_a_shifted = series_a[-lag:] series_b_shifted = series_b[:lag] elif lag > 0: # b滞后于a series_a_shifted = series_a[:-lag] series_b_shifted = series_b[lag:] elif lag == 0: # 0滞后 series_a_shifted = series_a series_b_shifted = series_b # 计算皮尔逊系数和显著性 if len(series_a_shifted) < 24 or len(series_b_shifted) < 24: print('skip') continue r, p_value = stats.pearsonr(series_a_shifted, series_b_shifted) # 过滤不显著的数据 if p_value > config_analysis.P_VALUE_THRESHOLD: continue if abs(r) < config_analysis.R_THRESHOLD: continue tem_dict.get('res').append({'k':lag, 'r':r, 'p':p_value}) # if lag < 0: # print(f'A滞后B {abs(lag)}个单位时间, k={lag}, r={r:.4f}, 显著性p={p_value:.4f}') # elif lag > 0: # print(f'B滞后A {abs(lag)}个单位时间, k={lag}, r={r:.4f}, 显著性p={p_value:.4f}') # else: # print(f'A与B无滞后, k={lag}, r={r:.4f}, 显著性p={p_value:.6f}') if 0 < len(tem_dict.get('res')): result.append(tem_dict) print(f'计算完成,结果总数量为:{len(result)}') # 将结果保存到文件 if os.path.exists(config_analysis.OUTPUT_JSON_FILE): print(f'删除旧文件{config_analysis.OUTPUT_JSON_FILE}') os.remove(config_analysis.OUTPUT_JSON_FILE) data = {'data': result, 'len': len(result), 'r_threshold': config_analysis.R_THRESHOLD, 'p_threshold': config_analysis.P_VALUE_THRESHOLD} with open(config_analysis.OUTPUT_JSON_FILE, 'w', encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=4) print(f'数据保存完成,{config_analysis.OUTPUT_JSON_FILE}')