wait.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package wait
  14. import (
  15. "context"
  16. "math/rand"
  17. "sync"
  18. "time"
  19. "k8s.io/apimachinery/pkg/util/runtime"
  20. )
  21. // For any test of the style:
  22. //
  23. // ...
  24. // <- time.After(timeout):
  25. // t.Errorf("Timed out")
  26. //
  27. // The value for timeout should effectively be "forever." Obviously we don't want our tests to truly lock up forever, but 30s
  28. // is long enough that it is effectively forever for the things that can slow down a run on a heavily contended machine
  29. // (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test.
  30. var ForeverTestTimeout = time.Second * 30
  31. // NeverStop may be passed to Until to make it never stop.
  32. var NeverStop <-chan struct{} = make(chan struct{})
  33. // Group allows to start a group of goroutines and wait for their completion.
  34. type Group struct {
  35. wg sync.WaitGroup
  36. }
  37. func (g *Group) Wait() {
  38. g.wg.Wait()
  39. }
  40. // StartWithChannel starts f in a new goroutine in the group.
  41. // stopCh is passed to f as an argument. f should stop when stopCh is available.
  42. func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) {
  43. g.Start(func() {
  44. f(stopCh)
  45. })
  46. }
  47. // StartWithContext starts f in a new goroutine in the group.
  48. // ctx is passed to f as an argument. f should stop when ctx.Done() is available.
  49. func (g *Group) StartWithContext(ctx context.Context, f func(context.Context)) {
  50. g.Start(func() {
  51. f(ctx)
  52. })
  53. }
  54. // Start starts f in a new goroutine in the group.
  55. func (g *Group) Start(f func()) {
  56. g.wg.Add(1)
  57. go func() {
  58. defer g.wg.Done()
  59. f()
  60. }()
  61. }
  62. // Forever calls f every period for ever.
  63. //
  64. // Forever is syntactic sugar on top of Until.
  65. func Forever(f func(), period time.Duration) {
  66. Until(f, period, NeverStop)
  67. }
  68. // Jitter returns a time.Duration between duration and duration + maxFactor *
  69. // duration.
  70. //
  71. // This allows clients to avoid converging on periodic behavior. If maxFactor
  72. // is 0.0, a suggested default value will be chosen.
  73. func Jitter(duration time.Duration, maxFactor float64) time.Duration {
  74. if maxFactor <= 0.0 {
  75. maxFactor = 1.0
  76. }
  77. wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
  78. return wait
  79. }
  80. // ConditionFunc returns true if the condition is satisfied, or an error
  81. // if the loop should be aborted.
  82. type ConditionFunc func() (done bool, err error)
  83. // ConditionWithContextFunc returns true if the condition is satisfied, or an error
  84. // if the loop should be aborted.
  85. //
  86. // The caller passes along a context that can be used by the condition function.
  87. type ConditionWithContextFunc func(context.Context) (done bool, err error)
  88. // WithContext converts a ConditionFunc into a ConditionWithContextFunc
  89. func (cf ConditionFunc) WithContext() ConditionWithContextFunc {
  90. return func(context.Context) (done bool, err error) {
  91. return cf()
  92. }
  93. }
  94. // ContextForChannel provides a context that will be treated as cancelled
  95. // when the provided parentCh is closed. The implementation returns
  96. // context.Canceled for Err() if and only if the parentCh is closed.
  97. func ContextForChannel(parentCh <-chan struct{}) context.Context {
  98. return channelContext{stopCh: parentCh}
  99. }
  100. var _ context.Context = channelContext{}
  101. // channelContext will behave as if the context were cancelled when stopCh is
  102. // closed.
  103. type channelContext struct {
  104. stopCh <-chan struct{}
  105. }
  106. func (c channelContext) Done() <-chan struct{} { return c.stopCh }
  107. func (c channelContext) Err() error {
  108. select {
  109. case <-c.stopCh:
  110. return context.Canceled
  111. default:
  112. return nil
  113. }
  114. }
  115. func (c channelContext) Deadline() (time.Time, bool) { return time.Time{}, false }
  116. func (c channelContext) Value(key any) any { return nil }
  117. // runConditionWithCrashProtection runs a ConditionFunc with crash protection.
  118. //
  119. // Deprecated: Will be removed when the legacy polling methods are removed.
  120. func runConditionWithCrashProtection(condition ConditionFunc) (bool, error) {
  121. defer runtime.HandleCrash()
  122. return condition()
  123. }
  124. // runConditionWithCrashProtectionWithContext runs a ConditionWithContextFunc
  125. // with crash protection.
  126. //
  127. // Deprecated: Will be removed when the legacy polling methods are removed.
  128. func runConditionWithCrashProtectionWithContext(ctx context.Context, condition ConditionWithContextFunc) (bool, error) {
  129. defer runtime.HandleCrash()
  130. return condition(ctx)
  131. }
  132. // waitFunc creates a channel that receives an item every time a test
  133. // should be executed and is closed when the last test should be invoked.
  134. //
  135. // Deprecated: Will be removed in a future release in favor of
  136. // loopConditionUntilContext.
  137. type waitFunc func(done <-chan struct{}) <-chan struct{}
  138. // WithContext converts the WaitFunc to an equivalent WaitWithContextFunc
  139. func (w waitFunc) WithContext() waitWithContextFunc {
  140. return func(ctx context.Context) <-chan struct{} {
  141. return w(ctx.Done())
  142. }
  143. }
  144. // waitWithContextFunc creates a channel that receives an item every time a test
  145. // should be executed and is closed when the last test should be invoked.
  146. //
  147. // When the specified context gets cancelled or expires the function
  148. // stops sending item and returns immediately.
  149. //
  150. // Deprecated: Will be removed in a future release in favor of
  151. // loopConditionUntilContext.
  152. type waitWithContextFunc func(ctx context.Context) <-chan struct{}
  153. // waitForWithContext continually checks 'fn' as driven by 'wait'.
  154. //
  155. // waitForWithContext gets a channel from 'wait()”, and then invokes 'fn'
  156. // once for every value placed on the channel and once more when the
  157. // channel is closed. If the channel is closed and 'fn'
  158. // returns false without error, waitForWithContext returns ErrWaitTimeout.
  159. //
  160. // If 'fn' returns an error the loop ends and that error is returned. If
  161. // 'fn' returns true the loop ends and nil is returned.
  162. //
  163. // context.Canceled will be returned if the ctx.Done() channel is closed
  164. // without fn ever returning true.
  165. //
  166. // When the ctx.Done() channel is closed, because the golang `select` statement is
  167. // "uniform pseudo-random", the `fn` might still run one or multiple times,
  168. // though eventually `waitForWithContext` will return.
  169. //
  170. // Deprecated: Will be removed in a future release in favor of
  171. // loopConditionUntilContext.
  172. func waitForWithContext(ctx context.Context, wait waitWithContextFunc, fn ConditionWithContextFunc) error {
  173. waitCtx, cancel := context.WithCancel(context.Background())
  174. defer cancel()
  175. c := wait(waitCtx)
  176. for {
  177. select {
  178. case _, open := <-c:
  179. ok, err := runConditionWithCrashProtectionWithContext(ctx, fn)
  180. if err != nil {
  181. return err
  182. }
  183. if ok {
  184. return nil
  185. }
  186. if !open {
  187. return ErrWaitTimeout
  188. }
  189. case <-ctx.Done():
  190. // returning ctx.Err() will break backward compatibility, use new PollUntilContext*
  191. // methods instead
  192. return ErrWaitTimeout
  193. }
  194. }
  195. }