package job import ( "GtDataStore/app/cmd/events/internal/svc" "GtDataStore/app/model" "context" "fmt" "github.com/go-redis/redis/v8" "github.com/zeromicro/go-zero/core/logx" "strconv" "sync" "time" ) // 负责仿真模拟的数据生产工作 异步执行 // 1. 读取可用项目,每个项目生成一个新的 Job -> J1 // 2. J1中检查项目对应的设备或工艺项目 // a. 没有查询到则退出 // b. 检查到的项注册到全局记录 G1 中 // 3. 从 G1 中读取到项目信息及项,项中包含执行时间间隔 interval // 4. 将项目信息 P 及项信息 D,注入 task,生成一个新的任务。时间随 interval 到来执行 // a. 间隔清理task type Partition struct { N int64 // 有几个Job Pn int64 // 区号, 决定选取哪页数据段 Total int64 // 数据总量 limit int64 // 数据页大小 = Total / N } type Job struct { Id uint32 `json:"id"` Name string `json:"name"` Ctx context.Context SvcCtx *svc.ServiceContext CheckInterval time.Duration StartInterval time.Duration Partition Partition syncClient *redis.Client pts map[uint32]*Task ptsLock sync.RWMutex handlerTable map[string]func(ctx context.Context, task *Task, technologyName string) error handlerIntervalTable map[string]time.Duration jobProjectTotalKey string jobNodesCountKey string jobNodesHeartbeatKey string } func NewJob(id uint32, name string, svc *svc.ServiceContext, handlerTable map[string]func(ctx context.Context, task *Task, technologyName string) error, handlerIntervalTable map[string]time.Duration) *Job { return &Job{ Id: id, Name: name, Ctx: context.Background(), SvcCtx: svc, CheckInterval: 120 * time.Second, StartInterval: 127 * time.Second, syncClient: redis.NewClient(&redis.Options{Addr: svc.Config.Redis.Host, Password: svc.Config.Redis.Pass, MaxRetries: 5}), pts: make(map[uint32]*Task), handlerTable: handlerTable, handlerIntervalTable: handlerIntervalTable, jobProjectTotalKey: fmt.Sprintf("job:project:total:%s", name), jobNodesCountKey: fmt.Sprintf("job:nodes:count:%s", name), jobNodesHeartbeatKey: fmt.Sprintf("job:nodes:heartbeat:%s", name), } } func (j *Job) Run() error { defer func() { if err := recover(); err != nil { fmt.Printf("Job quit by error") } }() // 检测新任务及更新任务状态,或退出不需要的任务 go j.checkProjectTask() // 启动待执行的任务 go j.startProjectTask() // 启动同步 go j.syncPartition() select {} } func (j *Job) startProjectTask() { tk := time.NewTicker(j.StartInterval) for { select { case <-tk.C: for _, task := range j.pts { if task.IsRun() == false { if te := task.Test(); te != nil { logx.Errorf("start project task error: test fail, %s", te.Error()) continue } task.Start("") } } } } } func (j *Job) checkProjectTask() { tk := time.NewTicker(j.CheckInterval) for { select { case <-tk.C: j.checkProject() } } } func (j *Job) checkProject() { // 对 pts加锁,防止在注册新任务或任务状态改变时,出现任务调度不一致的情况 j.ptsLock.Lock() defer func() { j.ptsLock.Unlock() }() limit := j.Partition.partitionLimit() ps, err := j.SvcCtx.ProjectConfig.FindAllByPage(j.Ctx, (j.Partition.Pn-1)*limit, limit) if err != nil { logx.Errorf("not found project config") } for _, p := range ps { jid := uint32(p.ProjectId) if _, ok := j.pts[jid]; !ok { if p.IsEnabled() { if npt, err := j.createProjectTask(p); err == nil { j.pts[jid] = npt } else { logx.Errorf("create new project task error: %s", err.Error()) } } } else { if p.IsEnabled() == false { if t, ok := j.pts[jid]; ok { t.Quit() delete(j.pts, jid) } } } } // todo: pts 中会存在数据表中已经删除的项目,也要定时清理 } func (j *Job) createProjectTask(model model.DcProjectConfig) (*Task, error) { fmt.Printf("check new project task: %d\n", model.ProjectId) return NewProjectTask(uint32(model.ProjectId), fmt.Sprintf("project-%d", model.ProjectId), j, model), nil } func (j *Job) ClearTask() { j.ptsLock.Lock() defer j.ptsLock.Unlock() for _, task := range j.pts { if task.IsRun() == true { task.Quit() delete(j.pts, task.Id) } } } func (p *Partition) partitionLimit() int64 { if p.N == 0 || p.Total < p.N { return 1 } total := p.Total if p.Total%2 == 1 { total++ } return total / p.N } func (j *Job) syncPartition() { tk := time.NewTicker(30 * time.Second) go func() { for { message, err := j.syncClient.Subscribe(j.Ctx, j.jobProjectTotalKey).ReceiveMessage(j.Ctx) if err != nil { time.Sleep(5 * time.Second) continue } if total, err := strconv.ParseInt(message.Payload, 10, 64); err == nil { ic := j.syncClient.Incr(j.Ctx, j.jobNodesCountKey) j.Partition.Total = total j.Partition.Pn = ic.Val() j.Partition.N = 1 time.Sleep(3 * time.Second) n := j.syncClient.Get(j.Ctx, j.jobNodesCountKey) j.syncClient.Expire(j.Ctx, j.jobNodesCountKey, 10*time.Second) if n, err := n.Int64(); err == nil { j.Partition.N = n } j.ClearTask() } } }() for { select { case <-tk.C: //fmt.Printf("total: %d nodes: %d pos: %d\n", j.Partition.Total, j.Partition.N, j.Partition.Pn) // 维持一个tk周期的心跳 ml := false nowUnix := time.Now().Unix() j.syncClient.HSet(context.Background(), j.jobNodesHeartbeatKey, j.Partition.Pn, nowUnix) // 检测心跳超过1分钟的连接, 如果存在, 收重新分配 if hm := j.syncClient.HGetAll(context.Background(), j.jobNodesHeartbeatKey); hm != nil { for _, lastSyncUnixs := range hm.Val() { if lastSyncUnix, err := strconv.ParseInt(lastSyncUnixs, 10, 64); err == nil && nowUnix-lastSyncUnix > 60 { j.syncClient.Del(context.Background(), j.jobNodesHeartbeatKey) ml = true break } } } ps, err := j.SvcCtx.ProjectConfig.FindAll(j.Ctx) if err != nil { logx.Errorf("not found project config") } if ml == true || len(ps) != int(j.Partition.Total) { // 发布一条消息 j.syncClient.Publish(j.Ctx, j.jobProjectTotalKey, len(ps)) } } } }