job.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package job
  2. import (
  3. "GtDataStore/app/cmd/events/internal/svc"
  4. "GtDataStore/app/model"
  5. "context"
  6. "fmt"
  7. "github.com/go-redis/redis/v8"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. "strconv"
  10. "sync"
  11. "time"
  12. )
  13. // 负责仿真模拟的数据生产工作 异步执行
  14. // 1. 读取可用项目,每个项目生成一个新的 Job -> J1
  15. // 2. J1中检查项目对应的设备或工艺项目
  16. // a. 没有查询到则退出
  17. // b. 检查到的项注册到全局记录 G1 中
  18. // 3. 从 G1 中读取到项目信息及项,项中包含执行时间间隔 interval
  19. // 4. 将项目信息 P 及项信息 D,注入 task,生成一个新的任务。时间随 interval 到来执行
  20. // a. 间隔清理task
  21. type Partition struct {
  22. N int64 // 有几个Job
  23. Pn int64 // 区号, 决定选取哪页数据段
  24. Total int64 // 数据总量
  25. limit int64 // 数据页大小 = Total / N
  26. }
  27. type Job struct {
  28. Id uint32 `json:"id"`
  29. Name string `json:"name"`
  30. Ctx context.Context
  31. SvcCtx *svc.ServiceContext
  32. CheckInterval time.Duration
  33. StartInterval time.Duration
  34. Partition Partition
  35. syncClient *redis.Client
  36. pts map[uint32]*Task
  37. ptsLock sync.RWMutex
  38. handlerTable map[string]func(ctx context.Context, task *Task, technologyName string) error
  39. handlerIntervalTable map[string]time.Duration
  40. }
  41. func NewJob(id uint32, name string, svc *svc.ServiceContext, handlerTable map[string]func(ctx context.Context, task *Task, technologyName string) error, handlerIntervalTable map[string]time.Duration) *Job {
  42. return &Job{
  43. Id: id,
  44. Name: name,
  45. Ctx: context.Background(),
  46. SvcCtx: svc,
  47. CheckInterval: 120 * time.Second,
  48. StartInterval: 127 * time.Second,
  49. syncClient: redis.NewClient(&redis.Options{Addr: svc.Config.Redis.Host, Password: svc.Config.Redis.Pass, MaxRetries: 5}),
  50. pts: make(map[uint32]*Task),
  51. handlerTable: handlerTable,
  52. handlerIntervalTable: handlerIntervalTable,
  53. }
  54. }
  55. func (j *Job) Run() error {
  56. defer func() {
  57. if err := recover(); err != nil {
  58. fmt.Printf("Job quit by error")
  59. }
  60. }()
  61. // 检测新任务及更新任务状态,或退出不需要的任务
  62. go j.checkProjectTask()
  63. // 启动待执行的任务
  64. go j.startProjectTask()
  65. // 启动同步
  66. go j.syncPartition()
  67. select {}
  68. }
  69. func (j *Job) startProjectTask() {
  70. tk := time.NewTicker(j.StartInterval)
  71. for {
  72. select {
  73. case <-tk.C:
  74. for _, task := range j.pts {
  75. if task.IsRun() == false {
  76. if te := task.Test(); te != nil {
  77. logx.Errorf("start project task error: test fail, %s", te.Error())
  78. continue
  79. }
  80. task.Start("")
  81. }
  82. }
  83. }
  84. }
  85. }
  86. func (j *Job) checkProjectTask() {
  87. tk := time.NewTicker(j.CheckInterval)
  88. for {
  89. select {
  90. case <-tk.C:
  91. j.checkProject()
  92. }
  93. }
  94. }
  95. func (j *Job) checkProject() {
  96. // 对 pts加锁,防止在注册新任务或任务状态改变时,出现任务调度不一致的情况
  97. j.ptsLock.Lock()
  98. defer func() {
  99. j.ptsLock.Unlock()
  100. }()
  101. limit := j.Partition.partitionLimit()
  102. ps, err := j.SvcCtx.ProjectConfig.FindAllByPage(j.Ctx, (j.Partition.Pn-1)*limit, limit)
  103. if err != nil {
  104. logx.Errorf("not found project config")
  105. }
  106. for _, p := range ps {
  107. jid := uint32(p.ProjectId)
  108. if _, ok := j.pts[jid]; !ok {
  109. if p.IsEnabled() {
  110. if npt, err := j.createProjectTask(p); err == nil {
  111. j.pts[jid] = npt
  112. } else {
  113. logx.Errorf("create new project task error: %s", err.Error())
  114. }
  115. }
  116. } else {
  117. if p.IsEnabled() == false {
  118. if t, ok := j.pts[jid]; ok {
  119. t.Quit()
  120. delete(j.pts, jid)
  121. }
  122. }
  123. }
  124. }
  125. // todo: pts 中会存在数据表中已经删除的项目,也要定时清理
  126. }
  127. func (j *Job) createProjectTask(model model.DcProjectConfig) (*Task, error) {
  128. fmt.Printf("check new project task: %d\n", model.ProjectId)
  129. return NewProjectTask(uint32(model.ProjectId), fmt.Sprintf("project-%d", model.ProjectId), j, model), nil
  130. }
  131. func (j *Job) ClearTask() {
  132. j.ptsLock.Lock()
  133. defer j.ptsLock.Unlock()
  134. for _, task := range j.pts {
  135. if task.IsRun() == true {
  136. task.Quit()
  137. delete(j.pts, task.Id)
  138. }
  139. }
  140. }
  141. func (j *Job) setPartition(partition Partition) {
  142. j.Partition = partition
  143. }
  144. func (p *Partition) partitionLimit() int64 {
  145. if p.N == 0 {
  146. return 1
  147. }
  148. total := p.Total
  149. if p.Total%2 == 1 {
  150. total++
  151. }
  152. return total / p.N
  153. }
  154. func (j *Job) syncPartition() {
  155. tk := time.NewTicker(60 * time.Second)
  156. go func() {
  157. for {
  158. message, err := j.syncClient.Subscribe(j.Ctx, "simulation-job-"+j.Name).ReceiveMessage(j.Ctx)
  159. if err != nil {
  160. time.Sleep(5 * time.Second)
  161. continue
  162. }
  163. if total, err := strconv.ParseInt(message.Payload, 10, 64); err == nil {
  164. ic := j.syncClient.Incr(j.Ctx, "simulation-job-n-"+j.Name)
  165. j.Partition.Total = total
  166. j.Partition.Pn = ic.Val()
  167. j.Partition.N = 1
  168. time.Sleep(3 * time.Second)
  169. n := j.syncClient.Get(j.Ctx, "simulation-job-n-"+j.Name)
  170. j.syncClient.Expire(j.Ctx, "simulation-job-n-"+j.Name, 10*time.Second)
  171. if n, err := n.Int64(); err == nil {
  172. j.Partition.N = n
  173. }
  174. j.ClearTask()
  175. }
  176. }
  177. }()
  178. for {
  179. select {
  180. case <-tk.C:
  181. ps, err := j.SvcCtx.ProjectConfig.FindAll(j.Ctx)
  182. if err != nil {
  183. logx.Errorf("not found project config")
  184. }
  185. if len(ps) != int(j.Partition.Total) {
  186. // 发布一条消息
  187. j.syncClient.Publish(j.Ctx, "simulation-job-"+j.Name, len(ps))
  188. }
  189. }
  190. }
  191. }