event.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package handler
  2. import (
  3. "GtDataStore/app/cmd/events/internal/svc"
  4. "GtDataStore/app/model"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "github.com/go-redis/redis/v8"
  9. "github.com/zeromicro/go-zero/core/logx"
  10. "time"
  11. )
  12. func HandlerEventNotify(svcCtx *svc.ServiceContext) error {
  13. defer close(eventChan)
  14. fmt.Print("start HandlerEventNotify.....\n")
  15. queue := svcCtx.Cache
  16. for info := range eventChan {
  17. info.MsgTime = time.Now()
  18. // 数据通知到redis
  19. toMQ(queue, info)
  20. // 事件记录到mysql
  21. toDB(svcCtx, info)
  22. }
  23. return nil
  24. }
  25. func toMQ(queue *redis.Client, info eventInfo) {
  26. channel := fmt.Sprintf(EVTNE_NOTIFY_KEY_FORMAT, info.ProjectId, info.EventName)
  27. if bs, err := json.Marshal(info); err == nil {
  28. //queue.XAdd(context.Background(), &redis.XAddArgs{
  29. // Stream: channel,
  30. // MaxLen: 5000,
  31. // Values: ``,
  32. //})
  33. if intCmd := queue.Publish(context.Background(), channel, string(bs)); intCmd == nil {
  34. logx.Errorf("handlerEvent queue.Publish toMQ error: %s %+v", channel, info)
  35. }
  36. } else {
  37. logx.Errorf("handlerEvent queue.Publish toMQ error: %s %+v", channel, info)
  38. }
  39. }
  40. func toDB(svcCtx *svc.ServiceContext, info eventInfo) {
  41. if _, err := svcCtx.DcEventList.Insert(context.Background(), &model.DcEventList{
  42. ProjectId: info.ProjectId,
  43. DeviceCode: info.DeviceCode,
  44. Name: info.EventName,
  45. Item: info.Item,
  46. Val: info.NewValue,
  47. OldVal: info.OldValue,
  48. Time: info.Time,
  49. EventId: info.EventId,
  50. CTime: info.MsgTime,
  51. }); err != nil {
  52. logx.Errorf("handlerEvent queue.Publish toDB error:%+v", info)
  53. }
  54. }