handler.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package handler
  2. import (
  3. "GtDataStore/app/cmd/events/internal/svc"
  4. "GtDataStore/app/model"
  5. "context"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "metawant.greentech.com.cn/gaoyagang/gt-common/rabbitMQ"
  10. "strconv"
  11. "time"
  12. )
  13. const (
  14. UF_BACKWASH = "EVENT:NOFITY:92:UF_BACKWASH"
  15. RABBIT_EXCHANGE_NAME = "gt.dc.event"
  16. )
  17. type (
  18. EventHandler struct {
  19. funcs map[string]func(message []byte) error
  20. consumer *rabbitMQ.Consumer
  21. svcCtx *svc.ServiceContext
  22. }
  23. )
  24. func NewEventHandler(ctx *svc.ServiceContext) (*EventHandler, error) {
  25. consumer, err := rabbitMQ.NewConsumer(rabbitMQ.Exchange{
  26. Name: RABBIT_EXCHANGE_NAME,
  27. Type: "fanout",
  28. Dns: ctx.Config.RabbitMQ.Url,
  29. })
  30. if err != nil {
  31. return nil, err
  32. }
  33. eh := &EventHandler{
  34. funcs: make(map[string]func(message []byte) error),
  35. consumer: consumer,
  36. svcCtx: ctx,
  37. }
  38. return eh, nil
  39. }
  40. func (h *EventHandler) Start() {
  41. go h.consumer.Handler(UF_BACKWASH, h.adjustFilterCycle)
  42. fmt.Printf("listen event by: %s to uf.FilterEarly\n", UF_BACKWASH)
  43. select {}
  44. }
  45. func (h *EventHandler) adjustFilterCycle(message []byte) error {
  46. var entInfo multiEventInfo
  47. if err := json.Unmarshal(message, &entInfo); err != nil {
  48. return err
  49. }
  50. // 获得一个时间窗口中的许可, 如果获得失败, 则说明事件重复, 不处理 600的是因为 超滤膜反洗时间不会超过600秒
  51. if h.canExec(entInfo.ProjectId, entInfo.DeviceCode, "adjustFilterCycle", "", 600*time.Second) == false {
  52. //fmt.Printf("adjustFilterCycle 不可重复执行\n")
  53. return nil
  54. }
  55. deviceInfo, err := h.getDeviceInfo(entInfo.ProjectId, entInfo.DeviceCode)
  56. if err != nil {
  57. return err
  58. }
  59. filterCycleItem, ok := deviceInfo.Items["filter_cycle"]
  60. if ok == false {
  61. return errors.New("filter_cycle not config")
  62. }
  63. stepItem, ok := deviceInfo.Items["step"]
  64. if ok == false {
  65. return errors.New("step not config")
  66. }
  67. filterCycleItemValue, ok := entInfo.NewValues[filterCycleItem.Item]
  68. if ok == false {
  69. return errors.New("filter_cycle value is empty")
  70. }
  71. stepItemValue, ok := entInfo.NewValues[stepItem.Item]
  72. filterCycle, err := strconv.ParseInt(filterCycleItemValue, 10, 64)
  73. step, err := strconv.ParseInt(stepItemValue, 10, 64)
  74. // 每进行一次反洗, 如果数据表中已经存在本周期的数据记录, 则adjust数值 +1
  75. // adjust意味着 自控系统与实际过滤周期数的差值
  76. if r, err := h.svcCtx.DcWorkingUf.FindLastByCycleAndStep(context.Background(), entInfo.ProjectId, entInfo.DeviceCode, filterCycle, step); err != nil || r == nil {
  77. // 没有查询到, 说明自控中该值已自增
  78. return nil
  79. } else {
  80. // 查询到了, 但可能是上一个CEB周期的数据, 也跳过
  81. // 判断是否为上一个周期的数据: 大于r.id的值, 并且存在着过滤数据
  82. if ls, _ := h.svcCtx.DcWorkingUf.FindForCycleAndStep(context.Background(), r.Id, entInfo.ProjectId, entInfo.DeviceCode, 0, step, 0, 1000, 1); len(ls) > 0 {
  83. return nil
  84. }
  85. }
  86. // 通过上面的判断, 得知自控系统的filterCycle值阻塞了, 需要调整
  87. adjustValue, err := filterCycleItem.IncreAdjust(86400 * time.Second)
  88. fmt.Printf("projectId: %d, item: %s, adjustValue: %d\n", entInfo.ProjectId, filterCycleItem.Item, adjustValue)
  89. // 判断是否清除adjust的值
  90. // 如果当前值(不包含adjust) == 0时, 清除
  91. if filterCycle-adjustValue <= 0 {
  92. filterCycleItem.ClearAdjust()
  93. fmt.Printf("projectId: %d, item: %s, adjustValue clear\n", entInfo.ProjectId, filterCycleItem.Item)
  94. }
  95. //key := fmt.Sprintf("events:adjust:%d:%s:%s:%s", entInfo.ProjectId, entInfo.DeviceCode, "filter_cycle", "")
  96. //h.svcCtx.Cache.Incr(context.Background(), key)
  97. //h.svcCtx.Cache.Expire(context.Background(), key, 86400*time.Second)
  98. return nil
  99. }
  100. func (h *EventHandler) canExec(projectId int64, deviceCode, opName, salt string, expire time.Duration) bool {
  101. key := fmt.Sprintf("events:handler:%d:%s:%s:%s", projectId, deviceCode, opName, salt)
  102. if intCmd := h.svcCtx.Cache.Incr(context.Background(), key); intCmd.Val() == 1 {
  103. _ = h.svcCtx.Cache.Expire(context.Background(), key, expire)
  104. return true
  105. } else {
  106. // 缓存续期
  107. _ = h.svcCtx.Cache.Expire(context.Background(), key, expire)
  108. }
  109. return false
  110. }
  111. func (h *EventHandler) getDeviceInfo(projectId int64, deviceCode string) (*model.DcDeviceBind, error) {
  112. return h.svcCtx.DcDeviceBind.FindOneByProjectIdDeviceCode(context.Background(), projectId, deviceCode)
  113. }