|
|
@@ -737,12 +737,68 @@ def main(strategy=3, start_date=None, unit_filter=None, separate_stages=True, se
|
|
|
|
|
|
return result_df
|
|
|
|
|
|
+def check_decision_exists(project_id, action_type, ctime):
|
|
|
+ """
|
|
|
+ 检查指定决策数据是否已存在
|
|
|
+
|
|
|
+ 通过 GET 请求查询 API,检查指定时间范围内是否已存在该 action_type 的数据
|
|
|
+
|
|
|
+ Args:
|
|
|
+ project_id: int,项目ID
|
|
|
+ action_type: str,动作类型,如'RO1'
|
|
|
+ ctime: str,CIP时间,格式'YYYY-MM-DD HH:MM:SS'
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ bool: 如果数据已存在返回True,不存在返回False
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 解析 ctime,构造查询时间范围(前后3个月)
|
|
|
+ ctime_dt = datetime.strptime(ctime, "%Y-%m-%d %H:%M:%S")
|
|
|
+ stime = (ctime_dt - timedelta(days=90)).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+ etime = (ctime_dt + timedelta(days=90)).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+
|
|
|
+ # 构造查询URL
|
|
|
+ query_params = {
|
|
|
+ "project_id": project_id,
|
|
|
+ "stime": stime,
|
|
|
+ "etime": etime,
|
|
|
+ "action_type": action_type
|
|
|
+ }
|
|
|
+
|
|
|
+ response = requests.get(callback_url, headers=headers, params=query_params, timeout=15)
|
|
|
+ response.raise_for_status()
|
|
|
+
|
|
|
+ result = response.json()
|
|
|
+
|
|
|
+ # 检查返回数据中是否存在该记录
|
|
|
+ if result.get("code") == 200 and result.get("data"):
|
|
|
+ for record in result["data"]:
|
|
|
+ # 检查是否有相同的 ctime(精确到日期)
|
|
|
+ existing_ctime = record.get("ctime", "")
|
|
|
+ if existing_ctime:
|
|
|
+ # 比较日期部分(忽略时分秒的差异)
|
|
|
+ existing_date = existing_ctime.split(" ")[0]
|
|
|
+ target_date = ctime.split(" ")[0]
|
|
|
+ if existing_date == target_date:
|
|
|
+ print(f" [去重] {action_type} 在 {target_date} 已存在记录(id={record.get('id')}), 跳过存储")
|
|
|
+ return True
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f" [去重检查异常] {action_type}: {e}")
|
|
|
+ # 查询失败时,保守处理,继续存储
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
def send_decision_to_callback(decision_data):
|
|
|
"""
|
|
|
将CIP决策结果发送到回调接口
|
|
|
|
|
|
功能:将分析结果按照API格式封装,通过HTTP POST发送到回调地址
|
|
|
|
|
|
+ 注意:发送前会先检查数据是否已存在,避免重复存储
|
|
|
+
|
|
|
Args:
|
|
|
decision_data: pd.DataFrame,决策数据,包含机组类型和CIP时机
|
|
|
|
|
|
@@ -776,24 +832,33 @@ def send_decision_to_callback(decision_data):
|
|
|
# 提取纯粹的机组编号(去掉"-一段"或"-二段")
|
|
|
# 例如:"RO1-一段" -> "RO1","RO2-二段" -> "RO2"
|
|
|
unit_name = unit_type.split('-')[0] if '-' in unit_type else unit_type
|
|
|
+ ctime_str = row["CIP时机"].strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+
|
|
|
+ # ===== 去重检查:查询是否已存在该记录 =====
|
|
|
+ if check_decision_exists(project_id, unit_name, ctime_str):
|
|
|
+ continue # 跳过已存在的记录
|
|
|
|
|
|
callback_list.append({
|
|
|
"type": unit_name,
|
|
|
"project_id": project_id,
|
|
|
- "ctime": row["CIP时机"].strftime("%Y-%m-%d %H:%M:%S"),
|
|
|
+ "ctime": ctime_str,
|
|
|
"ceb_backwash_frequency": stage_num
|
|
|
})
|
|
|
else:
|
|
|
callback_list = [decision_data]
|
|
|
|
|
|
- # 关键检查:如果没有有效数据,不发送回调
|
|
|
+ # 关键检查:如果没有有效数据(全部被去重),不发送回调
|
|
|
if not callback_list:
|
|
|
- return False
|
|
|
+ print("所有数据已存在或无有效数据,跳过回调")
|
|
|
+ return True # 返回True表示处理成功(只是不需要存储)
|
|
|
|
|
|
# 封装为API要求的格式
|
|
|
payload = {
|
|
|
"list": callback_list
|
|
|
}
|
|
|
+
|
|
|
+ print(f"准备发送 {len(callback_list)} 条数据到回调接口")
|
|
|
+
|
|
|
# 发送HTTP请求(带重试机制)
|
|
|
max_retries = 3
|
|
|
retry_interval = 10
|