uf.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package handler
  2. import (
  3. "GtDataStore/app/cmd/organization/internal/logic/job"
  4. "GtDataStore/app/cmd/organization/internal/svc"
  5. "GtDataStore/app/model"
  6. "context"
  7. "fmt"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. "strconv"
  10. "time"
  11. )
  12. func DeviceUf(ctx context.Context, task *job.Task, technologyName string) error {
  13. // 1. 查询所有的设备
  14. devices, err := findDeviceUf(task.Job.SvcCtx, int64(task.Id))
  15. if err != nil {
  16. return err
  17. }
  18. // 2. 获得点位信息
  19. //var wg sync.WaitGroup
  20. //wg.Add(len(devices))
  21. //for _, device := range devices {
  22. // device := device
  23. // go func() {
  24. // defer wg.Done()
  25. // if err := device.Items.FillCurrentValue(); err != nil {
  26. // logx.Errorf("DeviceUf device.Items.FillCurrentValue error: %s", err.Error())
  27. // }
  28. // }()
  29. //}
  30. //
  31. //wg.Wait()
  32. // 3. 转换为存储对象
  33. workings, err := transDeviceUfData(task.Job.SvcCtx, devices)
  34. if err != nil {
  35. return err
  36. }
  37. // 4. 批量入库
  38. if _, err := task.Job.SvcCtx.WorkingUf.MultiInsert(context.Background(), workings); err != nil {
  39. logx.Errorf("DeviceUf task.Job.SvcCtx.WorkingUf.MultiInsert error: %s", err.Error())
  40. }
  41. return nil
  42. }
  43. func findDeviceUf(svcCtx *svc.ServiceContext, projectId int64) ([]model.DcDeviceBind, error) {
  44. if data := deviceBindCacheTable.GetCache(DEVICE_UF); data != nil {
  45. return data, nil
  46. }
  47. if devices, err := svcCtx.DeviceBind.FindByProjectIdDeviceType(context.Background(), projectId, DEVICE_UF); err != nil {
  48. logx.Infof("findDeviceUf not found devices")
  49. return nil, err
  50. } else {
  51. deviceBindCacheTable.SetCache(DEVICE_UF, devices, time.Now().Add(300*time.Second))
  52. return devices, nil
  53. }
  54. }
  55. func transDeviceUfData(svcCtx *svc.ServiceContext, datas []model.DcDeviceBind) ([]model.DcWorkingUf, error) {
  56. ts := make([]model.DcWorkingUf, len(datas))
  57. for i, data := range datas {
  58. var filterCycleAdjust int64 = 0
  59. key := fmt.Sprintf("events:adjust:%d:%s:%s:%s", data.ProjectId, data.DeviceCode, "filter_cycle", "")
  60. if adjustString, err := svcCtx.Cache.Get(key); err == nil && adjustString != "" {
  61. filterCycleAdjust, _ = strconv.ParseInt(adjustString, 10, 64)
  62. }
  63. ts[i] = model.DcWorkingUf{
  64. ProjectId: data.ProjectId,
  65. DeviceCode: data.DeviceCode,
  66. WaterTemperature: data.Items.GetItemFloat64Value("water_temperature"),
  67. FeedFlow: data.Items.GetItemFloat64Value("feed_flow"),
  68. ConFlow: data.Items.GetItemFloat64Value("con_flow"),
  69. ProductFlow: data.Items.GetItemFloat64Value("product_flow"),
  70. FeedPressure: data.Items.GetItemFloat64Value("feed_pressure"),
  71. ConPressure: data.Items.GetItemFloat64Value("con_pressure"),
  72. ProductPressure: data.Items.GetItemFloat64Value("product_pressure"),
  73. Tmp: data.Items.GetItemFloat64Value("tmp"),
  74. Flux: data.Items.GetItemFloat64Value("flux"),
  75. Permeability: data.Items.GetItemFloat64Value("permeability"),
  76. FeedWqTurbidity: data.Items.GetItemFloat64Value("feed_wq_turbidity"),
  77. FeedWqPh: data.Items.GetItemInt64Value("feed_wq_ph"),
  78. ProductWqPh: data.Items.GetItemInt64Value("product_wq_ph"),
  79. FeedWqAl: data.Items.GetItemFloat64Value("feed_wq_al"),
  80. ProductWqAl: data.Items.GetItemFloat64Value("product_wq_al"),
  81. FeedWqFe: data.Items.GetItemFloat64Value("feed_wq_fe"),
  82. ProductWqFe: data.Items.GetItemFloat64Value("product_wq_fe"),
  83. FeedWqMn: data.Items.GetItemFloat64Value("feed_wq_mn"),
  84. ProductWqMn: data.Items.GetItemFloat64Value("product_wq_mn"),
  85. FeedWqSio2: data.Items.GetItemFloat64Value("feed_wq_sio2"),
  86. ProductWqSio2: data.Items.GetItemFloat64Value("product_wq_sio2"),
  87. FeedWqCod: data.Items.GetItemFloat64Value("feed_wq_cod"),
  88. ProductWqCod: data.Items.GetItemFloat64Value("product_wq_cod"),
  89. FeedWqP: data.Items.GetItemFloat64Value("feed_wq_p"),
  90. ProductWqP: data.Items.GetItemFloat64Value("product_wq_p"),
  91. Step: data.Items.GetItemInt64Value("step"),
  92. FilterTime: data.Items.GetItemFloat64Value("filter_time"),
  93. FilterCycle: data.Items.GetItemInt64Value("filter_cycle") + filterCycleAdjust,
  94. CTime: time.Now(),
  95. }
  96. }
  97. return ts, nil
  98. }