package handler import ( "GtDataStore/app/cmd/events/internal/svc" "GtDataStore/app/model" "context" "encoding/json" "errors" "fmt" "metawant.greentech.com.cn/gaoyagang/gt-common/rabbitMQ" "strconv" "time" ) const ( UF_BACKWASH = "EVENT:NOFITY:92:UF_BACKWASH" RABBIT_EXCHANGE_NAME = "gt.dc.event" ) type ( EventHandler struct { funcs map[string]func(message []byte) error consumer *rabbitMQ.Consumer svcCtx *svc.ServiceContext } ) func NewEventHandler(ctx *svc.ServiceContext) (*EventHandler, error) { consumer, err := rabbitMQ.NewConsumer(rabbitMQ.Exchange{ Name: RABBIT_EXCHANGE_NAME, Type: "fanout", Dns: ctx.Config.RabbitMQ.Url, }) if err != nil { return nil, err } eh := &EventHandler{ funcs: make(map[string]func(message []byte) error), consumer: consumer, svcCtx: ctx, } return eh, nil } func (h *EventHandler) Start() { go h.consumer.Handler(UF_BACKWASH, h.adjustFilterCycle) fmt.Printf("listen event by: %s to uf.FilterEarly\n", UF_BACKWASH) select {} } func (h *EventHandler) adjustFilterCycle(message []byte) error { var entInfo multiEventInfo if err := json.Unmarshal(message, &entInfo); err != nil { return err } // 获得一个时间窗口中的许可, 如果获得失败, 则说明事件重复, 不处理 600的是因为 超滤膜反洗时间不会超过600秒 if h.canExec(entInfo.ProjectId, entInfo.DeviceCode, "adjustFilterCycle", "", 600*time.Second) == false { //fmt.Printf("adjustFilterCycle 不可重复执行\n") return nil } deviceInfo, err := h.getDeviceInfo(entInfo.ProjectId, entInfo.DeviceCode) if err != nil { return err } filterCycleItem, ok := deviceInfo.Items["filter_cycle"] if ok == false { return errors.New("filter_cycle not config") } stepItem, ok := deviceInfo.Items["step"] if ok == false { return errors.New("step not config") } filterCycleItemValue, ok := entInfo.NewValues[filterCycleItem.Item] if ok == false { return errors.New("filter_cycle value is empty") } stepItemValue, ok := entInfo.NewValues[stepItem.Item] filterCycle, err := strconv.ParseInt(filterCycleItemValue, 10, 64) step, err := strconv.ParseInt(stepItemValue, 10, 64) // 每进行一次反洗, 如果数据表中已经存在本周期的数据记录, 则adjust数值 +1 // adjust意味着 自控系统与实际过滤周期数的差值 if r, err := h.svcCtx.DcWorkingUf.FindLastByCycleAndStep(context.Background(), entInfo.ProjectId, entInfo.DeviceCode, filterCycle, step); err != nil || r == nil { // 没有查询到, 说明自控中该值已自增 return nil } else { // 查询到了, 但可能是上一个CEB周期的数据, 也跳过 // 判断是否为上一个周期的数据: 大于r.id的值, 并且存在着过滤数据 if ls, _ := h.svcCtx.DcWorkingUf.FindForCycleAndStep(context.Background(), r.Id, entInfo.ProjectId, entInfo.DeviceCode, 0, step, 0, 1000, 1); len(ls) > 0 { return nil } } // 通过上面的判断, 得知自控系统的filterCycle值阻塞了, 需要调整 adjustValue, err := filterCycleItem.IncreAdjust(86400 * time.Second) fmt.Printf("projectId: %d, item: %s, adjustValue: %d\n", entInfo.ProjectId, filterCycleItem.Item, adjustValue) // 判断是否清除adjust的值 // 如果当前值(不包含adjust) == 0时, 清除 if filterCycle-adjustValue <= 0 { filterCycleItem.ClearAdjust() fmt.Printf("projectId: %d, item: %s, adjustValue clear\n", entInfo.ProjectId, filterCycleItem.Item) } //key := fmt.Sprintf("events:adjust:%d:%s:%s:%s", entInfo.ProjectId, entInfo.DeviceCode, "filter_cycle", "") //h.svcCtx.Cache.Incr(context.Background(), key) //h.svcCtx.Cache.Expire(context.Background(), key, 86400*time.Second) return nil } func (h *EventHandler) canExec(projectId int64, deviceCode, opName, salt string, expire time.Duration) bool { key := fmt.Sprintf("events:handler:%d:%s:%s:%s", projectId, deviceCode, opName, salt) if intCmd := h.svcCtx.Cache.Incr(context.Background(), key); intCmd.Val() == 1 { _ = h.svcCtx.Cache.Expire(context.Background(), key, expire) return true } else { // 缓存续期 _ = h.svcCtx.Cache.Expire(context.Background(), key, expire) } return false } func (h *EventHandler) getDeviceInfo(projectId int64, deviceCode string) (*model.DcDeviceBind, error) { return h.svcCtx.DcDeviceBind.FindOneByProjectIdDeviceCode(context.Background(), projectId, deviceCode) }