|
@@ -10,8 +10,8 @@ DB_HOST = "222.130.26.206"
|
|
|
DB_PORT = 4000
|
|
DB_PORT = 4000
|
|
|
|
|
|
|
|
# 时间配置
|
|
# 时间配置
|
|
|
-START_TIME = datetime(2025, 11, 28, 0, 0, 0)
|
|
|
|
|
-END_TIME = datetime(2026, 2, 20, 0, 0, 0)
|
|
|
|
|
|
|
+START_TIME = datetime(2025, 4, 1, 0, 0, 0)
|
|
|
|
|
+END_TIME = datetime(2026, 3, 1, 6, 0, 0)
|
|
|
BOUNDARY = datetime(2025, 3, 25, 0, 0, 0)
|
|
BOUNDARY = datetime(2025, 3, 25, 0, 0, 0)
|
|
|
|
|
|
|
|
DELETE_PERIODS = [
|
|
DELETE_PERIODS = [
|
|
@@ -23,26 +23,35 @@ DELETE_PERIODS = [
|
|
|
] # 来自张昊师兄的,需要被去除的黑名单区间
|
|
] # 来自张昊师兄的,需要被去除的黑名单区间
|
|
|
DELETE_PERIODS = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in DELETE_PERIODS]
|
|
DELETE_PERIODS = [(pd.to_datetime(s), pd.to_datetime(e)) for s, e in DELETE_PERIODS]
|
|
|
|
|
|
|
|
-# 新水岛
|
|
|
|
|
-# ---------- 传感器配置(新水厂系统) ----------
|
|
|
|
|
|
|
+# ---------- 传感器配置----------
|
|
|
# 机组编号
|
|
# 机组编号
|
|
|
UNITS = [1, 2]
|
|
UNITS = [1, 2]
|
|
|
|
|
|
|
|
-# 新系统变量模板
|
|
|
|
|
BASE_VARIABLES = [
|
|
BASE_VARIABLES = [
|
|
|
- "ns=3;s={}#UF_JSFLOW_O", # 进水流量
|
|
|
|
|
- "ns=3;s={}#UF_JSPRESS_O", # 进水压力(如有)
|
|
|
|
|
- "ns=3;s=UF{}_SSD_KMYC", # 跨膜压差(如有)
|
|
|
|
|
- "ns=3;s=UF{}_STEP" # 步序
|
|
|
|
|
|
|
+ "AR.{}#UF_JSFLOW_O", # 进水流量
|
|
|
|
|
+ "AR.{}#UF_JSPRESS_O", # 进水压力
|
|
|
|
|
+ "AR.UF{}_SSD_KMYC", # 跨膜压差
|
|
|
|
|
+ "AR.UF{}_STEP", # 步序/控制字
|
|
|
|
|
+ "AR.ZZ_{}#UFBWB_POWER" # 反洗泵功率
|
|
|
]
|
|
]
|
|
|
-
|
|
|
|
|
-# 系统总变量(如果存在)
|
|
|
|
|
SYSTEM_VARIABLES = [
|
|
SYSTEM_VARIABLES = [
|
|
|
- "ns=3;s=ZJS_TEMP_O", # 进水温度
|
|
|
|
|
- "ns=3;s=RO_JSORP_O", # 总产水ORP(示例)
|
|
|
|
|
- "ns=3;s=RO_JSPH_O", # 总产水PH(示例)
|
|
|
|
|
|
|
+ "AR.ZJS_TEMP_O", # 进水温度
|
|
|
|
|
+ "AR.RO_JSORP_O", # 总产水ORP
|
|
|
|
|
+ "AR.RO_JSPH_O", # 总产水PH
|
|
|
|
|
+ "AR.RO_JSDD_O", # 总产水电导
|
|
|
|
|
+ "AR.CN_LEVEL_O", # 次钠液位
|
|
|
|
|
+ "AR.S_LEVEL_O", # 酸液位
|
|
|
|
|
+ "AR.J_LEVEL_O", # 碱液位
|
|
|
|
|
+ # "AR.ZZ_UFGSB_POWER", # 超滤供水泵功率
|
|
|
]
|
|
]
|
|
|
|
|
|
|
|
|
|
+# 输出目录
|
|
|
|
|
+BASE_OUTPUT_DIR = "../datasets/UF_anzhen_data"
|
|
|
|
|
+PROCESSED_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "raw")
|
|
|
|
|
+
|
|
|
|
|
+# 创建目录
|
|
|
|
|
+os.makedirs(PROCESSED_OUTPUT_DIR, exist_ok=True)
|
|
|
|
|
+
|
|
|
# 生成所有变量名称
|
|
# 生成所有变量名称
|
|
|
SENSOR_NAMES = []
|
|
SENSOR_NAMES = []
|
|
|
|
|
|
|
@@ -56,13 +65,6 @@ print(f"总共查询 {len(SENSOR_NAMES)} 个变量")
|
|
|
for i, var in enumerate(SENSOR_NAMES, 1):
|
|
for i, var in enumerate(SENSOR_NAMES, 1):
|
|
|
print(f"{i:2d}. {var}")
|
|
print(f"{i:2d}. {var}")
|
|
|
|
|
|
|
|
-# 输出目录 - 只需要processed文件夹
|
|
|
|
|
-BASE_OUTPUT_DIR = "../datasets/UF_longting_data"
|
|
|
|
|
-PROCESSED_OUTPUT_DIR = os.path.join(BASE_OUTPUT_DIR, "raw")
|
|
|
|
|
-
|
|
|
|
|
-# 创建目录
|
|
|
|
|
-os.makedirs(PROCESSED_OUTPUT_DIR, exist_ok=True)
|
|
|
|
|
-
|
|
|
|
|
|
|
|
|
|
# ---------- 创建数据库引擎 ----------
|
|
# ---------- 创建数据库引擎 ----------
|
|
|
def create_db_engines():
|
|
def create_db_engines():
|
|
@@ -93,21 +95,20 @@ def create_db_engines():
|
|
|
# ---------- 一分钟聚合查询函数(子查询方式)----------
|
|
# ---------- 一分钟聚合查询函数(子查询方式)----------
|
|
|
def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
|
|
def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
|
|
|
"""
|
|
"""
|
|
|
- 从数据库获取传感器数据并直接进行时间聚合
|
|
|
|
|
- 使用子查询方式避免 ONLY_FULL_GROUP_BY 问题
|
|
|
|
|
|
|
+ 从数据库获取传感器数据并按分钟聚合,时间戳对齐到分钟开始(00秒)
|
|
|
"""
|
|
"""
|
|
|
interval_seconds = interval_minutes * 60
|
|
interval_seconds = interval_minutes * 60
|
|
|
|
|
|
|
|
sql = text(f"""
|
|
sql = text(f"""
|
|
|
SELECT
|
|
SELECT
|
|
|
- MIN(h_time) AS time,
|
|
|
|
|
|
|
+ FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(MIN(h_time)) / {interval_seconds}) * {interval_seconds}) AS time,
|
|
|
AVG(val) AS val
|
|
AVG(val) AS val
|
|
|
FROM (
|
|
FROM (
|
|
|
SELECT
|
|
SELECT
|
|
|
h_time,
|
|
h_time,
|
|
|
val,
|
|
val,
|
|
|
FLOOR(UNIX_TIMESTAMP(h_time) / {interval_seconds}) AS time_group
|
|
FLOOR(UNIX_TIMESTAMP(h_time) / {interval_seconds}) AS time_group
|
|
|
- FROM dc_item_history_data_1450
|
|
|
|
|
|
|
+ FROM dc_item_history_data_1181
|
|
|
WHERE item_name = :name
|
|
WHERE item_name = :name
|
|
|
AND h_time BETWEEN :st AND :et
|
|
AND h_time BETWEEN :st AND :et
|
|
|
AND val IS NOT NULL
|
|
AND val IS NOT NULL
|
|
@@ -120,77 +121,154 @@ def fetch_valve_aggregated(name, start, end, engine, interval_minutes=1):
|
|
|
df = pd.read_sql(sql, engine, params={"name": name, "st": start, "et": end})
|
|
df = pd.read_sql(sql, engine, params={"name": name, "st": start, "et": end})
|
|
|
if not df.empty:
|
|
if not df.empty:
|
|
|
df['time'] = pd.to_datetime(df['time'])
|
|
df['time'] = pd.to_datetime(df['time'])
|
|
|
|
|
+ # 确保时间戳是整分钟(去除秒和微秒)
|
|
|
|
|
+ df['time'] = df['time'].dt.floor('1min')
|
|
|
print(f" ✓ {name}: {len(df)} 条记录")
|
|
print(f" ✓ {name}: {len(df)} 条记录")
|
|
|
return df
|
|
return df
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
print(f" ✗ {name} 查询失败: {str(e)}")
|
|
print(f" ✗ {name} 查询失败: {str(e)}")
|
|
|
return pd.DataFrame()
|
|
return pd.DataFrame()
|
|
|
|
|
|
|
|
|
|
+def fetch_special_data(sensor, start, end, boundary, engine_test, engine_prod):
|
|
|
|
|
+ """
|
|
|
|
|
+ 专门获取步序数据,保持原始变化点
|
|
|
|
|
+ 返回原始时间戳和值
|
|
|
|
|
+ """
|
|
|
|
|
+ sql = text("""
|
|
|
|
|
+ SELECT h_time AS time, val
|
|
|
|
|
+ FROM dc_item_history_data_1181
|
|
|
|
|
+ WHERE item_name = :name
|
|
|
|
|
+ AND h_time BETWEEN :st
|
|
|
|
|
+ AND :et
|
|
|
|
|
+ AND val IS NOT NULL
|
|
|
|
|
+ ORDER BY h_time
|
|
|
|
|
+ """)
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 根据边界时间选择合适的数据库引擎
|
|
|
|
|
+ if end <= boundary:
|
|
|
|
|
+ df = pd.read_sql(sql, engine_test, params={"name": sensor, "st": start, "et": end})
|
|
|
|
|
+ elif start >= boundary:
|
|
|
|
|
+ df = pd.read_sql(sql, engine_prod, params={"name": sensor, "st": start, "et": end})
|
|
|
|
|
+ else:
|
|
|
|
|
+ df1 = pd.read_sql(sql, engine_test, params={"name": sensor, "st": start, "et": boundary})
|
|
|
|
|
+ df2 = pd.read_sql(sql, engine_prod, params={"name": sensor, "st": boundary, "et": end})
|
|
|
|
|
+ df = pd.concat([df1, df2], ignore_index=True)
|
|
|
|
|
|
|
|
|
|
+ if not df.empty:
|
|
|
|
|
+ df['time'] = pd.to_datetime(df['time'])
|
|
|
|
|
+
|
|
|
|
|
+ return df
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f" ✗ {sensor} 查询失败: {str(e)}")
|
|
|
|
|
+ return pd.DataFrame()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# ---------- 传感器数据查询函数(只获取聚合数据)----------
|
|
|
# ---------- 传感器数据查询函数(只获取聚合数据)----------
|
|
# ---------- 传感器数据查询函数(只获取聚合数据)----------
|
|
|
def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test, engine_prod):
|
|
def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test, engine_prod):
|
|
|
"""
|
|
"""
|
|
|
获取多个传感器的分钟级聚合数据并合并为宽表
|
|
获取多个传感器的分钟级聚合数据并合并为宽表
|
|
|
-
|
|
|
|
|
- 参数:
|
|
|
|
|
- sensor_names: 传感器名称列表
|
|
|
|
|
- start_time: 开始时间
|
|
|
|
|
- end_time: 结束时间
|
|
|
|
|
- boundary: 数据库切换边界
|
|
|
|
|
- engine_test: 测试数据库引擎
|
|
|
|
|
- engine_prod: 生产数据库引擎
|
|
|
|
|
|
|
+ 步序变量单独处理
|
|
|
"""
|
|
"""
|
|
|
- all_data = [] # 只存储聚合数据
|
|
|
|
|
-
|
|
|
|
|
- print("\n开始获取各传感器数据:")
|
|
|
|
|
-
|
|
|
|
|
- for sensor in sensor_names:
|
|
|
|
|
- try:
|
|
|
|
|
- # 根据边界时间选择合适的数据库引擎
|
|
|
|
|
- if end_time <= boundary:
|
|
|
|
|
- # 全部在测试数据库
|
|
|
|
|
- df = fetch_valve_aggregated(sensor, start_time, end_time, engine_test)
|
|
|
|
|
- elif start_time >= boundary:
|
|
|
|
|
- # 全部在生产数据库
|
|
|
|
|
- df = fetch_valve_aggregated(sensor, start_time, end_time, engine_prod)
|
|
|
|
|
- else:
|
|
|
|
|
- # 跨越两个数据库
|
|
|
|
|
- df1 = fetch_valve_aggregated(sensor, start_time, boundary, engine_test)
|
|
|
|
|
- df2 = fetch_valve_aggregated(sensor, boundary, end_time, engine_prod)
|
|
|
|
|
- df = pd.concat([df1, df2], ignore_index=True)
|
|
|
|
|
-
|
|
|
|
|
- if not df.empty:
|
|
|
|
|
- # 重命名列以区分不同传感器
|
|
|
|
|
- df_renamed = df.rename(columns={'val': sensor})
|
|
|
|
|
- all_data.append(df_renamed[['time', sensor]])
|
|
|
|
|
- else:
|
|
|
|
|
- print(f" ⚠ {sensor}: 无数据")
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- print(f" ⚠ {sensor}: 处理失败 - {str(e)}")
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- if not all_data:
|
|
|
|
|
- print("\n❌ 未获取到任何传感器数据")
|
|
|
|
|
- return pd.DataFrame()
|
|
|
|
|
|
|
+ # 识别步序变量和功率变量
|
|
|
|
|
+ step_vars = [v for v in sensor_names if 'STEP' in v]
|
|
|
|
|
+ power_vars = [v for v in sensor_names if 'POWER' in v]
|
|
|
|
|
+
|
|
|
|
|
+ # 需要特殊处理的变量
|
|
|
|
|
+ special_vars = step_vars + power_vars
|
|
|
|
|
|
|
|
- print(f"\n开始合并 {len(all_data)} 个传感器的数据...")
|
|
|
|
|
|
|
+ # 其他连续变量
|
|
|
|
|
+ continuous_vars = [v for v in sensor_names if v not in special_vars]
|
|
|
|
|
|
|
|
- # 使用reduce逐步合并DataFrame
|
|
|
|
|
- merged_df = reduce(
|
|
|
|
|
- lambda left, right: pd.merge(left, right, on='time', how='outer'),
|
|
|
|
|
- all_data
|
|
|
|
|
|
|
+ print(f"\n识别到 {len(special_vars)} 个离散变量, {len(continuous_vars)} 个连续变量")
|
|
|
|
|
+
|
|
|
|
|
+ # 3. 创建完整的时间网格(整分钟)- 先创建
|
|
|
|
|
+ print(f"\n创建完整时间网格...")
|
|
|
|
|
+ time_grid = pd.date_range(
|
|
|
|
|
+ start=start_time.replace(second=0, microsecond=0),
|
|
|
|
|
+ end=end_time.replace(second=0, microsecond=0),
|
|
|
|
|
+ freq='1min'
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- # 按时间排序
|
|
|
|
|
- merged_df = merged_df.sort_values('time').reset_index(drop=True)
|
|
|
|
|
|
|
+ # 创建以时间为索引的DataFrame
|
|
|
|
|
+ merged_df = pd.DataFrame(index=time_grid)
|
|
|
|
|
+ print(f"时间网格: {len(time_grid)} 个时间点")
|
|
|
|
|
+
|
|
|
|
|
+ # 1. 处理连续变量(按分钟聚合,时间对齐到整分钟)
|
|
|
|
|
+ if continuous_vars:
|
|
|
|
|
+ print("\n获取连续变量数据(分钟平均):")
|
|
|
|
|
+ for sensor in continuous_vars:
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 根据边界时间选择合适的数据库引擎
|
|
|
|
|
+ if end_time <= boundary:
|
|
|
|
|
+ df = fetch_valve_aggregated(sensor, start_time, end_time, engine_test)
|
|
|
|
|
+ elif start_time >= boundary:
|
|
|
|
|
+ df = fetch_valve_aggregated(sensor, start_time, end_time, engine_prod)
|
|
|
|
|
+ else:
|
|
|
|
|
+ df1 = fetch_valve_aggregated(sensor, start_time, boundary, engine_test)
|
|
|
|
|
+ df2 = fetch_valve_aggregated(sensor, boundary, end_time, engine_prod)
|
|
|
|
|
+ df = pd.concat([df1, df2], ignore_index=True)
|
|
|
|
|
+
|
|
|
|
|
+ if not df.empty:
|
|
|
|
|
+ # 确保时间戳是整分钟并设为索引
|
|
|
|
|
+ df['time'] = pd.to_datetime(df['time']).dt.floor('1min')
|
|
|
|
|
+ df = df.drop_duplicates(subset=['time'], keep='first')
|
|
|
|
|
+ df = df.set_index('time')
|
|
|
|
|
+
|
|
|
|
|
+ # 添加到合并DataFrame
|
|
|
|
|
+ merged_df[sensor] = df['val']
|
|
|
|
|
+ print(f" ✓ {sensor}: {len(df)} 条记录")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f" ⚠ {sensor}: 无数据")
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f" ⚠ {sensor}: 处理失败 - {str(e)}")
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 2. 处理离散变量(保持原始变化点)
|
|
|
|
|
+ if special_vars:
|
|
|
|
|
+ print("\n获取离散变量数据(原始变化点):")
|
|
|
|
|
+ for sensor in special_vars:
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 获取原始数据(不聚合)
|
|
|
|
|
+ df = fetch_special_data(sensor, start_time, end_time, boundary, engine_test, engine_prod)
|
|
|
|
|
+
|
|
|
|
|
+ if not df.empty:
|
|
|
|
|
+ # 创建分钟级的重采样
|
|
|
|
|
+ df['time'] = pd.to_datetime(df['time'])
|
|
|
|
|
+ df = df.set_index('time')
|
|
|
|
|
+
|
|
|
|
|
+ # 重采样到分钟,对于离散变量使用前向填充
|
|
|
|
|
+ df_resampled = df.resample('1min').ffill()
|
|
|
|
|
+
|
|
|
|
|
+ # 添加到合并DataFrame
|
|
|
|
|
+ merged_df[sensor] = df_resampled['val']
|
|
|
|
|
+ print(f" ✓ {sensor}: {len(df)} 个原始点 -> {len(df_resampled)} 个分钟点")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f" ⚠ {sensor}: 无数据")
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f" ⚠ {sensor}: 处理失败 - {str(e)}")
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if merged_df.empty or len(merged_df.columns) == 0:
|
|
|
|
|
+ print("\n❌ 未获取到任何传感器数据")
|
|
|
|
|
+ return pd.DataFrame()
|
|
|
|
|
|
|
|
- print(f"合并完成,共 {len(merged_df)} 条时间记录")
|
|
|
|
|
|
|
+ # 重置索引,将时间变为列
|
|
|
|
|
+ merged_df = merged_df.reset_index()
|
|
|
|
|
+ merged_df = merged_df.rename(columns={'index': 'time'})
|
|
|
|
|
|
|
|
- # 删除黑名单时段
|
|
|
|
|
- print("删除黑名单时段...")
|
|
|
|
|
|
|
+ print(f"\n合并完成,共 {len(merged_df)} 条时间记录 × {len(merged_df.columns) - 1} 个传感器")
|
|
|
|
|
+ print(f"数据框形状: {merged_df.shape}")
|
|
|
|
|
+
|
|
|
|
|
+ # 5. 删除黑名单时段
|
|
|
|
|
+ print("\n删除黑名单时段...")
|
|
|
original_len = len(merged_df)
|
|
original_len = len(merged_df)
|
|
|
for s, e in DELETE_PERIODS:
|
|
for s, e in DELETE_PERIODS:
|
|
|
|
|
+ s = pd.to_datetime(s).floor('1min')
|
|
|
|
|
+ e = pd.to_datetime(e).floor('1min')
|
|
|
merged_df = merged_df[(merged_df['time'] < s) | (merged_df['time'] > e)]
|
|
merged_df = merged_df[(merged_df['time'] < s) | (merged_df['time'] > e)]
|
|
|
|
|
|
|
|
deleted_count = original_len - len(merged_df)
|
|
deleted_count = original_len - len(merged_df)
|
|
@@ -201,16 +279,11 @@ def fetch_sensor_data(sensor_names, start_time, end_time, boundary, engine_test,
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------- 数据后处理函数(填充空值)----------
|
|
# ---------- 数据后处理函数(填充空值)----------
|
|
|
-def post_process_data(df, method='both'):
|
|
|
|
|
|
|
+def post_process_data(df, continuous_vars, step_vars):
|
|
|
"""
|
|
"""
|
|
|
- 对聚合后的数据进行后处理:填充空值
|
|
|
|
|
-
|
|
|
|
|
- 参数:
|
|
|
|
|
- df: 输入的宽表DataFrame
|
|
|
|
|
- method: 填充方法
|
|
|
|
|
-
|
|
|
|
|
- 返回:
|
|
|
|
|
- pd.DataFrame: 处理后的DataFrame
|
|
|
|
|
|
|
+ 对聚合后的数据进行后处理
|
|
|
|
|
+ 连续变量:线性插值或前后填充
|
|
|
|
|
+ 步序变量:已经前向填充,只需处理开头可能存在的空值
|
|
|
"""
|
|
"""
|
|
|
if df.empty:
|
|
if df.empty:
|
|
|
print("警告:输入数据为空")
|
|
print("警告:输入数据为空")
|
|
@@ -227,21 +300,22 @@ def post_process_data(df, method='both'):
|
|
|
missing_before = df_processed.isnull().sum().sum()
|
|
missing_before = df_processed.isnull().sum().sum()
|
|
|
print(f"填充前空值数量: {missing_before}")
|
|
print(f"填充前空值数量: {missing_before}")
|
|
|
|
|
|
|
|
- if method == 'bfill':
|
|
|
|
|
- df_processed = df_processed.bfill()
|
|
|
|
|
- elif method == 'ffill':
|
|
|
|
|
- df_processed = df_processed.ffill()
|
|
|
|
|
- else: # both
|
|
|
|
|
- df_processed = df_processed.ffill().bfill()
|
|
|
|
|
|
|
+ # 处理连续变量(使用线性插值更适合连续物理量)
|
|
|
|
|
+ for var in continuous_vars:
|
|
|
|
|
+ if var in df_processed.columns:
|
|
|
|
|
+ # 先线性插值,再前后填充边界
|
|
|
|
|
+ df_processed[var] = df_processed[var].interpolate(method='linear', limit_direction='both')
|
|
|
|
|
+
|
|
|
|
|
+ # 处理步序变量(可能开头有NaN,用后向填充)
|
|
|
|
|
+ for var in step_vars:
|
|
|
|
|
+ if var in df_processed.columns:
|
|
|
|
|
+ df_processed[var] = df_processed[var].bfill() # 开头空值用第一个有效值填充
|
|
|
|
|
|
|
|
missing_after = df_processed.isnull().sum().sum()
|
|
missing_after = df_processed.isnull().sum().sum()
|
|
|
print(f"填充后空值数量: {missing_after}")
|
|
print(f"填充后空值数量: {missing_after}")
|
|
|
print(f"填充了 {missing_before - missing_after} 个空值")
|
|
print(f"填充了 {missing_before - missing_after} 个空值")
|
|
|
|
|
|
|
|
- # 重置索引
|
|
|
|
|
- df_processed = df_processed.reset_index().rename(columns={'index': 'time'})
|
|
|
|
|
-
|
|
|
|
|
- return df_processed
|
|
|
|
|
|
|
+ return df_processed.reset_index()
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------- 数据分块保存函数 ----------
|
|
# ---------- 数据分块保存函数 ----------
|
|
@@ -303,12 +377,16 @@ if __name__ == "__main__":
|
|
|
print(f"\n✅ 聚合数据获取完成!")
|
|
print(f"\n✅ 聚合数据获取完成!")
|
|
|
print(f"总数据量: {len(agg_df)} 条记录")
|
|
print(f"总数据量: {len(agg_df)} 条记录")
|
|
|
print(f"时间范围: {agg_df['time'].min()} 到 {agg_df['time'].max()}")
|
|
print(f"时间范围: {agg_df['time'].min()} 到 {agg_df['time'].max()}")
|
|
|
- print(f"时间间隔: 1分钟")
|
|
|
|
|
- print(f"传感器数量: {len(agg_df.columns) - 1} 个") # 减1是因为time列
|
|
|
|
|
|
|
+ print(f"时间间隔: 1分钟(整点)")
|
|
|
|
|
+ print(f"传感器数量: {len(agg_df.columns) - 1} 个")
|
|
|
|
|
+
|
|
|
|
|
+ # 识别步序变量
|
|
|
|
|
+ step_vars = [col for col in agg_df.columns if 'STEP' in col]
|
|
|
|
|
+ continuous_vars = [col for col in agg_df.columns if col != 'time' and col not in step_vars]
|
|
|
|
|
|
|
|
- # 3. 后处理聚合数据(填充空值)
|
|
|
|
|
- print(f"\n[3/4] 后处理聚合数据(填充空值)...")
|
|
|
|
|
- processed_df = post_process_data(agg_df, method='both')
|
|
|
|
|
|
|
+ # 3. 后处理数据
|
|
|
|
|
+ print(f"\n[3/4] 后处理聚合数据...")
|
|
|
|
|
+ processed_df = post_process_data(agg_df, continuous_vars, step_vars)
|
|
|
|
|
|
|
|
print(f"\n✅ 后处理完成!")
|
|
print(f"\n✅ 后处理完成!")
|
|
|
print(f"处理后数据行数: {len(processed_df)}")
|
|
print(f"处理后数据行数: {len(processed_df)}")
|