123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- package job
- import (
- "GtDataStore/app/cmd/events/internal/svc"
- "GtDataStore/app/model"
- "context"
- "fmt"
- "github.com/go-redis/redis/v8"
- "github.com/zeromicro/go-zero/core/logx"
- "strconv"
- "sync"
- "time"
- )
- // 负责仿真模拟的数据生产工作 异步执行
- // 1. 读取可用项目,每个项目生成一个新的 Job -> J1
- // 2. J1中检查项目对应的设备或工艺项目
- // a. 没有查询到则退出
- // b. 检查到的项注册到全局记录 G1 中
- // 3. 从 G1 中读取到项目信息及项,项中包含执行时间间隔 interval
- // 4. 将项目信息 P 及项信息 D,注入 task,生成一个新的任务。时间随 interval 到来执行
- // a. 间隔清理task
- type Partition struct {
- N int64 // 有几个Job
- Pn int64 // 区号, 决定选取哪页数据段
- Total int64 // 数据总量
- limit int64 // 数据页大小 = Total / N
- }
- type Job struct {
- Id uint32 `json:"id"`
- Name string `json:"name"`
- Ctx context.Context
- SvcCtx *svc.ServiceContext
- CheckInterval time.Duration
- StartInterval time.Duration
- Partition Partition
- syncClient *redis.Client
- pts map[uint32]*Task
- ptsLock sync.RWMutex
- handlerTable map[string]func(ctx context.Context, task *Task, technologyName string) error
- handlerIntervalTable map[string]time.Duration
- jobProjectTotalKey string
- jobNodesCountKey string
- jobNodesHeartbeatKey string
- }
- func NewJob(id uint32, name string, svc *svc.ServiceContext, handlerTable map[string]func(ctx context.Context, task *Task, technologyName string) error, handlerIntervalTable map[string]time.Duration) *Job {
- return &Job{
- Id: id,
- Name: name,
- Ctx: context.Background(),
- SvcCtx: svc,
- CheckInterval: 120 * time.Second,
- StartInterval: 127 * time.Second,
- syncClient: redis.NewClient(&redis.Options{Addr: svc.Config.Redis.Host, Password: svc.Config.Redis.Pass, MaxRetries: 5}),
- pts: make(map[uint32]*Task),
- handlerTable: handlerTable,
- handlerIntervalTable: handlerIntervalTable,
- jobProjectTotalKey: fmt.Sprintf("job:project:total:%s", name),
- jobNodesCountKey: fmt.Sprintf("job:nodes:count:%s", name),
- jobNodesHeartbeatKey: fmt.Sprintf("job:nodes:heartbeat:%s", name),
- }
- }
- func (j *Job) Run() error {
- defer func() {
- if err := recover(); err != nil {
- fmt.Printf("Job quit by error")
- }
- }()
- // 检测新任务及更新任务状态,或退出不需要的任务
- go j.checkProjectTask()
- // 启动待执行的任务
- go j.startProjectTask()
- // 启动同步
- go j.syncPartition()
- select {}
- }
- func (j *Job) startProjectTask() {
- tk := time.NewTicker(j.StartInterval)
- for {
- select {
- case <-tk.C:
- for _, task := range j.pts {
- if task.IsRun() == false {
- if te := task.Test(); te != nil {
- logx.Errorf("start project task error: test fail, %s", te.Error())
- continue
- }
- task.Start("")
- }
- }
- }
- }
- }
- func (j *Job) checkProjectTask() {
- tk := time.NewTicker(j.CheckInterval)
- for {
- select {
- case <-tk.C:
- j.checkProject()
- }
- }
- }
- func (j *Job) checkProject() {
- // 对 pts加锁,防止在注册新任务或任务状态改变时,出现任务调度不一致的情况
- j.ptsLock.Lock()
- defer func() {
- j.ptsLock.Unlock()
- }()
- limit := j.Partition.partitionLimit()
- ps, err := j.SvcCtx.ProjectConfig.FindAllByPage(j.Ctx, (j.Partition.Pn-1)*limit, limit)
- if err != nil {
- logx.Errorf("not found project config")
- }
- for _, p := range ps {
- jid := uint32(p.ProjectId)
- if _, ok := j.pts[jid]; !ok {
- if p.IsEnabled() {
- if npt, err := j.createProjectTask(p); err == nil {
- j.pts[jid] = npt
- } else {
- logx.Errorf("create new project task error: %s", err.Error())
- }
- }
- } else {
- if p.IsEnabled() == false {
- if t, ok := j.pts[jid]; ok {
- t.Quit()
- delete(j.pts, jid)
- }
- }
- }
- }
- // todo: pts 中会存在数据表中已经删除的项目,也要定时清理
- }
- func (j *Job) createProjectTask(model model.DcProjectConfig) (*Task, error) {
- fmt.Printf("check new project task: %d\n", model.ProjectId)
- return NewProjectTask(uint32(model.ProjectId), fmt.Sprintf("project-%d", model.ProjectId), j, model), nil
- }
- func (j *Job) ClearTask() {
- j.ptsLock.Lock()
- defer j.ptsLock.Unlock()
- for _, task := range j.pts {
- if task.IsRun() == true {
- task.Quit()
- delete(j.pts, task.Id)
- }
- }
- }
- func (p *Partition) partitionLimit() int64 {
- if p.N == 0 || p.Total < p.N {
- return 1
- }
- total := p.Total
- if p.Total%2 == 1 {
- total++
- }
- return total / p.N
- }
- func (j *Job) syncPartition() {
- tk := time.NewTicker(30 * time.Second)
- go func() {
- for {
- message, err := j.syncClient.Subscribe(j.Ctx, j.jobProjectTotalKey).ReceiveMessage(j.Ctx)
- if err != nil {
- time.Sleep(5 * time.Second)
- continue
- }
- if total, err := strconv.ParseInt(message.Payload, 10, 64); err == nil {
- ic := j.syncClient.Incr(j.Ctx, j.jobNodesCountKey)
- j.Partition.Total = total
- j.Partition.Pn = ic.Val()
- j.Partition.N = 1
- time.Sleep(3 * time.Second)
- n := j.syncClient.Get(j.Ctx, j.jobNodesCountKey)
- j.syncClient.Expire(j.Ctx, j.jobNodesCountKey, 10*time.Second)
- if n, err := n.Int64(); err == nil {
- j.Partition.N = n
- }
- j.ClearTask()
- }
- }
- }()
- for {
- select {
- case <-tk.C:
- //fmt.Printf("total: %d nodes: %d pos: %d\n", j.Partition.Total, j.Partition.N, j.Partition.Pn)
- // 维持一个tk周期的心跳
- ml := false
- nowUnix := time.Now().Unix()
- j.syncClient.HSet(context.Background(), j.jobNodesHeartbeatKey, j.Partition.Pn, nowUnix)
- // 检测心跳超过1分钟的连接, 如果存在, 收重新分配
- if hm := j.syncClient.HGetAll(context.Background(), j.jobNodesHeartbeatKey); hm != nil {
- for _, lastSyncUnixs := range hm.Val() {
- if lastSyncUnix, err := strconv.ParseInt(lastSyncUnixs, 10, 64); err == nil && nowUnix-lastSyncUnix > 60 {
- j.syncClient.Del(context.Background(), j.jobNodesHeartbeatKey)
- ml = true
- break
- }
- }
- }
- ps, err := j.SvcCtx.ProjectConfig.FindAll(j.Ctx)
- if err != nil {
- logx.Errorf("not found project config")
- }
- if ml == true || len(ps) != int(j.Partition.Total) {
- // 发布一条消息
- j.syncClient.Publish(j.Ctx, j.jobProjectTotalKey, len(ps))
- }
- }
- }
- }
|