event.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package handler
  2. import (
  3. "GtDataStore/app/cmd/events/internal/svc"
  4. "GtDataStore/app/model"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "github.com/go-redis/redis/v8"
  9. "github.com/zeromicro/go-zero/core/logx"
  10. "metawant.greentech.com.cn/gaoyagang/gt-common/rabbitMQ"
  11. "time"
  12. )
  13. func HandlerEventNotify(svcCtx *svc.ServiceContext) error {
  14. var err error
  15. producter, err = rabbitMQ.NewProducter(rabbitMQ.Exchange{
  16. Name: "gt.dc.event",
  17. Type: "fanout",
  18. QueueName: "",
  19. Key: "",
  20. Dns: svcCtx.Config.RabbitMQ.Url,
  21. })
  22. if err != nil {
  23. return err
  24. }
  25. defer producter.Close()
  26. defer close(eventChan)
  27. fmt.Print("start HandlerEventNotify.....\n")
  28. queue := svcCtx.Cache
  29. go func() {
  30. for info := range eventChan {
  31. info.MsgTime = time.Now()
  32. // 数据通知到redis
  33. toMQ(queue, info)
  34. // 事件记录到mysql
  35. toDB(svcCtx, info)
  36. }
  37. }()
  38. for info := range multiEventChan {
  39. info.MsgTime = time.Now()
  40. // 数据通知到redis
  41. toMQForMulti(queue, info)
  42. }
  43. return nil
  44. }
  45. func toMQ(queue *redis.Client, info eventInfo) {
  46. channel := fmt.Sprintf(EVTNE_NOTIFY_KEY_FORMAT, info.ProjectId, info.EventName)
  47. if bs, err := json.Marshal(info); err == nil {
  48. if err := producter.Publisher(channel, bs); err == nil {
  49. logx.Errorf("handlerEvent queue.Publish toMQ ok: %s %+v", channel, info)
  50. }
  51. } else {
  52. logx.Errorf("handlerEvent queue.Publish toMQ error: %s %+v", channel, info)
  53. }
  54. }
  55. func toMQForMulti(queue *redis.Client, info multiEventInfo) {
  56. channel := fmt.Sprintf(EVTNE_NOTIFY_KEY_FORMAT, info.ProjectId, info.EventName)
  57. if bs, err := json.Marshal(info); err == nil {
  58. if err := producter.Publisher(channel, bs); err == nil {
  59. logx.Errorf("handlerEvent queue.Publish toMQForMulti ok: %s %+v", channel, info)
  60. }
  61. } else {
  62. logx.Errorf("handlerEvent queue.Publish toMQForMulti error: %s %+v", channel, info)
  63. }
  64. }
  65. func toDB(svcCtx *svc.ServiceContext, info eventInfo) {
  66. if _, err := svcCtx.DcEventList.Insert(context.Background(), &model.DcEventList{
  67. ProjectId: info.ProjectId,
  68. DeviceCode: info.DeviceCode,
  69. Name: info.EventName,
  70. Item: info.Item,
  71. Val: info.NewValue,
  72. OldVal: info.OldValue,
  73. Time: info.Time,
  74. EventId: info.EventId,
  75. CTime: info.MsgTime,
  76. }); err != nil {
  77. logx.Errorf("handlerEvent queue.Publish toDB error:%+v", info)
  78. }
  79. }