handler.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package handler
  2. import (
  3. "GtDataStore/app/cmd/events/internal/svc"
  4. "GtDataStore/app/model"
  5. "context"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "metawant.greentech.com.cn/gaoyagang/gt-common/rabbitMQ"
  10. "strconv"
  11. "time"
  12. )
  13. const (
  14. UF_BACKWASH = "EVENT:NOFITY:92:UF_BACKWASH"
  15. RABBIT_EXCHANGE_NAME = "gt.dc.event"
  16. )
  17. type (
  18. EventHandler struct {
  19. funcs map[string]func(message []byte) error
  20. consumer *rabbitMQ.Consumer
  21. svcCtx *svc.ServiceContext
  22. }
  23. )
  24. func NewEventHandler(ctx *svc.ServiceContext) (*EventHandler, error) {
  25. consumer, err := rabbitMQ.NewConsumer(rabbitMQ.Exchange{
  26. Name: RABBIT_EXCHANGE_NAME,
  27. Type: "fanout",
  28. Dns: ctx.Config.RabbitMQ.Url,
  29. })
  30. if err != nil {
  31. return nil, err
  32. }
  33. eh := &EventHandler{
  34. funcs: make(map[string]func(message []byte) error),
  35. consumer: consumer,
  36. svcCtx: ctx,
  37. }
  38. return eh, nil
  39. }
  40. func (h *EventHandler) Start() {
  41. go h.consumer.Handler(UF_BACKWASH, h.adjustFilterCycle)
  42. fmt.Printf("listen event by: %s to uf.FilterEarly\n", UF_BACKWASH)
  43. select {}
  44. }
  45. func (h *EventHandler) adjustFilterCycle(message []byte) error {
  46. var entInfo multiEventInfo
  47. if err := json.Unmarshal(message, &entInfo); err != nil {
  48. return err
  49. }
  50. fmt.Printf("adjustFilterCycle Start")
  51. // 获得一个时间窗口中的许可, 如果获得失败, 则说明事件重复, 不处理 600的是因为 超滤膜反洗时间不会超过600秒
  52. if h.canExec(entInfo.ProjectId, entInfo.DeviceCode, "adjustFilterCycle", "", 600*time.Second) == false {
  53. fmt.Printf("adjustFilterCycle 不可重复执行\n")
  54. return nil
  55. }
  56. deviceInfo, err := h.getDeviceInfo(entInfo.ProjectId, entInfo.DeviceCode)
  57. if err != nil {
  58. return err
  59. }
  60. filterCycleItem, ok := deviceInfo.Items["filter_cycle"]
  61. if ok == false {
  62. return errors.New("filter_cycle not config")
  63. }
  64. stepItem, ok := deviceInfo.Items["step"]
  65. if ok == false {
  66. return errors.New("step not config")
  67. }
  68. filterCycleItemValue, ok := entInfo.NewValues[filterCycleItem.Item]
  69. if ok == false {
  70. return errors.New("filter_cycle value is empty")
  71. }
  72. stepItemValue, ok := entInfo.NewValues[stepItem.Item]
  73. filterCycle, err := strconv.ParseInt(filterCycleItemValue, 10, 64)
  74. step, err := strconv.ParseInt(stepItemValue, 10, 64)
  75. // 每进行一次反洗, 如果数据表中已经存在本周期的数据记录, 则adjust数值 +1
  76. // adjust意味着 自控系统与实际过滤周期数的差值
  77. if r, err := h.svcCtx.DcWorkingUf.FindLastByCycleAndStep(context.Background(), entInfo.ProjectId, entInfo.DeviceCode, filterCycle, step); err != nil || r == nil {
  78. // 没有查询到, 说明自控中该值已自增
  79. return nil
  80. } else {
  81. // 查询到了, 但可能是上一个CEB周期的数据, 也跳过
  82. // 判断是否为上一个周期的数据: 大于r.id的值, 并且存在着过滤数据
  83. if ls, _ := h.svcCtx.DcWorkingUf.FindForCycleAndStep(context.Background(), r.Id, entInfo.ProjectId, entInfo.DeviceCode, 0, step, 0, 1000, 1); len(ls) > 0 {
  84. return nil
  85. }
  86. }
  87. // 通过上面的判断, 得知自控系统的filterCycle值阻塞了, 需要调整
  88. key := fmt.Sprintf("events:adjust:%d:%s:%s:%s", entInfo.ProjectId, entInfo.DeviceCode, "filter_cycle", "")
  89. h.svcCtx.Cache.Incr(context.Background(), key)
  90. h.svcCtx.Cache.Expire(context.Background(), key, 86400*time.Second)
  91. return nil
  92. }
  93. func (h *EventHandler) canExec(projectId int64, deviceCode, opName, salt string, expire time.Duration) bool {
  94. key := fmt.Sprintf("events:handler:%d:%s:%s:%s", projectId, deviceCode, opName, salt)
  95. if intCmd := h.svcCtx.Cache.Incr(context.Background(), key); intCmd.Val() == 1 {
  96. _ = h.svcCtx.Cache.Expire(context.Background(), key, expire)
  97. return true
  98. } else {
  99. // 缓存续期
  100. _ = h.svcCtx.Cache.Expire(context.Background(), key, expire)
  101. }
  102. return false
  103. }
  104. func (h *EventHandler) getDeviceInfo(projectId int64, deviceCode string) (*model.DcDeviceBind, error) {
  105. return h.svcCtx.DcDeviceBind.FindOneByProjectIdDeviceCode(context.Background(), projectId, deviceCode)
  106. }