delta_fifo.go 27 KB


  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "fmt"
  17. "sync"
  18. "time"
  19. "k8s.io/apimachinery/pkg/util/sets"
  20. "k8s.io/klog/v2"
  21. utiltrace "k8s.io/utils/trace"
  22. )
  23. // DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
  24. // optional.
  25. type DeltaFIFOOptions struct {
  26. // KeyFunction is used to figure out what key an object should have. (It's
  27. // exposed in the returned DeltaFIFO's KeyOf() method, with additional
  28. // handling around deleted objects and queue state).
  29. // Optional, the default is MetaNamespaceKeyFunc.
  30. KeyFunction KeyFunc
  31. // KnownObjects is expected to return a list of keys that the consumer of
  32. // this queue "knows about". It is used to decide which items are missing
  33. // when Replace() is called; 'Deleted' deltas are produced for the missing items.
  34. // KnownObjects may be nil if you can tolerate missing deletions on Replace().
  35. KnownObjects KeyListerGetter
  36. // EmitDeltaTypeReplaced indicates that the queue consumer
  37. // understands the Replaced DeltaType. Before the `Replaced` event type was
  38. // added, calls to Replace() were handled the same as Sync(). For
  39. // backwards-compatibility purposes, this is false by default.
  40. // When true, `Replaced` events will be sent for items passed to a Replace() call.
  41. // When false, `Sync` events will be sent instead.
  42. EmitDeltaTypeReplaced bool
  43. // If set, will be called for objects before enqueueing them. Please
  44. // see the comment on TransformFunc for details.
  45. Transformer TransformFunc
  46. }
  47. // DeltaFIFO is like FIFO, but differs in two ways. One is that the
  48. // accumulator associated with a given object's key is not that object
  49. // but rather a Deltas, which is a slice of Delta values for that
  50. // object. Applying an object to a Deltas means to append a Delta
  51. // except when the potentially appended Delta is a Deleted and the
  52. // Deltas already ends with a Deleted. In that case the Deltas does
  53. // not grow, although the terminal Deleted will be replaced by the new
  54. // Deleted if the older Deleted's object is a
  55. // DeletedFinalStateUnknown.
  56. //
  57. // The other difference is that DeltaFIFO has two additional ways that
  58. // an object can be applied to an accumulator: Replaced and Sync.
  59. // If EmitDeltaTypeReplaced is not set to true, Sync will be used in
  60. // replace events for backwards compatibility. Sync is used for periodic
  61. // resync events.
  62. //
  63. // DeltaFIFO is a producer-consumer queue, where a Reflector is
  64. // intended to be the producer, and the consumer is whatever calls
  65. // the Pop() method.
  66. //
  67. // DeltaFIFO solves this use case:
  68. // - You want to process every object change (delta) at most once.
  69. // - When you process an object, you want to see everything
  70. // that's happened to it since you last processed it.
  71. // - You want to process the deletion of some of the objects.
  72. // - You might want to periodically reprocess objects.
  73. //
  74. // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
  75. // interface{} to satisfy the Store/Queue interfaces, but they
  76. // will always return an object of type Deltas. List() returns
  77. // the newest object from each accumulator in the FIFO.
  78. //
  79. // A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
  80. // to list Store keys and to get objects by Store key. The objects in
  81. // question are called "known objects" and this set of objects
  82. // modifies the behavior of the Delete, Replace, and Resync methods
  83. // (each in a different way).
  84. //
  85. // A note on threading: If you call Pop() in parallel from multiple
  86. // threads, you could end up with multiple threads processing slightly
  87. // different versions of the same object.
  88. type DeltaFIFO struct {
  89. // lock/cond protects access to 'items' and 'queue'.
  90. lock sync.RWMutex
  91. cond sync.Cond
  92. // `items` maps a key to a Deltas.
  93. // Each such Deltas has at least one Delta.
  94. items map[string]Deltas
  95. // `queue` maintains FIFO order of keys for consumption in Pop().
  96. // There are no duplicates in `queue`.
  97. // A key is in `queue` if and only if it is in `items`.
  98. queue []string
  99. // populated is true if the first batch of items inserted by Replace() has been populated
  100. // or Delete/Add/Update/AddIfNotPresent was called first.
  101. populated bool
  102. // initialPopulationCount is the number of items inserted by the first call of Replace()
  103. initialPopulationCount int
  104. // keyFunc is used to make the key used for queued item
  105. // insertion and retrieval, and should be deterministic.
  106. keyFunc KeyFunc
  107. // knownObjects list keys that are "known" --- affecting Delete(),
  108. // Replace(), and Resync()
  109. knownObjects KeyListerGetter
  110. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
  111. // Currently, not used to gate any of CRUD operations.
  112. closed bool
  113. // emitDeltaTypeReplaced is whether to emit the Replaced or Sync
  114. // DeltaType when Replace() is called (to preserve backwards compat).
  115. emitDeltaTypeReplaced bool
  116. // Called with every object if non-nil.
  117. transformer TransformFunc
  118. }
  119. // TransformFunc allows for transforming an object before it will be processed.
  120. // TransformFunc (similarly to ResourceEventHandler functions) should be able
  121. // to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
  122. //
  123. // New in v1.27: In such cases, the contained object will already have gone
  124. // through the transform object separately (when it was added / updated prior
  125. // to the delete), so the TransformFunc can likely safely ignore such objects
  126. // (i.e., just return the input object).
  127. //
  128. // The most common usage pattern is to clean-up some parts of the object to
  129. // reduce component memory usage if a given component doesn't care about them.
  130. //
  131. // New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
  132. // sees the object before any other actor, and it is now safe to mutate the
  133. // object in place instead of making a copy.
  134. //
  135. // Note that TransformFunc is called while inserting objects into the
  136. // notification queue and is therefore extremely performance sensitive; please
  137. // do not do anything that will take a long time.
  138. type TransformFunc func(interface{}) (interface{}, error)
  139. // DeltaType is the type of a change (addition, deletion, etc)
  140. type DeltaType string
  141. // Change type definition
  142. const (
  143. Added DeltaType = "Added"
  144. Updated DeltaType = "Updated"
  145. Deleted DeltaType = "Deleted"
  146. // Replaced is emitted when we encountered watch errors and had to do a
  147. // relist. We don't know if the replaced object has changed.
  148. //
  149. // NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
  150. // as well. Hence, Replaced is only emitted when the option
  151. // EmitDeltaTypeReplaced is true.
  152. Replaced DeltaType = "Replaced"
  153. // Sync is for synthetic events during a periodic resync.
  154. Sync DeltaType = "Sync"
  155. )
  156. // Delta is a member of Deltas (a list of Delta objects) which
  157. // in its turn is the type stored by a DeltaFIFO. It tells you what
  158. // change happened, and the object's state after* that change.
  159. //
  160. // [*] Unless the change is a deletion, and then you'll get the final
  161. // state of the object before it was deleted.
  162. type Delta struct {
  163. Type DeltaType
  164. Object interface{}
  165. }
  166. // Deltas is a list of one or more 'Delta's to an individual object.
  167. // The oldest delta is at index 0, the newest delta is the last one.
  168. type Deltas []Delta
  169. // NewDeltaFIFO returns a Queue which can be used to process changes to items.
  170. //
  171. // keyFunc is used to figure out what key an object should have. (It is
  172. // exposed in the returned DeltaFIFO's KeyOf() method, with additional handling
  173. // around deleted objects and queue state).
  174. //
  175. // 'knownObjects' may be supplied to modify the behavior of Delete,
  176. // Replace, and Resync. It may be nil if you do not need those
  177. // modifications.
  178. //
  179. // TODO: consider merging keyLister with this object, tracking a list of
  180. // "known" keys when Pop() is called. Have to think about how that
  181. // affects error retrying.
  182. //
  183. // NOTE: It is possible to misuse this and cause a race when using an
  184. // external known object source.
  185. // Whether there is a potential race depends on how the consumer
  186. // modifies knownObjects. In Pop(), process function is called under
  187. // lock, so it is safe to update data structures in it that need to be
  188. // in sync with the queue (e.g. knownObjects).
  189. //
  190. // Example:
  191. // In case of sharedIndexInformer being a consumer
  192. // (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
  193. // there is no race as knownObjects (s.indexer) is modified safely
  194. // under DeltaFIFO's lock. The only exceptions are GetStore() and
  195. // GetIndexer() methods, which expose ways to modify the underlying
  196. // storage. Currently these two methods are used for creating Lister
  197. // and internal tests.
  198. //
  199. // Also see the comment on DeltaFIFO.
  200. //
  201. // Warning: This constructs a DeltaFIFO that does not differentiate between
  202. // events caused by a call to Replace (e.g., from a relist, which may
  203. // contain object updates), and synthetic events caused by a periodic resync
  204. // (which just emit the existing object). See https://issue.k8s.io/86015 for details.
  205. //
  206. // Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})`
  207. // instead to receive a `Replaced` event depending on the type.
  208. //
  209. // Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})
  210. func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
  211. return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
  212. KeyFunction: keyFunc,
  213. KnownObjects: knownObjects,
  214. })
  215. }
  216. // NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
  217. // items. See also the comment on DeltaFIFO.
  218. func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
  219. if opts.KeyFunction == nil {
  220. opts.KeyFunction = MetaNamespaceKeyFunc
  221. }
  222. f := &DeltaFIFO{
  223. items: map[string]Deltas{},
  224. queue: []string{},
  225. keyFunc: opts.KeyFunction,
  226. knownObjects: opts.KnownObjects,
  227. emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
  228. transformer: opts.Transformer,
  229. }
  230. f.cond.L = &f.lock
  231. return f
  232. }
  233. var (
  234. _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
  235. )
  236. var (
  237. // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
  238. // object with zero length is encountered (should be impossible,
  239. // but included for completeness).
  240. ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
  241. )
  242. // Close the queue.
  243. func (f *DeltaFIFO) Close() {
  244. f.lock.Lock()
  245. defer f.lock.Unlock()
  246. f.closed = true
  247. f.cond.Broadcast()
  248. }
  249. // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
  250. // DeletedFinalStateUnknown objects.
  251. func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
  252. if d, ok := obj.(Deltas); ok {
  253. if len(d) == 0 {
  254. return "", KeyError{obj, ErrZeroLengthDeltasObject}
  255. }
  256. obj = d.Newest().Object
  257. }
  258. if d, ok := obj.(DeletedFinalStateUnknown); ok {
  259. return d.Key, nil
  260. }
  261. return f.keyFunc(obj)
  262. }
  263. // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
  264. // or the first batch of items inserted by Replace() has been popped.
  265. func (f *DeltaFIFO) HasSynced() bool {
  266. f.lock.Lock()
  267. defer f.lock.Unlock()
  268. return f.hasSynced_locked()
  269. }
  270. func (f *DeltaFIFO) hasSynced_locked() bool {
  271. return f.populated && f.initialPopulationCount == 0
  272. }
  273. // Add inserts an item, and puts it in the queue. The item is only enqueued
  274. // if it doesn't already exist in the set.
  275. func (f *DeltaFIFO) Add(obj interface{}) error {
  276. f.lock.Lock()
  277. defer f.lock.Unlock()
  278. f.populated = true
  279. return f.queueActionLocked(Added, obj)
  280. }
  281. // Update is just like Add, but makes an Updated Delta.
  282. func (f *DeltaFIFO) Update(obj interface{}) error {
  283. f.lock.Lock()
  284. defer f.lock.Unlock()
  285. f.populated = true
  286. return f.queueActionLocked(Updated, obj)
  287. }
  288. // Delete is just like Add, but makes a Deleted Delta. If the given
  289. // object does not already exist, it will be ignored. (It may have
  290. // already been deleted by a Replace (re-list), for example.) In this
  291. // method `f.knownObjects`, if not nil, provides (via GetByKey)
  292. // _additional_ objects that are considered to already exist.
  293. func (f *DeltaFIFO) Delete(obj interface{}) error {
  294. id, err := f.KeyOf(obj)
  295. if err != nil {
  296. return KeyError{obj, err}
  297. }
  298. f.lock.Lock()
  299. defer f.lock.Unlock()
  300. f.populated = true
  301. if f.knownObjects == nil {
  302. if _, exists := f.items[id]; !exists {
  303. // Presumably, this was deleted when a relist happened.
  304. // Don't provide a second report of the same deletion.
  305. return nil
  306. }
  307. } else {
  308. // We only want to skip the "deletion" action if the object doesn't
  309. // exist in knownObjects and it doesn't have corresponding item in items.
  310. // Note that even if there is a "deletion" action in items, we can ignore it,
  311. // because it will be deduped automatically in "queueActionLocked"
  312. _, exists, err := f.knownObjects.GetByKey(id)
  313. _, itemsExist := f.items[id]
  314. if err == nil && !exists && !itemsExist {
  315. // Presumably, this was deleted when a relist happened.
  316. // Don't provide a second report of the same deletion.
  317. return nil
  318. }
  319. }
  320. // exist in items and/or KnownObjects
  321. return f.queueActionLocked(Deleted, obj)
  322. }
  323. // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
  324. // present in the set, it is neither enqueued nor added to the set.
  325. //
  326. // This is useful in a single producer/consumer scenario so that the consumer can
  327. // safely retry items without contending with the producer and potentially enqueueing
  328. // stale items.
  329. //
  330. // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
  331. // different from the Add/Update/Delete functions.
  332. func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
  333. deltas, ok := obj.(Deltas)
  334. if !ok {
  335. return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
  336. }
  337. id, err := f.KeyOf(deltas)
  338. if err != nil {
  339. return KeyError{obj, err}
  340. }
  341. f.lock.Lock()
  342. defer f.lock.Unlock()
  343. f.addIfNotPresent(id, deltas)
  344. return nil
  345. }
  346. // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
  347. // already holds the fifo lock.
  348. func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
  349. f.populated = true
  350. if _, exists := f.items[id]; exists {
  351. return
  352. }
  353. f.queue = append(f.queue, id)
  354. f.items[id] = deltas
  355. f.cond.Broadcast()
  356. }
  357. // re-listing and watching can deliver the same update multiple times in any
  358. // order. This will combine the most recent two deltas if they are the same.
  359. func dedupDeltas(deltas Deltas) Deltas {
  360. n := len(deltas)
  361. if n < 2 {
  362. return deltas
  363. }
  364. a := &deltas[n-1]
  365. b := &deltas[n-2]
  366. if out := isDup(a, b); out != nil {
  367. deltas[n-2] = *out
  368. return deltas[:n-1]
  369. }
  370. return deltas
  371. }
  372. // If a & b represent the same event, returns the delta that ought to be kept.
  373. // Otherwise, returns nil.
  374. // TODO: is there anything other than deletions that need deduping?
  375. func isDup(a, b *Delta) *Delta {
  376. if out := isDeletionDup(a, b); out != nil {
  377. return out
  378. }
  379. // TODO: Detect other duplicate situations? Are there any?
  380. return nil
  381. }
  382. // keep the one with the most information if both are deletions.
  383. func isDeletionDup(a, b *Delta) *Delta {
  384. if b.Type != Deleted || a.Type != Deleted {
  385. return nil
  386. }
  387. // Do more sophisticated checks, or is this sufficient?
  388. if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
  389. return a
  390. }
  391. return b
  392. }
  393. // queueActionLocked appends to the delta list for the object.
  394. // Caller must lock first.
  395. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  396. id, err := f.KeyOf(obj)
  397. if err != nil {
  398. return KeyError{obj, err}
  399. }
  400. // Every object comes through this code path once, so this is a good
  401. // place to call the transform func. If obj is a
  402. // DeletedFinalStateUnknown tombstone, then the containted inner object
  403. // will already have gone through the transformer, but we document that
  404. // this can happen. In cases involving Replace(), such an object can
  405. // come through multiple times.
  406. if f.transformer != nil {
  407. var err error
  408. obj, err = f.transformer(obj)
  409. if err != nil {
  410. return err
  411. }
  412. }
  413. oldDeltas := f.items[id]
  414. newDeltas := append(oldDeltas, Delta{actionType, obj})
  415. newDeltas = dedupDeltas(newDeltas)
  416. if len(newDeltas) > 0 {
  417. if _, exists := f.items[id]; !exists {
  418. f.queue = append(f.queue, id)
  419. }
  420. f.items[id] = newDeltas
  421. f.cond.Broadcast()
  422. } else {
  423. // This never happens, because dedupDeltas never returns an empty list
  424. // when given a non-empty list (as it is here).
  425. // If somehow it happens anyway, deal with it but complain.
  426. if oldDeltas == nil {
  427. klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
  428. return nil
  429. }
  430. klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
  431. f.items[id] = newDeltas
  432. return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
  433. }
  434. return nil
  435. }
  436. // List returns a list of all the items; it returns the object
  437. // from the most recent Delta.
  438. // You should treat the items returned inside the deltas as immutable.
  439. func (f *DeltaFIFO) List() []interface{} {
  440. f.lock.RLock()
  441. defer f.lock.RUnlock()
  442. return f.listLocked()
  443. }
  444. func (f *DeltaFIFO) listLocked() []interface{} {
  445. list := make([]interface{}, 0, len(f.items))
  446. for _, item := range f.items {
  447. list = append(list, item.Newest().Object)
  448. }
  449. return list
  450. }
  451. // ListKeys returns a list of all the keys of the objects currently
  452. // in the FIFO.
  453. func (f *DeltaFIFO) ListKeys() []string {
  454. f.lock.RLock()
  455. defer f.lock.RUnlock()
  456. list := make([]string, 0, len(f.queue))
  457. for _, key := range f.queue {
  458. list = append(list, key)
  459. }
  460. return list
  461. }
  462. // Get returns the complete list of deltas for the requested item,
  463. // or sets exists=false.
  464. // You should treat the items returned inside the deltas as immutable.
  465. func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
  466. key, err := f.KeyOf(obj)
  467. if err != nil {
  468. return nil, false, KeyError{obj, err}
  469. }
  470. return f.GetByKey(key)
  471. }
  472. // GetByKey returns the complete list of deltas for the requested item,
  473. // setting exists=false if that list is empty.
  474. // You should treat the items returned inside the deltas as immutable.
  475. func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
  476. f.lock.RLock()
  477. defer f.lock.RUnlock()
  478. d, exists := f.items[key]
  479. if exists {
  480. // Copy item's slice so operations on this slice
  481. // won't interfere with the object we return.
  482. d = copyDeltas(d)
  483. }
  484. return d, exists, nil
  485. }
  486. // IsClosed checks if the queue is closed
  487. func (f *DeltaFIFO) IsClosed() bool {
  488. f.lock.Lock()
  489. defer f.lock.Unlock()
  490. return f.closed
  491. }
  492. // Pop blocks until the queue has some items, and then returns one. If
  493. // multiple items are ready, they are returned in the order in which they were
  494. // added/updated. The item is removed from the queue (and the store) before it
  495. // is returned, so if you don't successfully process it, you need to add it back
  496. // with AddIfNotPresent().
  497. // process function is called under lock, so it is safe to update data structures
  498. // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
  499. // may return an instance of ErrRequeue with a nested error to indicate the current
  500. // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
  501. // process should avoid expensive I/O operation so that other queue operations, i.e.
  502. // Add() and Get(), won't be blocked for too long.
  503. //
  504. // Pop returns a 'Deltas', which has a complete list of all the things
  505. // that happened to the object (deltas) while it was sitting in the queue.
  506. func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  507. f.lock.Lock()
  508. defer f.lock.Unlock()
  509. for {
  510. for len(f.queue) == 0 {
  511. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  512. // When Close() is called, the f.closed is set and the condition is broadcasted.
  513. // Which causes this loop to continue and return from the Pop().
  514. if f.closed {
  515. return nil, ErrFIFOClosed
  516. }
  517. f.cond.Wait()
  518. }
  519. isInInitialList := !f.hasSynced_locked()
  520. id := f.queue[0]
  521. f.queue = f.queue[1:]
  522. depth := len(f.queue)
  523. if f.initialPopulationCount > 0 {
  524. f.initialPopulationCount--
  525. }
  526. item, ok := f.items[id]
  527. if !ok {
  528. // This should never happen
  529. klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
  530. continue
  531. }
  532. delete(f.items, id)
  533. // Only log traces if the queue depth is greater than 10 and it takes more than
  534. // 100 milliseconds to process one item from the queue.
  535. // Queue depth never goes high because processing an item is locking the queue,
  536. // and new items can't be added until processing finish.
  537. // https://github.com/kubernetes/kubernetes/issues/103789
  538. if depth > 10 {
  539. trace := utiltrace.New("DeltaFIFO Pop Process",
  540. utiltrace.Field{Key: "ID", Value: id},
  541. utiltrace.Field{Key: "Depth", Value: depth},
  542. utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
  543. defer trace.LogIfLong(100 * time.Millisecond)
  544. }
  545. err := process(item, isInInitialList)
  546. if e, ok := err.(ErrRequeue); ok {
  547. f.addIfNotPresent(id, item)
  548. err = e.Err
  549. }
  550. // Don't need to copyDeltas here, because we're transferring
  551. // ownership to the caller.
  552. return item, err
  553. }
  554. }
  555. // Replace atomically does two things: (1) it adds the given objects
  556. // using the Sync or Replace DeltaType and then (2) it does some deletions.
  557. // In particular: for every pre-existing key K that is not the key of
  558. // an object in `list` there is the effect of
  559. // `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known
  560. // object of K. The pre-existing keys are those in the union set of the keys in
  561. // `f.items` and `f.knownObjects` (if not nil). The last known object for key K is
  562. // the one present in the last delta in `f.items`. If there is no delta for K
  563. // in `f.items`, it is the object in `f.knownObjects`
  564. func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
  565. f.lock.Lock()
  566. defer f.lock.Unlock()
  567. keys := make(sets.String, len(list))
  568. // keep backwards compat for old clients
  569. action := Sync
  570. if f.emitDeltaTypeReplaced {
  571. action = Replaced
  572. }
  573. // Add Sync/Replaced action for each new item.
  574. for _, item := range list {
  575. key, err := f.KeyOf(item)
  576. if err != nil {
  577. return KeyError{item, err}
  578. }
  579. keys.Insert(key)
  580. if err := f.queueActionLocked(action, item); err != nil {
  581. return fmt.Errorf("couldn't enqueue object: %v", err)
  582. }
  583. }
  584. // Do deletion detection against objects in the queue
  585. queuedDeletions := 0
  586. for k, oldItem := range f.items {
  587. if keys.Has(k) {
  588. continue
  589. }
  590. // Delete pre-existing items not in the new list.
  591. // This could happen if watch deletion event was missed while
  592. // disconnected from apiserver.
  593. var deletedObj interface{}
  594. if n := oldItem.Newest(); n != nil {
  595. deletedObj = n.Object
  596. // if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object
  597. if d, ok := deletedObj.(DeletedFinalStateUnknown); ok {
  598. deletedObj = d.Obj
  599. }
  600. }
  601. queuedDeletions++
  602. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  603. return err
  604. }
  605. }
  606. if f.knownObjects != nil {
  607. // Detect deletions for objects not present in the queue, but present in KnownObjects
  608. knownKeys := f.knownObjects.ListKeys()
  609. for _, k := range knownKeys {
  610. if keys.Has(k) {
  611. continue
  612. }
  613. if len(f.items[k]) > 0 {
  614. continue
  615. }
  616. deletedObj, exists, err := f.knownObjects.GetByKey(k)
  617. if err != nil {
  618. deletedObj = nil
  619. klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
  620. } else if !exists {
  621. deletedObj = nil
  622. klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
  623. }
  624. queuedDeletions++
  625. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  626. return err
  627. }
  628. }
  629. }
  630. if !f.populated {
  631. f.populated = true
  632. f.initialPopulationCount = keys.Len() + queuedDeletions
  633. }
  634. return nil
  635. }
  636. // Resync adds, with a Sync type of Delta, every object listed by
  637. // `f.knownObjects` whose key is not already queued for processing.
  638. // If `f.knownObjects` is `nil` then Resync does nothing.
  639. func (f *DeltaFIFO) Resync() error {
  640. f.lock.Lock()
  641. defer f.lock.Unlock()
  642. if f.knownObjects == nil {
  643. return nil
  644. }
  645. keys := f.knownObjects.ListKeys()
  646. for _, k := range keys {
  647. if err := f.syncKeyLocked(k); err != nil {
  648. return err
  649. }
  650. }
  651. return nil
  652. }
  653. func (f *DeltaFIFO) syncKeyLocked(key string) error {
  654. obj, exists, err := f.knownObjects.GetByKey(key)
  655. if err != nil {
  656. klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
  657. return nil
  658. } else if !exists {
  659. klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
  660. return nil
  661. }
  662. // If we are doing Resync() and there is already an event queued for that object,
  663. // we ignore the Resync for it. This is to avoid the race, in which the resync
  664. // comes with the previous value of object (since queueing an event for the object
  665. // doesn't trigger changing the underlying store <knownObjects>.
  666. id, err := f.KeyOf(obj)
  667. if err != nil {
  668. return KeyError{obj, err}
  669. }
  670. if len(f.items[id]) > 0 {
  671. return nil
  672. }
  673. if err := f.queueActionLocked(Sync, obj); err != nil {
  674. return fmt.Errorf("couldn't queue object: %v", err)
  675. }
  676. return nil
  677. }
  678. // A KeyListerGetter is anything that knows how to list its keys and look up by key.
  679. type KeyListerGetter interface {
  680. KeyLister
  681. KeyGetter
  682. }
  683. // A KeyLister is anything that knows how to list its keys.
  684. type KeyLister interface {
  685. ListKeys() []string
  686. }
  687. // A KeyGetter is anything that knows how to get the value stored under a given key.
  688. type KeyGetter interface {
  689. // GetByKey returns the value associated with the key, or sets exists=false.
  690. GetByKey(key string) (value interface{}, exists bool, err error)
  691. }
  692. // Oldest is a convenience function that returns the oldest delta, or
  693. // nil if there are no deltas.
  694. func (d Deltas) Oldest() *Delta {
  695. if len(d) > 0 {
  696. return &d[0]
  697. }
  698. return nil
  699. }
  700. // Newest is a convenience function that returns the newest delta, or
  701. // nil if there are no deltas.
  702. func (d Deltas) Newest() *Delta {
  703. if n := len(d); n > 0 {
  704. return &d[n-1]
  705. }
  706. return nil
  707. }
  708. // copyDeltas returns a shallow copy of d; that is, it copies the slice but not
  709. // the objects in the slice. This allows Get/List to return an object that we
  710. // know won't be clobbered by a subsequent modifications.
  711. func copyDeltas(d Deltas) Deltas {
  712. d2 := make(Deltas, len(d))
  713. copy(d2, d)
  714. return d2
  715. }
  716. // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object
  717. // was deleted but the watch deletion event was missed while disconnected from
  718. // apiserver. In this case we don't know the final "resting" state of the object, so
  719. // there's a chance the included `Obj` is stale.
  720. type DeletedFinalStateUnknown struct {
  721. Key string
  722. Obj interface{}
  723. }