123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- package handler
- import (
- "GtDataStore/app/cmd/events/internal/svc"
- "GtDataStore/app/model"
- "context"
- "encoding/json"
- "fmt"
- "github.com/go-redis/redis/v8"
- "github.com/zeromicro/go-zero/core/logx"
- "metawant.greentech.com.cn/gaoyagang/gt-common/rabbitMQ"
- "time"
- )
- func HandlerEventNotify(svcCtx *svc.ServiceContext) error {
- var err error
- producter, err = rabbitMQ.NewProducter(rabbitMQ.Exchange{
- Name: "gt.dc.event",
- Type: "fanout",
- QueueName: "",
- Key: "",
- Dns: svcCtx.Config.RabbitMQ.Url,
- })
- if err != nil {
- return err
- }
- defer producter.Close()
- defer close(eventChan)
- fmt.Print("start HandlerEventNotify.....\n")
- queue := svcCtx.Cache
- go func() {
- for info := range eventChan {
- info.MsgTime = time.Now()
- // 数据通知到redis
- toMQ(queue, info)
- // 事件记录到mysql
- toDB(svcCtx, info)
- }
- }()
- for info := range multiEventChan {
- info.MsgTime = time.Now()
- // 数据通知到redis
- toMQForMulti(queue, info)
- }
- return nil
- }
- func toMQ(queue *redis.Client, info eventInfo) {
- channel := fmt.Sprintf(EVTNE_NOTIFY_KEY_FORMAT, info.ProjectId, info.EventName)
- if bs, err := json.Marshal(info); err == nil {
- if err := producter.Publisher(channel, bs); err == nil {
- logx.Errorf("handlerEvent queue.Publish toMQ ok: %s %+v", channel, info)
- }
- } else {
- logx.Errorf("handlerEvent queue.Publish toMQ error: %s %+v", channel, info)
- }
- }
- func toMQForMulti(queue *redis.Client, info multiEventInfo) {
- channel := fmt.Sprintf(EVTNE_NOTIFY_KEY_FORMAT, info.ProjectId, info.EventName)
- if bs, err := json.Marshal(info); err == nil {
- if err := producter.Publisher(channel, bs); err == nil {
- logx.Errorf("handlerEvent queue.Publish toMQForMulti ok: %s %+v", channel, info)
- }
- } else {
- logx.Errorf("handlerEvent queue.Publish toMQForMulti error: %s %+v", channel, info)
- }
- }
- func toDB(svcCtx *svc.ServiceContext, info eventInfo) {
- if _, err := svcCtx.DcEventList.Insert(context.Background(), &model.DcEventList{
- ProjectId: info.ProjectId,
- DeviceCode: info.DeviceCode,
- Name: info.EventName,
- Item: info.Item,
- Val: info.NewValue,
- OldVal: info.OldValue,
- Time: info.Time,
- EventId: info.EventId,
- CTime: info.MsgTime,
- }); err != nil {
- logx.Errorf("handlerEvent queue.Publish toDB error:%+v", info)
- }
- }
|