Sfoglia il codice sorgente

fit:
1. 无锡自控系统存在filter_cycle阻塞的情况
2. 在事件服务中, 定义反洗事件, 并监听该事件
3. 处理filter_cycle阻塞的问题, 在工况记录中额外记录filter_cycle

gaoyagang 1 anno fa
parent
commit
6a87e5a9a7

+ 12 - 6
app/cmd/events/events.go

@@ -1,21 +1,20 @@
 package main
 
 import (
+	"GtDataStore/app/cmd/events/internal/config"
 	"GtDataStore/app/cmd/events/internal/logic/handler"
 	"GtDataStore/app/cmd/events/internal/logic/job"
 	"GtDataStore/app/cmd/events/internal/server"
+	"GtDataStore/app/cmd/events/internal/svc"
 	"GtDataStore/app/cmd/events/pb"
 	"flag"
 	"fmt"
+	"github.com/zeromicro/go-zero/core/conf"
 	"github.com/zeromicro/go-zero/core/service"
 	"github.com/zeromicro/go-zero/zrpc"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/reflection"
 	"metawant.greentech.com.cn/gaoyagang/gt-common/envitem"
-
-	"GtDataStore/app/cmd/events/internal/config"
-	"GtDataStore/app/cmd/events/internal/svc"
-	"github.com/zeromicro/go-zero/core/conf"
 )
 
 var configFile = flag.String("f", "etc/events.yaml", "the config file")
@@ -37,9 +36,9 @@ func main() {
 	defer s.Stop()
 	fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
 
-	if c.Mode != "dev" {
-		envitem.SetOptions(envitem.Options{GtServerIp: c.GtServerIp, Cache: ctx.Cache})
+	envitem.SetOptions(envitem.Options{GtServerIp: c.GtServerIp, Cache: ctx.Cache})
 
+	if c.Mode != "dev" {
 		go func() {
 			defer func() {
 				fmt.Print("async events job stop.....\n")
@@ -54,5 +53,12 @@ func main() {
 	// 接收识别到的事件, 处理, 目前只有写入队列(暂时只有redis), 写入数据库
 	go handler.HandlerEventNotify(ctx)
 
+	// 处理事件
+	go func() {
+		// 启动事件监听
+		eh, _ := handler.NewEventHandler(ctx)
+		eh.Start()
+	}()
+
 	s.Start()
 }

+ 128 - 0
app/cmd/events/internal/logic/handler/handler.go

@@ -0,0 +1,128 @@
+package handler
+
+import (
+	"GtDataStore/app/cmd/events/internal/svc"
+	"GtDataStore/app/model"
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"metawant.greentech.com.cn/gaoyagang/gt-common/rabbitMQ"
+	"strconv"
+	"time"
+)
+
+const (
+	UF_BACKWASH = "EVENT:NOFITY:92:UF_BACKWASH"
+
+	RABBIT_EXCHANGE_NAME = "gt.dc.event"
+)
+
+type (
+	EventHandler struct {
+		funcs    map[string]func(message []byte) error
+		consumer *rabbitMQ.Consumer
+		svcCtx   *svc.ServiceContext
+	}
+)
+
+func NewEventHandler(ctx *svc.ServiceContext) (*EventHandler, error) {
+	consumer, err := rabbitMQ.NewConsumer(rabbitMQ.Exchange{
+		Name: RABBIT_EXCHANGE_NAME,
+		Type: "fanout",
+		Dns:  ctx.Config.RabbitMQ.Url,
+	})
+
+	if err != nil {
+		return nil, err
+	}
+
+	eh := &EventHandler{
+		funcs:    make(map[string]func(message []byte) error),
+		consumer: consumer,
+		svcCtx:   ctx,
+	}
+
+	return eh, nil
+}
+
+func (h *EventHandler) Start() {
+	go h.consumer.Handler(UF_BACKWASH, h.adjustFilterCycle)
+	fmt.Printf("listen event by: %s to uf.FilterEarly\n", UF_BACKWASH)
+
+	select {}
+}
+
+func (h *EventHandler) adjustFilterCycle(message []byte) error {
+	var entInfo multiEventInfo
+	if err := json.Unmarshal(message, &entInfo); err != nil {
+		return err
+	}
+	fmt.Printf("adjustFilterCycle Start")
+
+	// 获得一个时间窗口中的许可, 如果获得失败, 则说明事件重复, 不处理  600的是因为 超滤膜反洗时间不会超过600秒
+	if h.canExec(entInfo.ProjectId, entInfo.DeviceCode, "adjustFilterCycle", "", 600*time.Second) == false {
+		fmt.Printf("adjustFilterCycle 不可重复执行\n")
+		return nil
+	}
+
+	deviceInfo, err := h.getDeviceInfo(entInfo.ProjectId, entInfo.DeviceCode)
+	if err != nil {
+		return err
+	}
+
+	filterCycleItem, ok := deviceInfo.Items["filter_cycle"]
+	if ok == false {
+		return errors.New("filter_cycle not config")
+	}
+	stepItem, ok := deviceInfo.Items["step"]
+	if ok == false {
+		return errors.New("step not config")
+	}
+
+	filterCycleItemValue, ok := entInfo.NewValues[filterCycleItem.Item]
+	if ok == false {
+		return errors.New("filter_cycle value is empty")
+	}
+
+	stepItemValue, ok := entInfo.NewValues[stepItem.Item]
+
+	filterCycle, err := strconv.ParseInt(filterCycleItemValue, 10, 64)
+	step, err := strconv.ParseInt(stepItemValue, 10, 64)
+	// 每进行一次反洗, 如果数据表中已经存在本周期的数据记录, 则adjust数值 +1
+	// adjust意味着 自控系统与实际过滤周期数的差值
+	if r, err := h.svcCtx.DcWorkingUf.FindLastByCycleAndStep(context.Background(), entInfo.ProjectId, entInfo.DeviceCode, filterCycle, step); err != nil || r == nil {
+		// 没有查询到, 说明自控中该值已自增
+		return nil
+	} else {
+		// 查询到了, 但可能是上一个CEB周期的数据, 也跳过
+		// 判断是否为上一个周期的数据: 大于r.id的值, 并且存在着过滤数据
+		if ls, _ := h.svcCtx.DcWorkingUf.FindForCycleAndStep(context.Background(), r.Id, entInfo.ProjectId, entInfo.DeviceCode, 0, step, 0, 1000, 1); len(ls) > 0 {
+			return nil
+		}
+	}
+
+	// 通过上面的判断, 得知自控系统的filterCycle值阻塞了, 需要调整
+	key := fmt.Sprintf("events:adjust:%d:%s:%s:%s", entInfo.ProjectId, entInfo.DeviceCode, "filter_cycle", "")
+	h.svcCtx.Cache.Incr(context.Background(), key)
+	h.svcCtx.Cache.Expire(context.Background(), key, 86400*time.Second)
+
+	return nil
+}
+
+func (h *EventHandler) canExec(projectId int64, deviceCode, opName, salt string, expire time.Duration) bool {
+	key := fmt.Sprintf("events:handler:%d:%s:%s:%s", projectId, deviceCode, opName, salt)
+	if intCmd := h.svcCtx.Cache.Incr(context.Background(), key); intCmd.Val() == 1 {
+		_ = h.svcCtx.Cache.Expire(context.Background(), key, expire)
+		return true
+	} else {
+		// 缓存续期
+		_ = h.svcCtx.Cache.Expire(context.Background(), key, expire)
+	}
+
+	return false
+}
+
+func (h *EventHandler) getDeviceInfo(projectId int64, deviceCode string) (*model.DcDeviceBind, error) {
+	return h.svcCtx.DcDeviceBind.FindOneByProjectIdDeviceCode(context.Background(), projectId, deviceCode)
+}

+ 22 - 0
app/cmd/events/internal/logic/handler/handler_test.go

@@ -0,0 +1,22 @@
+package handler
+
+import (
+	"GtDataStore/app/cmd/events/internal/config"
+	"GtDataStore/app/cmd/events/internal/svc"
+	"flag"
+	"github.com/zeromicro/go-zero/core/conf"
+	"metawant.greentech.com.cn/gaoyagang/gt-common/envitem"
+	"testing"
+)
+
+func TestEventHandler_Start(t *testing.T) {
+	var c config.Config
+	var configFile = flag.String("f", "../../../etc/events.test.yaml", "the config file")
+	conf.MustLoad(*configFile, &c)
+	svcCtx := svc.NewServiceContext(c)
+
+	envitem.SetOptions(envitem.Options{GtServerIp: c.GtServerIp})
+
+	eh, _ := NewEventHandler(svcCtx)
+	eh.Start()
+}

+ 4 - 0
app/cmd/events/internal/svc/vars.go

@@ -11,6 +11,8 @@ type modelList struct {
 	DcEventRule   model.DcEventRuleModel
 	DcEventList   model.DcEventListModel
 	DcItemData    model.DcItemDataModel
+	DcWorkingUf   model.DcWorkingUfModel
+	DcDeviceBind  model.DcDeviceBindModel
 }
 
 func initModel(svc *ServiceContext) {
@@ -19,5 +21,7 @@ func initModel(svc *ServiceContext) {
 	svc.DcEventList = model.NewDcEventListModel(mysql)
 	svc.DcEventBind = model.NewDcEventBindModel(mysql)
 	svc.DcItemData = model.NewDcItemDataModel(mysql)
+	svc.DcWorkingUf = model.NewDcWorkingUfModel(mysql)
+	svc.DcDeviceBind = model.NewDcDeviceBindModel(mysql)
 	svc.ProjectConfig = model.NewDcProjectConfigModel(mysql)
 }

+ 12 - 3
app/cmd/organization/internal/logic/handler/uf.go

@@ -5,7 +5,9 @@ import (
 	"GtDataStore/app/cmd/organization/internal/svc"
 	"GtDataStore/app/model"
 	"context"
+	"fmt"
 	"github.com/zeromicro/go-zero/core/logx"
+	"strconv"
 	"time"
 )
 
@@ -31,7 +33,7 @@ func DeviceUf(ctx context.Context, task *job.Task, technologyName string) error
 	//wg.Wait()
 
 	// 3. 转换为存储对象
-	workings, err := transDeviceUfData(devices)
+	workings, err := transDeviceUfData(task.Job.SvcCtx, devices)
 	if err != nil {
 		return err
 	}
@@ -56,10 +58,17 @@ func findDeviceUf(svcCtx *svc.ServiceContext, projectId int64) ([]model.DcDevice
 	}
 }
 
-func transDeviceUfData(datas []model.DcDeviceBind) ([]model.DcWorkingUf, error) {
+func transDeviceUfData(svcCtx *svc.ServiceContext, datas []model.DcDeviceBind) ([]model.DcWorkingUf, error) {
 	ts := make([]model.DcWorkingUf, len(datas))
 
 	for i, data := range datas {
+		var filterCycleAdjust int64 = 0
+
+		key := fmt.Sprintf("events:adjust:%d:%s:%s:%s", data.ProjectId, data.DeviceCode, "filter_cycle", "")
+		if adjustString, err := svcCtx.Cache.Get(key); err == nil && adjustString != "" {
+			filterCycleAdjust, _ = strconv.ParseInt(adjustString, 10, 64)
+		}
+
 		ts[i] = model.DcWorkingUf{
 			ProjectId:        data.ProjectId,
 			DeviceCode:       data.DeviceCode,
@@ -90,7 +99,7 @@ func transDeviceUfData(datas []model.DcDeviceBind) ([]model.DcWorkingUf, error)
 			ProductWqP:       data.Items.GetItemFloat64Value("product_wq_p"),
 			Step:             data.Items.GetItemInt64Value("step"),
 			FilterTime:       data.Items.GetItemFloat64Value("filter_time"),
-			FilterCycle:      data.Items.GetItemInt64Value("filter_cycle"),
+			FilterCycle:      data.Items.GetItemInt64Value("filter_cycle") + filterCycleAdjust,
 			CTime:            time.Now(),
 		}
 	}