| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- PLC指令模拟测试工具
- 仅显示请求详情,不实际发送,用于调试和验证
- """
- import json
- import hashlib
- import time
- def load_config(config_file='config.json'):
- """加载配置文件"""
- with open(config_file, 'r', encoding='utf-8') as f:
- return json.load(f)
- def generate_md5_signature(record_data, secret, timestamp):
- """生成MD5签名"""
- cal_str = f"{record_data}{secret}{timestamp}"
- cal_md5 = hashlib.md5(cal_str.encode('utf-8')).hexdigest()
- return cal_md5.upper()
- def prepare_plc_request(device_name, item, old_value, new_value, command_type):
- """
- 准备PLC请求参数
-
- 参数:
- device_name: 设备名称
- item: 参数项名称
- old_value: 当前值
- new_value: 目标值
- command_type: 命令类型
-
- 返回:
- 请求信息字典
- """
- config = load_config()
-
- PLC_URL = config['api']['base_url'] + config['api']['plc_endpoint']
- PROJECT_ID = config['scada']['project_id']
- SCADA_SECRET = config['scada']['secret']
-
- timestamp = int(time.time())
-
- record_dict = {
- "project_id": PROJECT_ID,
- "item": item,
- "old_value": old_value,
- "new_value": new_value,
- "command_type": command_type
- }
- record_data = json.dumps(record_dict, separators=(',', ':'))
-
- signature = generate_md5_signature(record_data, SCADA_SECRET, timestamp)
- full_url = f"{PLC_URL}?sign={signature}×tamp={timestamp}"
- payload = [record_dict]
-
- return {
- 'url': full_url,
- 'payload': payload,
- 'signature_data': record_data,
- 'signature': signature,
- 'timestamp': timestamp,
- 'secret': SCADA_SECRET
- }
- if __name__ == "__main__":
- print("=== PLC指令测试 - 模拟运行 ===")
- print()
-
- # 测试参数
- device_name = "UF2"
- item = "C.M.UF2_DB@time_production"
- old_value = "3800"
- new_value = "3801"
- command_type = 1
-
- # 准备请求
- request_info = prepare_plc_request(device_name, item, old_value, new_value, command_type)
-
- print(f"📋 测试场景:")
- print(f" 设备: {device_name}")
- print(f" 参数项: {item}")
- print(f" 当前值: {old_value}")
- print(f" 目标值: {new_value}")
- print(f" 命令类型: {command_type}")
- print()
-
- print(f"🔧 请求详情:")
- print(f" 完整URL: {request_info['url']}")
- print()
-
- print(f"📝 请求头:")
- print(f" Content-Type: application/json")
- print()
-
- print(f"📦 请求体:")
- print(json.dumps(request_info['payload'], indent=4, ensure_ascii=False))
- print()
-
- print(f"🔐 签名计算:")
- print(f" SCADA密钥: {request_info['secret']}")
- print(f" 时间戳: {request_info['timestamp']}")
- print(f" 签名原数据: {request_info['signature_data']}")
- print(f" 计算字符串: {request_info['signature_data']}{request_info['secret']}{request_info['timestamp']}")
- print(f" MD5签名: {request_info['signature']}")
- print()
-
- print(f"✨ curl命令:")
- curl_cmd = f"""curl -X POST '{request_info['url']}' \\
- -H 'Content-Type: application/json' \\
- -d '{json.dumps(request_info['payload'], separators=(',', ':'), ensure_ascii=False)}'"""
- print(curl_cmd)
- print()
-
- print("🚀 这就是将要发送给PLC系统的完整请求!")
- print(" 如果看起来正确,您可以运行 test_plc_update.py 来实际发送。")
|