123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- package job
- import (
- "GtDataStore/app/model"
- "context"
- "errors"
- "fmt"
- "github.com/zeromicro/go-zero/core/logx"
- "time"
- )
- // Task 提供项目的状态、信息,为子任务提供共享空间
- type Task struct {
- Id uint32 `json:"id"`
- Name string `json:"name"`
- Model *model.DcProjectConfig `json:"model"`
- handler func(ctx context.Context, task *Task) error
- Job *Job
- sigStart chan string
- status uint8 // 运行状态 0:未运行 1:运行中 2:待退出
- ctx context.Context
- cancelFunc context.CancelFunc
- }
- func NewProjectTask(id uint32, name string, job *Job, model model.DcProjectConfig) *Task {
- ctx, cancelFunc := context.WithCancel(context.Background())
- np := &Task{
- Id: id,
- Name: name,
- Model: &model,
- Job: job,
- handler: Handler,
- status: 0,
- sigStart: make(chan string),
- ctx: ctx,
- cancelFunc: cancelFunc,
- }
- // 等待开始信号
- go np.run()
- return np
- }
- func (p *Task) run() error {
- select {
- case <-p.sigStart:
- p.status = 1
- }
- err := p.handler(p.ctx, p)
- if err != nil {
- logx.Infof("project task simulations %d, %s, %s", p.Id, p.Name, err.Error())
- }
- return err
- }
- func (p *Task) Start(msg string) {
- p.sigStart <- msg
- }
- func (p *Task) Quit() {
- p.status = 2
- p.cancelFunc()
- }
- func (p *Task) IsRun() bool {
- return p.status == 1
- }
- func (p *Task) Test() error {
- if p.Job == nil {
- return errors.New("not found Job")
- }
- if p.Model == nil {
- return errors.New("not found project model")
- }
- if p.handler == nil {
- return errors.New("not set simulations")
- }
- if len(p.Model.Technologys) == 0 {
- return errors.New("not set technologys list, modify technologys field for mysql cd_project_config")
- }
- for _, technology := range p.Model.Technologys {
- if _, ok := p.Job.handlerTable[technology]; !ok {
- fmt.Printf("project: %d, %s no support", p.Id, technology)
- continue
- //return errors.New(fmt.Sprintf("project: %d, %s no support", p.Id, technology))
- }
- }
- return nil
- }
- func Handler(ctx context.Context, task *Task) error {
- for {
- select {
- case <-ctx.Done():
- return nil
- default:
- return distribute(ctx, task)
- }
- }
- }
- // 将一个项目需要处理的所有工艺进行分发
- func distribute(ctx context.Context, task *Task) error {
- fmt.Printf("distribute task for: %+v \n", task.Model.Technologys)
- for _, technology := range task.Model.Technologys {
- interval, ok := task.Job.handlerIntervalTable[technology]
- if !ok {
- fmt.Printf("project: %d, %s no support", task.Id, technology)
- continue
- //return errors.New(fmt.Sprintf("project: %d, %s no support", task.Id, technology))
- }
- f, ok := task.Job.handlerTable[technology]
- if !ok {
- return errors.New(fmt.Sprintf("project: %d, %s no bind simulations", task.Id, technology))
- }
- technologyName := technology
- go func(interval time.Duration, call func(ctx context.Context, task *Task, technologyName string) error) {
- tk := time.NewTicker(interval)
- for {
- select {
- case <-ctx.Done():
- return
- case <-tk.C:
- // 这里是同步执行的,避免并发问题
- err := call(ctx, task, technologyName)
- if err != nil {
- fmt.Printf("distribute.call error: %s\n", err.Error())
- }
- //} else {
- // fmt.Print("distribute.call finish\n")
- //}
- }
- }
- }(interval, f)
- }
- return nil
- }
|