瀏覽代碼

1:修正文件

wmy 5 月之前
父節點
當前提交
23ebb466f7
共有 23 個文件被更改,包括 311 次插入2831 次删除
  1. 311 39
      README.md
  2. 0 188
      models/uf-rl/ultrafiltration_model/DEPLOYMENT_PLAN.md
  3. 0 264
      models/uf-rl/ultrafiltration_model/DQN_decide.py
  4. 0 566
      models/uf-rl/ultrafiltration_model/DQN_env.py
  5. 0 200
      models/uf-rl/ultrafiltration_model/config.json
  6. 0 23
      models/uf-rl/ultrafiltration_model/device_states.json
  7. 二進制
      models/uf-rl/ultrafiltration_model/dqn_model.zip
  8. 0 1
      models/uf-rl/ultrafiltration_model/dqn_model/_stable_baselines3_version
  9. 0 46
      models/uf-rl/ultrafiltration_model/dqn_model/data
  10. 二進制
      models/uf-rl/ultrafiltration_model/dqn_model/policy.optimizer.pth
  11. 二進制
      models/uf-rl/ultrafiltration_model/dqn_model/policy.pth
  12. 二進制
      models/uf-rl/ultrafiltration_model/dqn_model/pytorch_variables.pth
  13. 0 9
      models/uf-rl/ultrafiltration_model/dqn_model/system_info.txt
  14. 0 771
      models/uf-rl/ultrafiltration_model/loop_main.py
  15. 0 0
      models/uf-rl/ultrafiltration_model/monitor_service.log
  16. 0 120
      models/uf-rl/ultrafiltration_model/plc_test_dry_run.py
  17. 0 18
      models/uf-rl/ultrafiltration_model/requirements.txt
  18. 0 73
      models/uf-rl/ultrafiltration_model/save_uf_models.py
  19. 0 393
      models/uf-rl/ultrafiltration_model/test_callback.py
  20. 0 120
      models/uf-rl/ultrafiltration_model/test_plc_update.py
  21. 二進制
      models/uf-rl/ultrafiltration_model/uf_bw.pth
  22. 二進制
      models/uf-rl/ultrafiltration_model/uf_dqn_tensorboard/DQN_lr0.0001_buf2000_bs16_gamma0.95_exp0.6_default_20251017-114220/DQN_1/events.out.tfevents.1760672541.MacBook-Pro-2.local.85900.0
  23. 二進制
      models/uf-rl/ultrafiltration_model/uf_fp.pth

+ 311 - 39
README.md

@@ -2,85 +2,357 @@
 
 ## 项目简介
 
-DualFlow 是一个简洁的多模型协作平台,支持多个独立的机器学习模型,便于团队协作开发。
+DualFlow 是一个专业的多模型协作平台,专注于工业场景下的智能模型集成与部署。平台集成了异常检测、因果推理、压力预测和超滤强化学习等多个核心机器学习模型,为工业生产过程提供智能化解决方案。
+
+## 核心特性
+
+- 🤖 **多模型集成**: 支持异常检测、因果推理、压力预测、强化学习等多种模型类型
+- 🏭 **工业场景优化**: 针对工业生产过程的实际需求进行深度优化
+- 🔧 **模块化设计**: 各模型独立开发部署,便于维护和扩展
+- 📊 **统一API接口**: 提供标准化的模型服务接口
+- 🚀 **CI/CD支持**: 完整的模型构建、部署和监控流程
 
 ## 项目结构
 
 ```
 DualFlow/
-├── models/                 # 各模型独立
-│   
-├── common/                # 公共逻辑 (共享函数)
-│   ├── data/              # 数据处理
-│   ├── preprocessing/    # 数据预处理
-│   ├── metrics/          # 评估指标
-│   └── utils/            # 工具函数
-├── ci/                   # CI/CD 流程模板
-│   ├── model_build.yml   # 模型构建流程
-│   ├── model_deploy.yml  # 模型部署流程
-│   └── scripts/          # 部署脚本
-├── configs/              # 全局配置
-│   ├── registry.yaml     # 模型注册表
-│   └── model_matrix.yaml # 模型负责人矩阵
-├── tests/                # 项目测试
-├── scripts/              # 项目脚本
-│   └── run_all_models.py # 运行所有模型
-└── README.md
+├── models/                        # 机器学习模型
+│   ├── anomaly_detection/         # 异常检测模型
+│   │   ├── detection.py          # 检测算法实现
+│   │   └── *.pkl                 # 预训练模型文件
+│   ├── causal-inference/         # 因果推理模型
+│   │   ├── gat.py                # 图注意力网络
+│   │   ├── rl_optimizer.py       # 强化学习优化器
+│   │   └── *.pth                 # 训练好的模型权重
+│   ├── pressure-predictor/       # 压力预测模型
+│   │   ├── gat-lstm_model/       # GAT-LSTM混合模型
+│   │   ├── 20分钟TMP预测模型源码/ # 20分钟短期预测
+│   │   ├── 90天TMP预测模型源码/   # 90天长期预测
+│   │   └── 90天TMP预测模型源码修改/ # 改进版预测模型
+│   └── uf-rl/                    # 超滤强化学习模型
+│       ├── Ultrafiltration_model/ # 超滤生产模型
+│       ├── dqn_model/            # 深度Q网络模型
+│       └── 超滤训练源码/           # 训练源码和文档
+├── common/                        # 公共模块
+│   ├── data/                     # 数据处理模块
+│   │   └── loader.py             # 数据加载器
+│   ├── preprocessing/            # 数据预处理
+│   │   ├── image.py              # 图像预处理
+│   │   └── text.py               # 文本预处理
+│   ├── metrics/                  # 评估指标
+│   │   ├── classification.py     # 分类指标
+│   │   └── regression.py         # 回归指标
+│   └── utils/                    # 工具函数
+│       ├── config.py             # 配置管理
+│       └── logger.py             # 日志工具
+├── api/                          # API接口
+│   └── routers/                  # 路由模块
+│       ├── health.py             # 健康检查
+│       ├── models.py             # 模型管理
+│       └── users.py              # 用户管理
+├── ci/                           # CI/CD配置
+│   ├── model_build.yml           # 模型构建流程
+│   ├── model_deploy.yml          # 模型部署流程
+│   └── scripts/                  # 部署脚本
+│       └── deploy_model.sh       # 模型部署脚本
+├── requirements.txt               # 项目依赖
+├── requirements-dev.txt          # 开发依赖
+├── env.example                   # 环境变量示例
+└── README.md                     # 项目说明
 ```
 
+## 模型说明
+
+### 1. 异常检测模型 (Anomaly Detection)
+- **功能**: 基于孤立森林和三西格玛方法的异常检测
+- **算法**: Isolation Forest, Three Sigma
+- **应用场景**: 工业生产过程中的异常监控
+
+### 2. 因果推理模型 (Causal Inference)
+- **功能**: 基于图神经网络的因果推断和强化学习优化
+- **算法**: Graph Attention Network (GAT), Reinforcement Learning
+- **应用场景**: 生产参数优化和决策支持
+
+### 3. 压力预测模型 (Pressure Predictor)
+- **功能**: 多时间尺度的跨膜压力(TMP)预测
+- **算法**: GAT-LSTM混合神经网络
+- **预测周期**: 20分钟短期预测、90天长期预测
+- **应用场景**: 超滤系统的压力预测和维护预警
+
+### 4. 超滤强化学习模型 (UF-RL)
+- **功能**: 基于深度Q网络的超滤生产优化
+- **算法**: Deep Q-Network (DQN)
+- **应用场景**: 超滤生产过程的智能化控制和优化
+
 ## 快速开始
 
 ### 环境要求
 
 - Python 3.9+
+- CUDA 11.0+ (GPU训练)
 - Git
+- Docker (可选,用于容器化部署)
 
-### 安装和运行
+### 安装和配置
 
-1. 克隆项目
+1. **克隆项目**
 ```bash
 git clone <repository-url>
 cd DualFlow
 ```
 
-2. 安装依赖
+2. **创建虚拟环境**
+```bash
+python -m venv venv
+source venv/bin/activate  # Linux/Mac
+# 或
+venv\Scripts\activate     # Windows
+```
+
+3. **安装依赖**
 ```bash
 pip install -r requirements.txt
+pip install -r requirements-dev.txt  # 开发环境
+```
+
+4. **环境配置**
+```bash
+cp env.example .env
+# 编辑 .env 文件,配置相关环境变量
+```
+
+### 运行模型
+
+#### 异常检测模型
+```bash
+cd models/anomaly_detection
+python detection.py
+```
+
+#### 因果推理模型
+```bash
+cd models/causal-inference
+python main.py
+```
+
+#### 压力预测模型
+```bash
+# 20分钟预测
+cd models/pressure-predictor/20分钟TMP预测模型源码
+python main.py
+
+# 90天预测
+cd models/pressure-predictor/90天TMP预测模型源码
+python main.py
+
+# GAT-LSTM混合模型
+cd models/pressure-predictor/gat-lstm_model
+python api_main.py
 ```
 
-3. 运行特定模型
+#### 超滤强化学习模型
 ```bash
-cd models/nlp_bert
-python train.py
+# 训练
+cd models/uf-rl/超滤训练源码
+python DQN_train.py
+
+# 推理
+cd models/uf-rl/Ultrafiltration_model
+python loop_main.py
 ```
 
-4. 运行所有模型
+### API服务启动
 ```bash
-python scripts/run_all_models.py
+# 启动FastAPI服务
+uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload
 ```
 
 ## 开发指南
 
+### 代码结构规范
+
+每个模型目录应包含以下标准结构:
+```
+model_name/
+├── README.md           # 模型说明文档
+├── main.py            # 主程序入口
+├── config.py          # 配置文件
+├── requirements.txt   # 模型特定依赖
+├── data/              # 数据目录
+├── models/            # 模型文件
+└── tests/             # 测试文件
+```
+
 ### 添加新模型
 
-1. 在 `models/` 目录下创建新的模型目录
-2. 按照现有模型结构创建必要的文件
-3. 在 `configs/registry.yaml` 中注册新模型
-4. 添加相应的测试
+1. **创建模型目录**
+```bash
+mkdir models/new_model
+cd models/new_model
+```
+
+2. **创建标准文件**
+```bash
+touch README.md main.py config.py requirements.txt
+mkdir -p data models tests
+```
+
+3. **实现模型代码**
+   - 在`main.py`中实现模型逻辑
+   - 在`config.py`中定义配置参数
+   - 在`README.md`中编写详细文档
+
+4. **添加测试**
+```bash
+# 在tests/目录下创建测试文件
+pytest tests/
+```
 
 ### 代码规范
 
-- 使用 Black 进行代码格式化
-- 使用 isort 进行导入排序
-- 使用 pytest 进行测试
+- **代码格式化**: 使用 Black 进行代码格式化
+```bash
+black .
+```
+
+- **导入排序**: 使用 isort 进行导入排序
+```bash
+isort .
+```
+
+- **类型检查**: 使用 mypy 进行类型检查
+```bash
+mypy .
+```
+
+- **代码质量**: 使用 flake8 进行代码检查
+```bash
+flake8 .
+```
+
+- **提交前检查**: 使用 pre-commit hooks
+```bash
+pre-commit install
+```
+
+### 测试
+
+```bash
+# 运行所有测试
+pytest
+
+# 运行特定模型测试
+pytest models/anomaly_detection/tests/
+
+# 生成测试覆盖率报告
+pytest --cov=models --cov-report=html
+```
+
+## 部署指南
+
+### Docker部署
+
+1. **构建镜像**
+```bash
+docker build -t dualflow:latest .
+```
+
+2. **运行容器**
+```bash
+docker run -d -p 8000:8000 --name dualflow-app dualflow:latest
+```
+
+### 生产环境部署
+
+1. **使用CI/CD流程**
+```bash
+# 构建模型
+./ci/scripts/model_build.yml
+
+# 部署模型
+./ci/scripts/model_deploy.yml
+```
+
+2. **监控和日志**
+- 使用 Prometheus 进行指标监控
+- 使用结构化日志记录运行状态
+
+## API文档
+
+启动服务后,访问以下地址查看API文档:
+- Swagger UI: http://localhost:8000/docs
+- ReDoc: http://localhost:8000/redoc
+
+### 主要API端点
+
+- `GET /health` - 健康检查
+- `GET /models` - 获取模型列表
+- `POST /models/{model_name}/predict` - 模型预测
+- `GET /models/{model_name}/status` - 模型状态
+
+## 性能优化
+
+### 模型优化
+- 使用 TensorRT 进行GPU加速
+- 实现模型量化和剪枝
+- 批处理优化
+
+### 系统优化
+- Redis缓存热点数据
+- 异步处理提高并发
+- 负载均衡和水平扩展
 
 ## 贡献指南
 
-1. Fork 项目
-2. 创建功能分支
-3. 提交更改
-4. 创建 Pull Request
+我们欢迎所有形式的贡献!请遵循以下步骤:
+
+1. **Fork 项目**
+2. **创建功能分支**
+```bash
+git checkout -b feature/your-feature-name
+```
+
+3. **提交更改**
+```bash
+git commit -m "Add your feature description"
+```
+
+4. **推送分支**
+```bash
+git push origin feature/your-feature-name
+```
+
+5. **创建 Pull Request**
+- 提供清晰的PR描述
+- 确保所有测试通过
+- 更新相关文档
+
+## 问题反馈
+
+如果您遇到任何问题或有改进建议,请:
+
+1. 查看现有的 [Issues](../../issues)
+2. 如果没有相关问题,请创建新的 Issue
+3. 提供详细的问题描述和复现步骤
+
+## 更新日志
+
+### v1.0.0 (2025-01-10)
+- 初始版本发布
+- 集成四个核心模型
+- 完整的API接口
+- CI/CD流程支持
 
 ## 许可证
 
-MIT License
+本项目采用 MIT 许可证。详情请查看 [LICENSE](LICENSE) 文件。
+
+## 联系方式
+
+- 项目维护者: [您的姓名]
+- 邮箱: [您的邮箱]
+- 项目主页: [项目链接]
+
+---
+
+**注意**: 本项目仍在持续开发中,部分功能可能存在不稳定情况。建议在生产环境使用前进行充分测试。

+ 0 - 188
models/uf-rl/ultrafiltration_model/DEPLOYMENT_PLAN.md

@@ -1,188 +0,0 @@
-# 超滤AI控制系统部署方案
-
-## 一、测试流程
-### 控制两台设备(1-2周)
-**目标**:控制设备,看效果
-
-**操作**:
-0. 启动程序:`nohup python loop_main.py > /dev/null 2>&1 &`
-1. **选两台设备**:比如 UF1 和 UF2(其他设备保持关闭)
-2. **前端开启** UF1 和 UF2 的模型开关
-3. **每天看2次**:早上一次,下午一次
-4. **重点关注**:
-   - 设备有没有报警
-   - TMP趋势是不是稳定(看 device_states.json 里保存的历史值)
-   - PLC参数有没有乱跳
-
-**判断标准**:
-- 决策的产水时间在 3600-6000 秒之间
-- 决策的反洗时间在 40-60 秒之间
-- 看着无明显异常
-
-**出现问题立即停止**:
-- 设备报警了 → 前端立即关闭
-- TMP一直涨或者一直降 → 关闭
-- 感觉不对劲 → 关闭
-
-**没问题就继续跑**:
-- 跑个1-2周
-- 确认稳定后再放开观察频率
-
----
-
-## 二、日常怎么看
-
-### 看日志
-```bash
-# 看最近的运行情况
-tail -n 50 monitor_service.log
-
-# 看有没有错误
-tail -n 200 monitor_service.log | grep ERROR
-
-# 实时跟踪
-tail -f monitor_service.log
-```
-
-### 看设备状态
-```bash
-# 查看最近5次TMP记录,判断趋势
-cat device_states.json
-
-# 看起来格式化点
-cat device_states.json | python -m json.tool
-```
-
-**TMP趋势判断**:
-- 一直涨:可能膜污染严重,要注意
-- 一直降:可能反洗太频繁
-- 波动正常:没啥问题
-
-### 看设备本身
-- SCADA系统看设备状态
-- 现场看有没有报警
-- 产水量、能耗这些数据对比下
-
----
-
-## 三、出问题怎么办
-
-### 设备报警
-1. 前端立即关闭模型
-2. 搞清楚是不是模型导致的
-
-### 程序崩溃
-1. 看日志最后几行,看报什么错
-2. 修复后重启
-
-### TMP一直涨
-1. 前端关闭模型
-2. 观察是不是水质问题
-3. 可能需要手动CEB
-
-### 决策乱跳
-1. 检查输入数据是否正常
-2. 可能是数据采集有问题
-3. 先关闭,排查原因
-
----
-
-## 四、关键点说明
-
-### 1. 跨膜压差异常判断
-**系统里能判断**:
-- 通过保存的历史TMP值判断趋势
-- 如果最近5次TMP持续上升 → 可能有问题
-- 如果TMP突然跳变很大 → 异常
-
-**需要SCADA系统配合**:
-- 绝对阈值报警(>0.08 bar)需要SCADA系统设置
-- 具体数值监控依赖SCADA
-
-**建议**:
-- 程序里记录TMP趋势
-- SCADA设置报警阈值
-- 两边配合使用
-
-### 2. 产水量判断
-**系统里不能直接判断**:
-- 需要流量计数据
-- 需要SCADA系统提供
-
-**替代方案**:
-- 人工定期查看SCADA数据
-- 对比历史同期数据
-
----
-
-## 五、配置说明(新增)
-
-### device_states.json 存储内容
-```json
-{
-  "UF1": {
-    "model_prev_L_s": 3860,           // 上次决策的产水时间
-    "model_prev_t_bw_s": 42,          // 上次决策的反洗时间
-    "last_cycle_end_time": "2025-10-27 14:30:00",  // 上次决策时间
-    "recent_tmp_values": [            // 最近N次TMP平均值(新增)
-      0.0312,
-      0.0318,
-      0.0325,
-      0.0331,
-      0.0328
-    ]
-  }
-}
-```
-
-### 配置参数(config.json)
-```json
-{
-  "system": {
-    "tmp_history_count": 5,  // 保存最近N次TMP值,可调整
-    ...
-  }
-}
-```
-
-### TMP趋势判断逻辑
-- 保存最近5次(可配置)TMP平均值
-- 每次决策后更新
-- 可以通过这个列表判断:
-  - 持续上升:可能需要关注
-  - 突然跳变:可能数据异常
-  - 波动正常:运行正常
-
-
-
----
-
-## 六、成功标准
-
-**看这几点**:
-1. 程序能稳定跑2周
-2. 设备没有因为模型控制出问题
-3. 前端开关能正常控制
-4. TMP趋势没有明显异常
-
----
-
-## 七、应急方案
-
-### 前端关闭
-- 在手机端关闭系统自控按钮
-- 程序继续跑,但不下发指令
-
-### 备选:停程序
-```bash
-ps aux | grep loop_main.py
-kill <PID>
-```
-
-### 最后:PLC手动操作
-- 切换到手动模式
-
----
-
-**版本**:v1.0  
-**日期**:2025-10-27

+ 0 - 264
models/uf-rl/ultrafiltration_model/DQN_decide.py

@@ -1,264 +0,0 @@
-import numpy as np
-from stable_baselines3 import DQN
-from DQN_env import UFSuperCycleEnv
-from DQN_env import UFParams
-
-# 模型路径
-MODEL_PATH = "dqn_model.zip"
-
-# 创建环境实例以获取观察空间和动作空间
-def _get_model_spaces():
-    """获取模型的观察空间和动作空间"""
-    env = UFSuperCycleEnv(UFParams())
-    obs_space = env.observation_space
-    action_space = env.action_space
-    env.close()
-    return obs_space, action_space
-
-# 加载模型(只加载一次,提高效率)
-try:
-    # 尝试直接加载
-    model = DQN.load(MODEL_PATH)
-except KeyError:
-    # 如果失败,则提供观察空间和动作空间
-    obs_space, action_space = _get_model_spaces()
-    model = DQN.load(MODEL_PATH, custom_objects={
-        'observation_space': obs_space,
-        'action_space': action_space
-    })
-
-def run_uf_DQN_decide(uf_params, TMP0_value: float):
-    """
-    单步决策函数:输入原始 TMP0,预测并执行动作
-
-    参数:
-        TMP0_value (float): 当前 TMP0 值(单位与环境一致)
-
-    返回:
-        dict: 包含模型选择的动作、动作参数、新状态、奖励等
-    """
-    # 1. 实例化环境
-    base_params = uf_params
-    env = UFSuperCycleEnv(base_params)
-
-    # 2. 将输入的 TMP0 写入环境
-    env.current_params.TMP0 = TMP0_value
-
-    # 3. 获取归一化状态
-    obs = env._get_obs().reshape(1, -1)
-
-    # 4. 模型预测动作
-    action, _ = model.predict(obs, deterministic=True)
-
-    # 5. 解析动作对应的 L_s 和 t_bw_s
-    L_s, t_bw_s = env._get_action_values(action[0])
-
-    # 6. 在环境中执行该动作
-    next_obs, reward, terminated, truncated, info = env.step(action[0])
-
-    # 7. 整理结果
-    result = {
-        "action": int(action[0]),
-        "L_s": float(L_s),
-        "t_bw_s": float(t_bw_s),
-        "next_obs": next_obs,
-        "reward": reward,
-        "terminated": terminated,
-        "truncated": truncated,
-        "info": info
-    }
-
-    # 8. 关闭环境
-    env.close()
-
-    return result
-
-def generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model_prev_t_bw_s, model_L_s, model_t_bw_s):
-    """
-    根据工厂当前值、模型上一轮决策值和模型当前轮决策值,生成PLC指令。
-
-    新增功能:
-    1. 处理None值情况:如果模型上一轮值为None,则使用工厂当前值;
-       如果工厂当前值也为None,则返回None并提示错误。
-    """
-    # 参数配置保持不变
-    params = UFParams(
-        L_min_s=3600.0, L_max_s=6000.0, L_step_s=60.0,
-        t_bw_min_s=40.0, t_bw_max_s=60.0, t_bw_step_s=2.0,
-    )
-
-    # 参数解包
-    L_step_s = params.L_step_s
-    t_bw_step_s = params.t_bw_step_s
-    L_min_s = params.L_min_s
-    L_max_s = params.L_max_s
-    t_bw_min_s = params.t_bw_min_s
-    t_bw_max_s = params.t_bw_max_s
-    adjustment_threshold = 1.0
-
-    # 处理None值情况
-    if model_prev_L_s is None:
-        if current_L_s is None:
-            print("错误: 过滤时长的工厂当前值和模型上一轮值均为None")
-            return None, None
-        else:
-            # 使用工厂当前值作为基准
-            effective_current_L = current_L_s
-            source_L = "工厂当前值(模型上一轮值为None)"
-    else:
-        # 模型上一轮值不为None,继续检查工厂当前值
-        if current_L_s is None:
-            effective_current_L = model_prev_L_s
-            source_L = "模型上一轮值(工厂当前值为None)"
-        else:
-            effective_current_L = model_prev_L_s
-            source_L = "模型上一轮值"
-
-    # 对反洗时长进行同样的处理
-    if model_prev_t_bw_s is None:
-        if current_t_bw_s is None:
-            print("错误: 反洗时长的工厂当前值和模型上一轮值均为None")
-            return None, None
-        else:
-            effective_current_t_bw = current_t_bw_s
-            source_t_bw = "工厂当前值(模型上一轮值为None)"
-    else:
-        if current_t_bw_s is None:
-            effective_current_t_bw = model_prev_t_bw_s
-            source_t_bw = "模型上一轮值(工厂当前值为None)"
-        else:
-            effective_current_t_bw = model_prev_t_bw_s
-            source_t_bw = "模型上一轮值"
-
-    # 检测所有输入值是否在规定范围内(只对非None值进行检查)
-    # 工厂当前值检查(警告)
-    if current_L_s is not None and not (L_min_s <= current_L_s <= L_max_s):
-        print(f"警告: 当前过滤时长 {current_L_s} 秒不在允许范围内 [{L_min_s}, {L_max_s}]")
-    if current_t_bw_s is not None and not (t_bw_min_s <= current_t_bw_s <= t_bw_max_s):
-        print(f"警告: 当前反洗时长 {current_t_bw_s} 秒不在允许范围内 [{t_bw_min_s}, {t_bw_max_s}]")
-
-    # 模型上一轮决策值检查(警告)
-    if model_prev_L_s is not None and not (L_min_s <= model_prev_L_s <= L_max_s):
-        print(f"警告: 模型上一轮过滤时长 {model_prev_L_s} 秒不在允许范围内 [{L_min_s}, {L_max_s}]")
-    if model_prev_t_bw_s is not None and not (t_bw_min_s <= model_prev_t_bw_s <= t_bw_max_s):
-        print(f"警告: 模型上一轮反洗时长 {model_prev_t_bw_s} 秒不在允许范围内 [{t_bw_min_s}, {t_bw_max_s}]")
-
-    # 模型当前轮决策值检查(错误)
-    if model_L_s is None:
-        raise ValueError("错误: 决策模型建议的过滤时长不能为None")
-    elif not (L_min_s <= model_L_s <= L_max_s):
-        raise ValueError(f"错误: 决策模型建议的过滤时长 {model_L_s} 秒不在允许范围内 [{L_min_s}, {L_max_s}]")
-
-    if model_t_bw_s is None:
-        raise ValueError("错误: 决策模型建议的反洗时长不能为None")
-    elif not (t_bw_min_s <= model_t_bw_s <= t_bw_max_s):
-        raise ValueError(f"错误: 决策模型建议的反洗时长 {model_t_bw_s} 秒不在允许范围内 [{t_bw_min_s}, {t_bw_max_s}]")
-
-    print(f"过滤时长基准: {source_L}, 值: {effective_current_L}")
-    print(f"反洗时长基准: {source_t_bw}, 值: {effective_current_t_bw}")
-
-    # 使用选定的基准值进行计算调整
-    L_diff = model_L_s - effective_current_L
-    L_adjustment = 0
-    if abs(L_diff) >= adjustment_threshold * L_step_s:
-        if L_diff >= 0:
-            L_adjustment = L_step_s
-        else:
-            L_adjustment = -L_step_s
-    next_L_s = effective_current_L + L_adjustment
-
-    t_bw_diff = model_t_bw_s - effective_current_t_bw
-    t_bw_adjustment = 0
-    if abs(t_bw_diff) >= adjustment_threshold * t_bw_step_s:
-        if t_bw_diff >= 0:
-            t_bw_adjustment = t_bw_step_s
-        else:
-            t_bw_adjustment = -t_bw_step_s
-    next_t_bw_s = effective_current_t_bw + t_bw_adjustment
-
-    return next_L_s, next_t_bw_s
-
-
-from DQN_env import simulate_one_supercycle
-def calc_uf_cycle_metrics(p, TMP0, max_tmp_during_filtration, min_tmp_during_filtration, L_s: float, t_bw_s: float):
-    """
-    计算 UF 超滤系统的核心性能指标
-
-    参数:
-        p (UFParams): UF 系统参数
-        L_s (float): 单次过滤时间(秒)
-        t_bw_s (float): 单次反洗时间(秒)
-
-    返回:
-        dict: {
-            "k_bw_per_ceb": 小周期次数,
-            "ton_water_energy_kWh_per_m3": 吨水电耗,
-            "recovery": 回收率,
-            "net_delivery_rate_m3ph": 净供水率 (m³/h),
-            "daily_prod_time_h": 日均产水时间 (小时/天)
-            "max_permeability": 全周期最高渗透率(lmh/bar)
-        }
-    """
-    # 将跨膜压差写入参数
-    p.TMP0 = TMP0
-
-    # 模拟该参数下的超级周期
-    feasible, info = simulate_one_supercycle(p, L_s, t_bw_s)
-
-    # 获得模型模拟周期信息
-    k_bw_per_ceb = info["k_bw_per_ceb"]
-    ton_water_energy_kWh_per_m3 = info["ton_water_energy_kWh_per_m3"]
-    recovery = info["recovery"]
-    net_delivery_rate_m3ph = info["net_delivery_rate_m3ph"]
-    daily_prod_time_h = info["daily_prod_time_h"]
-
-    # 获得模型模拟周期内最高跨膜压差/最低跨膜压差
-    if max_tmp_during_filtration is None:
-        max_tmp_during_filtration = info["max_TMP_during_filtration"]
-    if min_tmp_during_filtration is None:
-        min_tmp_during_filtration = info["min_TMP_during_filtration"]
-
-    # 计算最高渗透率
-    max_permeability = 100 * p.q_UF / (128*40) / min_tmp_during_filtration
-
-
-    return {
-        "k_bw_per_ceb": k_bw_per_ceb,
-        "ton_water_energy_kWh_per_m3": ton_water_energy_kWh_per_m3,
-        "recovery": recovery,
-        "net_delivery_rate_m3ph": net_delivery_rate_m3ph,
-        "daily_prod_time_h": daily_prod_time_h,
-        "max_permeability": max_permeability
-    }
-
-
-# ==============================
-# 示例调用
-# ==============================
-if __name__ == "__main__":
-    uf_params = UFParams()
-    TMP0 = 0.03 # 原始 TMP0
-    model_decide_result = run_uf_DQN_decide(uf_params, TMP0) # 调用模型获得动作
-    model_L_s = model_decide_result['L_s'] # 获得模型决策产水时长
-    model_t_bw_s = model_decide_result['t_bw_s'] # 获得模型决策反洗时长
-
-    current_L_s = 3800
-    current_t_bw_s = 40
-    model_prev_L_s = 4040
-    model_prev_t_bw_s = 60
-    L_s, t_bw_s = generate_plc_instructions(current_L_s, current_t_bw_s, model_prev_L_s, model_prev_t_bw_s, model_L_s, model_t_bw_s) # 获取模型下发指令
-
-    L_s = 4100
-    t_bw_s = 96
-    max_tmp_during_filtration = 0.050176 # 新增工厂数据接口:周期最高/最低跨膜压差,无工厂数据接入时传入None,calc_uf_cycle_metrics()自动获取模拟周期中的跨膜压差最值
-    min_tmp_during_filtration = 0.012496
-    execution_result = calc_uf_cycle_metrics(uf_params, TMP0, max_tmp_during_filtration, min_tmp_during_filtration, L_s, t_bw_s)
-    print("\n===== 单步决策结果 =====")
-    print(f"模型选择的动作: {model_decide_result['action']}")
-    print(f"模型选择的L_s: {model_L_s} 秒, 模型选择的t_bw_s: {model_t_bw_s} 秒")
-    print(f"指令下发的L_s: {L_s} 秒, 指令下发的t_bw_s: {t_bw_s} 秒")
-    print(f"指令对应的反洗次数: {execution_result['k_bw_per_ceb']}")
-    print(f"指令对应的吨水电耗: {execution_result['ton_water_energy_kWh_per_m3']}")
-    print(f"指令对应的回收率: {execution_result['recovery']}")
-    print(f"指令对应的日均产水时间: {execution_result['daily_prod_time_h']}")
-    print(f"指令对应的最高渗透率: {execution_result['max_permeability']}")

+ 0 - 566
models/uf-rl/ultrafiltration_model/DQN_env.py

@@ -1,566 +0,0 @@
-import os
-import time
-import random
-import numpy as np
-import gymnasium as gym
-from gymnasium import spaces
-from stable_baselines3 import DQN
-from stable_baselines3.common.monitor import Monitor
-from stable_baselines3.common.vec_env import DummyVecEnv
-from stable_baselines3.common.callbacks import BaseCallback
-from typing import Dict, Tuple, Optional
-import torch
-import torch.nn as nn
-from dataclasses import dataclass, asdict
-from save_uf_models import TMPIncreaseModel, TMPDecreaseModel  # 导入模型类
-import copy
-
-
-# ==== 定义膜的基础运行参数 ====
-@dataclass
-class UFParams:
-    # —— 膜与运行参数 ——
-    q_UF: float = 360.0  # 过滤进水流量(m^3/h)
-    TMP0: float = 0.03  # 初始TMP(MPa)
-    TMP_max: float = 0.06  # TMP硬上限(MPa)
-
-    # —— 膜污染动力学 ——
-    alpha: float = 1e-6  # TMP增长系数
-    belta: float = 1.1  # 幂指数
-
-    # —— 反洗参数(固定) ——
-    q_bw_m3ph: float = 1000.0  # 物理反洗流量(m^3/h)
-
-    # —— CEB参数(固定) ——
-    T_ceb_interval_h: float = 48.0  # 固定每 k 小时做一次CEB
-    v_ceb_m3: float = 30.0  # CEB用水体积(m^3)
-    t_ceb_s: float = 40 * 60.0  # CEB时长(s)
-    phi_ceb: float = 1.0  # CEB去除比例(简化:完全恢复到TMP0)
-
-    # —— 约束与收敛 ——
-    dTMP: float = 0.001  # 单次产水结束时,相对TMP0最大升幅(MPa)
-
-    # —— 搜索范围(秒) ——
-    L_min_s: float = 3800.0  # 过滤时长下限(s)
-    L_max_s: float = 6000.0  # 过滤时长上限(s)
-    t_bw_min_s: float = 40.0  # 物洗时长下限(s)
-    t_bw_max_s: float = 60.0  # 物洗时长上限(s)
-
-    # —— 物理反洗恢复函数参数 ——
-    phi_bw_min: float = 0.7  # 物洗去除比例最小值
-    phi_bw_max: float = 1.0  # 物洗去除比例最大值
-    L_ref_s: float = 4000.0  # 过滤时长影响时间尺度
-    tau_bw_s: float = 20.0  # 物洗时长影响时间尺度
-    gamma_t: float = 1.0  # 物洗时长作用指数
-
-    # —— 网格 ——
-    L_step_s: float = 60.0  # 过滤时长步长(s)
-    t_bw_step_s: float = 5.0  # 物洗时长步长(s)
-
-    # 多目标加权及高TMP惩罚
-    w_rec: float = 0.8  # 回收率权重
-    w_rate: float = 0.2  # 净供水率权重
-    w_headroom: float = 0.2  # 贴边惩罚权重
-    r_headroom: float = 2.0  # 贴边惩罚幂次
-    headroom_hardcap: float = 0.98  # 超过此比例直接视为不可取
-
-# ==== 定义强化学习超参数 ====
-@dataclass
-class DQNParams:
-    """
-    DQN 超参数定义类
-    用于统一管理模型训练参数
-    """
-    # 学习率,控制神经网络更新步长
-    learning_rate: float = 1e-4
-
-    # 经验回放缓冲区大小(步数)
-    buffer_size: int = 10000
-
-    # 学习开始前需要收集的步数
-    learning_starts: int = 200
-
-    # 每次从经验池中采样的样本数量
-    batch_size: int = 32
-
-    # 折扣因子,越接近1越重视长期奖励
-    gamma: float = 0.95
-
-    # 每隔多少步训练一次
-    train_freq: int = 4
-
-    # 目标网络更新间隔
-    target_update_interval: int = 2000
-
-    # 初始探索率 ε
-    exploration_initial_eps: float = 1.0
-
-    # 从初始ε衰减到最终ε所占的训练比例
-    exploration_fraction: float = 0.3
-
-    # 最终探索率 ε
-    exploration_final_eps: float = 0.02
-
-    # 日志备注(用于区分不同实验)
-    remark: str = "default"
-
-# ==== 加载模拟环境模型 ====
-# 初始化模型
-model_fp = TMPIncreaseModel()
-model_bw = TMPDecreaseModel()
-
-# 加载参数
-model_fp.load_state_dict(torch.load("uf_fp.pth"))
-model_bw.load_state_dict(torch.load("uf_bw.pth"))
-
-# 切换到推理模式
-model_fp.eval()
-model_bw.eval()
-
-
-def _delta_tmp(p, L_h: float) -> float:
-    """
-    过滤时段TMP上升量:调用 uf_fp.pth 模型
-    """
-    return model_fp(p, L_h)
-
-def phi_bw_of(p, L_s: float, t_bw_s: float) -> float:
-    """
-    物洗去除比例:调用 uf_bw.pth 模型
-    """
-    return model_bw(p, L_s, t_bw_s)
-
-def _tmp_after_ceb(p, L_s: float, t_bw_s: float) -> float:
-    """
-    计算化学清洗(CEB)后的TMP,当前为恢复初始跨膜压差
-    """
-    return p.TMP0
-
-def _v_bw_m3(p, t_bw_s: float) -> float:
-    """
-    物理反洗水耗
-    """
-    return float(p.q_bw_m3ph * (float(t_bw_s) / 3600.0))
-
-def simulate_one_supercycle(p: UFParams, L_s: float, t_bw_s: float):
-    """
-    返回 (是否可行, 指标字典)
-    - 支持动态CEB次数:48h固定间隔
-    - 增加日均产水时间和吨水电耗
-    - 增加最小TMP记录
-    """
-    L_h = float(L_s) / 3600.0  # 小周期过滤时间(h)
-
-    tmp = p.TMP0
-    max_tmp_during_filtration = tmp
-    min_tmp_during_filtration = tmp  # 新增:初始化最小TMP
-    max_residual_increase = 0.0
-
-    # 小周期总时长(h)
-    t_small_cycle_h = (L_s + t_bw_s) / 3600.0
-
-    # 计算超级周期内CEB次数
-    k_bw_per_ceb = int(np.floor(p.T_ceb_interval_h / t_small_cycle_h))
-    if k_bw_per_ceb < 1:
-        k_bw_per_ceb = 1  # 至少一个小周期
-
-    # ton水电耗查表
-    energy_lookup = {
-        3600: 0.1034, 3660: 0.1031, 3720: 0.1029, 3780: 0.1026,
-        3840: 0.1023, 3900: 0.1021, 3960: 0.1019, 4020: 0.1017,
-        4080: 0.1015, 4140: 0.1012, 4200: 0.1011
-    }
-
-    for _ in range(k_bw_per_ceb):
-        tmp_run_start = tmp
-
-        # 过滤阶段TMP增长
-        dtmp = _delta_tmp(p, L_h)
-        tmp_peak = tmp_run_start + dtmp
-
-        # 约束1:峰值不得超过硬上限
-        if tmp_peak > p.TMP_max + 1e-12:
-            return False, {"reason": "TMP_max violated during filtration", "TMP_peak": tmp_peak}
-
-        # 更新最大和最小TMP
-        if tmp_peak > max_tmp_during_filtration:
-            max_tmp_during_filtration = tmp_peak
-        if tmp_run_start < min_tmp_during_filtration:  # 新增:记录运行开始时的最小TMP
-            min_tmp_during_filtration = tmp_run_start
-
-        # 物理反洗
-        phi = phi_bw_of(p, L_s, t_bw_s)
-        tmp_after_bw = tmp_peak - phi * (tmp_peak - tmp_run_start)
-
-        # 约束2:单次残余增量控制
-        residual_inc = tmp_after_bw - tmp_run_start
-        if residual_inc > p.dTMP + 1e-12:
-            return False, {
-                "reason": "residual TMP increase after BW exceeded dTMP",
-                "residual_increase": residual_inc,
-                "limit_dTMP": p.dTMP
-            }
-        if residual_inc > max_residual_increase:
-            max_residual_increase = residual_inc
-
-        tmp = tmp_after_bw
-
-    # CEB
-    tmp_after_ceb = p.TMP0
-
-    # 体积与回收率
-    V_feed_super = k_bw_per_ceb * p.q_UF * L_h
-    V_loss_super = k_bw_per_ceb * _v_bw_m3(p, t_bw_s) + p.v_ceb_m3
-    V_net = max(0.0, V_feed_super - V_loss_super)
-    recovery = max(0.0, V_net / max(V_feed_super, 1e-12))
-
-    # 时间与净供水率
-    T_super_h = k_bw_per_ceb * (L_s + t_bw_s) / 3600.0 + p.t_ceb_s / 3600.0
-    net_delivery_rate_m3ph = V_net / max(T_super_h, 1e-12)
-
-    # 贴边比例与硬限
-    headroom_ratio = max_tmp_during_filtration / max(p.TMP_max, 1e-12)
-    if headroom_ratio > p.headroom_hardcap + 1e-12:
-        return False, {"reason": "headroom hardcap exceeded", "headroom_ratio": headroom_ratio}
-
-    # —— 新增指标 1:日均产水时间(h/d) ——
-    daily_prod_time_h = k_bw_per_ceb * L_h / T_super_h * 24.0
-
-    # —— 新增指标 2:吨水电耗(kWh/m³) ——
-    closest_L = min(energy_lookup.keys(), key=lambda x: abs(x - L_s))
-    ton_water_energy = energy_lookup[closest_L]
-
-    info = {
-        "recovery": recovery,
-        "V_feed_super_m3": V_feed_super,
-        "V_loss_super_m3": V_loss_super,
-        "V_net_super_m3": V_net,
-        "supercycle_time_h": T_super_h,
-        "net_delivery_rate_m3ph": net_delivery_rate_m3ph,
-        "max_TMP_during_filtration": max_tmp_during_filtration,
-        "min_TMP_during_filtration": min_tmp_during_filtration,  # 新增:最小TMP
-        "max_residual_increase_per_run": max_residual_increase,
-        "phi_bw_effective": phi,
-        "TMP_after_ceb": tmp_after_ceb,
-        "headroom_ratio": headroom_ratio,
-        "daily_prod_time_h": daily_prod_time_h,
-        "ton_water_energy_kWh_per_m3": ton_water_energy,
-        "k_bw_per_ceb": k_bw_per_ceb
-    }
-
-    return True, info
-
-def _score(p: UFParams, rec: dict) -> float:
-    """综合评分:越大越好。通过非线性放大奖励差异,强化区分好坏动作"""
-
-    # —— 无量纲化净供水率 ——
-    rate_norm = rec["net_delivery_rate_m3ph"] / max(p.q_UF, 1e-12)
-
-    # —— TMP soft penalty (sigmoid) ——
-    tmp_ratio = rec["max_TMP_during_filtration"] / max(p.TMP_max, 1e-12)
-    k = 10.0
-    headroom_penalty = 1.0 / (1.0 + np.exp(-k * (tmp_ratio - 1.0)))
-
-    # —— 基础 reward(0.6~0.9左右)——
-    base_reward = (
-        p.w_rec * rec["recovery"]
-        + p.w_rate * rate_norm
-        - p.w_headroom * headroom_penalty
-    )
-
-    # —— 非线性放大:平方映射 + 缩放 ——
-    # 目的是放大好坏动作差异,同时限制最大值,避免 TD-error 过大
-    amplified_reward = (base_reward - 0.5) ** 2 * 5.0
-
-    # —— 可选:保留符号,区分负奖励
-    if base_reward < 0.5:
-        amplified_reward = -amplified_reward
-
-    return amplified_reward
-
-
-def set_global_seed(seed: int):
-    """固定全局随机种子,保证训练可复现"""
-    random.seed(seed)
-    np.random.seed(seed)
-    torch.manual_seed(seed)
-    torch.cuda.manual_seed_all(seed)  # 如果使用GPU
-    torch.backends.cudnn.deterministic = True
-    torch.backends.cudnn.benchmark = False
-
-class UFSuperCycleEnv(gym.Env):
-    """超滤系统环境(超级周期级别决策)"""
-
-    metadata = {"render_modes": ["human"]}
-
-    def __init__(self, base_params, max_episode_steps: int = 20):
-        super(UFSuperCycleEnv, self).__init__()
-
-        self.base_params = base_params
-        self.current_params = copy.deepcopy(base_params)
-        self.max_episode_steps = max_episode_steps
-        self.current_step = 0
-
-        # 计算离散动作空间
-        self.L_values = np.arange(
-            self.base_params.L_min_s,
-            self.base_params.L_max_s + self.base_params.L_step_s,
-            self.base_params.L_step_s
-        )
-        self.t_bw_values = np.arange(
-            self.base_params.t_bw_min_s,
-            self.base_params.t_bw_max_s + self.base_params.t_bw_step_s,
-            self.base_params.t_bw_step_s
-        )
-
-        self.num_L = len(self.L_values)
-        self.num_bw = len(self.t_bw_values)
-
-        # 单一离散动作空间
-        self.action_space = spaces.Discrete(self.num_L * self.num_bw)
-
-        # 状态空间增加 TMP0, 上一次动作(L_s, t_bw_s), 本周期最高 TMP
-        # 状态归一化均在 _get_obs 内处理
-        self.observation_space = spaces.Box(
-            low=np.zeros(4, dtype=np.float32),
-            high=np.ones(4, dtype=np.float32),
-            dtype=np.float32
-        )
-
-        # 初始化状态
-        self.last_action = (self.base_params.L_min_s, self.base_params.t_bw_min_s)
-        self.max_TMP_during_filtration = self.current_params.TMP0
-        self.reset(seed=None)
-
-    def _get_obs(self):
-        TMP0 = self.current_params.TMP0
-        TMP0_norm = (TMP0 - 0.01) / (0.05 - 0.01)
-
-        L_s, t_bw_s = self.last_action
-        L_norm = (L_s - self.base_params.L_min_s) / (self.base_params.L_max_s - self.base_params.L_min_s)
-        t_bw_norm = (t_bw_s - self.base_params.t_bw_min_s) / (self.base_params.t_bw_max_s - self.base_params.t_bw_min_s)
-
-        max_TMP_norm = (self.max_TMP_during_filtration - 0.01) / (0.05 - 0.01)
-
-        return np.array([TMP0_norm, L_norm, t_bw_norm, max_TMP_norm], dtype=np.float32)
-
-    def _get_action_values(self, action):
-        L_idx = action // self.num_bw
-        t_bw_idx = action % self.num_bw
-        return self.L_values[L_idx], self.t_bw_values[t_bw_idx]
-
-    def reset(self, seed=None, options=None):
-        super().reset(seed=seed)
-        self.current_params.TMP0 = np.random.uniform(0.01, 0.03)
-        self.current_step = 0
-        self.last_action = (self.base_params.L_min_s, self.base_params.t_bw_min_s)
-        self.max_TMP_during_filtration = self.current_params.TMP0
-        return self._get_obs(), {}
-
-    def step(self, action):
-        self.current_step += 1
-        L_s, t_bw_s = self._get_action_values(action)
-        L_s = np.clip(L_s, self.base_params.L_min_s, self.base_params.L_max_s)
-        t_bw_s = np.clip(t_bw_s, self.base_params.t_bw_min_s, self.base_params.t_bw_max_s)
-
-        # 模拟超级周期
-        feasible, info = simulate_one_supercycle(self.current_params, L_s, t_bw_s)
-
-        if feasible:
-            reward = _score(self.current_params, info)
-            self.current_params.TMP0 = info["TMP_after_ceb"]
-            self.max_TMP_during_filtration = info["max_TMP_during_filtration"]
-            terminated = False
-        else:
-            reward = -20
-            terminated = True
-
-        truncated = self.current_step >= self.max_episode_steps
-        self.last_action = (L_s, t_bw_s)
-        next_obs = self._get_obs()
-
-        info["feasible"] = feasible
-        info["step"] = self.current_step
-
-        return next_obs, reward, terminated, truncated, info
-
-
-class UFEpisodeRecorder:
-    """记录episode中的决策和结果"""
-
-    def __init__(self):
-        self.episode_data = []
-        self.current_episode = []
-
-    def record_step(self, obs, action, reward, done, info):
-        """记录一步"""
-        step_data = {
-            "obs": obs.copy(),
-            "action": action.copy(),
-            "reward": reward,
-            "done": done,
-            "info": info.copy() if info else {}
-        }
-        self.current_episode.append(step_data)
-
-        if done:
-            self.episode_data.append(self.current_episode)
-            self.current_episode = []
-
-    def get_episode_stats(self, episode_idx=-1):
-        """获取episode统计信息"""
-        if not self.episode_data:
-            return {}
-
-        episode = self.episode_data[episode_idx]
-        total_reward = sum(step["reward"] for step in episode)
-        avg_recovery = np.mean([step["info"].get("recovery", 0) for step in episode if "recovery" in step["info"]])
-        feasible_steps = sum(1 for step in episode if step["info"].get("feasible", False))
-
-        return {
-            "total_reward": total_reward,
-            "avg_recovery": avg_recovery,
-            "feasible_steps": feasible_steps,
-            "total_steps": len(episode)
-        }
-
-
-class UFTrainingCallback(BaseCallback):
-    """
-    PPO 训练回调,用于记录每一步的数据到 recorder。
-    相比原来的 RecordingCallback,更加合理和健壮:
-    1. 不依赖环境内部 last_* 属性
-    2. 使用 PPO 提供的 obs、actions、rewards、dones、infos
-    3. 自动处理 episode 结束时的统计
-    """
-
-    def __init__(self, recorder, verbose=0):
-        super(UFTrainingCallback, self).__init__(verbose)
-        self.recorder = recorder
-
-    def _on_step(self) -> bool:
-        try:
-            new_obs = self.locals.get("new_obs")
-            actions = self.locals.get("actions")
-            rewards = self.locals.get("rewards")
-            dones = self.locals.get("dones")
-            infos = self.locals.get("infos")
-
-            if len(new_obs) > 0:
-                step_obs = new_obs[0]
-                step_action = actions[0] if actions is not None else None
-                step_reward = rewards[0] if rewards is not None else 0.0
-                step_done = dones[0] if dones is not None else False
-                step_info = infos[0] if infos is not None else {}
-
-                # 打印当前 step 的信息
-                if self.verbose:
-                    print(f"[Step {self.num_timesteps}] 动作={step_action}, 奖励={step_reward:.3f}, Done={step_done}")
-
-                # 记录数据
-                self.recorder.record_step(
-                    obs=step_obs,
-                    action=step_action,
-                    reward=step_reward,
-                    done=step_done,
-                    info=step_info,
-                )
-
-        except Exception as e:
-            if self.verbose:
-                print(f"[Callback Error] {e}")
-
-        return True
-
-
-
-
-class DQNTrainer:
-    def __init__(self, env, params, callback=None):
-        self.env = env
-        self.params = params
-        self.callback = callback
-        self.log_dir = self._create_log_dir()
-        self.model = self._create_model()
-
-    def _create_log_dir(self):
-        timestamp = time.strftime("%Y%m%d-%H%M%S")
-        log_name = (
-            f"DQN_lr{self.params.learning_rate}_buf{self.params.buffer_size}_bs{self.params.batch_size}"
-            f"_gamma{self.params.gamma}_exp{self.params.exploration_fraction}"
-            f"_{self.params.remark}_{timestamp}"
-        )
-        log_dir = os.path.join("./uf_dqn_tensorboard", log_name)
-        os.makedirs(log_dir, exist_ok=True)
-        return log_dir
-
-    def _create_model(self):
-        return DQN(
-            policy="MlpPolicy",
-            env=self.env,
-            learning_rate=self.params.learning_rate,
-            buffer_size=self.params.buffer_size,  # 大缓冲保证经验多样性
-            learning_starts=self.params.learning_starts,
-            batch_size=self.params.batch_size,
-            gamma=self.params.gamma,
-            train_freq=self.params.train_freq,
-            target_update_interval=1,
-            tau=0.005,
-            exploration_initial_eps=self.params.exploration_initial_eps,
-            exploration_fraction=self.params.exploration_fraction,
-            exploration_final_eps=self.params.exploration_final_eps,
-            verbose=1,
-            tensorboard_log=self.log_dir
-            # 不再指定 replay_buffer_class,默认使用 ReplayBuffer
-        )
-
-    def train(self, total_timesteps: int):
-        if self.callback:
-            self.model.learn(total_timesteps=total_timesteps, callback=self.callback)
-        else:
-            self.model.learn(total_timesteps=total_timesteps)
-        print(f"模型训练完成,日志保存在:{self.log_dir}")
-
-    def save(self, path=None):
-        if path is None:
-            path = os.path.join(self.log_dir, "dqn_model.zip")
-        self.model.save(path)
-        print(f"模型已保存到:{path}")
-
-    def load(self, path):
-        self.model = DQN.load(path, env=self.env)
-        print(f"模型已从 {path} 加载")
-
-
-def train_uf_rl_agent(params: UFParams, total_timesteps: int = 10000, seed: int = 2025):
-    set_global_seed(seed)
-    recorder = UFEpisodeRecorder()
-    callback = UFTrainingCallback(recorder, verbose=1)
-
-    def make_env():
-        env = UFSuperCycleEnv(params)
-        env = Monitor(env)
-        return env
-
-    env = DummyVecEnv([make_env])
-
-    dqn_params = DQNParams()
-    trainer = DQNTrainer(env, dqn_params, callback=callback)
-    trainer.train(total_timesteps)
-    trainer.save()
-
-    stats = callback.recorder.get_episode_stats()
-    print(f"训练完成 - 总奖励: {stats.get('total_reward', 0):.2f}, 平均回收率: {stats.get('avg_recovery', 0):.3f}")
-
-    return trainer.model
-
-
-# 训练和测试示例
-if __name__ == "__main__":
-    # 初始化参数
-    params = UFParams()
-
-    # 训练RL代理
-    print("开始训练RL代理...")
-    train_uf_rl_agent(params, total_timesteps=50000)
-

+ 0 - 200
models/uf-rl/ultrafiltration_model/config.json

@@ -1,200 +0,0 @@
-{
-  "_comment_api": "API接口配置",
-  "api": {
-    "base_url": "http://120.55.44.4:8900",
-    "current_data_endpoint": "/api/v1/jinke-cloud/device/current-data",
-    "callback_endpoint": "/api/dtgateway/v1/decision/data",
-    "plc_endpoint": "/api/v1/plc/set-var-values",
-    "jwt_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJJRCI6NywiVXNlcm5hbWUiOiJhZG1pbiIsIkRlcCI6IjEzNSIsImV4cCI6MTc3NjExOTExNCwiaXNzIjoiZ2luLWJsb2cifQ.0HTtzHZjyd2mHo8VCy8icYROxmntRMuQhyoZsAYRL_M"
-  },
-
-  "_comment_database": "MySQL数据库连接配置",
-  "database": {
-    "host": "222.130.26.206",
-    "port": 4000,
-    "user": "whu",
-    "password": "09093f4e6b33ddd",
-    "database": "ws_data",
-    "table_name": "dc_item_history_data_minute"
-  },
-
-  "_comment_scada": "SCADA系统配置 - PLC通信签名验证",
-  "scada": {
-    "secret": "237c92d2-8795-1094-11ef-00e2e48fce4a",
-    "project_id": 92
-  },
-  "_comment_system": "系统运行参数配置",
-  "system": {
-    "use_model": 0,
-    "_use_model_desc": "模型开关: 1=启用模型决策, 0=禁用模型仅记录数据 (支持运行时修改)",
-    "trigger_value": 95,
-    "_trigger_value_desc": "触发监控的控制值",
-    "num_values_to_collect": 10,
-    "_num_values_to_collect_desc": "每次收集的数据点数量",
-    "poll_interval": 2,
-    "_poll_interval_desc": "轮询间隔时间(秒)",
-    "backwash_time": 100,
-    "_backwash_time_desc": "默认反洗时间(秒)",
-    "ceb_count": 45,
-    "tmp_history_count": 5,
-    "_tmp_history_count_desc": "保存最近N次TMP平均值,用于趋势判断(可调整为任意次数)"
-  },
-  "_comment_devices": "设备配置列表 - 每个设备的API调用参数",
-  "devices": [
-    {
-      "_comment": "UF1超滤设备配置",
-      "name": "UF1",
-      "press_pv_item": "C.M.UF1_DB@press_PV",
-      "_press_pv_item_desc": "用于历史数据查询的压差",
-      "control_payload": {
-        "_desc": "控制字读取配置 - 用于触发条件检测",
-        "deviceId": "1",
-        "deviceItems": "C.M.UF1_DB@word_control",
-        "deviceName": "UF1_control_word",
-        "project_id": 92
-      },
-      "target_payload": {
-        "_desc": "跨膜压差读取配置 - 用于数据收集",
-        "deviceId": "1",
-        "deviceItems": "UF1_BW_After_TMP",
-        "deviceName": "UF1_backwash_pressure_diff",
-        "project_id": 92
-      },
-      "production_time_payload": {
-        "_desc": "产水时长读取配置 - 用于模型输入",
-        "deviceId": "1",
-        "deviceItems": "C.M.UF1_DB@time_production",
-        "deviceName": "UF1过滤时长",
-        "project_id": 92
-      },
-      "backwashing_payload": {
-        "_desc": "反洗时长读取配置 - 用于模型输入",
-        "deviceId": "1",
-        "deviceItems": "C.M.UF1_DB@time_BW_SP",
-        "deviceName": "UF1反洗时长",
-        "project_id": 92
-      },
-      "ceb_payload": {
-        "_desc": "CEB次数读取配置 - 用于下发",
-        "deviceId": "1",
-        "deviceItems": "C.M.UF1_DB@cycle_sp",
-        "deviceName": "UF1CEB次数设定",
-        "project_id": 92
-      }
-    },
-    {
-      "_comment": "UF2超滤设备配置",
-      "name": "UF2",
-      "press_pv_item": "C.M.UF2_DB@press_PV",
-      "control_payload": {
-        "deviceId": "1",
-        "deviceItems": "C.M.UF2_DB@word_control",
-        "deviceName": "UF2_control_word",
-        "project_id": 92
-      },
-      "target_payload": {
-        "deviceId": "1",
-        "deviceItems": "UF2_BW_After_TMP",
-        "deviceName": "UF2_backwash_pressure_diff",
-        "project_id": 92
-      },
-      "production_time_payload": {
-        "deviceId": "1",
-        "deviceItems": "C.M.UF2_DB@time_production",
-        "deviceName": "UF2过滤时长",
-        "project_id": 92
-      },
-      "backwashing_payload": {
-        "deviceId": "1",
-        "deviceItems": "C.M.UF2_DB@time_BW_SP",
-        "deviceName": "UF2反洗时长",
-        "project_id": 92
-      },
-      "ceb_payload": {
-        "_desc": "CEB次数读取配置 - 用于下发",
-        "deviceId": "1",
-        "deviceItems": "C.M.UF2_DB@cycle_sp",
-        "deviceName": "UF2CEB次数设定",
-        "project_id": 92
-      }
-    },
-    {
-      "_comment": "UF3超滤设备配置",
-      "name": "UF3",
-      "press_pv_item": "C.M.UF3_DB@press_PV",
-      "control_payload": {
-        "deviceId": "1",
-        "deviceItems": "C.M.UF3_DB@word_control",
-        "deviceName": "UF3_control_word",
-        "project_id": 92
-      },
-      "target_payload": {
-        "deviceId": "1",
-        "deviceItems": "UF3_BW_After_TMP",
-        "deviceName": "UF3_backwash_pressure_diff",
-        "project_id": 92
-      },
-      "production_time_payload": {
-        "deviceId": "1",
-        "deviceItems": "C.M.UF3_DB@time_production",
-        "deviceName": "UF3过滤时长",
-        "project_id": 92
-      },
-      "backwashing_payload": {
-        "deviceId": "1",
-        "deviceItems": "C.M.UF3_DB@time_BW_SP",
-        "deviceName": "UF3反洗时长",
-        "project_id": 92
-      },
-      "ceb_payload": {
-        "_desc": "CEB次数读取配置 - 用于下发",
-        "deviceId": "1",
-        "deviceItems": "C.M.UF3_DB@cycle_sp",
-        "deviceName": "UF3CEB次数设定",
-        "project_id": 92
-      }
-    },
-    {
-      "_comment": "UF4超滤设备配置",
-      "name": "UF4",
-      "press_pv_item": "C.M.UF4_DB@press_PV",
-      "control_payload": {
-        "deviceId": "1",
-        "deviceItems": "C.M.UF4_DB@word_control",
-        "deviceName": "UF4_control_word",
-        "project_id": 92
-      },
-      "target_payload": {
-        "deviceId": "1",
-        "deviceItems": "UF4_BW_After_TMP",
-        "deviceName": "UF4_backwash_pressure_diff",
-        "project_id": 92
-      },
-      "production_time_payload": {
-        "deviceId": "1",
-        "deviceItems": "C.M.UF4_DB@time_production",
-        "deviceName": "UF4反洗时长",
-        "project_id": 92
-      },
-      "backwashing_payload": {
-        "deviceId": "1",
-        "deviceItems": "C.M.UF4_DB@time_BW_SP",
-        "deviceName": "UF4反洗时长",
-        "project_id": 92
-      },
-      "ceb_payload": {
-        "_desc": "CEB次数读取配置 - 用于下发",
-        "deviceId": "1",
-        "deviceItems": "C.M.UF4_DB@cycle_sp",
-        "deviceName": "UF4CEB次数设定",
-        "project_id": 92
-      }
-    }
-  ],
-  "_comment_usage": "配置文件使用说明",
-  "_usage_notes": {
-    "1_动态配置": "use_model支持运行时修改,无需重启程序",
-    "2_签名验证": "PLC通信使用MD5签名验证,确保scada.secret与服务器端一致",
-    "3_设备扩展": "新增设备时,复制现有设备配置并修改相应的deviceItems参数"
-  }
-}

+ 0 - 23
models/uf-rl/ultrafiltration_model/device_states.json

@@ -1,23 +0,0 @@
-{
-    "_comment": "此文件用于存储每个设备的运行时状态。时间格式为 YYYY-MM-DD HH:MM:SS",
-    "UF1": {
-        "model_prev_L_s": 4220.0,
-        "model_prev_t_bw_s": 90.0,
-        "last_cycle_end_time": "2025-10-29 09:29:48"
-    },
-    "UF2": {
-        "model_prev_L_s": 4220.0,
-        "model_prev_t_bw_s": 90.0,
-        "last_cycle_end_time": "2025-10-26 15:34:23"
-    },
-    "UF3": {
-        "model_prev_L_s": 4220.0,
-        "model_prev_t_bw_s": 90.0,
-        "last_cycle_end_time": "2025-10-26 18:17:29"
-    },
-    "UF4": {
-        "model_prev_L_s": 4220.0,
-        "model_prev_t_bw_s": 90.0,
-        "last_cycle_end_time": "2025-10-27 13:44:35"
-    }
-}

二進制
models/uf-rl/ultrafiltration_model/dqn_model.zip


+ 0 - 1
models/uf-rl/ultrafiltration_model/dqn_model/_stable_baselines3_version

@@ -1 +0,0 @@
-2.6.0

File diff suppressed because it is too large
+ 0 - 46
models/uf-rl/ultrafiltration_model/dqn_model/data


二進制
models/uf-rl/ultrafiltration_model/dqn_model/policy.optimizer.pth


二進制
models/uf-rl/ultrafiltration_model/dqn_model/policy.pth


二進制
models/uf-rl/ultrafiltration_model/dqn_model/pytorch_variables.pth


+ 0 - 9
models/uf-rl/ultrafiltration_model/dqn_model/system_info.txt

@@ -1,9 +0,0 @@
-- OS: Windows-10-10.0.26100-SP0 10.0.26100
-- Python: 3.10.9
-- Stable-Baselines3: 2.6.0
-- PyTorch: 2.8.0+cpu
-- GPU Enabled: False
-- Numpy: 1.26.4
-- Cloudpickle: 3.1.1
-- Gymnasium: 1.0.0
-- OpenAI Gym: 0.26.2

+ 0 - 771
models/uf-rl/ultrafiltration_model/loop_main.py

@@ -1,771 +0,0 @@
-# 标准库导入
-import time
-import json
-import os
-import threading
-import hashlib
-from datetime import datetime, timedelta
-import logging
-from logging.handlers import RotatingFileHandler
-
-# 第三方库导入
-import pymysql
-import requests
-
-# 自定义模块导入
-from DQN_env import UFParams
-from DQN_decide import run_uf_DQN_decide, generate_plc_instructions, calc_uf_cycle_metrics 
-
-# 日志系统配置
-logger = logging.getLogger(__name__)
-logger.setLevel(logging.INFO)
-
-# 日志输出格式
-formatter = logging.Formatter(
-    '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
-    datefmt='%Y-%m-%d %H:%M:%S'
-)
-
-# 文件日志处理器,单个文件最大5MB,保留3个备份
-file_handler = RotatingFileHandler('monitor_service.log', maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8')
-file_handler.setFormatter(formatter)
-
-# 控制台日志处理器
-console_handler = logging.StreamHandler()
-console_handler.setFormatter(formatter)
-
-# 添加处理器
-logger.addHandler(file_handler)
-logger.addHandler(console_handler)
-
-
-# 配置加载函数
-def load_config(config_file='config.json'):
-    """
-    从JSON配置文件加载系统配置
-    
-    参数:
-        config_file: 配置文件路径
-        
-    返回:
-        配置字典
-        
-    异常:
-        配置文件不存在或格式错误时抛出异常
-    """
-    try:
-        with open(config_file, 'r', encoding='utf-8') as f:
-            return json.load(f)
-    except FileNotFoundError:
-        logger.critical(f"配置文件未找到 {config_file}")
-        raise
-    except json.JSONDecodeError as e:
-        logger.critical(f"配置文件格式错误 {config_file}: {e}")
-        raise
-
-
-def get_current_config():
-    """
-    获取当前配置,支持运行时配置动态变更
-    """
-    return load_config()
-
-
-# 初始化配置
-config = load_config()
-
-# 全局配置参数
-
-# API接口配置
-API_BASE_URL = config['api']['base_url']
-API_URL = API_BASE_URL + config['api']['current_data_endpoint']
-CALLBACK_URL = API_BASE_URL + config['api']['callback_endpoint']
-PLC_URL = API_BASE_URL + config['api']['plc_endpoint']
-
-# HTTP请求头
-HEADERS = {
-    "Content-Type": "application/json",
-    "JWT-TOKEN": config['api']['jwt_token']
-}
-
-# MySQL数据库配置,优先读取环境变量
-DB_USER = os.getenv('DB_USERNAME', config['database']['user'])
-DB_PASSWORD = os.getenv('DB_PASSWORD', config['database']['password'])
-DB_HOST = os.getenv('DB_HOST', config['database']['host'])
-DB_NAME = os.getenv('DB_DATABASE', config['database']['database'])
-DB_PORT = int(os.getenv('DB_PORT', str(config['database']['port'])))
-HISTORY_TABLE_NAME = config['database']['table_name']
-
-# 超滤系统参数
-uf_params = UFParams()
-PROJECT_ID_FOR_CALLBACK = config['scada']['project_id']
-SCADA_SECRET = config['scada']['secret']
-
-# 监控流程参数
-TRIGGER_VALUE = config['system']['trigger_value']
-NUM_VALUES_TO_COLLECT = config['system']['num_values_to_collect']
-POLL_INTERVAL = config['system']['poll_interval']
-
-# 设备列表
-DEVICE_SEQUENCE = config['devices']
-
-# 状态持久化配置
-STATE_FILE = 'device_states.json'
-_state_lock = threading.Lock()
-device_states = {}
-DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
-
-
-# 状态持久化函数
-def load_device_states():
-    """
-    从状态文件加载所有设备的运行状态
-    """
-    global device_states
-    with _state_lock:
-        try:
-            if os.path.exists(STATE_FILE):
-                with open(STATE_FILE, 'r', encoding='utf-8') as f:
-                    content = f.read()
-                    if content:
-                        device_states = json.loads(content)
-                        logger.info(f"状态文件加载成功 {STATE_FILE}")
-                    else:
-                        logger.warning(f"状态文件为空 {STATE_FILE}")
-                        device_states = {}
-            else:
-                logger.info(f"状态文件不存在,首次运行 {STATE_FILE}")
-                device_states = {}
-        except (json.JSONDecodeError, IOError) as e:
-            logger.error(f"状态文件加载失败 {STATE_FILE}: {e}")
-            device_states = {}
-
-
-def save_device_state(device_name, state_data):
-    """
-    保存单个设备的运行状态到文件
-    
-    参数:
-        device_name: 设备名称
-        state_data: 设备状态数据字典
-    """
-    with _state_lock:
-        try:
-            # 读取现有状态
-            full_states = {}
-            if os.path.exists(STATE_FILE):
-                with open(STATE_FILE, 'r', encoding='utf-8') as f:
-                    content = f.read()
-                    if content:
-                        full_states = json.loads(content)
-
-            # 更新指定设备状态
-            full_states[device_name] = state_data
-
-            # 写回文件
-            with open(STATE_FILE, 'w', encoding='utf-8') as f:
-                json.dump(full_states, f, indent=4, ensure_ascii=False)
-
-            # 更新内存缓存
-            global device_states
-            device_states[device_name] = state_data
-            logger.info(f"[{device_name}] 状态保存成功")
-        except (json.JSONDecodeError, IOError) as e:
-            logger.error(f"[{device_name}] 状态保存失败: {e}")
-
-
-# 核心业务函数
-
-def create_db_connection():
-    """
-    创建MySQL数据库连接
-    
-    返回:
-        连接对象或None
-    """
-    try:
-        connection = pymysql.connect(
-            host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME,
-            port=DB_PORT, charset='utf8mb4',
-            cursorclass=pymysql.cursors.DictCursor
-        )
-        logger.debug("数据库连接成功")
-        return connection
-    except pymysql.MySQLError as e:
-        logger.error(f"数据库连接失败: {e}")
-        return None
-
-
-def get_tmp_extremes(item_name, start_time, end_time, word_control):
-    """
-    通过API查询历史数据中指定时间范围内的跨膜压差极值
-    
-    参数:
-        item_name: 数据项名称
-        start_time: 开始时间
-        end_time: 结束时间
-        word_control: 控制字段名
-        
-    返回:
-        (最大值, 最小值) 或 (None, None)
-    """
-    # 转换时间为毫秒级时间戳
-    start_timestamp = int(start_time.timestamp() * 1000)
-    end_timestamp = int(end_time.timestamp() * 1000)
-    
-    logger.info(f"查询历史极值 {item_name} 从 {start_time.strftime(DATETIME_FORMAT)} 到 {end_time.strftime(DATETIME_FORMAT)}")
-    
-    # API基础URL
-    api_base_url = "http://120.55.44.4:8900/api/v1/jinke-cloud/db/device/history-data"
-    
-    try:
-        # 第一次调用:查询item_name的极值
-        params1 = {
-            "deviceid": "1",
-            "dataitemid": item_name,
-            "project_id": "92",
-            "stime": start_timestamp,
-            "etime": end_timestamp,
-            "size": "1",
-            "interval": "minute",
-            "aggregator": "new"
-        }
-        
-        logger.info(f"第一次API调用: {api_base_url} 参数: {params1}")
-        response1 = requests.get(api_base_url, params=params1, headers=HEADERS, timeout=30)
-        response1.raise_for_status()
-        data1 = response1.json()
-        logger.debug(f"第一次API响应: {data1}")
-        
-        # 第二次调用:查询word_control的极值
-        params2 = {
-            "deviceid": "1", 
-            "dataitemid": word_control,
-            "project_id": "92",
-            "stime": start_timestamp,
-            "etime": end_timestamp,
-            "size": "1",
-            "interval": "minute",
-            "aggregator": "new"
-        }
-        
-        logger.info(f"第二次API调用: {api_base_url} 参数: {params2}")
-        response2 = requests.get(api_base_url, params=params2, headers=HEADERS, timeout=30)
-        response2.raise_for_status()
-        data2 = response2.json()
-        logger.debug(f"第二次API响应: {data2}")
-
-        # 处理两次API调用的结果
-        max_val = None
-        min_val = None
-
-        # 从第一次调用结果中提取'UF1跨膜压差'的值,并存储在字典中,以时间为键
-        uf1_diff_values = {}
-        if data1.get("code") == 200 and data1.get("data"):
-            for item in data1["data"]:
-                if item.get("name") == "UF1跨膜压差" and item.get("val") is not None:
-                    time = item.get("htime_at")
-                    uf1_diff_values[time] = float(item.get("val"))
-            if uf1_diff_values:
-                logger.info(f"第一次API查询成功,提取到跨膜压差数据数量:{len(uf1_diff_values)}")
-
-        # 从第二次调用结果中提取'UF1控制字'为26的数据点,并进行时间匹配
-        if data2.get("code") == 200 and data2.get("data"):
-            control_26_values = []
-            for item in data2["data"]:
-                if item.get("name") == "UF1控制字" and item.get("val") == '26':
-                    time = item.get("htime_at")
-                    # 如果在第一次数据中找到了对应的跨膜压差值
-                    if time in uf1_diff_values:
-                        control_26_values.append(uf1_diff_values[time])
-
-            if control_26_values:
-                logger.info(f"找到控制字为26的数据点,合并跨膜压差数据")
-                max_val = max(control_26_values)
-                min_val = min(control_26_values)
-                # 增加最小跨膜压差的下限值
-                if min_val < 0.01:
-                    min_val = 0.01
-                logger.info(f"控制字为26时的最大跨膜压差值={max_val},最小跨膜压差值={min_val}")
-
-        if max_val is not None and min_val is not None:
-            logger.info(f"API查询成功 最大跨膜压差值={max_val} 最小跨膜压差值={min_val}")
-            return max_val, min_val
-        else:
-            logger.warning("未找到有效的控制字为26时的跨膜压差数据")
-            return None, None
-            
-    except requests.exceptions.RequestException as e:
-        logger.error(f"API请求错误: {e}")
-        return None, None
-    except (json.JSONDecodeError, ValueError, KeyError) as e:
-        logger.error(f"API响应解析错误: {e}")
-        return None, None
-    except Exception as e:
-        logger.error(f"API查询未知错误: {e}")
-        return None, None
-
-
-def generate_md5_signature(record_data, secret, timestamp):
-    """
-    生成PLC请求的MD5签名
-    """
-    cal_str = f"{record_data}{secret}{timestamp}"
-    return hashlib.md5(cal_str.encode('utf-8')).hexdigest().upper()
-
-
-def send_plc_update(device_name, item, old_value, new_value, command_type):
-    """
-    向PLC发送参数更新指令
-    
-    参数:
-        device_name: 设备名称
-        item: 参数项名称
-        old_value: 旧值
-        new_value: 新值
-        command_type: 指令类型
-        
-    返回:
-        是否发送成功
-    """
-    # 构造签名和请求数据
-    timestamp = int(time.time())  # 生成时间戳
-    record_obj = {
-        "project_id": PROJECT_ID_FOR_CALLBACK,  # 项目ID
-        "item": item,  # 参数项名称
-        "old_value": old_value,  # 旧值 
-        "new_value": new_value,  # 新值
-        "command_type": command_type  # 指令类型
-    }
-    record_data = json.dumps([record_obj])  # 生成签名数据
-    signature = generate_md5_signature(record_data, SCADA_SECRET, timestamp)  # 生成签名
-    url = f"{PLC_URL}?sign={signature}&timestamp={timestamp}"  # 生成请求URL
-    payload = [record_obj]
-
-    logger.info(f"[{device_name}] PLC指令 {item} 从 {old_value} 到 {new_value}")  
-    logger.debug(f"[{device_name}] 签名数据 {record_data}")
-    logger.debug(f"[{device_name}] 签名值 {signature}")
-
-    # 重试机制
-    max_retries, retry_interval = 3, 60  # 重试次数 重试间隔
-    for attempt in range(1, max_retries + 1):
-        try:
-            logger.info(f"[{device_name}] 发送PLC指令 尝试 {attempt}/{max_retries}")
-            response = requests.post(url, json=payload, timeout=15)  # 发送PLC指令 请求头 请求体 超时时间
-            response_json = response.json()
-            if response_json.get('code') == 200:
-                logger.info(f"[{device_name}] PLC指令发送成功 响应 {response_json}")
-                return True
-            else:
-                logger.error(f"[{device_name}] PLC指令发送失败 {response_json.get('msg', '未知错误')}")
-        except requests.exceptions.RequestException as e:
-            logger.error(f"[{device_name}] PLC指令网络错误 {e}")
-        except Exception as e:
-            logger.error(f"[{device_name}] PLC指令未知错误 {e}")
-
-        if attempt < max_retries:  # 重试次数 小于 最大重试次数
-            logger.info(f"[{device_name}] 等待{retry_interval}秒后重试")
-            time.sleep(retry_interval)
-
-    logger.error(f"[{device_name}] PLC指令发送失败,已达最大重试次数")
-    return False
-
-
-def send_decision_to_callback(type_name, **kwargs):
-    """
-    发送决策结果到回调接口
-    
-    参数:
-        type_name: 设备类型名称
-        **kwargs: 决策结果数据
-        
-    返回:
-        use_model状态值: 1表示开启模型,0表示关闭模型,None表示发送失败
-    """
-    payload = {"list": [{"type": type_name, "project_id": PROJECT_ID_FOR_CALLBACK, **kwargs}]}  # 请求负载 设备类型 项目ID 决策结果数据
-
-    logger.info(f"[{type_name}] 发送决策数据\n{json.dumps(payload, indent=2, ensure_ascii=False)}")
-
-    max_retries, retry_interval = 3, 60  # 重试次数 重试间隔
-    for attempt in range(1, max_retries + 1):
-        try:
-            logger.info(f"[{type_name}] 发送回调 尝试 {attempt}/{max_retries}")
-            response = requests.post(CALLBACK_URL, headers=HEADERS, json=payload, timeout=15)  # 发送回调 请求头 请求体 超时时间
-            response.raise_for_status()
-            response_json = response.json()
-            logger.info(f"[{type_name}] 回调发送成功 响应 {response.text}")
-            
-            # 提取返回的 data 字段,表示 use_model 状态(1=开启,0=关闭)
-            use_model_status = response_json.get('data')
-            logger.info(f"[{type_name}] 服务器返回 use_model 状态: {use_model_status}")
-            return use_model_status
-        except requests.exceptions.RequestException as e:
-            logger.error(f"[{type_name}] 回调发送失败 {e}")
-        except (json.JSONDecodeError, ValueError) as e:
-            logger.error(f"[{type_name}] 响应解析失败 {e}")
-
-        if attempt < max_retries:  # 重试次数 小于 最大重试次数
-            logger.info(f"[{type_name}] 等待{retry_interval}秒后重试")
-            time.sleep(retry_interval)
-
-    logger.error(f"[{type_name}] 回调发送失败,已达最大重试次数")
-    return None
-
-
-def get_device_value(payload, device_name):
-    """
-    从API获取设备数据项的当前值
-    
-    参数:
-        payload: 请求负载
-        device_name: 设备名称
-        
-    返回:
-        数据值或None
-    """
-    try:
-        response = requests.post(API_URL, headers=HEADERS, json=[payload], timeout=10)  # 发送请求 请求头 请求体 超时时间
-        response.raise_for_status()
-        api_response = response.json()  # 解析响应
-        if api_response.get("code") == 200 and api_response.get("data"):
-            val_str = api_response["data"][0].get("val")  # 获取数据值
-            if val_str is not None:
-                return float(val_str)
-        else:
-            logger.error(f"[{device_name}] 获取数据失败 {payload['deviceItems']} {api_response.get('msg', '未知错误')}")  # 日志 设备名称 请求负载 响应
-    except requests.exceptions.RequestException as e:
-        logger.error(f"[{device_name}] API网络错误 {payload['deviceItems']} {e}")  # 日志 设备名称 请求负载 错误
-    except (json.JSONDecodeError, ValueError, IndexError) as e:
-        logger.error(f"[{device_name}] API数据解析错误 {payload['deviceItems']} {e}")  # 日志 设备名称 请求负载 错误
-    return None
-
-
-# 设备监控主循环
-
-def monitor_device(device):
-    """
-    单个设备的监控循环
-    
-    完整流程:
-    1. 等待触发条件
-    2. 收集稳定数据
-    3. 执行决策计算
-    4. 发送控制指令
-    5. 等待重置信号
-    
-    参数:
-        device: 设备配置字典
-    """
-    name = device["name"]
-    threading.current_thread().name = name
-    logger.info("监控线程启动")
-
-    # 加载设备历史状态
-    device_state = device_states.get(name, {})  # 设备状态 
-    model_prev_L_s = device_state.get('model_prev_L_s')  # 过滤时间 上一轮
-    model_prev_t_bw_s = device_state.get('model_prev_t_bw_s')  # 反洗时间 上一轮
-    last_cycle_end_time_str = device_state.get('last_cycle_end_time')  # 上次运行结束时间
-
-    # 解析上次运行结束时间
-    last_cycle_end_time = None  # 上次运行结束时间
-    if last_cycle_end_time_str:
-        try:
-            last_cycle_end_time = datetime.strptime(last_cycle_end_time_str, DATETIME_FORMAT)  # 上次运行结束时间
-            logger.info(f"历史状态加载成功,上次运行时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}")
-        except ValueError:
-            logger.warning(f"时间戳解析失败 {last_cycle_end_time_str}")
-    else:
-        logger.info("首次运行,无历史状态")
-
-    # 主循环
-    while True:
-        try:
-            # 阶段1: 等待触发条件 (控制字=95)
-            logger.info(f"等待触发 控制字需等于 {TRIGGER_VALUE}")
-            while True:
-                control_value = get_device_value(device["control_payload"], name)  # 控制字
-                if control_value is not None and int(control_value) == TRIGGER_VALUE:  # 控制字 等于 触发值 95
-                    logger.info("触发条件满足,开始等待控制字变为26")
-                    break
-                time.sleep(POLL_INTERVAL)
-
-            # 阶段1.5: 等待控制字变为26
-            logger.info("等待控制字变为26")
-            while True:
-                control_value = get_device_value(device["control_payload"], name)  # 控制字
-                if control_value is not None and int(control_value) == 26:  # 控制字 等于 26
-                    logger.info("控制字变为26,开始收集10分钟数据")
-                    break
-                time.sleep(POLL_INTERVAL)
-
-            # 阶段2: 收集10分钟数据并计算平均值
-            logger.info("开始收集10分钟TMP数据")
-            collected_values = []
-            start_collection_time = datetime.now()
-            collection_duration = timedelta(minutes=10)  # 10分钟
-            
-            # 日志计数器,每收集60个点打印一次,避免日志过多
-            log_interval = 60
-            
-            while datetime.now() - start_collection_time < collection_duration:
-                current_value = get_device_value(device["target_payload"], name)  # 当前值
-                control_value = get_device_value(device["control_payload"], name)  # 检查控制字
-                
-                # 检查控制字是否保持26
-                if control_value is not None and int(control_value) != 26:
-                    logger.warning(f"数据收集期间控制字发生变化: {control_value},停止收集")
-                    # 如果控制字变为95,说明系统重置了,需要重新开始
-                    if int(control_value) == TRIGGER_VALUE:
-                        logger.info("控制字变为95,系统重置,重新开始监控")
-                        break
-                    else:
-                        logger.info("控制字变为其他值,等待重置")
-                        break
-                    
-                if current_value is not None:
-                    collected_values.append(current_value)
-                    # 每收集60个点或第一个点时打印日志,减少日志数量
-                    if len(collected_values) == 1 or len(collected_values) % log_interval == 0:
-                        logger.info(f"收集TMP值 {current_value:.4f} 已收集 {len(collected_values)} 个数据点")
-                time.sleep(POLL_INTERVAL)
-            
-            if not collected_values:
-                logger.warning("10分钟内未收集到有效数据,跳过本轮")
-                # 检查控制字状态,如果已经是95则直接开始新一轮
-                control_value = get_device_value(device["control_payload"], name)
-                if control_value is not None and int(control_value) == TRIGGER_VALUE:
-                    logger.info("控制字已经是95,直接开始新一轮")
-                    continue
-                else:
-                    # 等待控制字重置后再继续
-                    logger.info("等待控制字重置...")
-                    time.sleep(10)  # 等待10秒
-                    continue
-
-            # 阶段3: 决策计算
-            logger.info(f"数据收集完成,共收集 {len(collected_values)} 个数据点,开始决策计算")
-            if collected_values:
-                # 计算平均值作为代表值
-                average_value = sum(collected_values) / len(collected_values)
-                logger.info(f"TMP平均值 {average_value:.4f}")
-
-                # 确定历史数据查询时间范围
-                current_decision_time = datetime.now()
-                start_query_time = last_cycle_end_time if last_cycle_end_time else current_decision_time - timedelta(hours=48)
-                _word_controldevice = device["control_payload"]["deviceItems"]
-
-                # 查询历史极值
-                max_tmp, min_tmp = get_tmp_extremes(device["press_pv_item"], start_query_time, current_decision_time, _word_controldevice)
-
-                # 调用DQN模型获取决策建议
-                logger.info("调用DQN决策模型")
-                uf_bw_dict = run_uf_DQN_decide(uf_params, average_value)
-                logger.info(f"模型决策结果 {uf_bw_dict}")
-
-                # 获取当前PLC参数
-                prod_time = get_device_value(device["production_time_payload"], name) or 3800  # 产水时间 默认3800
-                bw_time = get_device_value(device["backwashing_payload"], name) or 100  # 反洗时间 默认100
-                bw_per_ceb = get_device_value(device["ceb_payload"], name) or 40  # CEB 次数时间 默认40
-
-                # 生成PLC指令
-                L_s, t_bw_s = generate_plc_instructions(
-                    prod_time, bw_time,  # 产水时间 反洗时间
-                    model_prev_L_s, model_prev_t_bw_s,  # 过滤时间 反洗时间 上一轮
-                    uf_bw_dict["L_s"], uf_bw_dict["t_bw_s"]  # 过滤时间 反洗时间 决策建议
-                )
-                
-                # 计算运行指标
-                logger.info(f"计算运行指标 TMP={average_value} L_s={L_s} t_bw_s={t_bw_s}")
-                metrics = calc_uf_cycle_metrics(uf_params, average_value, max_tmp, min_tmp, L_s, t_bw_s)  # 计算运行指标
-                ceb_backwash_frequency = int(metrics["k_bw_per_ceb"])
-                
-                # 发送决策结果,并获取服务器返回的 use_model 状态
-                use_model_status = send_decision_to_callback(
-                    type_name=name,  # 设备名称
-                    water_production_time=int(L_s),  # 过滤时间
-                    physical_backwash=int(t_bw_s),  # 反洗时间
-                    ceb_backwash_frequency=ceb_backwash_frequency,  # 化学反洗频率
-                    duration_system=int(prod_time),  # 系统运行时间
-                    tmp_action=average_value,  # TMP动作
-                    recovery_rate=metrics["recovery"],  # 回收率
-                    ton_water_energy_kWh=metrics['ton_water_energy_kWh_per_m3'],  # 吨水电耗
-                    max_permeability=metrics['max_permeability'],  # 最高渗透率
-                    daily_prod_time_h=metrics['daily_prod_time_h'],  # 日均产水时间
-                    ctime=current_decision_time.strftime(DATETIME_FORMAT)  # 时间
-                )
-
-                # 判断是否下发PLC指令,根据服务器返回的 use_model 状态
-                if use_model_status == 1:
-                    logger.info("模型开关已开启,检查PLC指令")
-                    
-                    # 记录当前PLC值和模型决策值
-                    current_plc_values = {
-                        'prod_time': int(prod_time),
-                        'bw_time': int(bw_time),
-                        'bw_per_ceb': int(bw_per_ceb)
-                    }
-                    model_decision_values = {
-                        'L_s': int(L_s),
-                        't_bw_s': int(t_bw_s),
-                        'ceb_frequency': int(ceb_backwash_frequency)
-                    }
-                    
-                    logger.info(f"当前PLC值: 产水时间={current_plc_values['prod_time']}, 反洗时间={current_plc_values['bw_time']}, CEB次数={current_plc_values['bw_per_ceb']}")
-                    logger.info(f"模型决策值: L_s={model_decision_values['L_s']}, t_bw_s={model_decision_values['t_bw_s']}, ceb_frequency={model_decision_values['ceb_frequency']}")
-                    
-                    # 检查每个参数是否需要下发指令
-                    
-                    # 检查产水时间是否需要更新
-                    if current_plc_values['prod_time'] != model_decision_values['L_s']:
-                        logger.info(f"产水时间需要更新: {current_plc_values['prod_time']} -> {model_decision_values['L_s']}")
-                        send_plc_update(name, device["production_time_payload"]["deviceItems"], str(prod_time), str(model_decision_values['L_s']), 1)
-                    else:
-                        logger.info(f"产水时间无需更新: {current_plc_values['prod_time']}")
-                    
-                    # 检查反洗时间是否需要更新
-                    if current_plc_values['bw_time'] != model_decision_values['t_bw_s']:
-                        logger.info(f"反洗时间需要更新: {current_plc_values['bw_time']} -> {model_decision_values['t_bw_s']}")
-                        send_plc_update(name, device["backwashing_payload"]["deviceItems"], str(bw_time), str(model_decision_values['t_bw_s']), 4)
-                    else:
-                        logger.info(f"反洗时间无需更新: {current_plc_values['bw_time']}")
-                    
-                    # 检查CEB次数是否需要更新
-                    if current_plc_values['bw_per_ceb'] != model_decision_values['ceb_frequency']:
-                        logger.info(f"CEB次数需要更新: {current_plc_values['bw_per_ceb']} -> {model_decision_values['ceb_frequency']}")
-                        send_plc_update(name, device["ceb_payload"]["deviceItems"], str(bw_per_ceb), str(model_decision_values['ceb_frequency']), 2)
-                    else:
-                        logger.info(f"CEB次数无需更新: {current_plc_values['bw_per_ceb']}")
-
-                elif use_model_status == 0:
-                    logger.info("服务器返回 use_model=0,模型开关已关闭,跳过PLC指令")
-                else:
-                    logger.warning("回调发送失败,无法获取 use_model 状态,跳过PLC指令")
-
-                # 保存运行状态
-                model_prev_L_s = L_s  # 过滤时间 上一轮
-                model_prev_t_bw_s = t_bw_s  # 反洗时间 上一轮
-                last_cycle_end_time = current_decision_time  # 上次运行结束时间 
-                
-                # 获取配置的TMP历史记录数量
-                current_config = get_current_config()
-                tmp_history_count = current_config.get('system', {}).get('tmp_history_count', 5)
-                
-                # 从最新的内存缓存中读取当前设备状态(确保获取最新的历史记录)
-                current_device_state = device_states.get(name, {})
-                recent_tmp_values = current_device_state.get('recent_tmp_values', [])
-                recent_tmp_values.append(round(average_value, 4))
-                # 只保留最近N次
-                recent_tmp_values = recent_tmp_values[-tmp_history_count:]
-
-                state_to_save = {
-                    'model_prev_L_s': model_prev_L_s,  # 过滤时间 上一轮
-                    'model_prev_t_bw_s': model_prev_t_bw_s,  # 反洗时间 上一轮
-                    'last_cycle_end_time': last_cycle_end_time.strftime(DATETIME_FORMAT),  # 上次运行结束时间
-                    'recent_tmp_values': recent_tmp_values  # 最近N次TMP平均值(新增)
-                }
-                save_device_state(name, state_to_save)  # 保存设备状态 
-                logger.info(f"状态保存完成 下次查询起始时间 {last_cycle_end_time.strftime(DATETIME_FORMAT)}")
-                logger.info(f"最近{tmp_history_count}次TMP记录: {recent_tmp_values}") 
-
-            # 阶段4: 等待重置
-            logger.info(f"等待重置 控制字需重新等于 {TRIGGER_VALUE}")
-            # 等待一段时间,确保不是立即开始新一轮
-            time.sleep(5)  # 等待5秒
-            while True:
-                control_value = get_device_value(device["control_payload"], name)  # 控制字 
-                if control_value is not None and int(control_value) == TRIGGER_VALUE:  # 控制字 等于 触发值
-                    logger.info("完整周期结束,开始新一轮")
-                    break
-                time.sleep(POLL_INTERVAL)
-
-            logger.info(f"{name} 本轮完成\n")
-
-        except Exception as e:
-            logger.critical(f"监控循环异常 {e}", exc_info=True)
-            logger.info("等待60秒后重试")
-            time.sleep(60)
-
-
-# 程序主入口
-
-def main():
-    """
-    主函数
-    
-    功能:
-    1. 加载设备历史状态
-    2. 为每个设备启动独立监控线程
-    3. 保持主线程运行
-    """
-    logger.info("========================================")
-    logger.info("超滤并行监控服务启动")
-    logger.info("========================================")
-
-    # 加载设备历史状态
-    load_device_states()
-
-    # 为每个设备创建监控线程
-    threads = []
-    for device_config in DEVICE_SEQUENCE:
-        thread = threading.Thread(target=monitor_device, args=(device_config,), daemon=True)
-        threads.append(thread)
-        thread.start()
-        logger.info(f"设备 {device_config['name']} 监控线程已启动")
-
-    # 保持主线程运行
-    try:
-        while any(t.is_alive() for t in threads):
-            time.sleep(1)
-    except KeyboardInterrupt:
-        logger.info("检测到中断信号,程序退出")
-
-
-def test_get_tmp_extremes():
-    """
-    测试get_tmp_extremes函数的API调用
-    """
-    print("=" * 50)
-    print("测试get_tmp_extremes API调用")
-    print("=" * 50)
-    
-    # 设置测试参数
-    test_item_name = "C.M.UF1_DB@press_PV"  # 测试数据项
-    test_word_control = "C.M.UF1_DB@word_control"  # 测试控制字段
-    
-    # 设置测试时间范围(最近24小时)
-    end_time = datetime.now()
-    start_time = end_time - timedelta(hours=24)
-    
-    print(f"测试参数:")
-    print(f"  数据项: {test_item_name}")
-    print(f"  控制字段: {test_word_control}")
-    print(f"  开始时间: {start_time.strftime(DATETIME_FORMAT)}")
-    print(f"  结束时间: {end_time.strftime(DATETIME_FORMAT)}")
-    print()
-    
-    try:
-        # 调用函数
-        max_val, min_val = get_tmp_extremes(test_item_name, start_time, end_time, test_word_control)
-        
-        print("测试结果:")
-        if max_val is not None and min_val is not None:
-            print(f"  API调用成功")
-            print(f"  最大值: {max_val}")
-            print(f"  最小值: {min_val}")
-        else:
-            print(f"  API调用失败或未返回有效数据")
-            print(f"  最大值: {max_val}")
-            print(f"  最小值: {min_val}")
-            
-    except Exception as e:
-        print(f" 测试过程中发生异常: {e}")
-    
-    print("=" * 50)
-
-
-if __name__ == "__main__":
-    # 运行测试用例
-    # test_get_tmp_extremes()
-    
-    # 运行主程序
-    main()

+ 0 - 0
models/uf-rl/ultrafiltration_model/monitor_service.log


+ 0 - 120
models/uf-rl/ultrafiltration_model/plc_test_dry_run.py

@@ -1,120 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-"""
-PLC指令模拟测试工具
-仅显示请求详情,不实际发送,用于调试和验证
-"""
-
-import json
-import hashlib
-import time
-
-
-def load_config(config_file='config.json'):
-    """加载配置文件"""
-    with open(config_file, 'r', encoding='utf-8') as f:
-        return json.load(f)
-
-
-def generate_md5_signature(record_data, secret, timestamp):
-    """生成MD5签名"""
-    cal_str = f"{record_data}{secret}{timestamp}"
-    cal_md5 = hashlib.md5(cal_str.encode('utf-8')).hexdigest()
-    return cal_md5.upper()
-
-
-def prepare_plc_request(device_name, item, old_value, new_value, command_type):
-    """
-    准备PLC请求参数
-    
-    参数:
-        device_name: 设备名称
-        item: 参数项名称
-        old_value: 当前值
-        new_value: 目标值
-        command_type: 命令类型
-        
-    返回:
-        请求信息字典
-    """
-    config = load_config()
-    
-    PLC_URL = config['api']['base_url'] + config['api']['plc_endpoint']
-    PROJECT_ID = config['scada']['project_id']
-    SCADA_SECRET = config['scada']['secret']
-    
-    timestamp = int(time.time())
-    
-    record_dict = {
-        "project_id": PROJECT_ID,
-        "item": item,
-        "old_value": old_value,
-        "new_value": new_value,
-        "command_type": command_type
-    }
-    record_data = json.dumps(record_dict, separators=(',', ':'))
-    
-    signature = generate_md5_signature(record_data, SCADA_SECRET, timestamp)
-    full_url = f"{PLC_URL}?sign={signature}&timestamp={timestamp}"
-    payload = [record_dict]
-    
-    return {
-        'url': full_url,
-        'payload': payload,
-        'signature_data': record_data,
-        'signature': signature,
-        'timestamp': timestamp,
-        'secret': SCADA_SECRET
-    }
-
-if __name__ == "__main__":
-    print("=== PLC指令测试 - 模拟运行 ===")
-    print()
-    
-    # 测试参数
-    device_name = "UF2"
-    item = "C.M.UF2_DB@time_production"
-    old_value = "3800"
-    new_value = "3801"
-    command_type = 1
-    
-    # 准备请求
-    request_info = prepare_plc_request(device_name, item, old_value, new_value, command_type)
-    
-    print(f"📋 测试场景:")
-    print(f"   设备: {device_name}")
-    print(f"   参数项: {item}")
-    print(f"   当前值: {old_value}")
-    print(f"   目标值: {new_value}")
-    print(f"   命令类型: {command_type}")
-    print()
-    
-    print(f"🔧 请求详情:")
-    print(f"   完整URL: {request_info['url']}")
-    print()
-    
-    print(f"📝 请求头:")
-    print(f"   Content-Type: application/json")
-    print()
-    
-    print(f"📦 请求体:")
-    print(json.dumps(request_info['payload'], indent=4, ensure_ascii=False))
-    print()
-    
-    print(f"🔐 签名计算:")
-    print(f"   SCADA密钥: {request_info['secret']}")
-    print(f"   时间戳: {request_info['timestamp']}")
-    print(f"   签名原数据: {request_info['signature_data']}")
-    print(f"   计算字符串: {request_info['signature_data']}{request_info['secret']}{request_info['timestamp']}")
-    print(f"   MD5签名: {request_info['signature']}")
-    print()
-    
-    print(f"✨ curl命令:")
-    curl_cmd = f"""curl -X POST '{request_info['url']}' \\
-  -H 'Content-Type: application/json' \\
-  -d '{json.dumps(request_info['payload'], separators=(',', ':'), ensure_ascii=False)}'"""
-    print(curl_cmd)
-    print()
-    
-    print("🚀 这就是将要发送给PLC系统的完整请求!")
-    print("   如果看起来正确,您可以运行 test_plc_update.py 来实际发送。")

+ 0 - 18
models/uf-rl/ultrafiltration_model/requirements.txt

@@ -1,18 +0,0 @@
-# 超滤系统强化学习决策系统 - 依赖包
-
-# 科学计算
-numpy>=1.23.0
-
-# 深度学习框架
-torch>=2.0.0
-
-# 强化学习框架
-gymnasium>=1.2.0
-stable-baselines3>=2.6.0
-
-# 数据库连接
-pymysql>=1.0.0
-
-# HTTP请求
-requests>=2.28.0
-

+ 0 - 73
models/uf-rl/ultrafiltration_model/save_uf_models.py

@@ -1,73 +0,0 @@
-import torch
-import numpy as np
-
-
-class TMPIncreaseModel(torch.nn.Module):
-    """
-    跨膜压差上升模型
-    
-    计算过滤阶段的TMP增长量
-    """
-    def __init__(self):
-        super().__init__()
-    
-    def forward(self, p, L_h):
-        """
-        计算TMP增长量
-        
-        参数:
-            p: 系统参数对象
-            L_h: 过滤时长(小时)
-            
-        返回:
-            TMP增长量
-        """
-        return float(p.alpha * (p.q_UF ** p.belta) * L_h)
-
-
-class TMPDecreaseModel(torch.nn.Module):
-    """
-    跨膜压差恢复模型
-    
-    计算反洗阶段的TMP恢复比例
-    """
-    def __init__(self):
-        super().__init__()
-    
-    def forward(self, p, L_s, t_bw_s):
-        """
-        计算反洗恢复比例
-        
-        参数:
-            p: 系统参数对象
-            L_s: 过滤时长(秒)
-            t_bw_s: 反洗时长(秒)
-            
-        返回:
-            TMP恢复比例(0到1之间)
-        """
-        L = max(float(L_s), 1.0)
-        t = max(float(t_bw_s), 1e-6)
-        
-        # 计算恢复比例上限(随过滤时长增加而降低)
-        upper_L = p.phi_bw_min + (p.phi_bw_max - p.phi_bw_min) * np.exp(-L / p.L_ref_s)
-        
-        # 计算时间增益因子(反洗时间越长,恢复越好)
-        time_gain = 1.0 - np.exp(-(t / p.tau_bw_s) ** p.gamma_t)
-        
-        # 综合计算恢复比例
-        phi = upper_L * time_gain
-        
-        return float(np.clip(phi, 0.0, 0.999))
-
-
-if __name__ == "__main__":
-    # 创建模型实例
-    model_fp = TMPIncreaseModel()
-    model_bw = TMPDecreaseModel()
-
-    # 保存模型参数
-    torch.save(model_fp.state_dict(), "uf_fp.pth")
-    torch.save(model_bw.state_dict(), "uf_bw.pth")
-
-    print("模型已保存 uf_fp.pth uf_bw.pth")

+ 0 - 393
models/uf-rl/ultrafiltration_model/test_callback.py

@@ -1,393 +0,0 @@
-"""
-send_decision_to_callback 函数测试脚本
-
-测试功能:
-1. 测试正常的决策数据发送
-2. 测试不同参数组合
-3. 模拟实际使用场景
-4. 测试PLC指令下发(当use_model_status=1时)
-"""
-
-import json
-from datetime import datetime
-from loop_main import send_decision_to_callback, send_plc_update, DATETIME_FORMAT, load_config, get_device_value
-
-def test_basic_callback():
-    """
-    基础测试:发送一组标准的决策数据,如果use_model_status=1则测试PLC指令下发
-    """
-    print("=" * 60)
-    print("测试1: 基础回调测试")
-    print("=" * 60)
-    
-    # 模拟决策结果数据
-    device_name = "UF1"  # 设备名称
-    test_data = {
-        "type_name": device_name,
-        "water_production_time": 4201,  # 产水时间(秒)
-        "physical_backwash": 120,  # 物理反洗时间(秒)
-        "ceb_backwash_frequency": 35,  # CEB反洗频率
-        "duration_system": 3800,  # 系统运行时间(秒)
-        "tmp_action": 0.045,  # TMP动作值
-        "recovery_rate": 0.95,  # 回收率
-        "ton_water_energy_kWh": 0.28,  # 吨水电耗
-        "max_permeability": 850.5,  # 最高渗透率
-        "daily_prod_time_h": 22.5,  # 日均产水时间(小时)
-        "ctime": datetime.now().strftime(DATETIME_FORMAT)  # 当前时间
-    }
-    
-    print("发送数据:")
-    print(json.dumps(test_data, indent=2, ensure_ascii=False))
-    print()
-    
-    # 调用函数
-    try:
-        use_model_status = send_decision_to_callback(**test_data)
-        print(f"返回的 use_model 状态: {use_model_status}")
-        
-        if use_model_status == 1:
-            print("测试结果: 成功 - 模型开关已开启")
-            print()
-            print("-" * 60)
-            print("开始测试PLC指令下发")
-            print("-" * 60)
-            
-            # 加载配置获取设备信息
-            config = load_config()
-            device_config = None
-            for dev in config['devices']:
-                if dev['name'] == device_name:
-                    device_config = dev
-                    break
-            
-            if device_config:
-                # 先读取当前PLC的产水时间值
-                print("正在读取PLC当前产水时间...")
-                current_prod_time = get_device_value(device_config["production_time_payload"], device_name)
-                
-                if current_prod_time is not None:
-                    old_prod_time = str(int(current_prod_time))
-                    new_prod_time = str(int(current_prod_time) + 1)  # 当前值+1
-                    prod_time_item = device_config["production_time_payload"]["deviceItems"]
-                    
-                    print(f"✓ 读取成功: 当前产水时间 = {old_prod_time}")
-                    print()
-                    print(f"测试参数:")
-                    print(f"  设备名称: {device_name}")
-                    print(f"  参数项: {prod_time_item}")
-                    print(f"  旧值: {old_prod_time} (从PLC读取)")
-                    print(f"  新值: {new_prod_time} (旧值+1)")
-                    print(f"  指令类型: 1 (产水时间)")
-                    print()
-                    
-                    # 发送PLC指令
-                    print("正在发送PLC指令...")
-                    plc_result = send_plc_update(
-                        device_name=device_name,
-                        item=prod_time_item,
-                        old_value=old_prod_time,
-                        new_value=new_prod_time,
-                        command_type=1  # 产水时间的指令类型
-                    )
-                    
-                    if plc_result:
-                        print("✓ PLC指令发送成功")
-                    else:
-                        print("✗ PLC指令发送失败")
-                else:
-                    print("✗ 错误:无法读取当前产水时间,跳过PLC指令测试")
-            else:
-                print(f"⚠ 警告: 未找到设备 {device_name} 的配置")
-            
-            print("-" * 60)
-            
-        elif use_model_status == 0:
-            print("测试结果: 成功 - 模型开关已关闭")
-            print("跳过PLC指令测试")
-        else:
-            print("测试结果: 失败 - 未能获取 use_model 状态")
-    except Exception as e:
-        print(f"测试异常: {e}")
-        import traceback
-        traceback.print_exc()
-    
-    print("=" * 60)
-    print()
-
-
-def test_minimal_callback():
-    """
-    最小参数测试:只传递必要的参数
-    """
-    print("=" * 60)
-    print("测试2: 最小参数测试")
-    print("=" * 60)
-    
-    # 最小数据集
-    test_data = {
-        "type_name": "UF2",
-        "water_production_time": 3500,
-        "physical_backwash": 100,
-        "ctime": datetime.now().strftime(DATETIME_FORMAT)
-    }
-    
-    print("发送数据:")
-    print(json.dumps(test_data, indent=2, ensure_ascii=False))
-    print()
-    
-    try:
-        use_model_status = send_decision_to_callback(**test_data)
-        print(f"返回的 use_model 状态: {use_model_status}")
-        if use_model_status == 1:
-            print("测试结果: 成功 - 模型开关已开启")
-        elif use_model_status == 0:
-            print("测试结果: 成功 - 模型开关已关闭")
-        else:
-            print("测试结果: 失败 - 未能获取 use_model 状态")
-    except Exception as e:
-        print(f"测试异常: {e}")
-    
-    print("=" * 60)
-    print()
-
-
-def test_multiple_devices():
-    """
-    多设备测试:模拟多个设备的决策数据发送
-    """
-    print("=" * 60)
-    print("测试3: 多设备测试")
-    print("=" * 60)
-    
-    devices = ["UF1", "UF2", "UF3"]
-    
-    for device_name in devices:
-        print(f"\n发送设备 {device_name} 的决策数据...")
-        
-        test_data = {
-            "type_name": device_name,
-            "water_production_time": 3600 + (devices.index(device_name) * 100),
-            "physical_backwash": 100 + (devices.index(device_name) * 10),
-            "ceb_backwash_frequency": 40 - devices.index(device_name),
-            "duration_system": 3800,
-            "tmp_action": 0.040 + (devices.index(device_name) * 0.005),
-            "recovery_rate": 0.95,
-            "ton_water_energy_kWh": 0.25 + (devices.index(device_name) * 0.02),
-            "max_permeability": 850.0,
-            "daily_prod_time_h": 22.0,
-            "ctime": datetime.now().strftime(DATETIME_FORMAT)
-        }
-        
-        try:
-            use_model_status = send_decision_to_callback(**test_data)
-            print(f"{device_name} 返回的 use_model 状态: {use_model_status}")
-            if use_model_status == 1:
-                print(f"{device_name} 测试结果: 成功 - 模型开关已开启")
-            elif use_model_status == 0:
-                print(f"{device_name} 测试结果: 成功 - 模型开关已关闭")
-            else:
-                print(f"{device_name} 测试结果: 失败 - 未能获取 use_model 状态")
-        except Exception as e:
-            print(f"{device_name} 测试异常: {e}")
-    
-    print("\n" + "=" * 60)
-    print()
-
-
-def test_custom_scenario():
-    """
-    自定义场景测试:可以根据需要修改参数
-    """
-    print("=" * 60)
-    print("测试4: 自定义场景测试")
-    print("=" * 60)
-    
-    # 这里可以自定义测试参数
-    test_data = {
-        "type_name": "UF1",  # 修改设备名称
-        "water_production_time": 4000,  # 修改产水时间
-        "physical_backwash": 150,  # 修改反洗时间
-        "ceb_backwash_frequency": 30,  # 修改CEB频率
-        "duration_system": 4200,
-        "tmp_action": 0.055,
-        "recovery_rate": 0.92,
-        "ton_water_energy_kWh": 0.30,
-        "max_permeability": 800.0,
-        "daily_prod_time_h": 21.5,
-        "ctime": datetime.now().strftime(DATETIME_FORMAT)
-    }
-    
-    print("自定义测试数据:")
-    print(json.dumps(test_data, indent=2, ensure_ascii=False))
-    print()
-    
-    try:
-        use_model_status = send_decision_to_callback(**test_data)
-        print(f"返回的 use_model 状态: {use_model_status}")
-        if use_model_status == 1:
-            print("测试结果: 成功 - 模型开关已开启")
-        elif use_model_status == 0:
-            print("测试结果: 成功 - 模型开关已关闭")
-        else:
-            print("测试结果: 失败 - 未能获取 use_model 状态")
-    except Exception as e:
-        print(f"测试异常: {e}")
-    
-    print("=" * 60)
-    print()
-
-
-def test_plc_update_with_callback():
-    """
-    测试5: 回调+PLC指令测试(完整流程)
-    """
-    print("=" * 60)
-    print("测试5: 回调+PLC指令完整流程测试")
-    print("=" * 60)
-    
-    device_name = "UF1"
-    
-    # 第一步:发送回调数据
-    test_data = {
-        "type_name": device_name,
-        "water_production_time": 4201,  # 决策建议的产水时间
-        "physical_backwash": 120,
-        "ceb_backwash_frequency": 35,
-        "duration_system": 3800,
-        "tmp_action": 0.045,
-        "recovery_rate": 0.95,
-        "ton_water_energy_kWh": 0.28,
-        "max_permeability": 850.5,
-        "daily_prod_time_h": 22.5,
-        "ctime": datetime.now().strftime(DATETIME_FORMAT)
-    }
-    
-    print("步骤1: 发送决策数据到回调接口")
-    print("=" * 60)
-    print(json.dumps(test_data, indent=2, ensure_ascii=False))
-    print()
-    
-    try:
-        # use_model_status = 1
-        use_model_status = send_decision_to_callback(**test_data)
-        print(f"✓ 回调响应: use_model_status = {use_model_status}")
-        print()
-        
-        # 第二步:根据返回状态决定是否发送PLC指令
-        print("步骤2: 根据use_model_status决定是否下发PLC指令")
-        print("=" * 60)
-        
-        if use_model_status == 1:
-            print("✓ 模型开关已开启,准备下发PLC指令")
-            print()
-            
-            # 加载配置
-            config = load_config()
-            device_config = None
-            for dev in config['devices']:
-                if dev['name'] == device_name:
-                    device_config = dev
-                    break
-            
-            if device_config:
-                print("步骤3: 读取PLC当前产水时间")
-                print("=" * 60)
-                
-                # 先读取当前PLC的产水时间值
-                current_prod_time = get_device_value(device_config["production_time_payload"], device_name)
-                
-                if current_prod_time is not None:
-                    old_prod_time = str(int(current_prod_time))
-                    new_prod_time = str(int(current_prod_time) - 1)  # 当前值+1
-                    prod_time_item = device_config["production_time_payload"]["deviceItems"]
-                    
-                    print(f"✓ 读取成功: 当前产水时间 = {old_prod_time} 秒")
-                    print()
-                    
-                    print("步骤4: 下发PLC指令(修改产水时间)")
-                    print("=" * 60)
-                    print(f"设备: {device_name}")
-                    print(f"参数: {prod_time_item}")
-                    print(f"变更: {old_prod_time} -> {new_prod_time} (当前值+1)")
-                    print(f"类型: command_type=1 (产水时间)")
-                    print()
-                    
-                    # 发送PLC指令
-                    plc_result = send_plc_update(
-                        device_name=device_name,
-                        item=prod_time_item,
-                        old_value=old_prod_time,
-                        new_value=new_prod_time,
-                        command_type=1
-                    )
-                    
-                    print()
-                    if plc_result:
-                        print("✓✓✓ 测试成功:完整流程执行完毕")
-                        print("  1. 回调接口调用成功")
-                        print("  2. use_model_status=1")
-                        print("  3. 从PLC读取当前值成功")
-                        print("  4. PLC指令发送成功")
-                    else:
-                        print("✗ 测试失败:PLC指令发送失败")
-                else:
-                    print("✗ 错误:无法读取当前产水时间")
-                    print("测试失败:无法获取PLC当前值")
-            else:
-                print(f"✗ 错误:未找到设备 {device_name} 的配置")
-        
-        elif use_model_status == 0:
-            print("⚠ 模型开关已关闭,跳过PLC指令下发")
-            print("  这是正常行为(use_model_status=0)")
-        
-        else:
-            print("✗ 测试失败:未能获取有效的 use_model_status")
-            
-    except Exception as e:
-        print(f"✗ 测试异常: {e}")
-        import traceback
-        traceback.print_exc()
-    
-    print()
-    print("=" * 60)
-    print()
-
-
-def main():
-    """
-    主测试函数
-    """
-    print("\n")
-    print("*" * 60)
-    print("send_decision_to_callback + PLC指令 测试")
-    print("*" * 60)
-    print()
-    
-    # 运行各个测试
-    # 默认运行基础回调测试(包含PLC指令测试)
-    # test_basic_callback()
-    
-    # 取消注释下面的行来运行其他测试
-    # test_minimal_callback()
-    # test_multiple_devices()
-    # test_custom_scenario()
-    
-    # 完整流程测试(推荐使用)
-    test_plc_update_with_callback()
-    
-    print("\n")
-    print("*" * 60)
-    print("测试完成")
-    print("*" * 60)
-    print()
-    print("提示:")
-    print("  - 如果 use_model_status=1,会自动发送PLC指令")
-    print("  - 测试会先从PLC读取当前产水时间,然后修改为当前值+1")
-    print("  - 这样确保old_value与PLC实际值匹配,避免'设置scada变量失败'错误")
-    print("  - 可以运行 test_plc_update_with_callback() 查看完整流程")
-    print()
-
-
-if __name__ == "__main__":
-    main()
-

+ 0 - 120
models/uf-rl/ultrafiltration_model/test_plc_update.py

@@ -1,120 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-"""
-PLC指令测试工具
-用于测试PLC参数更新请求的实际发送
-"""
-
-import requests
-import json
-import hashlib
-import time
-
-
-def load_config(config_file='config.json'):
-    """加载配置文件"""
-    with open(config_file, 'r', encoding='utf-8') as f:
-        return json.load(f)
-
-
-def generate_md5_signature(record_data, secret, timestamp):
-    """生成MD5签名"""
-    cal_str = f"{record_data}{secret}{timestamp}"
-    cal_md5 = hashlib.md5(cal_str.encode('utf-8')).hexdigest()
-    return cal_md5.upper()
-
-
-def send_plc_update_test(device_name, item, old_value, new_value, command_type):
-    """
-    发送PLC参数更新测试
-    
-    参数:
-        device_name: 设备名称
-        item: 参数项名称
-        old_value: 当前值
-        new_value: 目标值
-        command_type: 命令类型
-        
-    返回:
-        是否发送成功
-    """
-    config = load_config()
-    
-    PLC_URL = config['api']['base_url'] + config['api']['plc_endpoint']
-    PROJECT_ID = config['scada']['project_id']
-    SCADA_SECRET = config['scada']['secret']
-    
-    timestamp = int(time.time())
-    
-    record_data = json.dumps({
-        "project_id": PROJECT_ID,
-        "item": item,
-        "old_value": old_value,
-        "new_value": new_value,
-        "command_type": command_type
-    }, separators=(',', ':'))
-    
-    signature = generate_md5_signature(record_data, SCADA_SECRET, timestamp)
-    url = f"{PLC_URL}?sign={signature}&timestamp={timestamp}"
-    
-    payload = [{
-        "project_id": PROJECT_ID,
-        "item": item,
-        "old_value": old_value,
-        "new_value": new_value,
-        "command_type": command_type
-    }]
-    
-    print(f"PLC测试")
-    print(f"设备 {device_name}")
-    print(f"参数 {item}")
-    print(f"旧值 {old_value}")
-    print(f"新值 {new_value}")
-    print(f"类型 {command_type}")
-    print(f"时间戳 {timestamp}")
-    print(f"URL {url}")
-    print(f"请求体 {json.dumps(payload, indent=2, ensure_ascii=False)}")
-    print(f"签名数据 {record_data}")
-    print(f"签名 {signature}")
-    print("-" * 50)
-    
-    try:
-        headers = {"Content-Type": "application/json"}
-        response = requests.post(url, headers=headers, json=payload, timeout=15)
-        
-        print(f"响应状态码 {response.status_code}")
-        print(f"响应头 {dict(response.headers)}")
-        
-        try:
-            response_json = response.json()
-            print(f"响应JSON {json.dumps(response_json, indent=2, ensure_ascii=False)}")
-        except:
-            print(f"响应文本 {response.text}")
-            
-        response.raise_for_status()
-        print("请求发送成功")
-        return True
-        
-    except requests.exceptions.RequestException as e:
-        print(f"请求失败 {e}")
-        return False
-    except Exception as e:
-        print(f"未知错误 {e}")
-        return False
-
-
-if __name__ == "__main__":
-    # 测试配置
-    device_name = "UF2"
-    item = "C.M.UF2_DB@time_production"
-    old_value = "3800"
-    new_value = "3801"
-    command_type = 1
-    
-    print("开始PLC指令测试")
-    success = send_plc_update_test(device_name, item, old_value, new_value, command_type)
-    
-    if success:
-        print("\n测试完成 请检查PLC系统")
-    else:
-        print("\n测试失败 请检查网络和配置")

二進制
models/uf-rl/ultrafiltration_model/uf_bw.pth


二進制
models/uf-rl/ultrafiltration_model/uf_dqn_tensorboard/DQN_lr0.0001_buf2000_bs16_gamma0.95_exp0.6_default_20251017-114220/DQN_1/events.out.tfevents.1760672541.MacBook-Pro-2.local.85900.0


二進制
models/uf-rl/ultrafiltration_model/uf_fp.pth


Some files were not shown because too many files changed in this diff