job.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package job
  2. import (
  3. "GtDataStore/app/cmd/events/internal/svc"
  4. "GtDataStore/app/model"
  5. "context"
  6. "fmt"
  7. "github.com/go-redis/redis/v8"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. "strconv"
  10. "sync"
  11. "time"
  12. )
  13. // 负责仿真模拟的数据生产工作 异步执行
  14. // 1. 读取可用项目,每个项目生成一个新的 Job -> J1
  15. // 2. J1中检查项目对应的设备或工艺项目
  16. // a. 没有查询到则退出
  17. // b. 检查到的项注册到全局记录 G1 中
  18. // 3. 从 G1 中读取到项目信息及项,项中包含执行时间间隔 interval
  19. // 4. 将项目信息 P 及项信息 D,注入 task,生成一个新的任务。时间随 interval 到来执行
  20. // a. 间隔清理task
  21. type Partition struct {
  22. N int64 // 有几个Job
  23. Pn int64 // 区号, 决定选取哪页数据段
  24. Total int64 // 数据总量
  25. limit int64 // 数据页大小 = Total / N
  26. }
  27. type Job struct {
  28. Id uint32 `json:"id"`
  29. Name string `json:"name"`
  30. Ctx context.Context
  31. SvcCtx *svc.ServiceContext
  32. CheckInterval time.Duration
  33. StartInterval time.Duration
  34. Partition Partition
  35. syncClient *redis.Client
  36. pts map[uint32]*Task
  37. ptsLock sync.RWMutex
  38. handlerTable map[string]func(ctx context.Context, task *Task, technologyName string) error
  39. handlerIntervalTable map[string]time.Duration
  40. jobProjectTotalKey string
  41. jobNodesCountKey string
  42. jobNodesHeartbeatKey string
  43. }
  44. func NewJob(id uint32, name string, svc *svc.ServiceContext, handlerTable map[string]func(ctx context.Context, task *Task, technologyName string) error, handlerIntervalTable map[string]time.Duration) *Job {
  45. return &Job{
  46. Id: id,
  47. Name: name,
  48. Ctx: context.Background(),
  49. SvcCtx: svc,
  50. CheckInterval: 120 * time.Second,
  51. StartInterval: 127 * time.Second,
  52. syncClient: redis.NewClient(&redis.Options{Addr: svc.Config.Redis.Host, Password: svc.Config.Redis.Pass, MaxRetries: 5}),
  53. pts: make(map[uint32]*Task),
  54. handlerTable: handlerTable,
  55. handlerIntervalTable: handlerIntervalTable,
  56. jobProjectTotalKey: fmt.Sprintf("job:project:total:%s", name),
  57. jobNodesCountKey: fmt.Sprintf("job:nodes:count:%s", name),
  58. jobNodesHeartbeatKey: fmt.Sprintf("job:nodes:heartbeat:%s", name),
  59. }
  60. }
  61. func (j *Job) Run() error {
  62. defer func() {
  63. if err := recover(); err != nil {
  64. fmt.Printf("Job quit by error")
  65. }
  66. }()
  67. // 检测新任务及更新任务状态,或退出不需要的任务
  68. go j.checkProjectTask()
  69. // 启动待执行的任务
  70. go j.startProjectTask()
  71. // 启动同步
  72. go j.syncPartition()
  73. select {}
  74. }
  75. func (j *Job) startProjectTask() {
  76. tk := time.NewTicker(j.StartInterval)
  77. for {
  78. select {
  79. case <-tk.C:
  80. for _, task := range j.pts {
  81. if task.IsRun() == false {
  82. if te := task.Test(); te != nil {
  83. logx.Errorf("start project task error: test fail, %s", te.Error())
  84. continue
  85. }
  86. task.Start("")
  87. }
  88. }
  89. }
  90. }
  91. }
  92. func (j *Job) checkProjectTask() {
  93. tk := time.NewTicker(j.CheckInterval)
  94. for {
  95. select {
  96. case <-tk.C:
  97. j.checkProject()
  98. }
  99. }
  100. }
  101. func (j *Job) checkProject() {
  102. // 对 pts加锁,防止在注册新任务或任务状态改变时,出现任务调度不一致的情况
  103. j.ptsLock.Lock()
  104. defer func() {
  105. j.ptsLock.Unlock()
  106. }()
  107. limit := j.Partition.partitionLimit()
  108. ps, err := j.SvcCtx.ProjectConfig.FindAllByPage(j.Ctx, (j.Partition.Pn-1)*limit, limit)
  109. if err != nil {
  110. logx.Errorf("not found project config")
  111. }
  112. for _, p := range ps {
  113. jid := uint32(p.ProjectId)
  114. if _, ok := j.pts[jid]; !ok {
  115. if p.IsEnabled() {
  116. if npt, err := j.createProjectTask(p); err == nil {
  117. j.pts[jid] = npt
  118. } else {
  119. logx.Errorf("create new project task error: %s", err.Error())
  120. }
  121. }
  122. } else {
  123. if p.IsEnabled() == false {
  124. if t, ok := j.pts[jid]; ok {
  125. t.Quit()
  126. delete(j.pts, jid)
  127. }
  128. }
  129. }
  130. }
  131. // todo: pts 中会存在数据表中已经删除的项目,也要定时清理
  132. }
  133. func (j *Job) createProjectTask(model model.DcProjectConfig) (*Task, error) {
  134. fmt.Printf("check new project task: %d\n", model.ProjectId)
  135. return NewProjectTask(uint32(model.ProjectId), fmt.Sprintf("project-%d", model.ProjectId), j, model), nil
  136. }
  137. func (j *Job) ClearTask() {
  138. j.ptsLock.Lock()
  139. defer j.ptsLock.Unlock()
  140. for _, task := range j.pts {
  141. if task.IsRun() == true {
  142. task.Quit()
  143. delete(j.pts, task.Id)
  144. }
  145. }
  146. }
  147. func (p *Partition) partitionLimit() int64 {
  148. if p.N == 0 || p.Total < p.N {
  149. return 1
  150. }
  151. total := p.Total
  152. if p.Total%2 == 1 {
  153. total++
  154. }
  155. return total / p.N
  156. }
  157. func (j *Job) syncPartition() {
  158. tk := time.NewTicker(30 * time.Second)
  159. go func() {
  160. for {
  161. message, err := j.syncClient.Subscribe(j.Ctx, j.jobProjectTotalKey).ReceiveMessage(j.Ctx)
  162. if err != nil {
  163. time.Sleep(5 * time.Second)
  164. continue
  165. }
  166. if total, err := strconv.ParseInt(message.Payload, 10, 64); err == nil {
  167. ic := j.syncClient.Incr(j.Ctx, j.jobNodesCountKey)
  168. j.Partition.Total = total
  169. j.Partition.Pn = ic.Val()
  170. j.Partition.N = 1
  171. time.Sleep(3 * time.Second)
  172. n := j.syncClient.Get(j.Ctx, j.jobNodesCountKey)
  173. j.syncClient.Expire(j.Ctx, j.jobNodesCountKey, 10*time.Second)
  174. if n, err := n.Int64(); err == nil {
  175. j.Partition.N = n
  176. }
  177. j.ClearTask()
  178. }
  179. }
  180. }()
  181. for {
  182. select {
  183. case <-tk.C:
  184. //fmt.Printf("total: %d nodes: %d pos: %d\n", j.Partition.Total, j.Partition.N, j.Partition.Pn)
  185. // 维持一个tk周期的心跳
  186. ml := false
  187. nowUnix := time.Now().Unix()
  188. j.syncClient.HSet(context.Background(), j.jobNodesHeartbeatKey, j.Partition.Pn, nowUnix)
  189. // 检测心跳超过1分钟的连接, 如果存在, 收重新分配
  190. if hm := j.syncClient.HGetAll(context.Background(), j.jobNodesHeartbeatKey); hm != nil {
  191. for _, lastSyncUnixs := range hm.Val() {
  192. if lastSyncUnix, err := strconv.ParseInt(lastSyncUnixs, 10, 64); err == nil && nowUnix-lastSyncUnix > 60 {
  193. j.syncClient.Del(context.Background(), j.jobNodesHeartbeatKey)
  194. ml = true
  195. break
  196. }
  197. }
  198. }
  199. ps, err := j.SvcCtx.ProjectConfig.FindAll(j.Ctx)
  200. if err != nil {
  201. logx.Errorf("not found project config")
  202. }
  203. if ml == true || len(ps) != int(j.Partition.Total) {
  204. // 发布一条消息
  205. j.syncClient.Publish(j.Ctx, j.jobProjectTotalKey, len(ps))
  206. }
  207. }
  208. }
  209. }