""" send_decision_to_callback 函数测试脚本 测试功能: 1. 测试正常的决策数据发送 2. 测试不同参数组合 3. 模拟实际使用场景 4. 测试PLC指令下发(当use_model_status=1时) """ import json from datetime import datetime from loop_main import send_decision_to_callback, send_plc_update, DATETIME_FORMAT, load_config, get_device_value def test_basic_callback(): """ 基础测试:发送一组标准的决策数据,如果use_model_status=1则测试PLC指令下发 """ print("=" * 60) print("测试1: 基础回调测试") print("=" * 60) # 模拟决策结果数据 device_name = "UF1" # 设备名称 test_data = { "type_name": device_name, "water_production_time": 4201, # 产水时间(秒) "physical_backwash": 120, # 物理反洗时间(秒) "ceb_backwash_frequency": 35, # CEB反洗频率 "duration_system": 3800, # 系统运行时间(秒) "tmp_action": 0.045, # TMP动作值 "recovery_rate": 0.95, # 回收率 "ton_water_energy_kWh": 0.28, # 吨水电耗 "max_permeability": 850.5, # 最高渗透率 "daily_prod_time_h": 22.5, # 日均产水时间(小时) "ctime": datetime.now().strftime(DATETIME_FORMAT) # 当前时间 } print("发送数据:") print(json.dumps(test_data, indent=2, ensure_ascii=False)) print() # 调用函数 try: use_model_status = send_decision_to_callback(**test_data) print(f"返回的 use_model 状态: {use_model_status}") if use_model_status == 1: print("测试结果: 成功 - 模型开关已开启") print() print("-" * 60) print("开始测试PLC指令下发") print("-" * 60) # 加载配置获取设备信息 config = load_config() device_config = None for dev in config['devices']: if dev['name'] == device_name: device_config = dev break if device_config: # 先读取当前PLC的产水时间值 print("正在读取PLC当前产水时间...") current_prod_time = get_device_value(device_config["production_time_payload"], device_name) if current_prod_time is not None: old_prod_time = str(int(current_prod_time)) new_prod_time = str(int(current_prod_time) + 1) # 当前值+1 prod_time_item = device_config["production_time_payload"]["deviceItems"] print(f"✓ 读取成功: 当前产水时间 = {old_prod_time}") print() print(f"测试参数:") print(f" 设备名称: {device_name}") print(f" 参数项: {prod_time_item}") print(f" 旧值: {old_prod_time} (从PLC读取)") print(f" 新值: {new_prod_time} (旧值+1)") print(f" 指令类型: 1 (产水时间)") print() # 发送PLC指令 print("正在发送PLC指令...") plc_result = send_plc_update( device_name=device_name, item=prod_time_item, old_value=old_prod_time, new_value=new_prod_time, command_type=1 # 产水时间的指令类型 ) if plc_result: print("✓ PLC指令发送成功") else: print("✗ PLC指令发送失败") else: print("✗ 错误:无法读取当前产水时间,跳过PLC指令测试") else: print(f"⚠ 警告: 未找到设备 {device_name} 的配置") print("-" * 60) elif use_model_status == 0: print("测试结果: 成功 - 模型开关已关闭") print("跳过PLC指令测试") else: print("测试结果: 失败 - 未能获取 use_model 状态") except Exception as e: print(f"测试异常: {e}") import traceback traceback.print_exc() print("=" * 60) print() def test_minimal_callback(): """ 最小参数测试:只传递必要的参数 """ print("=" * 60) print("测试2: 最小参数测试") print("=" * 60) # 最小数据集 test_data = { "type_name": "UF2", "water_production_time": 3500, "physical_backwash": 100, "ctime": datetime.now().strftime(DATETIME_FORMAT) } print("发送数据:") print(json.dumps(test_data, indent=2, ensure_ascii=False)) print() try: use_model_status = send_decision_to_callback(**test_data) print(f"返回的 use_model 状态: {use_model_status}") if use_model_status == 1: print("测试结果: 成功 - 模型开关已开启") elif use_model_status == 0: print("测试结果: 成功 - 模型开关已关闭") else: print("测试结果: 失败 - 未能获取 use_model 状态") except Exception as e: print(f"测试异常: {e}") print("=" * 60) print() def test_multiple_devices(): """ 多设备测试:模拟多个设备的决策数据发送 """ print("=" * 60) print("测试3: 多设备测试") print("=" * 60) devices = ["UF1", "UF2", "UF3"] for device_name in devices: print(f"\n发送设备 {device_name} 的决策数据...") test_data = { "type_name": device_name, "water_production_time": 3600 + (devices.index(device_name) * 100), "physical_backwash": 100 + (devices.index(device_name) * 10), "ceb_backwash_frequency": 40 - devices.index(device_name), "duration_system": 3800, "tmp_action": 0.040 + (devices.index(device_name) * 0.005), "recovery_rate": 0.95, "ton_water_energy_kWh": 0.25 + (devices.index(device_name) * 0.02), "max_permeability": 850.0, "daily_prod_time_h": 22.0, "ctime": datetime.now().strftime(DATETIME_FORMAT) } try: use_model_status = send_decision_to_callback(**test_data) print(f"{device_name} 返回的 use_model 状态: {use_model_status}") if use_model_status == 1: print(f"{device_name} 测试结果: 成功 - 模型开关已开启") elif use_model_status == 0: print(f"{device_name} 测试结果: 成功 - 模型开关已关闭") else: print(f"{device_name} 测试结果: 失败 - 未能获取 use_model 状态") except Exception as e: print(f"{device_name} 测试异常: {e}") print("\n" + "=" * 60) print() def test_custom_scenario(): """ 自定义场景测试:可以根据需要修改参数 """ print("=" * 60) print("测试4: 自定义场景测试") print("=" * 60) # 这里可以自定义测试参数 test_data = { "type_name": "UF1", # 修改设备名称 "water_production_time": 4000, # 修改产水时间 "physical_backwash": 150, # 修改反洗时间 "ceb_backwash_frequency": 30, # 修改CEB频率 "duration_system": 4200, "tmp_action": 0.055, "recovery_rate": 0.92, "ton_water_energy_kWh": 0.30, "max_permeability": 800.0, "daily_prod_time_h": 21.5, "ctime": datetime.now().strftime(DATETIME_FORMAT) } print("自定义测试数据:") print(json.dumps(test_data, indent=2, ensure_ascii=False)) print() try: use_model_status = send_decision_to_callback(**test_data) print(f"返回的 use_model 状态: {use_model_status}") if use_model_status == 1: print("测试结果: 成功 - 模型开关已开启") elif use_model_status == 0: print("测试结果: 成功 - 模型开关已关闭") else: print("测试结果: 失败 - 未能获取 use_model 状态") except Exception as e: print(f"测试异常: {e}") print("=" * 60) print() def test_plc_update_with_callback(): """ 测试5: 回调+PLC指令测试(完整流程) """ print("=" * 60) print("测试5: 回调+PLC指令完整流程测试") print("=" * 60) device_name = "UF1" # 第一步:发送回调数据 test_data = { "type_name": device_name, "water_production_time": 4201, # 决策建议的产水时间 "physical_backwash": 120, "ceb_backwash_frequency": 35, "duration_system": 3800, "tmp_action": 0.045, "recovery_rate": 0.95, "ton_water_energy_kWh": 0.28, "max_permeability": 850.5, "daily_prod_time_h": 22.5, "ctime": datetime.now().strftime(DATETIME_FORMAT) } print("步骤1: 发送决策数据到回调接口") print("=" * 60) print(json.dumps(test_data, indent=2, ensure_ascii=False)) print() try: # use_model_status = 1 use_model_status = send_decision_to_callback(**test_data) print(f"✓ 回调响应: use_model_status = {use_model_status}") print() # 第二步:根据返回状态决定是否发送PLC指令 print("步骤2: 根据use_model_status决定是否下发PLC指令") print("=" * 60) if use_model_status == 1: print("✓ 模型开关已开启,准备下发PLC指令") print() # 加载配置 config = load_config() device_config = None for dev in config['devices']: if dev['name'] == device_name: device_config = dev break if device_config: print("步骤3: 读取PLC当前产水时间") print("=" * 60) # 先读取当前PLC的产水时间值 current_prod_time = get_device_value(device_config["production_time_payload"], device_name) if current_prod_time is not None: old_prod_time = str(int(current_prod_time)) new_prod_time = str(int(current_prod_time) - 1) # 当前值+1 prod_time_item = device_config["production_time_payload"]["deviceItems"] print(f"✓ 读取成功: 当前产水时间 = {old_prod_time} 秒") print() print("步骤4: 下发PLC指令(修改产水时间)") print("=" * 60) print(f"设备: {device_name}") print(f"参数: {prod_time_item}") print(f"变更: {old_prod_time} -> {new_prod_time} (当前值+1)") print(f"类型: command_type=1 (产水时间)") print() # 发送PLC指令 plc_result = send_plc_update( device_name=device_name, item=prod_time_item, old_value=old_prod_time, new_value=new_prod_time, command_type=1 ) print() if plc_result: print("✓✓✓ 测试成功:完整流程执行完毕") print(" 1. 回调接口调用成功") print(" 2. use_model_status=1") print(" 3. 从PLC读取当前值成功") print(" 4. PLC指令发送成功") else: print("✗ 测试失败:PLC指令发送失败") else: print("✗ 错误:无法读取当前产水时间") print("测试失败:无法获取PLC当前值") else: print(f"✗ 错误:未找到设备 {device_name} 的配置") elif use_model_status == 0: print("⚠ 模型开关已关闭,跳过PLC指令下发") print(" 这是正常行为(use_model_status=0)") else: print("✗ 测试失败:未能获取有效的 use_model_status") except Exception as e: print(f"✗ 测试异常: {e}") import traceback traceback.print_exc() print() print("=" * 60) print() def main(): """ 主测试函数 """ print("\n") print("*" * 60) print("send_decision_to_callback + PLC指令 测试") print("*" * 60) print() # 运行各个测试 # 默认运行基础回调测试(包含PLC指令测试) # test_basic_callback() # 取消注释下面的行来运行其他测试 # test_minimal_callback() # test_multiple_devices() # test_custom_scenario() # 完整流程测试(推荐使用) test_plc_update_with_callback() print("\n") print("*" * 60) print("测试完成") print("*" * 60) print() print("提示:") print(" - 如果 use_model_status=1,会自动发送PLC指令") print(" - 测试会先从PLC读取当前产水时间,然后修改为当前值+1") print(" - 这样确保old_value与PLC实际值匹配,避免'设置scada变量失败'错误") print(" - 可以运行 test_plc_update_with_callback() 查看完整流程") print() if __name__ == "__main__": main()