Quellcode durchsuchen

fit: events支持多点位事件

gaoyagang vor 1 Jahr
Ursprung
Commit
80f518079d

+ 22 - 4
app/cmd/events/internal/logic/handler/event.go

@@ -32,12 +32,20 @@ func HandlerEventNotify(svcCtx *svc.ServiceContext) error {
 	fmt.Print("start HandlerEventNotify.....\n")
 	queue := svcCtx.Cache
 
-	for info := range eventChan {
+	go func() {
+		for info := range eventChan {
+			info.MsgTime = time.Now()
+			// 数据通知到redis
+			toMQ(queue, info)
+			// 事件记录到mysql
+			toDB(svcCtx, info)
+		}
+	}()
+
+	for info := range multiEventChan {
 		info.MsgTime = time.Now()
 		// 数据通知到redis
-		toMQ(queue, info)
-		// 事件记录到mysql
-		toDB(svcCtx, info)
+		toMQForMulti(queue, info)
 	}
 
 	return nil
@@ -52,7 +60,17 @@ func toMQ(queue *redis.Client, info eventInfo) {
 	} else {
 		logx.Errorf("handlerEvent queue.Publish toMQ error: %s %+v", channel, info)
 	}
+}
 
+func toMQForMulti(queue *redis.Client, info multiEventInfo) {
+	channel := fmt.Sprintf(EVTNE_NOTIFY_KEY_FORMAT, info.ProjectId, info.EventName)
+	if bs, err := json.Marshal(info); err == nil {
+		if err := producter.Publisher(channel, bs); err == nil {
+			logx.Errorf("handlerEvent queue.Publish toMQForMulti ok: %s %+v", channel, info)
+		}
+	} else {
+		logx.Errorf("handlerEvent queue.Publish toMQForMulti error: %s %+v", channel, info)
+	}
 }
 
 func toDB(svcCtx *svc.ServiceContext, info eventInfo) {

+ 77 - 4
app/cmd/events/internal/logic/handler/multi.go

@@ -6,23 +6,96 @@ import (
 	"GtDataStore/app/model"
 	"context"
 	"github.com/zeromicro/go-zero/core/logx"
+	"metawant.greentech.com.cn/gaoyagang/gt-common/envitem"
+	"metawant.greentech.com.cn/gaoyagang/gt-common/identify"
+	"sync"
+	"time"
 )
 
 func EventMulti(ctx context.Context, task *job.Task, technologyName string) error {
 	// 1. 查询所有的设备
-	_, err := findMultiRecord(task.Job.SvcCtx, int64(task.Id))
+	binds, err := findMultiRecord(task.Job.SvcCtx, int64(task.Id))
 	if err != nil {
 		return err
 	}
 
+	// 2. 识别事件
+	var wg sync.WaitGroup
+	wg.Add(len(binds))
+	for _, bind := range binds {
+		bind := bind
+		go func() {
+			defer wg.Done()
+
+			multiEnvItem := extractEventItems(bind)
+			//fmt.Printf("extractEventItems: %+v\n", multiEnvItem)
+			err := multiEnvItem.FillCurrentValue()
+			if err != nil {
+				return
+			}
+
+			ovs := multiEnvItem.FindPrevString()
+			nvs := multiEnvItem.FindString()
+
+			rules := make(map[string]identify.MultiRule)
+			rules[bind.Name] = bind.Config.Multi
+			checker := identify.NewMultiCheck(rules)
+			name := checker.Check(ovs, nvs)
+			//fmt.Printf("ovs: %+v\n", ovs)
+			//fmt.Printf("nvs: %+v\n", nvs)
+			//fmt.Printf("rules: %+v\n", rules)
+			//fmt.Printf("name: %s\n", name)
+			if name != "" {
+				items := make([]string, 0)
+				times := make(map[string]time.Time)
+
+				for itemName, _ := range nvs {
+					items = append(items, itemName)
+					times[itemName] = *multiEnvItem.GetItemHtime(itemName)
+				}
+
+				multiEventChan <- multiEventInfo{
+					ProjectId:  bind.ProjectId,
+					DeviceCode: bind.DeviceCode,
+					Items:      items,
+					EventId:    bind.Id,
+					EventName:  name,
+					OldValues:  ovs,
+					NewValues:  nvs,
+					Times:      times,
+				}
+			}
+		}()
+	}
+
+	wg.Wait()
+
 	return nil
 }
 
 func findMultiRecord(svcCtx *svc.ServiceContext, projectId int64) ([]model.DcEventBind, error) {
-	if records, err := svcCtx.DcEventBind.FindByProjectIdRuleFlag(context.Background(), projectId, EVENT_MULTI_ITEM); err != nil {
-		logx.Infof("findMultiRecord not found record")
+	if data := eventBindCacheTable.GetCache(EVENT_MULTI_ITEM); data != nil {
+		return data, nil
+	}
+
+	if binds, err := svcCtx.DcEventBind.FindByProjectIdRuleFlag(context.Background(), projectId, EVENT_MULTI_ITEM); err != nil {
+		logx.Error("findSingleRecord not found record")
 		return nil, err
 	} else {
-		return records, nil
+		eventBindCacheTable.SetCache(EVENT_MULTI_ITEM, binds, time.Now().Add(300*time.Second))
+		return binds, nil
 	}
 }
+
+func extractEventItems(bind model.DcEventBind) envitem.MultiEnvItem {
+	me := make(envitem.MultiEnvItem)
+	for itemName, rule := range bind.Config.Multi {
+		_ = rule
+		me[itemName] = &envitem.EnvItem{
+			ProjectId: bind.ProjectId,
+			Item:      itemName,
+		}
+	}
+
+	return me
+}

+ 1 - 1
app/cmd/events/internal/logic/handler/single.go

@@ -45,7 +45,7 @@ func EventSingle(ctx context.Context, task *job.Task, technologyName string) err
 			ov, _ := ovMap[bind.Item]
 			nv := item.GetItemStringVal()
 
-			checker := identify.NewSingleCheck(map[string]identify.Rule{bind.Name: bind.Config})
+			checker := identify.NewSingleCheck(map[string]identify.Rule{bind.Name: bind.Config.Single})
 			name := checker.Check(ov, nv)
 			//println(fmt.Sprintf("ov: %s nv: %s event: %s name", ov, nv, name))
 

+ 19 - 4
app/cmd/events/internal/logic/handler/vars.go

@@ -17,7 +17,7 @@ const (
 )
 
 type (
-	// 事件信息
+	// 事件信息, 单点位
 	eventInfo struct {
 		ProjectId  int64     // 项目ID
 		DeviceCode string    // 设备位号
@@ -30,6 +30,19 @@ type (
 		MsgTime    time.Time // 消息时间
 	}
 
+	// 事件信息, 多点位
+	multiEventInfo struct {
+		ProjectId  int64                // 项目ID
+		DeviceCode string               // 设备位号
+		Items      []string             // 点位名称
+		EventId    int64                // 事件ID
+		EventName  string               // 事件名称
+		OldValues  map[string]string    // 点位旧值
+		NewValues  map[string]string    // 新值
+		Times      map[string]time.Time // 新值时间
+		MsgTime    time.Time            // 消息时间
+	}
+
 	EventBindCache struct {
 		Expire time.Time
 		Data   []model.DcEventBind
@@ -39,12 +52,13 @@ type (
 )
 
 var (
-	eventChan chan eventInfo
-	producter *rabbitMQ.Producter
+	eventChan      chan eventInfo
+	multiEventChan chan multiEventInfo
+	producter      *rabbitMQ.Producter
 
 	EventIntervalTable = map[string]time.Duration{
 		EVENT_SINGLE_ITEM: 2 * time.Second,
-		EVENT_MULTI_ITEM:  60 * time.Second,
+		EVENT_MULTI_ITEM:  2 * time.Second,
 	}
 
 	EventHandlerTable = map[string]func(ctx context.Context, task *job.Task, technologyName string) error{
@@ -79,4 +93,5 @@ func (t EventBindCacheMap) SetCache(technologyName string, data []model.DcEventB
 
 func init() {
 	eventChan = make(chan eventInfo, 50)
+	multiEventChan = make(chan multiEventInfo, 50)
 }

+ 9 - 10
app/model/dcEventBindModel_gen.go

@@ -6,7 +6,6 @@ import (
 	"context"
 	"database/sql"
 	"fmt"
-	"metawant.greentech.com.cn/gaoyagang/gt-common/identify"
 	"strings"
 	"time"
 
@@ -40,15 +39,15 @@ type (
 	}
 
 	DcEventBind struct {
-		Id         int64         `db:"id"`
-		ProjectId  int64         `db:"project_id"`  // 项目 ID
-		DeviceCode string        `db:"device_code"` // 设备位号
-		Name       string        `db:"name"`        // 事件名称
-		Item       string        `db:"item"`        // 点位名称
-		RuleFlag   string        `db:"rule_flag"`   // 绑定的事件引擎
-		Config     identify.Rule `db:"config"`      // 事件配置 一个 json 配置,用于
-		Interval   int64         `db:"interval"`    // 检测时间间隔
-		CTime      time.Time     `db:"c_time"`
+		Id         int64     `db:"id"`
+		ProjectId  int64     `db:"project_id"`  // 项目 ID
+		DeviceCode string    `db:"device_code"` // 设备位号
+		Name       string    `db:"name"`        // 事件名称
+		Item       string    `db:"item"`        // 点位名称
+		RuleFlag   string    `db:"rule_flag"`   // 绑定的事件引擎
+		Config     EventRule `db:"config"`      // 事件配置 一个 json 配置,用于
+		Interval   int64     `db:"interval"`    // 检测时间间隔
+		CTime      time.Time `db:"c_time"`
 	}
 )
 

+ 23 - 0
app/model/vars.go

@@ -4,6 +4,7 @@ import (
 	"database/sql/driver"
 	"encoding/json"
 	"github.com/zeromicro/go-zero/core/stores/sqlx"
+	"metawant.greentech.com.cn/gaoyagang/gt-common/identify"
 )
 
 var ErrNotFound = sqlx.ErrNotFound
@@ -19,3 +20,25 @@ func (d *Technologys) Scan(input interface{}) error {
 	_ = json.Unmarshal(input.([]byte), &d)
 	return nil
 }
+
+type EventRule struct {
+	Single identify.Rule
+	Multi  identify.MultiRule
+}
+
+func (d EventRule) Value() (driver.Value, error) {
+	return json.Marshal(d)
+}
+
+// Scan 实现方法
+func (d *EventRule) Scan(input interface{}) error {
+	if err := json.Unmarshal(input.([]byte), &d.Multi); err == nil {
+		return nil
+	}
+
+	if err := json.Unmarshal(input.([]byte), &d.Single); err == nil {
+		return nil
+	}
+
+	return nil
+}

+ 19 - 0
app/model/vars_test.go

@@ -0,0 +1,19 @@
+package model
+
+import (
+	"encoding/json"
+	"metawant.greentech.com.cn/gaoyagang/gt-common/identify"
+	"testing"
+)
+
+func TestEvent_SetT(t *testing.T) {
+	_ = `{"C.M.UF1_DB@word_control":{"NvIn":["26"]},"C.M.UF1_DB@time_production":{"NvIn":["360...660"]}}`
+	jso1 := `{"NvIn":["26"]}`
+
+	x := identify.Rule{}
+
+	err := json.Unmarshal([]byte(jso1), &x)
+
+	t.Log(err)
+	t.Logf("%+v", x)
+}

+ 1 - 1
go.mod

@@ -9,7 +9,7 @@ require (
 	github.com/zeromicro/go-zero v1.6.0
 	google.golang.org/grpc v1.59.0
 	google.golang.org/protobuf v1.31.0
-	metawant.greentech.com.cn/gaoyagang/gt-common v1.2.2
+	metawant.greentech.com.cn/gaoyagang/gt-common v1.2.3
 )
 
 require (

+ 2 - 2
go.sum

@@ -279,8 +279,8 @@ k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/A
 k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA=
 k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI=
 k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
-metawant.greentech.com.cn/gaoyagang/gt-common v1.2.2 h1:piC5uoDCIklKaCityR5lAD0u2KQInWhY65nCr2gqNAU=
-metawant.greentech.com.cn/gaoyagang/gt-common v1.2.2/go.mod h1:TiS/E+b6WFW/0Ei4H/djoguRTnfeBsiUnt5odj1C9n4=
+metawant.greentech.com.cn/gaoyagang/gt-common v1.2.3 h1:pruZlr729MOcYvCObRXmtOPVyg+bcpKyxOhyw1M0uMI=
+metawant.greentech.com.cn/gaoyagang/gt-common v1.2.3/go.mod h1:TiS/E+b6WFW/0Ei4H/djoguRTnfeBsiUnt5odj1C9n4=
 sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
 sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
 sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE=