single.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package handler
  2. import (
  3. "GtDataStore/app/cmd/events/internal/logic/job"
  4. "GtDataStore/app/cmd/events/internal/svc"
  5. "GtDataStore/app/model"
  6. "context"
  7. "errors"
  8. "fmt"
  9. "github.com/zeromicro/go-zero/core/logx"
  10. "metawant.greentech.com.cn/gaoyagang/gt-common/envitem"
  11. "metawant.greentech.com.cn/gaoyagang/gt-common/identify"
  12. "sync"
  13. "time"
  14. )
  15. func EventSingle(ctx context.Context, task *job.Task, technologyName string) error {
  16. // 1. 查询所有的设备
  17. binds, err := findSingleRecord(task.Job.SvcCtx, int64(task.Id))
  18. if err != nil {
  19. return err
  20. }
  21. // 2. 从记录中提取点位,并合并成MultiEventItems
  22. MultiEvents := mergeToMultiEventItems(binds)
  23. if len(MultiEvents) == 0 {
  24. return errors.New("mergeToMultiEventItems not found envitems")
  25. }
  26. // 2. 获得点位信息
  27. ovMap := MultiEvents.FindPrevString() // 旧点位值map
  28. err = MultiEvents.FillCurrentValue() // 该方法执行行, 会将当前的值更新到旧点位值
  29. if err != nil {
  30. return err
  31. }
  32. // 3. 识别事件
  33. var wg sync.WaitGroup
  34. wg.Add(len(binds))
  35. for _, bind := range binds {
  36. bind := bind
  37. go func() {
  38. defer wg.Done()
  39. item := MultiEvents[bind.Item]
  40. ov, _ := ovMap[bind.Item]
  41. nv := item.GetItemStringVal()
  42. checker := identify.NewSingleCheck(map[string]identify.Rule{bind.Name: bind.Config.Single})
  43. name := checker.Check(ov, nv)
  44. //println(fmt.Sprintf("ov: %s nv: %s event: %s name", ov, nv, name))
  45. if name != "" {
  46. ht := item.GetItemHtime()
  47. if ht == nil {
  48. println(fmt.Sprintf("htime is nil ov: %s nv: %s event: %s name", ov, nv, name))
  49. return
  50. }
  51. eventChan <- eventInfo{
  52. ProjectId: bind.ProjectId,
  53. DeviceCode: bind.DeviceCode,
  54. Item: bind.Item,
  55. EventId: bind.Id,
  56. EventName: name,
  57. OldValue: ov,
  58. NewValue: nv,
  59. Time: *item.GetItemHtime(),
  60. }
  61. }
  62. }()
  63. }
  64. wg.Wait()
  65. return nil
  66. }
  67. func findSingleRecord(svcCtx *svc.ServiceContext, projectId int64) ([]model.DcEventBind, error) {
  68. if data := eventBindCacheTable.GetCache(EVENT_SINGLE_ITEM); data != nil {
  69. return data, nil
  70. }
  71. if binds, err := svcCtx.DcEventBind.FindByProjectIdRuleFlag(context.Background(), projectId, EVENT_SINGLE_ITEM); err != nil {
  72. logx.Error("findSingleRecord not found record")
  73. return nil, err
  74. } else {
  75. eventBindCacheTable.SetCache(EVENT_SINGLE_ITEM, binds, time.Now().Add(300*time.Second))
  76. return binds, nil
  77. }
  78. }
  79. func mergeToMultiEventItems(binds []model.DcEventBind) envitem.MultiEnvItem {
  80. me := make(envitem.MultiEnvItem)
  81. for _, bind := range binds {
  82. me[bind.Item] = &envitem.EnvItem{
  83. ProjectId: bind.ProjectId,
  84. Item: bind.Item,
  85. }
  86. }
  87. return me
  88. }