|
|
@@ -267,7 +267,7 @@ def get_tmp_extremes(item_name, start_time, end_time, word_control):
|
|
|
time = item.get("htime_at")
|
|
|
uf1_diff_values[time] = float(item.get("val"))
|
|
|
if uf1_diff_values:
|
|
|
- logger.info(f"第一次API查询成功,提取到跨膜压差数据:{uf1_diff_values}")
|
|
|
+ logger.info(f"第一次API查询成功,提取到跨膜压差数据数量:{len(uf1_diff_values)}")
|
|
|
|
|
|
# 从第二次调用结果中提取'UF1控制字'为26的数据点,并进行时间匹配
|
|
|
if data2.get("code") == 200 and data2.get("data"):
|
|
|
@@ -380,7 +380,7 @@ def send_decision_to_callback(type_name, **kwargs):
|
|
|
**kwargs: 决策结果数据
|
|
|
|
|
|
返回:
|
|
|
- 是否发送成功
|
|
|
+ use_model状态值: 1表示开启模型,0表示关闭模型,None表示发送失败
|
|
|
"""
|
|
|
payload = {"list": [{"type": type_name, "project_id": PROJECT_ID_FOR_CALLBACK, **kwargs}]} # 请求负载 设备类型 项目ID 决策结果数据
|
|
|
|
|
|
@@ -392,17 +392,24 @@ def send_decision_to_callback(type_name, **kwargs):
|
|
|
logger.info(f"[{type_name}] 发送回调 尝试 {attempt}/{max_retries}")
|
|
|
response = requests.post(CALLBACK_URL, headers=HEADERS, json=payload, timeout=15) # 发送回调 请求头 请求体 超时时间
|
|
|
response.raise_for_status()
|
|
|
+ response_json = response.json()
|
|
|
logger.info(f"[{type_name}] 回调发送成功 响应 {response.text}")
|
|
|
- return True
|
|
|
+
|
|
|
+ # 提取返回的 data 字段,表示 use_model 状态(1=开启,0=关闭)
|
|
|
+ use_model_status = response_json.get('data')
|
|
|
+ logger.info(f"[{type_name}] 服务器返回 use_model 状态: {use_model_status}")
|
|
|
+ return use_model_status
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
logger.error(f"[{type_name}] 回调发送失败 {e}")
|
|
|
+ except (json.JSONDecodeError, ValueError) as e:
|
|
|
+ logger.error(f"[{type_name}] 响应解析失败 {e}")
|
|
|
|
|
|
if attempt < max_retries: # 重试次数 小于 最大重试次数
|
|
|
logger.info(f"[{type_name}] 等待{retry_interval}秒后重试")
|
|
|
time.sleep(retry_interval)
|
|
|
|
|
|
logger.error(f"[{type_name}] 回调发送失败,已达最大重试次数")
|
|
|
- return False
|
|
|
+ return None
|
|
|
|
|
|
|
|
|
def get_device_value(payload, device_name):
|
|
|
@@ -497,6 +504,9 @@ def monitor_device(device):
|
|
|
start_collection_time = datetime.now()
|
|
|
collection_duration = timedelta(minutes=10) # 10分钟
|
|
|
|
|
|
+ # 日志计数器,每收集30个点打印一次,避免日志过多
|
|
|
+ log_interval = 30
|
|
|
+
|
|
|
while datetime.now() - start_collection_time < collection_duration:
|
|
|
current_value = get_device_value(device["target_payload"], name) # 当前值
|
|
|
control_value = get_device_value(device["control_payload"], name) # 检查控制字
|
|
|
@@ -514,7 +524,9 @@ def monitor_device(device):
|
|
|
|
|
|
if current_value is not None:
|
|
|
collected_values.append(current_value)
|
|
|
- logger.info(f"收集TMP值 {current_value:.4f} 已收集 {len(collected_values)} 个数据点")
|
|
|
+ # 每收集30个点或第一个点时打印日志,减少日志数量
|
|
|
+ if len(collected_values) == 1 or len(collected_values) % log_interval == 0:
|
|
|
+ logger.info(f"收集TMP值 {current_value:.4f} 已收集 {len(collected_values)} 个数据点")
|
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
|
|
if not collected_values:
|
|
|
@@ -531,7 +543,7 @@ def monitor_device(device):
|
|
|
continue
|
|
|
|
|
|
# 阶段3: 决策计算
|
|
|
- logger.info("数据收集完成,开始决策计算")
|
|
|
+ logger.info(f"数据收集完成,共收集 {len(collected_values)} 个数据点,开始决策计算")
|
|
|
if collected_values:
|
|
|
# 计算平均值作为代表值
|
|
|
average_value = sum(collected_values) / len(collected_values)
|
|
|
@@ -567,8 +579,8 @@ def monitor_device(device):
|
|
|
metrics = calc_uf_cycle_metrics(uf_params, average_value, max_tmp, min_tmp, L_s, t_bw_s) # 计算运行指标
|
|
|
ceb_backwash_frequency = int(metrics["k_bw_per_ceb"])
|
|
|
|
|
|
- # 发送决策结果
|
|
|
- send_decision_to_callback(
|
|
|
+ # 发送决策结果,并获取服务器返回的 use_model 状态
|
|
|
+ use_model_status = send_decision_to_callback(
|
|
|
type_name=name, # 设备名称
|
|
|
water_production_time=int(L_s), # 过滤时间
|
|
|
physical_backwash=int(t_bw_s), # 反洗时间
|
|
|
@@ -582,8 +594,8 @@ def monitor_device(device):
|
|
|
ctime=current_decision_time.strftime(DATETIME_FORMAT) # 时间
|
|
|
)
|
|
|
|
|
|
- # 判断是否下发PLC指令
|
|
|
- if get_current_config()['system']['use_model'] == 1:
|
|
|
+ # 判断是否下发PLC指令,根据服务器返回的 use_model 状态
|
|
|
+ if use_model_status == 1:
|
|
|
logger.info("模型开关已开启,检查PLC指令")
|
|
|
|
|
|
# 记录当前PLC值和模型决策值
|
|
|
@@ -624,21 +636,36 @@ def monitor_device(device):
|
|
|
else:
|
|
|
logger.info(f"CEB次数无需更新: {current_plc_values['bw_per_ceb']}")
|
|
|
|
|
|
+ elif use_model_status == 0:
|
|
|
+ logger.info("服务器返回 use_model=0,模型开关已关闭,跳过PLC指令")
|
|
|
else:
|
|
|
- logger.info("模型开关已关闭,跳过PLC指令")
|
|
|
+ logger.warning("回调发送失败,无法获取 use_model 状态,跳过PLC指令")
|
|
|
|
|
|
# 保存运行状态
|
|
|
model_prev_L_s = L_s # 过滤时间 上一轮
|
|
|
model_prev_t_bw_s = t_bw_s # 反洗时间 上一轮
|
|
|
last_cycle_end_time = current_decision_time # 上次运行结束时间
|
|
|
+
|
|
|
+ # 获取配置的TMP历史记录数量
|
|
|
+ current_config = get_current_config()
|
|
|
+ tmp_history_count = current_config.get('system', {}).get('tmp_history_count', 5)
|
|
|
+
|
|
|
+ # 从最新的内存缓存中读取当前设备状态(确保获取最新的历史记录)
|
|
|
+ current_device_state = device_states.get(name, {})
|
|
|
+ recent_tmp_values = current_device_state.get('recent_tmp_values', [])
|
|
|
+ recent_tmp_values.append(round(average_value, 4))
|
|
|
+ # 只保留最近N次
|
|
|
+ recent_tmp_values = recent_tmp_values[-tmp_history_count:]
|
|
|
|
|
|
state_to_save = {
|
|
|
'model_prev_L_s': model_prev_L_s, # 过滤时间 上一轮
|
|
|
'model_prev_t_bw_s': model_prev_t_bw_s, # 反洗时间 上一轮
|
|
|
- 'last_cycle_end_time': last_cycle_end_time.strftime(DATETIME_FORMAT) # 上次运行结束时间
|
|
|
+ 'last_cycle_end_time': last_cycle_end_time.strftime(DATETIME_FORMAT), # 上次运行结束时间
|
|
|
+ 'recent_tmp_values': recent_tmp_values # 最近N次TMP平均值(新增)
|
|
|
}
|
|
|
save_device_state(name, state_to_save) # 保存设备状态
|
|
|
- logger.info(f"状态保存完成 下次查询起始时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}")
|
|
|
+ logger.info(f"状态保存完成 下次查询起始时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}")
|
|
|
+ logger.info(f"最近{tmp_history_count}次TMP记录: {recent_tmp_values}")
|
|
|
|
|
|
# 阶段4: 等待重置
|
|
|
logger.info(f"等待重置 控制字需重新等于 {TRIGGER_VALUE}")
|