throttle.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package flowcontrol
  14. import (
  15. "context"
  16. "errors"
  17. "sync"
  18. "time"
  19. "golang.org/x/time/rate"
  20. "k8s.io/utils/clock"
  21. )
  22. type PassiveRateLimiter interface {
  23. // TryAccept returns true if a token is taken immediately. Otherwise,
  24. // it returns false.
  25. TryAccept() bool
  26. // Stop stops the rate limiter, subsequent calls to CanAccept will return false
  27. Stop()
  28. // QPS returns QPS of this rate limiter
  29. QPS() float32
  30. }
  31. type RateLimiter interface {
  32. PassiveRateLimiter
  33. // Accept returns once a token becomes available.
  34. Accept()
  35. // Wait returns nil if a token is taken before the Context is done.
  36. Wait(ctx context.Context) error
  37. }
  38. type tokenBucketPassiveRateLimiter struct {
  39. limiter *rate.Limiter
  40. qps float32
  41. clock clock.PassiveClock
  42. }
  43. type tokenBucketRateLimiter struct {
  44. tokenBucketPassiveRateLimiter
  45. clock Clock
  46. }
  47. // NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
  48. // The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a
  49. // smoothed qps rate of 'qps'.
  50. // The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
  51. // The maximum number of tokens in the bucket is capped at 'burst'.
  52. func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
  53. limiter := rate.NewLimiter(rate.Limit(qps), burst)
  54. return newTokenBucketRateLimiterWithClock(limiter, clock.RealClock{}, qps)
  55. }
  56. // NewTokenBucketPassiveRateLimiter is similar to NewTokenBucketRateLimiter except that it returns
  57. // a PassiveRateLimiter which does not have Accept() and Wait() methods.
  58. func NewTokenBucketPassiveRateLimiter(qps float32, burst int) PassiveRateLimiter {
  59. limiter := rate.NewLimiter(rate.Limit(qps), burst)
  60. return newTokenBucketRateLimiterWithPassiveClock(limiter, clock.RealClock{}, qps)
  61. }
  62. // An injectable, mockable clock interface.
  63. type Clock interface {
  64. clock.PassiveClock
  65. Sleep(time.Duration)
  66. }
  67. var _ Clock = (*clock.RealClock)(nil)
  68. // NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
  69. // but allows an injectable clock, for testing.
  70. func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
  71. limiter := rate.NewLimiter(rate.Limit(qps), burst)
  72. return newTokenBucketRateLimiterWithClock(limiter, c, qps)
  73. }
  74. // NewTokenBucketPassiveRateLimiterWithClock is similar to NewTokenBucketRateLimiterWithClock
  75. // except that it returns a PassiveRateLimiter which does not have Accept() and Wait() methods
  76. // and uses a PassiveClock.
  77. func NewTokenBucketPassiveRateLimiterWithClock(qps float32, burst int, c clock.PassiveClock) PassiveRateLimiter {
  78. limiter := rate.NewLimiter(rate.Limit(qps), burst)
  79. return newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps)
  80. }
  81. func newTokenBucketRateLimiterWithClock(limiter *rate.Limiter, c Clock, qps float32) *tokenBucketRateLimiter {
  82. return &tokenBucketRateLimiter{
  83. tokenBucketPassiveRateLimiter: *newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps),
  84. clock: c,
  85. }
  86. }
  87. func newTokenBucketRateLimiterWithPassiveClock(limiter *rate.Limiter, c clock.PassiveClock, qps float32) *tokenBucketPassiveRateLimiter {
  88. return &tokenBucketPassiveRateLimiter{
  89. limiter: limiter,
  90. qps: qps,
  91. clock: c,
  92. }
  93. }
  94. func (tbprl *tokenBucketPassiveRateLimiter) Stop() {
  95. }
  96. func (tbprl *tokenBucketPassiveRateLimiter) QPS() float32 {
  97. return tbprl.qps
  98. }
  99. func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
  100. return tbprl.limiter.AllowN(tbprl.clock.Now(), 1)
  101. }
  102. // Accept will block until a token becomes available
  103. func (tbrl *tokenBucketRateLimiter) Accept() {
  104. now := tbrl.clock.Now()
  105. tbrl.clock.Sleep(tbrl.limiter.ReserveN(now, 1).DelayFrom(now))
  106. }
  107. func (tbrl *tokenBucketRateLimiter) Wait(ctx context.Context) error {
  108. return tbrl.limiter.Wait(ctx)
  109. }
  110. type fakeAlwaysRateLimiter struct{}
  111. func NewFakeAlwaysRateLimiter() RateLimiter {
  112. return &fakeAlwaysRateLimiter{}
  113. }
  114. func (t *fakeAlwaysRateLimiter) TryAccept() bool {
  115. return true
  116. }
  117. func (t *fakeAlwaysRateLimiter) Stop() {}
  118. func (t *fakeAlwaysRateLimiter) Accept() {}
  119. func (t *fakeAlwaysRateLimiter) QPS() float32 {
  120. return 1
  121. }
  122. func (t *fakeAlwaysRateLimiter) Wait(ctx context.Context) error {
  123. return nil
  124. }
  125. type fakeNeverRateLimiter struct {
  126. wg sync.WaitGroup
  127. }
  128. func NewFakeNeverRateLimiter() RateLimiter {
  129. rl := fakeNeverRateLimiter{}
  130. rl.wg.Add(1)
  131. return &rl
  132. }
  133. func (t *fakeNeverRateLimiter) TryAccept() bool {
  134. return false
  135. }
  136. func (t *fakeNeverRateLimiter) Stop() {
  137. t.wg.Done()
  138. }
  139. func (t *fakeNeverRateLimiter) Accept() {
  140. t.wg.Wait()
  141. }
  142. func (t *fakeNeverRateLimiter) QPS() float32 {
  143. return 1
  144. }
  145. func (t *fakeNeverRateLimiter) Wait(ctx context.Context) error {
  146. return errors.New("can not be accept")
  147. }
  148. var (
  149. _ RateLimiter = (*tokenBucketRateLimiter)(nil)
  150. _ RateLimiter = (*fakeAlwaysRateLimiter)(nil)
  151. _ RateLimiter = (*fakeNeverRateLimiter)(nil)
  152. )
  153. var _ PassiveRateLimiter = (*tokenBucketPassiveRateLimiter)(nil)