123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- package handler
- import (
- "GtDataStore/app/cmd/events/internal/svc"
- "GtDataStore/app/model"
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "metawant.greentech.com.cn/gaoyagang/gt-common/rabbitMQ"
- "strconv"
- "time"
- )
- const (
- UF_BACKWASH = "EVENT:NOFITY:92:UF_BACKWASH"
- RABBIT_EXCHANGE_NAME = "gt.dc.event"
- )
- type (
- EventHandler struct {
- funcs map[string]func(message []byte) error
- consumer *rabbitMQ.Consumer
- svcCtx *svc.ServiceContext
- }
- )
- func NewEventHandler(ctx *svc.ServiceContext) (*EventHandler, error) {
- consumer, err := rabbitMQ.NewConsumer(rabbitMQ.Exchange{
- Name: RABBIT_EXCHANGE_NAME,
- Type: "fanout",
- Dns: ctx.Config.RabbitMQ.Url,
- })
- if err != nil {
- return nil, err
- }
- eh := &EventHandler{
- funcs: make(map[string]func(message []byte) error),
- consumer: consumer,
- svcCtx: ctx,
- }
- return eh, nil
- }
- func (h *EventHandler) Start() {
- go h.consumer.Handler(UF_BACKWASH, h.adjustFilterCycle)
- fmt.Printf("listen event by: %s to uf.FilterEarly\n", UF_BACKWASH)
- select {}
- }
- func (h *EventHandler) adjustFilterCycle(message []byte) error {
- var entInfo multiEventInfo
- if err := json.Unmarshal(message, &entInfo); err != nil {
- return err
- }
- // 获得一个时间窗口中的许可, 如果获得失败, 则说明事件重复, 不处理 600的是因为 超滤膜反洗时间不会超过600秒
- if h.canExec(entInfo.ProjectId, entInfo.DeviceCode, "adjustFilterCycle", "", 600*time.Second) == false {
- //fmt.Printf("adjustFilterCycle 不可重复执行\n")
- return nil
- }
- deviceInfo, err := h.getDeviceInfo(entInfo.ProjectId, entInfo.DeviceCode)
- if err != nil {
- return err
- }
- filterCycleItem, ok := deviceInfo.Items["filter_cycle"]
- if ok == false {
- return errors.New("filter_cycle not config")
- }
- stepItem, ok := deviceInfo.Items["step"]
- if ok == false {
- return errors.New("step not config")
- }
- filterCycleItemValue, ok := entInfo.NewValues[filterCycleItem.Item]
- if ok == false {
- return errors.New("filter_cycle value is empty")
- }
- stepItemValue, ok := entInfo.NewValues[stepItem.Item]
- filterCycle, err := strconv.ParseInt(filterCycleItemValue, 10, 64)
- step, err := strconv.ParseInt(stepItemValue, 10, 64)
- // 每进行一次反洗, 如果数据表中已经存在本周期的数据记录, 则adjust数值 +1
- // adjust意味着 自控系统与实际过滤周期数的差值
- if r, err := h.svcCtx.DcWorkingUf.FindLastByCycleAndStep(context.Background(), entInfo.ProjectId, entInfo.DeviceCode, filterCycle, step); err != nil || r == nil {
- // 没有查询到, 说明自控中该值已自增
- return nil
- } else {
- // 查询到了, 但可能是上一个CEB周期的数据, 也跳过
- // 判断是否为上一个周期的数据: 大于r.id的值, 并且存在着过滤数据
- if ls, _ := h.svcCtx.DcWorkingUf.FindForCycleAndStep(context.Background(), r.Id, entInfo.ProjectId, entInfo.DeviceCode, 0, step, 0, 1000, 1); len(ls) > 0 {
- return nil
- }
- }
- // 通过上面的判断, 得知自控系统的filterCycle值阻塞了, 需要调整
- adjustValue, err := filterCycleItem.IncreAdjust(86400 * time.Second)
- fmt.Printf("projectId: %d, item: %s, adjustValue: %d\n", entInfo.ProjectId, filterCycleItem.Item, adjustValue)
- // 判断是否清除adjust的值
- // 如果当前值(不包含adjust) == 0时, 清除
- if filterCycle-adjustValue <= 0 {
- filterCycleItem.ClearAdjust()
- fmt.Printf("projectId: %d, item: %s, adjustValue clear\n", entInfo.ProjectId, filterCycleItem.Item)
- }
- //key := fmt.Sprintf("events:adjust:%d:%s:%s:%s", entInfo.ProjectId, entInfo.DeviceCode, "filter_cycle", "")
- //h.svcCtx.Cache.Incr(context.Background(), key)
- //h.svcCtx.Cache.Expire(context.Background(), key, 86400*time.Second)
- return nil
- }
- func (h *EventHandler) canExec(projectId int64, deviceCode, opName, salt string, expire time.Duration) bool {
- key := fmt.Sprintf("events:handler:%d:%s:%s:%s", projectId, deviceCode, opName, salt)
- if intCmd := h.svcCtx.Cache.Incr(context.Background(), key); intCmd.Val() == 1 {
- _ = h.svcCtx.Cache.Expire(context.Background(), key, expire)
- return true
- } else {
- // 缓存续期
- _ = h.svcCtx.Cache.Expire(context.Background(), key, expire)
- }
- return false
- }
- func (h *EventHandler) getDeviceInfo(projectId int64, deviceCode string) (*model.DcDeviceBind, error) {
- return h.svcCtx.DcDeviceBind.FindOneByProjectIdDeviceCode(context.Background(), projectId, deviceCode)
- }
|