package job import ( "GtDataStore/app/model" "context" "errors" "fmt" "github.com/zeromicro/go-zero/core/logx" "time" ) // Task 提供项目的状态、信息,为子任务提供共享空间 type Task struct { Id uint32 `json:"id"` Name string `json:"name"` Model *model.DcProjectConfig `json:"model"` handler func(ctx context.Context, task *Task) error Job *Job sigStart chan string status uint8 // 运行状态 0:未运行 1:运行中 2:待退出 ctx context.Context cancelFunc context.CancelFunc } func NewProjectTask(id uint32, name string, job *Job, model model.DcProjectConfig) *Task { ctx, cancelFunc := context.WithCancel(context.Background()) np := &Task{ Id: id, Name: name, Model: &model, Job: job, handler: Handler, status: 0, sigStart: make(chan string), ctx: ctx, cancelFunc: cancelFunc, } // 等待开始信号 go np.run() return np } func (p *Task) run() error { select { case <-p.sigStart: p.status = 1 } err := p.handler(p.ctx, p) if err != nil { logx.Infof("project task simulations %d, %s, %s", p.Id, p.Name, err.Error()) } return err } func (p *Task) Start(msg string) { p.sigStart <- msg } func (p *Task) Quit() { p.status = 2 p.cancelFunc() } func (p *Task) IsRun() bool { return p.status == 1 } func (p *Task) Test() error { if p.Job == nil { return errors.New("not found Job") } if p.Model == nil { return errors.New("not found project model") } if p.handler == nil { return errors.New("not set simulations") } if len(p.Model.Technologys) == 0 { return errors.New("not set technologys list, modify technologys field for mysql cd_project_config") } for _, technology := range p.Model.Technologys { if _, ok := p.Job.handlerTable[technology]; !ok { fmt.Printf("project: %d, %s no support", p.Id, technology) continue //return errors.New(fmt.Sprintf("project: %d, %s no support", p.Id, technology)) } } return nil } func Handler(ctx context.Context, task *Task) error { for { select { case <-ctx.Done(): return nil default: return distribute(ctx, task) } } } // 将一个项目需要处理的所有工艺进行分发 func distribute(ctx context.Context, task *Task) error { fmt.Printf("distribute task for: %+v \n", task.Model.Technologys) for _, technology := range task.Model.Technologys { interval, ok := task.Job.handlerIntervalTable[technology] if !ok { fmt.Printf("project: %d, %s no support", task.Id, technology) continue //return errors.New(fmt.Sprintf("project: %d, %s no support", task.Id, technology)) } f, ok := task.Job.handlerTable[technology] if !ok { return errors.New(fmt.Sprintf("project: %d, %s no bind simulations", task.Id, technology)) } technologyName := technology go func(interval time.Duration, call func(ctx context.Context, task *Task, technologyName string) error) { tk := time.NewTicker(interval) for { select { case <-ctx.Done(): return case <-tk.C: // 这里是同步执行的,避免并发问题 err := call(ctx, task, technologyName) if err != nil { fmt.Printf("distribute.call error: %s\n", err.Error()) } //} else { // fmt.Print("distribute.call finish\n") //} } } }(interval, f) } return nil }