Bladeren bron

fit:
1. 修正job snyc的问题
2. 单点位判断支持改变判断

gaoyagang 1 jaar geleden
bovenliggende
commit
b1c599a1de

+ 9 - 1
.idea/GtDataStore.iml

@@ -1,9 +1,17 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <module type="WEB_MODULE" version="4">
+  <component name="FacetManager">
+    <facet type="Python" name="Python facet">
+      <configuration sdkName="Python 3.9 (GtDataStore)" />
+    </facet>
+  </component>
   <component name="Go" enabled="true" />
   <component name="NewModuleRootManager">
-    <content url="file://$MODULE_DIR$" />
+    <content url="file://$MODULE_DIR$">
+      <excludeFolder url="file://$MODULE_DIR$/venv" />
+    </content>
     <orderEntry type="inheritedJdk" />
     <orderEntry type="sourceFolder" forTests="false" />
+    <orderEntry type="library" name="Python 3.9 (GtDataStore) interpreter library" level="application" />
   </component>
 </module>

+ 1 - 1
app/cmd/events/etc/events.yaml

@@ -7,7 +7,7 @@ Mode: test
 Log:
   ServiceName: events-rpc
   Mode: console
-  Level: info
+  Level: error
 
 #双胞胎服务
 GtServerIp: 47.96.12.136:8788

+ 16 - 19
app/cmd/events/events.go

@@ -8,15 +8,8 @@ import (
 	"fmt"
 
 	"GtDataStore/app/cmd/events/internal/config"
-	"GtDataStore/app/cmd/events/internal/server"
 	"GtDataStore/app/cmd/events/internal/svc"
-	"GtDataStore/app/cmd/events/pb"
-
 	"github.com/zeromicro/go-zero/core/conf"
-	"github.com/zeromicro/go-zero/core/service"
-	"github.com/zeromicro/go-zero/zrpc"
-	"google.golang.org/grpc"
-	"google.golang.org/grpc/reflection"
 )
 
 var configFile = flag.String("f", "etc/events.yaml", "the config file")
@@ -28,27 +21,31 @@ func main() {
 	conf.MustLoad(*configFile, &c)
 	ctx := svc.NewServiceContext(c)
 
-	s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
-		pb.RegisterEventsServer(grpcServer, server.NewEventsServer(ctx))
-
-		if c.Mode == service.DevMode || c.Mode == service.TestMode {
-			reflection.Register(grpcServer)
-		}
-	})
-	defer s.Stop()
+	//s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
+	//	pb.RegisterEventsServer(grpcServer, server.NewEventsServer(ctx))
+	//
+	//	if c.Mode == service.DevMode || c.Mode == service.TestMode {
+	//		reflection.Register(grpcServer)
+	//	}
+	//})
+	//defer s.Stop()
+	//fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
+	//s.Start()
 
 	if c.Mode != "dev" {
-		envitem.SetOptions(envitem.Options{GtServerIp: c.GtServerIp})
+		envitem.SetOptions(envitem.Options{GtServerIp: c.GtServerIp, Cache: ctx.Cache})
 
 		go func() {
 			defer func() {
 				fmt.Print("async events job stop.....\n")
 			}()
+
+			// 启动事件识别
+			fmt.Print("start new job for events.....\n")
 			j := job.NewJob(1, "events", ctx, handler.EventHandlerTable, handler.EventIntervalTable)
 			j.Run()
 		}()
 	}
-
-	fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
-	s.Start()
+	// 接收识别到的事件, 处理, 目前只有写入队列(暂时只有redis), 写入数据库
+	_ = handler.HandlerEventNotify(ctx)
 }

+ 0 - 5
app/cmd/events/events/events.go

@@ -4,12 +4,7 @@
 package events
 
 import (
-	"context"
-
-	"GtDataStore/app/cmd/events/pb"
-
 	"github.com/zeromicro/go-zero/zrpc"
-	"google.golang.org/grpc"
 )
 
 type (

+ 44 - 4
app/cmd/events/internal/logic/handler/event.go

@@ -1,16 +1,56 @@
 package handler
 
 import (
-	"GtDataStore/app/cmd/events/internal/logic/job"
+	"GtDataStore/app/cmd/events/internal/svc"
+	"GtDataStore/app/model"
 	"context"
+	"encoding/json"
+	"fmt"
+	"github.com/go-redis/redis/v8"
+	"github.com/zeromicro/go-zero/core/logx"
+	"time"
 )
 
-func handlerEvent(ctx context.Context, task *job.Task, technologyName string) error {
-	queue := task.Job.SvcCtx.Cache
+func HandlerEventNotify(svcCtx *svc.ServiceContext) error {
+	defer close(eventChan)
+	fmt.Print("start HandlerEventNotify.....\n")
+	queue := svcCtx.Cache
 
 	for info := range eventChan {
-
+		info.MsgTime = time.Now()
+		// 数据通知到redis
+		toMQ(queue, info)
+		// 事件记录到mysql
+		toDB(svcCtx, info)
 	}
 
 	return nil
 }
+
+func toMQ(queue *redis.Client, info eventInfo) {
+	channel := fmt.Sprintf(EVTNE_NOTIFY_KEY_FORMAT, info.ProjectId, info.EventName)
+	if bs, err := json.Marshal(info); err == nil {
+		if intCmd := queue.Publish(context.Background(), channel, string(bs)); intCmd == nil {
+			logx.Errorf("handlerEvent queue.Publish toMQ error: %s %+v", channel, info)
+		}
+	} else {
+		logx.Errorf("handlerEvent queue.Publish toMQ error: %s %+v", channel, info)
+	}
+
+}
+
+func toDB(svcCtx *svc.ServiceContext, info eventInfo) {
+	if _, err := svcCtx.DcEventList.Insert(context.Background(), &model.DcEventList{
+		ProjectId:  info.ProjectId,
+		DeviceCode: info.DeviceCode,
+		Name:       info.EventName,
+		Item:       info.Item,
+		Val:        info.NewValue,
+		OldVal:     info.OldValue,
+		Time:       info.Time,
+		EventId:    info.EventId,
+		CTime:      info.MsgTime,
+	}); err != nil {
+		logx.Errorf("handlerEvent queue.Publish toDB error:%+v", info)
+	}
+}

+ 24 - 15
app/cmd/events/internal/logic/handler/single.go

@@ -8,6 +8,7 @@ import (
 	"GtDataStore/common/identify"
 	"context"
 	"errors"
+	"fmt"
 	"github.com/zeromicro/go-zero/core/logx"
 	"sync"
 )
@@ -26,7 +27,8 @@ func EventSingle(ctx context.Context, task *job.Task, technologyName string) err
 	}
 
 	// 2. 获得点位信息
-	err = MultiEvents.FillCurrentValue()
+	ovMap := MultiEvents.FindPrevString() // 旧点位值map
+	err = MultiEvents.FillCurrentValue()  // 该方法执行行, 会将当前的值更新到旧点位值
 	if err != nil {
 		return err
 	}
@@ -35,22 +37,29 @@ func EventSingle(ctx context.Context, task *job.Task, technologyName string) err
 	var wg sync.WaitGroup
 	wg.Add(len(binds))
 	for _, bind := range binds {
-		item := MultiEvents[bind.Item]
-		ov := item.GetItemPrevStringVal()
-		nv := item.GetItemStringVal()
+		bind := bind
+		go func() {
+			defer wg.Done()
+			item := MultiEvents[bind.Item]
+			ov, _ := ovMap[bind.Item]
+			nv := item.GetItemStringVal()
 
-		checker := identify.NewSingleCheck(map[string]identify.Rule{bind.Name: bind.Config})
-		if name := checker.Check(ov, nv); name != "" {
-			eventChan <- eventInfo{
-				ProjectId:  bind.ProjectId,
-				DeviceCode: bind.DeviceCode,
-				Item:       bind.Item,
-				EventName:  name,
-				OldValue:   ov,
-				NewValue:   nv,
-				Time:       *item.GetItemHtime(),
+			checker := identify.NewSingleCheck(map[string]identify.Rule{bind.Name: bind.Config})
+			name := checker.Check(ov, nv)
+			println(fmt.Sprintf("ov: %s nv: %s event: %s name", ov, nv, name))
+			if name != "" {
+				eventChan <- eventInfo{
+					ProjectId:  bind.ProjectId,
+					DeviceCode: bind.DeviceCode,
+					Item:       bind.Item,
+					EventId:    bind.Id,
+					EventName:  name,
+					OldValue:   ov,
+					NewValue:   nv,
+					Time:       *item.GetItemHtime(),
+				}
 			}
-		}
+		}()
 	}
 
 	wg.Wait()

+ 3 - 2
app/cmd/events/internal/logic/handler/vars.go

@@ -11,6 +11,7 @@ const (
 	EVENT_SINGLE_ITEM = "SINGLE_ITEM_EVENT" // 单点位值事件
 	EVENT_MULTI_ITEM  = "MULTI_ITEM_EVENT"  // 多点位值事件
 
+	EVTNE_NOTIFY_KEY_FORMAT = "EVENT:NOFITY:%d:%s" // 事件通知通道 %d: 项目名称 %s: 事件名称
 )
 
 type (
@@ -19,10 +20,12 @@ type (
 		ProjectId  int64
 		DeviceCode string
 		Item       string
+		EventId    int64
 		EventName  string
 		OldValue   string
 		NewValue   string
 		Time       time.Time
+		MsgTime    time.Time
 	}
 )
 
@@ -32,13 +35,11 @@ var (
 	EventIntervalTable = map[string]time.Duration{
 		EVENT_SINGLE_ITEM: 60 * time.Second,
 		EVENT_MULTI_ITEM:  60 * time.Second,
-		EVENT_NOTIFY:      50 * time.Second,
 	}
 
 	EventHandlerTable = map[string]func(ctx context.Context, task *job.Task, technologyName string) error{
 		EVENT_SINGLE_ITEM: EventSingle,
 		EVENT_MULTI_ITEM:  EventMulti,
-		EVENT_NOTIFY:      handlerEvent,
 	}
 )
 

+ 31 - 12
app/cmd/events/internal/logic/job/job.go

@@ -42,6 +42,9 @@ type Job struct {
 	ptsLock              sync.RWMutex
 	handlerTable         map[string]func(ctx context.Context, task *Task, technologyName string) error
 	handlerIntervalTable map[string]time.Duration
+	jobProjectTotalKey   string
+	jobNodesCountKey     string
+	jobNodesHeartbeatKey string
 }
 
 func NewJob(id uint32, name string, svc *svc.ServiceContext, handlerTable map[string]func(ctx context.Context, task *Task, technologyName string) error, handlerIntervalTable map[string]time.Duration) *Job {
@@ -56,6 +59,9 @@ func NewJob(id uint32, name string, svc *svc.ServiceContext, handlerTable map[st
 		pts:                  make(map[uint32]*Task),
 		handlerTable:         handlerTable,
 		handlerIntervalTable: handlerIntervalTable,
+		jobProjectTotalKey:   fmt.Sprintf("job:project:total:%s", name),
+		jobNodesCountKey:     fmt.Sprintf("job:nodes:count:%s", name),
+		jobNodesHeartbeatKey: fmt.Sprintf("job:nodes:heartbeat:%s", name),
 	}
 }
 
@@ -159,12 +165,8 @@ func (j *Job) ClearTask() {
 	}
 }
 
-func (j *Job) setPartition(partition Partition) {
-	j.Partition = partition
-}
-
 func (p *Partition) partitionLimit() int64 {
-	if p.N == 0 {
+	if p.N == 0 || p.Total < p.N {
 		return 1
 	}
 	total := p.Total
@@ -175,25 +177,25 @@ func (p *Partition) partitionLimit() int64 {
 }
 
 func (j *Job) syncPartition() {
-	tk := time.NewTicker(60 * time.Second)
+	tk := time.NewTicker(30 * time.Second)
 
 	go func() {
 		for {
-			message, err := j.syncClient.Subscribe(j.Ctx, "simulation-job-"+j.Name).ReceiveMessage(j.Ctx)
+			message, err := j.syncClient.Subscribe(j.Ctx, j.jobProjectTotalKey).ReceiveMessage(j.Ctx)
 			if err != nil {
 				time.Sleep(5 * time.Second)
 				continue
 			}
 
 			if total, err := strconv.ParseInt(message.Payload, 10, 64); err == nil {
-				ic := j.syncClient.Incr(j.Ctx, "simulation-job-n-"+j.Name)
+				ic := j.syncClient.Incr(j.Ctx, j.jobNodesCountKey)
 				j.Partition.Total = total
 				j.Partition.Pn = ic.Val()
 				j.Partition.N = 1
 
 				time.Sleep(3 * time.Second)
-				n := j.syncClient.Get(j.Ctx, "simulation-job-n-"+j.Name)
-				j.syncClient.Expire(j.Ctx, "simulation-job-n-"+j.Name, 10*time.Second)
+				n := j.syncClient.Get(j.Ctx, j.jobNodesCountKey)
+				j.syncClient.Expire(j.Ctx, j.jobNodesCountKey, 10*time.Second)
 
 				if n, err := n.Int64(); err == nil {
 					j.Partition.N = n
@@ -207,14 +209,31 @@ func (j *Job) syncPartition() {
 	for {
 		select {
 		case <-tk.C:
+
+			//fmt.Printf("total: %d nodes: %d pos: %d\n", j.Partition.Total, j.Partition.N, j.Partition.Pn)
+			// 维持一个tk周期的心跳
+			ml := false
+			nowUnix := time.Now().Unix()
+			j.syncClient.HSet(context.Background(), j.jobNodesHeartbeatKey, j.Partition.Pn, nowUnix)
+			// 检测心跳超过1分钟的连接, 如果存在, 收重新分配
+			if hm := j.syncClient.HGetAll(context.Background(), j.jobNodesHeartbeatKey); hm != nil {
+				for _, lastSyncUnixs := range hm.Val() {
+					if lastSyncUnix, err := strconv.ParseInt(lastSyncUnixs, 10, 64); err == nil && nowUnix-lastSyncUnix > 60 {
+						j.syncClient.Del(context.Background(), j.jobNodesHeartbeatKey)
+						ml = true
+						break
+					}
+				}
+			}
+
 			ps, err := j.SvcCtx.ProjectConfig.FindAll(j.Ctx)
 			if err != nil {
 				logx.Errorf("not found project config")
 			}
 
-			if len(ps) != int(j.Partition.Total) {
+			if ml == true || len(ps) != int(j.Partition.Total) {
 				// 发布一条消息
-				j.syncClient.Publish(j.Ctx, "simulation-job-"+j.Name, len(ps))
+				j.syncClient.Publish(j.Ctx, j.jobProjectTotalKey, len(ps))
 			}
 		}
 	}

+ 1 - 1
app/cmd/events/internal/logic/job/task.go

@@ -111,7 +111,7 @@ func Handler(ctx context.Context, task *Task) error {
 
 // 将一个项目需要处理的所有工艺进行分发
 func distribute(ctx context.Context, task *Task) error {
-	fmt.Printf("distribute task for: %+v", task.Model.Technologys)
+	fmt.Printf("distribute task for: %+v \n", task.Model.Technologys)
 	for _, technology := range task.Model.Technologys {
 		interval, ok := task.Job.handlerIntervalTable[technology]
 		if !ok {

+ 31 - 12
app/cmd/organization/internal/logic/job/job.go

@@ -42,6 +42,9 @@ type Job struct {
 	ptsLock              sync.RWMutex
 	handlerTable         map[string]func(ctx context.Context, task *Task, technologyName string) error
 	handlerIntervalTable map[string]time.Duration
+	jobProjectTotalKey   string
+	jobNodesCountKey     string
+	jobNodesHeartbeatKey string
 }
 
 func NewJob(id uint32, name string, svc *svc.ServiceContext, handlerTable map[string]func(ctx context.Context, task *Task, technologyName string) error, handlerIntervalTable map[string]time.Duration) *Job {
@@ -56,6 +59,9 @@ func NewJob(id uint32, name string, svc *svc.ServiceContext, handlerTable map[st
 		pts:                  make(map[uint32]*Task),
 		handlerTable:         handlerTable,
 		handlerIntervalTable: handlerIntervalTable,
+		jobProjectTotalKey:   fmt.Sprintf("job:project:total:%s", name),
+		jobNodesCountKey:     fmt.Sprintf("job:nodes:count:%s", name),
+		jobNodesHeartbeatKey: fmt.Sprintf("job:nodes:heartbeat:%s", name),
 	}
 }
 
@@ -159,12 +165,8 @@ func (j *Job) ClearTask() {
 	}
 }
 
-func (j *Job) setPartition(partition Partition) {
-	j.Partition = partition
-}
-
 func (p *Partition) partitionLimit() int64 {
-	if p.N == 0 {
+	if p.N == 0 || p.Total < p.N {
 		return 1
 	}
 	total := p.Total
@@ -175,25 +177,25 @@ func (p *Partition) partitionLimit() int64 {
 }
 
 func (j *Job) syncPartition() {
-	tk := time.NewTicker(60 * time.Second)
+	tk := time.NewTicker(30 * time.Second)
 
 	go func() {
 		for {
-			message, err := j.syncClient.Subscribe(j.Ctx, "simulation-job-"+j.Name).ReceiveMessage(j.Ctx)
+			message, err := j.syncClient.Subscribe(j.Ctx, j.jobProjectTotalKey).ReceiveMessage(j.Ctx)
 			if err != nil {
 				time.Sleep(5 * time.Second)
 				continue
 			}
 
 			if total, err := strconv.ParseInt(message.Payload, 10, 64); err == nil {
-				ic := j.syncClient.Incr(j.Ctx, "simulation-job-n-"+j.Name)
+				ic := j.syncClient.Incr(j.Ctx, j.jobNodesCountKey)
 				j.Partition.Total = total
 				j.Partition.Pn = ic.Val()
 				j.Partition.N = 1
 
 				time.Sleep(3 * time.Second)
-				n := j.syncClient.Get(j.Ctx, "simulation-job-n-"+j.Name)
-				j.syncClient.Expire(j.Ctx, "simulation-job-n-"+j.Name, 10*time.Second)
+				n := j.syncClient.Get(j.Ctx, j.jobNodesCountKey)
+				j.syncClient.Expire(j.Ctx, j.jobNodesCountKey, 10*time.Second)
 
 				if n, err := n.Int64(); err == nil {
 					j.Partition.N = n
@@ -207,14 +209,31 @@ func (j *Job) syncPartition() {
 	for {
 		select {
 		case <-tk.C:
+
+			//fmt.Printf("total: %d nodes: %d pos: %d\n", j.Partition.Total, j.Partition.N, j.Partition.Pn)
+			// 维持一个tk周期的心跳
+			ml := false
+			nowUnix := time.Now().Unix()
+			j.syncClient.HSet(context.Background(), j.jobNodesHeartbeatKey, j.Partition.Pn, nowUnix)
+			// 检测心跳超过1分钟的连接, 如果存在, 收重新分配
+			if hm := j.syncClient.HGetAll(context.Background(), j.jobNodesHeartbeatKey); hm != nil {
+				for _, lastSyncUnixs := range hm.Val() {
+					if lastSyncUnix, err := strconv.ParseInt(lastSyncUnixs, 10, 64); err == nil && nowUnix-lastSyncUnix > 60 {
+						j.syncClient.Del(context.Background(), j.jobNodesHeartbeatKey)
+						ml = true
+						break
+					}
+				}
+			}
+
 			ps, err := j.SvcCtx.ProjectConfig.FindAll(j.Ctx)
 			if err != nil {
 				logx.Errorf("not found project config")
 			}
 
-			if len(ps) != int(j.Partition.Total) {
+			if ml == true || len(ps) != int(j.Partition.Total) {
 				// 发布一条消息
-				j.syncClient.Publish(j.Ctx, "simulation-job-"+j.Name, len(ps))
+				j.syncClient.Publish(j.Ctx, j.jobProjectTotalKey, len(ps))
 			}
 		}
 	}

+ 6 - 3
app/model/dcEventListModel_gen.go

@@ -39,8 +39,11 @@ type (
 		Id         int64     `db:"id"`
 		ProjectId  int64     `db:"project_id"`
 		DeviceCode string    `db:"device_code"`
+		Name       string    `db:"name"` // 事件名称
 		Item       string    `db:"item"`
 		Val        string    `db:"val"`      // 事件发生时,点位的值
+		OldVal     string    `db:"old_val"`  // 事件发生时, 旧的点位值
+		Time       time.Time `db:"time"`     // 事件触发时间
 		EventId    int64     `db:"event_id"` // 事件 ID dc_event_bind.id
 		Title      string    `db:"title"`    // 事件标题
 		Context    string    `db:"context"`  // 事件内容
@@ -83,14 +86,14 @@ func (m *defaultDcEventListModel) FindOne(ctx context.Context, id int64) (*DcEve
 }
 
 func (m *defaultDcEventListModel) Insert(ctx context.Context, data *DcEventList) (sql.Result, error) {
-	query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?)", m.table, dcEventListRowsExpectAutoSet)
-	ret, err := m.conn.ExecCtx(ctx, query, data.ProjectId, data.DeviceCode, data.Item, data.Val, data.EventId, data.Title, data.Context, data.CTime)
+	query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, dcEventListRowsExpectAutoSet)
+	ret, err := m.conn.ExecCtx(ctx, query, data.ProjectId, data.DeviceCode, data.Name, data.Item, data.Val, data.OldVal, data.Time, data.EventId, data.Title, data.Context, data.CTime)
 	return ret, err
 }
 
 func (m *defaultDcEventListModel) Update(ctx context.Context, data *DcEventList) error {
 	query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, dcEventListRowsWithPlaceHolder)
-	_, err := m.conn.ExecCtx(ctx, query, data.ProjectId, data.DeviceCode, data.Item, data.Val, data.EventId, data.Title, data.Context, data.CTime, data.Id)
+	_, err := m.conn.ExecCtx(ctx, query, data.ProjectId, data.DeviceCode, data.Name, data.Item, data.Val, data.OldVal, data.Time, data.EventId, data.Title, data.Context, data.CTime, data.Id)
 	return err
 }
 

+ 17 - 6
common/envitem/func.go

@@ -28,7 +28,7 @@ func (m MultiEnvItem) GetProjectId() int64 {
 	return 0
 }
 
-func (m MultiEnvItem) GetItemNames() []string {
+func (m MultiEnvItem) getItemNames() []string {
 	items := make([]string, 0)
 	for _, item := range m {
 		items = append(items, item.Item)
@@ -78,11 +78,22 @@ func (m MultiEnvItem) GetItemStringValue(key string) string {
 	return ""
 }
 
-func (m MultiEnvItem) GetCurrentValue(key string) (string, string, error) {
-	if envItem, ok := m[key]; ok {
-		return envItem.getCurrentValue()
+func (m MultiEnvItem) FindString() map[string]string {
+	a := make(map[string]string)
+	for _, item := range m {
+		a[item.Item] = item.Value.(string)
+	}
+
+	return a
+}
+
+func (m MultiEnvItem) FindPrevString() map[string]string {
+	a := make(map[string]string)
+	for _, item := range m {
+		a[item.Item] = item.GetItemPrevStringVal()
 	}
-	return "", "", errors.New(key + " not exists")
+
+	return a
 }
 
 func (m MultiEnvItem) getCurrentData() ([]*ItemValueResp, error) {
@@ -90,7 +101,7 @@ func (m MultiEnvItem) getCurrentData() ([]*ItemValueResp, error) {
 	data := make([]*ItemValueReq, 1)
 
 	data[0] = &ItemValueReq{
-		DeviceItems: strings.Join(m.GetItemNames(), ","),
+		DeviceItems: strings.Join(m.getItemNames(), ","),
 		ProjectId:   m.GetProjectId(),
 	}
 

+ 5 - 2
common/envitem/vars.go

@@ -1,10 +1,13 @@
 package envitem
 
-import "github.com/go-redis/redis/v8"
+import (
+	"github.com/go-redis/redis/v8"
+	"time"
+)
 
 const (
 	CACHE_PREV_VALUE_KEY        = "envitem:prev:value:%d:%s"
-	CACHE_PREV_VALUE_KEY_EXPIRE = 86400
+	CACHE_PREV_VALUE_KEY_EXPIRE = 86400 * time.Second
 )
 
 var (

+ 16 - 3
common/identify/single.go

@@ -106,17 +106,30 @@ func (s *SingleItem) nvNotIn(slice []string) bool {
 
 func (s *SingleItem) inSlice(slice []string, v string) bool {
 	for _, s2 := range slice {
-		if s2 == v {
+		s1 := s.transVar(s2)
+
+		if s1 == v {
 			return true
 		}
 		// 数字比较
-		if _, err := strconv.ParseInt(s2, 10, 64); err != nil {
-			return s.compare(s2, v)
+		if _, err := strconv.ParseInt(s1, 10, 64); err != nil {
+			return s.compare(s1, v)
 		}
 	}
 	return false
 }
 
+func (s *SingleItem) transVar(v string) string {
+	if v == VAR_NV {
+		v = s.nv
+	}
+
+	if v == VAR_OV {
+		v = s.ov
+	}
+	return v
+}
+
 func (s *SingleItem) compare(r string, v string) bool {
 	if strings.Index(r, "...") > -1 {
 		return s.compareRange(r, v)

+ 11 - 0
common/identify/single_test.go

@@ -0,0 +1,11 @@
+package identify
+
+import "testing"
+
+func TestStepTest(t *testing.T) {
+	// 只要值发生改变就触发  旧值的可选范围中, 不包含新值
+	// 适合液位变化, 开关量, 敏感数据监控
+	rule := `{"xxx":{"OvNotIn":["$nv"]}}`
+	x := StepTest("5", "6", rule)
+	t.Log(x)
+}

+ 6 - 0
common/identify/vars.go

@@ -0,0 +1,6 @@
+package identify
+
+const (
+	VAR_NV = "$nv"
+	VAR_OV = "$ov"
+)