delaying_queue.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package workqueue
  14. import (
  15. "container/heap"
  16. "sync"
  17. "time"
  18. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  19. "k8s.io/utils/clock"
  20. )
  21. // DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
  22. // requeue items after failures without ending up in a hot-loop.
  23. type DelayingInterface interface {
  24. Interface
  25. // AddAfter adds an item to the workqueue after the indicated duration has passed
  26. AddAfter(item interface{}, duration time.Duration)
  27. }
  28. // DelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
  29. type DelayingQueueConfig struct {
  30. // Name for the queue. If unnamed, the metrics will not be registered.
  31. Name string
  32. // MetricsProvider optionally allows specifying a metrics provider to use for the queue
  33. // instead of the global provider.
  34. MetricsProvider MetricsProvider
  35. // Clock optionally allows injecting a real or fake clock for testing purposes.
  36. Clock clock.WithTicker
  37. // Queue optionally allows injecting custom queue Interface instead of the default one.
  38. Queue Interface
  39. }
  40. // NewDelayingQueue constructs a new workqueue with delayed queuing ability.
  41. // NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
  42. // NewDelayingQueueWithConfig instead and specify a name.
  43. func NewDelayingQueue() DelayingInterface {
  44. return NewDelayingQueueWithConfig(DelayingQueueConfig{})
  45. }
  46. // NewDelayingQueueWithConfig constructs a new workqueue with options to
  47. // customize different properties.
  48. func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {
  49. if config.Clock == nil {
  50. config.Clock = clock.RealClock{}
  51. }
  52. if config.Queue == nil {
  53. config.Queue = NewWithConfig(QueueConfig{
  54. Name: config.Name,
  55. MetricsProvider: config.MetricsProvider,
  56. Clock: config.Clock,
  57. })
  58. }
  59. return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider)
  60. }
  61. // NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
  62. // inject custom queue Interface instead of the default one
  63. // Deprecated: Use NewDelayingQueueWithConfig instead.
  64. func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
  65. return NewDelayingQueueWithConfig(DelayingQueueConfig{
  66. Name: name,
  67. Queue: q,
  68. })
  69. }
  70. // NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability.
  71. // Deprecated: Use NewDelayingQueueWithConfig instead.
  72. func NewNamedDelayingQueue(name string) DelayingInterface {
  73. return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name})
  74. }
  75. // NewDelayingQueueWithCustomClock constructs a new named workqueue
  76. // with ability to inject real or fake clock for testing purposes.
  77. // Deprecated: Use NewDelayingQueueWithConfig instead.
  78. func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {
  79. return NewDelayingQueueWithConfig(DelayingQueueConfig{
  80. Name: name,
  81. Clock: clock,
  82. })
  83. }
  84. func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {
  85. ret := &delayingType{
  86. Interface: q,
  87. clock: clock,
  88. heartbeat: clock.NewTicker(maxWait),
  89. stopCh: make(chan struct{}),
  90. waitingForAddCh: make(chan *waitFor, 1000),
  91. metrics: newRetryMetrics(name, provider),
  92. }
  93. go ret.waitingLoop()
  94. return ret
  95. }
  96. // delayingType wraps an Interface and provides delayed re-enquing
  97. type delayingType struct {
  98. Interface
  99. // clock tracks time for delayed firing
  100. clock clock.Clock
  101. // stopCh lets us signal a shutdown to the waiting loop
  102. stopCh chan struct{}
  103. // stopOnce guarantees we only signal shutdown a single time
  104. stopOnce sync.Once
  105. // heartbeat ensures we wait no more than maxWait before firing
  106. heartbeat clock.Ticker
  107. // waitingForAddCh is a buffered channel that feeds waitingForAdd
  108. waitingForAddCh chan *waitFor
  109. // metrics counts the number of retries
  110. metrics retryMetrics
  111. }
  112. // waitFor holds the data to add and the time it should be added
  113. type waitFor struct {
  114. data t
  115. readyAt time.Time
  116. // index in the priority queue (heap)
  117. index int
  118. }
  119. // waitForPriorityQueue implements a priority queue for waitFor items.
  120. //
  121. // waitForPriorityQueue implements heap.Interface. The item occurring next in
  122. // time (i.e., the item with the smallest readyAt) is at the root (index 0).
  123. // Peek returns this minimum item at index 0. Pop returns the minimum item after
  124. // it has been removed from the queue and placed at index Len()-1 by
  125. // container/heap. Push adds an item at index Len(), and container/heap
  126. // percolates it into the correct location.
  127. type waitForPriorityQueue []*waitFor
  128. func (pq waitForPriorityQueue) Len() int {
  129. return len(pq)
  130. }
  131. func (pq waitForPriorityQueue) Less(i, j int) bool {
  132. return pq[i].readyAt.Before(pq[j].readyAt)
  133. }
  134. func (pq waitForPriorityQueue) Swap(i, j int) {
  135. pq[i], pq[j] = pq[j], pq[i]
  136. pq[i].index = i
  137. pq[j].index = j
  138. }
  139. // Push adds an item to the queue. Push should not be called directly; instead,
  140. // use `heap.Push`.
  141. func (pq *waitForPriorityQueue) Push(x interface{}) {
  142. n := len(*pq)
  143. item := x.(*waitFor)
  144. item.index = n
  145. *pq = append(*pq, item)
  146. }
  147. // Pop removes an item from the queue. Pop should not be called directly;
  148. // instead, use `heap.Pop`.
  149. func (pq *waitForPriorityQueue) Pop() interface{} {
  150. n := len(*pq)
  151. item := (*pq)[n-1]
  152. item.index = -1
  153. *pq = (*pq)[0:(n - 1)]
  154. return item
  155. }
  156. // Peek returns the item at the beginning of the queue, without removing the
  157. // item or otherwise mutating the queue. It is safe to call directly.
  158. func (pq waitForPriorityQueue) Peek() interface{} {
  159. return pq[0]
  160. }
  161. // ShutDown stops the queue. After the queue drains, the returned shutdown bool
  162. // on Get() will be true. This method may be invoked more than once.
  163. func (q *delayingType) ShutDown() {
  164. q.stopOnce.Do(func() {
  165. q.Interface.ShutDown()
  166. close(q.stopCh)
  167. q.heartbeat.Stop()
  168. })
  169. }
  170. // AddAfter adds the given item to the work queue after the given delay
  171. func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
  172. // don't add if we're already shutting down
  173. if q.ShuttingDown() {
  174. return
  175. }
  176. q.metrics.retry()
  177. // immediately add things with no delay
  178. if duration <= 0 {
  179. q.Add(item)
  180. return
  181. }
  182. select {
  183. case <-q.stopCh:
  184. // unblock if ShutDown() is called
  185. case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
  186. }
  187. }
  188. // maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
  189. // Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
  190. // expired item sitting for more than 10 seconds.
  191. const maxWait = 10 * time.Second
  192. // waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
  193. func (q *delayingType) waitingLoop() {
  194. defer utilruntime.HandleCrash()
  195. // Make a placeholder channel to use when there are no items in our list
  196. never := make(<-chan time.Time)
  197. // Make a timer that expires when the item at the head of the waiting queue is ready
  198. var nextReadyAtTimer clock.Timer
  199. waitingForQueue := &waitForPriorityQueue{}
  200. heap.Init(waitingForQueue)
  201. waitingEntryByData := map[t]*waitFor{}
  202. for {
  203. if q.Interface.ShuttingDown() {
  204. return
  205. }
  206. now := q.clock.Now()
  207. // Add ready entries
  208. for waitingForQueue.Len() > 0 {
  209. entry := waitingForQueue.Peek().(*waitFor)
  210. if entry.readyAt.After(now) {
  211. break
  212. }
  213. entry = heap.Pop(waitingForQueue).(*waitFor)
  214. q.Add(entry.data)
  215. delete(waitingEntryByData, entry.data)
  216. }
  217. // Set up a wait for the first item's readyAt (if one exists)
  218. nextReadyAt := never
  219. if waitingForQueue.Len() > 0 {
  220. if nextReadyAtTimer != nil {
  221. nextReadyAtTimer.Stop()
  222. }
  223. entry := waitingForQueue.Peek().(*waitFor)
  224. nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
  225. nextReadyAt = nextReadyAtTimer.C()
  226. }
  227. select {
  228. case <-q.stopCh:
  229. return
  230. case <-q.heartbeat.C():
  231. // continue the loop, which will add ready items
  232. case <-nextReadyAt:
  233. // continue the loop, which will add ready items
  234. case waitEntry := <-q.waitingForAddCh:
  235. if waitEntry.readyAt.After(q.clock.Now()) {
  236. insert(waitingForQueue, waitingEntryByData, waitEntry)
  237. } else {
  238. q.Add(waitEntry.data)
  239. }
  240. drained := false
  241. for !drained {
  242. select {
  243. case waitEntry := <-q.waitingForAddCh:
  244. if waitEntry.readyAt.After(q.clock.Now()) {
  245. insert(waitingForQueue, waitingEntryByData, waitEntry)
  246. } else {
  247. q.Add(waitEntry.data)
  248. }
  249. default:
  250. drained = true
  251. }
  252. }
  253. }
  254. }
  255. }
  256. // insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
  257. func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
  258. // if the entry already exists, update the time only if it would cause the item to be queued sooner
  259. existing, exists := knownEntries[entry.data]
  260. if exists {
  261. if existing.readyAt.After(entry.readyAt) {
  262. existing.readyAt = entry.readyAt
  263. heap.Fix(q, existing.index)
  264. }
  265. return
  266. }
  267. heap.Push(q, entry)
  268. knownEntries[entry.data] = entry
  269. }