123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package workqueue
- import (
- "context"
- "sync"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- )
- type DoWorkPieceFunc func(piece int)
- type options struct {
- chunkSize int
- }
- type Options func(*options)
- // WithChunkSize allows to set chunks of work items to the workers, rather than
- // processing one by one.
- // It is recommended to use this option if the number of pieces significantly
- // higher than the number of workers and the work done for each item is small.
- func WithChunkSize(c int) func(*options) {
- return func(o *options) {
- o.chunkSize = c
- }
- }
- // ParallelizeUntil is a framework that allows for parallelizing N
- // independent pieces of work until done or the context is canceled.
- func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options) {
- if pieces == 0 {
- return
- }
- o := options{}
- for _, opt := range opts {
- opt(&o)
- }
- chunkSize := o.chunkSize
- if chunkSize < 1 {
- chunkSize = 1
- }
- chunks := ceilDiv(pieces, chunkSize)
- toProcess := make(chan int, chunks)
- for i := 0; i < chunks; i++ {
- toProcess <- i
- }
- close(toProcess)
- var stop <-chan struct{}
- if ctx != nil {
- stop = ctx.Done()
- }
- if chunks < workers {
- workers = chunks
- }
- wg := sync.WaitGroup{}
- wg.Add(workers)
- for i := 0; i < workers; i++ {
- go func() {
- defer utilruntime.HandleCrash()
- defer wg.Done()
- for chunk := range toProcess {
- start := chunk * chunkSize
- end := start + chunkSize
- if end > pieces {
- end = pieces
- }
- for p := start; p < end; p++ {
- select {
- case <-stop:
- return
- default:
- doWorkPiece(p)
- }
- }
- }
- }()
- }
- wg.Wait()
- }
- func ceilDiv(a, b int) int {
- return (a + b - 1) / b
- }
|