| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- import sys
- sys.path.append("..")
- from Database.database_ import Database, DatabaseParam
- import pandas as pd
- from scipy import stats
- from utils_analysis import label_queue, diff_tool, skip_tool
- import config_analysis
- import os
- import json
- import time
- # 打印信息确认
- print(f"""
- 查询数据库:ws_data
- 查询表:{config_analysis.DB_SHEET_NAME}
- 起始日期:{config_analysis.CHECK_YEAR_START}-{config_analysis.CHECK_MONTH_START}-{config_analysis.CHECK_DAY_START}
- 终止日期:{config_analysis.CHECK_YEAR_END}-{config_analysis.CHECK_MONTH_END}-{config_analysis.CHECK_DAY_END}
- 项目ID:{config_analysis.PROJECT_ID}
- """)
- time.sleep(6)
- # 创建数据库参数
- db_param = DatabaseParam(
- db_host='192.168.50.4',
- db_user='root',
- db_password='*B-@p2b+97D5xAF1e6',
- db_name='ws_data',
- db_port=4000)
- # 存储总数量
- total_name_list = []
- total_code_list = []
- # 数据库操作应在内部,Database定义了上下文管理器,负责自动释放连接和游标
- with Database(db_param) as db:
- # 排除常数序列
- # 选择从文件加载
- if os.path.exists(config_analysis.TOTAL_LIST_JSON_FILE):
- with open(config_analysis.TOTAL_LIST_JSON_FILE, "r", encoding="utf-8") as f:
- loaded_data = json.load(f)
- print(f'从文件{config_analysis.TOTAL_LIST_JSON_FILE}中加载待分析列表...')
- total_name_list = loaded_data['total_name_list']
- total_code_list = loaded_data['total_code_list']
- # 文件不存在进行及时分析
- else:
- for lab in label_queue():
- time_series_name = lab.get('name')
- time_series_code = lab.get('code')
- df = db.query_sql_time_series2data_frame(project_id=config_analysis.PROJECT_ID,
- sheet_name=config_analysis.DB_SHEET_NAME,
- data_code=time_series_code,
- start_year=config_analysis.CHECK_YEAR_START, end_year = config_analysis.CHECK_YEAR_END,
- start_month=config_analysis.CHECK_MONTH_START, end_month=config_analysis.CHECK_MONTH_END,
- start_day=config_analysis.CHECK_DAY_START, end_day=config_analysis.CHECK_DAY_END)
- if df is None:
- continue
- # 过滤常数序列
- if df[df.columns[1]].nunique() <= 2:
- print(f'过滤常数序列{time_series_name}({time_series_code})!')
- continue
- else:
- total_name_list.append(time_series_name)
- total_code_list.append(time_series_code)
- # 保存文件
- saved_data = {
- 'total_name_list': total_name_list,
- 'total_code_list': total_code_list,
- }
- with open(config_analysis.TOTAL_LIST_JSON_FILE, "w", encoding="utf-8") as f:
- json.dump(saved_data, f, ensure_ascii=False, indent=4)
- print(f'分析列表保存到{config_analysis.TOTAL_LIST_JSON_FILE}')
- # 存储所有计算结果
- result = []
- """
- result: [dict, dict, ...]
- dict格式:
- {
- 'A':{'name':,'code':},
- 'B':{'name':,'code':},
- 'res':[{'k':值,'r':值,'p':值},...]:
- }
- """
- # 寻找需要分析的数据,应该从文件中读取字段
- # 序列A
- for a_idx in range(0, len(total_code_list), 1):
- time_series_a_name = total_name_list[a_idx]
- time_series_a_code = total_code_list[a_idx]
- # 获取A列
- df_a = db.query_sql_time_series2data_frame(project_id=config_analysis.PROJECT_ID,
- sheet_name=config_analysis.DB_SHEET_NAME,
- data_code=time_series_a_code,
- start_year=config_analysis.CHECK_YEAR_START,
- end_year=config_analysis.CHECK_YEAR_END,
- start_month=config_analysis.CHECK_MONTH_START,
- end_month=config_analysis.CHECK_MONTH_END,
- start_day=config_analysis.CHECK_DAY_START,
- end_day=config_analysis.CHECK_DAY_END)
- if df_a is None:
- continue
- # 过滤常数序列
- if df_a[df_a.columns[1]].nunique() <= 2 :
- print(f'序列A.{time_series_a_name}({time_series_a_code})遇到常数列, 跳过计算!')
- continue
- # 平稳化
- df_a = diff_tool(time_series_a_name, df_a, df_a.columns[1])
- # 序列B
- for b_idx in range(a_idx, len(total_code_list), 1):
- time_series_b_name = total_name_list[b_idx]
- time_series_b_code = total_code_list[b_idx]
- if skip_tool(time_series_a_name, time_series_b_name):
- print(f'跳过组合:{time_series_a_name} vs. {time_series_b_name}')
- continue
- # 获取B列
- df_b = db.query_sql_time_series2data_frame(project_id=config_analysis.PROJECT_ID,
- sheet_name=config_analysis.DB_SHEET_NAME,
- data_code=time_series_b_code,
- start_year=config_analysis.CHECK_YEAR_START,
- end_year=config_analysis.CHECK_YEAR_END,
- start_month=config_analysis.CHECK_MONTH_START,
- end_month=config_analysis.CHECK_MONTH_END,
- start_day=config_analysis.CHECK_DAY_START,
- end_day=config_analysis.CHECK_DAY_END)
- if df_b is None:
- continue
- # if abs(len(df_a) - len(df_b)) > 20: raise ValueError('时序数据数量差异过大:len(A), len(B)', len(df_a),
- # len(df_b))
- # 过滤常数序列,有一些数列为常数,这些数据方差接近0,无法计算协方差
- if df_b[df_b.columns[1]].nunique() <= 2:
- print(f'序列B.{time_series_b_name}({time_series_b_code})遇到常数列, 跳过计算!')
- continue
- # 平稳化,根据name筛选出需要平稳化的数据,进行一阶差分
- df_b = diff_tool(time_series_b_name, df_b, df_b.columns[1])
- # 融合AB序列
- df_merge = pd.merge(df_a, df_b, how='inner', on='time').sort_values('time', kind='mergesort')
- _, time_series_a_column, time_series_b_column = df_merge.columns
- # 互相关分析
- series_a = df_merge[time_series_a_column]
- series_b = df_merge[time_series_b_column]
- lags = config_analysis.MAX_LAG # 最大滞后
- step = 1
- print(f'正在进行互相关性分析:A.{time_series_a_name}({time_series_a_code}) | B.{time_series_b_name}({time_series_b_code}) ')
- tem_dict = {'A': {'name': time_series_a_name, 'code': time_series_a_code},
- 'B': {'name': time_series_b_name, 'code': time_series_b_code},
- 'res':[]}
- for lag in range(-lags, lags, step):
- if lag < 0: # a滞后于b
- series_a_shifted = series_a[-lag:]
- series_b_shifted = series_b[:lag]
- elif lag > 0: # b滞后于a
- series_a_shifted = series_a[:-lag]
- series_b_shifted = series_b[lag:]
- elif lag == 0: # 0滞后
- series_a_shifted = series_a
- series_b_shifted = series_b
- # 计算皮尔逊系数和显著性
- if len(series_a_shifted) < 24 or len(series_b_shifted) < 24:
- print('skip')
- continue
- r, p_value = stats.pearsonr(series_a_shifted, series_b_shifted)
- # 过滤不显著的数据
- if p_value > config_analysis.P_VALUE_THRESHOLD:
- continue
- if abs(r) < config_analysis.R_THRESHOLD:
- continue
- tem_dict.get('res').append({'k':lag, 'r':r, 'p':p_value})
- # if lag < 0:
- # print(f'A滞后B {abs(lag)}个单位时间, k={lag}, r={r:.4f}, 显著性p={p_value:.4f}')
- # elif lag > 0:
- # print(f'B滞后A {abs(lag)}个单位时间, k={lag}, r={r:.4f}, 显著性p={p_value:.4f}')
- # else:
- # print(f'A与B无滞后, k={lag}, r={r:.4f}, 显著性p={p_value:.6f}')
- if 0 < len(tem_dict.get('res')): result.append(tem_dict)
- print(f'计算完成,结果总数量为:{len(result)}')
- # 将结果保存到文件
- if os.path.exists(config_analysis.OUTPUT_JSON_FILE):
- print(f'删除旧文件{config_analysis.OUTPUT_JSON_FILE}')
- os.remove(config_analysis.OUTPUT_JSON_FILE)
- data = {'data': result, 'len': len(result), 'r_threshold': config_analysis.R_THRESHOLD, 'p_threshold': config_analysis.P_VALUE_THRESHOLD}
- with open(config_analysis.OUTPUT_JSON_FILE, 'w', encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=4)
- print(f'数据保存完成,{config_analysis.OUTPUT_JSON_FILE}')
|