backoff.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. /*
  2. Copyright 2023 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package wait
  14. import (
  15. "context"
  16. "math"
  17. "sync"
  18. "time"
  19. "k8s.io/apimachinery/pkg/util/runtime"
  20. "k8s.io/utils/clock"
  21. )
  22. // Backoff holds parameters applied to a Backoff function.
  23. type Backoff struct {
  24. // The initial duration.
  25. Duration time.Duration
  26. // Duration is multiplied by factor each iteration, if factor is not zero
  27. // and the limits imposed by Steps and Cap have not been reached.
  28. // Should not be negative.
  29. // The jitter does not contribute to the updates to the duration parameter.
  30. Factor float64
  31. // The sleep at each iteration is the duration plus an additional
  32. // amount chosen uniformly at random from the interval between
  33. // zero and `jitter*duration`.
  34. Jitter float64
  35. // The remaining number of iterations in which the duration
  36. // parameter may change (but progress can be stopped earlier by
  37. // hitting the cap). If not positive, the duration is not
  38. // changed. Used for exponential backoff in combination with
  39. // Factor and Cap.
  40. Steps int
  41. // A limit on revised values of the duration parameter. If a
  42. // multiplication by the factor parameter would make the duration
  43. // exceed the cap then the duration is set to the cap and the
  44. // steps parameter is set to zero.
  45. Cap time.Duration
  46. }
  47. // Step returns an amount of time to sleep determined by the original
  48. // Duration and Jitter. The backoff is mutated to update its Steps and
  49. // Duration. A nil Backoff always has a zero-duration step.
  50. func (b *Backoff) Step() time.Duration {
  51. if b == nil {
  52. return 0
  53. }
  54. var nextDuration time.Duration
  55. nextDuration, b.Duration, b.Steps = delay(b.Steps, b.Duration, b.Cap, b.Factor, b.Jitter)
  56. return nextDuration
  57. }
  58. // DelayFunc returns a function that will compute the next interval to
  59. // wait given the arguments in b. It does not mutate the original backoff
  60. // but the function is safe to use only from a single goroutine.
  61. func (b Backoff) DelayFunc() DelayFunc {
  62. steps := b.Steps
  63. duration := b.Duration
  64. cap := b.Cap
  65. factor := b.Factor
  66. jitter := b.Jitter
  67. return func() time.Duration {
  68. var nextDuration time.Duration
  69. // jitter is applied per step and is not cumulative over multiple steps
  70. nextDuration, duration, steps = delay(steps, duration, cap, factor, jitter)
  71. return nextDuration
  72. }
  73. }
  74. // Timer returns a timer implementation appropriate to this backoff's parameters
  75. // for use with wait functions.
  76. func (b Backoff) Timer() Timer {
  77. if b.Steps > 1 || b.Jitter != 0 {
  78. return &variableTimer{new: internalClock.NewTimer, fn: b.DelayFunc()}
  79. }
  80. if b.Duration > 0 {
  81. return &fixedTimer{new: internalClock.NewTicker, interval: b.Duration}
  82. }
  83. return newNoopTimer()
  84. }
  85. // delay implements the core delay algorithm used in this package.
  86. func delay(steps int, duration, cap time.Duration, factor, jitter float64) (_ time.Duration, next time.Duration, nextSteps int) {
  87. // when steps is non-positive, do not alter the base duration
  88. if steps < 1 {
  89. if jitter > 0 {
  90. return Jitter(duration, jitter), duration, 0
  91. }
  92. return duration, duration, 0
  93. }
  94. steps--
  95. // calculate the next step's interval
  96. if factor != 0 {
  97. next = time.Duration(float64(duration) * factor)
  98. if cap > 0 && next > cap {
  99. next = cap
  100. steps = 0
  101. }
  102. } else {
  103. next = duration
  104. }
  105. // add jitter for this step
  106. if jitter > 0 {
  107. duration = Jitter(duration, jitter)
  108. }
  109. return duration, next, steps
  110. }
  111. // DelayWithReset returns a DelayFunc that will return the appropriate next interval to
  112. // wait. Every resetInterval the backoff parameters are reset to their initial state.
  113. // This method is safe to invoke from multiple goroutines, but all calls will advance
  114. // the backoff state when Factor is set. If Factor is zero, this method is the same as
  115. // invoking b.DelayFunc() since Steps has no impact without Factor. If resetInterval is
  116. // zero no backoff will be performed as the same calling DelayFunc with a zero factor
  117. // and steps.
  118. func (b Backoff) DelayWithReset(c clock.Clock, resetInterval time.Duration) DelayFunc {
  119. if b.Factor <= 0 {
  120. return b.DelayFunc()
  121. }
  122. if resetInterval <= 0 {
  123. b.Steps = 0
  124. b.Factor = 0
  125. return b.DelayFunc()
  126. }
  127. return (&backoffManager{
  128. backoff: b,
  129. initialBackoff: b,
  130. resetInterval: resetInterval,
  131. clock: c,
  132. lastStart: c.Now(),
  133. timer: nil,
  134. }).Step
  135. }
  136. // Until loops until stop channel is closed, running f every period.
  137. //
  138. // Until is syntactic sugar on top of JitterUntil with zero jitter factor and
  139. // with sliding = true (which means the timer for period starts after the f
  140. // completes).
  141. func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
  142. JitterUntil(f, period, 0.0, true, stopCh)
  143. }
  144. // UntilWithContext loops until context is done, running f every period.
  145. //
  146. // UntilWithContext is syntactic sugar on top of JitterUntilWithContext
  147. // with zero jitter factor and with sliding = true (which means the timer
  148. // for period starts after the f completes).
  149. func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
  150. JitterUntilWithContext(ctx, f, period, 0.0, true)
  151. }
  152. // NonSlidingUntil loops until stop channel is closed, running f every
  153. // period.
  154. //
  155. // NonSlidingUntil is syntactic sugar on top of JitterUntil with zero jitter
  156. // factor, with sliding = false (meaning the timer for period starts at the same
  157. // time as the function starts).
  158. func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
  159. JitterUntil(f, period, 0.0, false, stopCh)
  160. }
  161. // NonSlidingUntilWithContext loops until context is done, running f every
  162. // period.
  163. //
  164. // NonSlidingUntilWithContext is syntactic sugar on top of JitterUntilWithContext
  165. // with zero jitter factor, with sliding = false (meaning the timer for period
  166. // starts at the same time as the function starts).
  167. func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
  168. JitterUntilWithContext(ctx, f, period, 0.0, false)
  169. }
  170. // JitterUntil loops until stop channel is closed, running f every period.
  171. //
  172. // If jitterFactor is positive, the period is jittered before every run of f.
  173. // If jitterFactor is not positive, the period is unchanged and not jittered.
  174. //
  175. // If sliding is true, the period is computed after f runs. If it is false then
  176. // period includes the runtime for f.
  177. //
  178. // Close stopCh to stop. f may not be invoked if stop channel is already
  179. // closed. Pass NeverStop to if you don't want it stop.
  180. func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
  181. BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
  182. }
  183. // BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
  184. //
  185. // If sliding is true, the period is computed after f runs. If it is false then
  186. // period includes the runtime for f.
  187. func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
  188. var t clock.Timer
  189. for {
  190. select {
  191. case <-stopCh:
  192. return
  193. default:
  194. }
  195. if !sliding {
  196. t = backoff.Backoff()
  197. }
  198. func() {
  199. defer runtime.HandleCrash()
  200. f()
  201. }()
  202. if sliding {
  203. t = backoff.Backoff()
  204. }
  205. // NOTE: b/c there is no priority selection in golang
  206. // it is possible for this to race, meaning we could
  207. // trigger t.C and stopCh, and t.C select falls through.
  208. // In order to mitigate we re-check stopCh at the beginning
  209. // of every loop to prevent extra executions of f().
  210. select {
  211. case <-stopCh:
  212. if !t.Stop() {
  213. <-t.C()
  214. }
  215. return
  216. case <-t.C():
  217. }
  218. }
  219. }
  220. // JitterUntilWithContext loops until context is done, running f every period.
  221. //
  222. // If jitterFactor is positive, the period is jittered before every run of f.
  223. // If jitterFactor is not positive, the period is unchanged and not jittered.
  224. //
  225. // If sliding is true, the period is computed after f runs. If it is false then
  226. // period includes the runtime for f.
  227. //
  228. // Cancel context to stop. f may not be invoked if context is already expired.
  229. func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) {
  230. JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done())
  231. }
  232. // backoffManager provides simple backoff behavior in a threadsafe manner to a caller.
  233. type backoffManager struct {
  234. backoff Backoff
  235. initialBackoff Backoff
  236. resetInterval time.Duration
  237. clock clock.Clock
  238. lock sync.Mutex
  239. lastStart time.Time
  240. timer clock.Timer
  241. }
  242. // Step returns the expected next duration to wait.
  243. func (b *backoffManager) Step() time.Duration {
  244. b.lock.Lock()
  245. defer b.lock.Unlock()
  246. switch {
  247. case b.resetInterval == 0:
  248. b.backoff = b.initialBackoff
  249. case b.clock.Now().Sub(b.lastStart) > b.resetInterval:
  250. b.backoff = b.initialBackoff
  251. b.lastStart = b.clock.Now()
  252. }
  253. return b.backoff.Step()
  254. }
  255. // Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer
  256. // for exponential backoff. The returned timer must be drained before calling Backoff() the second
  257. // time.
  258. func (b *backoffManager) Backoff() clock.Timer {
  259. b.lock.Lock()
  260. defer b.lock.Unlock()
  261. if b.timer == nil {
  262. b.timer = b.clock.NewTimer(b.Step())
  263. } else {
  264. b.timer.Reset(b.Step())
  265. }
  266. return b.timer
  267. }
  268. // Timer returns a new Timer instance that shares the clock and the reset behavior with all other
  269. // timers.
  270. func (b *backoffManager) Timer() Timer {
  271. return DelayFunc(b.Step).Timer(b.clock)
  272. }
  273. // BackoffManager manages backoff with a particular scheme based on its underlying implementation.
  274. type BackoffManager interface {
  275. // Backoff returns a shared clock.Timer that is Reset on every invocation. This method is not
  276. // safe for use from multiple threads. It returns a timer for backoff, and caller shall backoff
  277. // until Timer.C() drains. If the second Backoff() is called before the timer from the first
  278. // Backoff() call finishes, the first timer will NOT be drained and result in undetermined
  279. // behavior.
  280. Backoff() clock.Timer
  281. }
  282. // Deprecated: Will be removed when the legacy polling functions are removed.
  283. type exponentialBackoffManagerImpl struct {
  284. backoff *Backoff
  285. backoffTimer clock.Timer
  286. lastBackoffStart time.Time
  287. initialBackoff time.Duration
  288. backoffResetDuration time.Duration
  289. clock clock.Clock
  290. }
  291. // NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and
  292. // backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset.
  293. // This backoff manager is used to reduce load during upstream unhealthiness.
  294. //
  295. // Deprecated: Will be removed when the legacy Poll methods are removed. Callers should construct a
  296. // Backoff struct, use DelayWithReset() to get a DelayFunc that periodically resets itself, and then
  297. // invoke Timer() when calling wait.BackoffUntil.
  298. //
  299. // Instead of:
  300. //
  301. // bm := wait.NewExponentialBackoffManager(init, max, reset, factor, jitter, clock)
  302. // ...
  303. // wait.BackoffUntil(..., bm.Backoff, ...)
  304. //
  305. // Use:
  306. //
  307. // delayFn := wait.Backoff{
  308. // Duration: init,
  309. // Cap: max,
  310. // Steps: int(math.Ceil(float64(max) / float64(init))), // now a required argument
  311. // Factor: factor,
  312. // Jitter: jitter,
  313. // }.DelayWithReset(reset, clock)
  314. // wait.BackoffUntil(..., delayFn.Timer(), ...)
  315. func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager {
  316. return &exponentialBackoffManagerImpl{
  317. backoff: &Backoff{
  318. Duration: initBackoff,
  319. Factor: backoffFactor,
  320. Jitter: jitter,
  321. // the current impl of wait.Backoff returns Backoff.Duration once steps are used up, which is not
  322. // what we ideally need here, we set it to max int and assume we will never use up the steps
  323. Steps: math.MaxInt32,
  324. Cap: maxBackoff,
  325. },
  326. backoffTimer: nil,
  327. initialBackoff: initBackoff,
  328. lastBackoffStart: c.Now(),
  329. backoffResetDuration: resetDuration,
  330. clock: c,
  331. }
  332. }
  333. func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration {
  334. if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration {
  335. b.backoff.Steps = math.MaxInt32
  336. b.backoff.Duration = b.initialBackoff
  337. }
  338. b.lastBackoffStart = b.clock.Now()
  339. return b.backoff.Step()
  340. }
  341. // Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for exponential backoff.
  342. // The returned timer must be drained before calling Backoff() the second time
  343. func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer {
  344. if b.backoffTimer == nil {
  345. b.backoffTimer = b.clock.NewTimer(b.getNextBackoff())
  346. } else {
  347. b.backoffTimer.Reset(b.getNextBackoff())
  348. }
  349. return b.backoffTimer
  350. }
  351. // Deprecated: Will be removed when the legacy polling functions are removed.
  352. type jitteredBackoffManagerImpl struct {
  353. clock clock.Clock
  354. duration time.Duration
  355. jitter float64
  356. backoffTimer clock.Timer
  357. }
  358. // NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
  359. // is negative, backoff will not be jittered.
  360. //
  361. // Deprecated: Will be removed when the legacy Poll methods are removed. Callers should construct a
  362. // Backoff struct and invoke Timer() when calling wait.BackoffUntil.
  363. //
  364. // Instead of:
  365. //
  366. // bm := wait.NewJitteredBackoffManager(duration, jitter, clock)
  367. // ...
  368. // wait.BackoffUntil(..., bm.Backoff, ...)
  369. //
  370. // Use:
  371. //
  372. // wait.BackoffUntil(..., wait.Backoff{Duration: duration, Jitter: jitter}.Timer(), ...)
  373. func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
  374. return &jitteredBackoffManagerImpl{
  375. clock: c,
  376. duration: duration,
  377. jitter: jitter,
  378. backoffTimer: nil,
  379. }
  380. }
  381. func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
  382. jitteredPeriod := j.duration
  383. if j.jitter > 0.0 {
  384. jitteredPeriod = Jitter(j.duration, j.jitter)
  385. }
  386. return jitteredPeriod
  387. }
  388. // Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for jittered backoff.
  389. // The returned timer must be drained before calling Backoff() the second time
  390. func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
  391. backoff := j.getNextBackoff()
  392. if j.backoffTimer == nil {
  393. j.backoffTimer = j.clock.NewTimer(backoff)
  394. } else {
  395. j.backoffTimer.Reset(backoff)
  396. }
  397. return j.backoffTimer
  398. }
  399. // ExponentialBackoff repeats a condition check with exponential backoff.
  400. //
  401. // It repeatedly checks the condition and then sleeps, using `backoff.Step()`
  402. // to determine the length of the sleep and adjust Duration and Steps.
  403. // Stops and returns as soon as:
  404. // 1. the condition check returns true or an error,
  405. // 2. `backoff.Steps` checks of the condition have been done, or
  406. // 3. a sleep truncated by the cap on duration has been completed.
  407. // In case (1) the returned error is what the condition function returned.
  408. // In all other cases, ErrWaitTimeout is returned.
  409. //
  410. // Since backoffs are often subject to cancellation, we recommend using
  411. // ExponentialBackoffWithContext and passing a context to the method.
  412. func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
  413. for backoff.Steps > 0 {
  414. if ok, err := runConditionWithCrashProtection(condition); err != nil || ok {
  415. return err
  416. }
  417. if backoff.Steps == 1 {
  418. break
  419. }
  420. time.Sleep(backoff.Step())
  421. }
  422. return ErrWaitTimeout
  423. }
  424. // ExponentialBackoffWithContext repeats a condition check with exponential backoff.
  425. // It immediately returns an error if the condition returns an error, the context is cancelled
  426. // or hits the deadline, or if the maximum attempts defined in backoff is exceeded (ErrWaitTimeout).
  427. // If an error is returned by the condition the backoff stops immediately. The condition will
  428. // never be invoked more than backoff.Steps times.
  429. func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionWithContextFunc) error {
  430. for backoff.Steps > 0 {
  431. select {
  432. case <-ctx.Done():
  433. return ctx.Err()
  434. default:
  435. }
  436. if ok, err := runConditionWithCrashProtectionWithContext(ctx, condition); err != nil || ok {
  437. return err
  438. }
  439. if backoff.Steps == 1 {
  440. break
  441. }
  442. waitBeforeRetry := backoff.Step()
  443. select {
  444. case <-ctx.Done():
  445. return ctx.Err()
  446. case <-time.After(waitBeforeRetry):
  447. }
  448. }
  449. return ErrWaitTimeout
  450. }