shared_informer.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "fmt"
  17. "sync"
  18. "time"
  19. "k8s.io/apimachinery/pkg/api/meta"
  20. "k8s.io/apimachinery/pkg/runtime"
  21. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  22. "k8s.io/apimachinery/pkg/util/wait"
  23. "k8s.io/client-go/tools/cache/synctrack"
  24. "k8s.io/utils/buffer"
  25. "k8s.io/utils/clock"
  26. "k8s.io/klog/v2"
  27. )
  28. // SharedInformer provides eventually consistent linkage of its
  29. // clients to the authoritative state of a given collection of
  30. // objects. An object is identified by its API group, kind/resource,
  31. // namespace (if any), and name; the `ObjectMeta.UID` is not part of
  32. // an object's ID as far as this contract is concerned. One
  33. // SharedInformer provides linkage to objects of a particular API
  34. // group and kind/resource. The linked object collection of a
  35. // SharedInformer may be further restricted to one namespace (if
  36. // applicable) and/or by label selector and/or field selector.
  37. //
  38. // The authoritative state of an object is what apiservers provide
  39. // access to, and an object goes through a strict sequence of states.
  40. // An object state is either (1) present with a ResourceVersion and
  41. // other appropriate content or (2) "absent".
  42. //
  43. // A SharedInformer maintains a local cache --- exposed by GetStore(),
  44. // by GetIndexer() in the case of an indexed informer, and possibly by
  45. // machinery involved in creating and/or accessing the informer --- of
  46. // the state of each relevant object. This cache is eventually
  47. // consistent with the authoritative state. This means that, unless
  48. // prevented by persistent communication problems, if ever a
  49. // particular object ID X is authoritatively associated with a state S
  50. // then for every SharedInformer I whose collection includes (X, S)
  51. // eventually either (1) I's cache associates X with S or a later
  52. // state of X, (2) I is stopped, or (3) the authoritative state
  53. // service for X terminates. To be formally complete, we say that the
  54. // absent state meets any restriction by label selector or field
  55. // selector.
  56. //
  57. // For a given informer and relevant object ID X, the sequence of
  58. // states that appears in the informer's cache is a subsequence of the
  59. // states authoritatively associated with X. That is, some states
  60. // might never appear in the cache but ordering among the appearing
  61. // states is correct. Note, however, that there is no promise about
  62. // ordering between states seen for different objects.
  63. //
  64. // The local cache starts out empty, and gets populated and updated
  65. // during `Run()`.
  66. //
  67. // As a simple example, if a collection of objects is henceforth
  68. // unchanging, a SharedInformer is created that links to that
  69. // collection, and that SharedInformer is `Run()` then that
  70. // SharedInformer's cache eventually holds an exact copy of that
  71. // collection (unless it is stopped too soon, the authoritative state
  72. // service ends, or communication problems between the two
  73. // persistently thwart achievement).
  74. //
  75. // As another simple example, if the local cache ever holds a
  76. // non-absent state for some object ID and the object is eventually
  77. // removed from the authoritative state then eventually the object is
  78. // removed from the local cache (unless the SharedInformer is stopped
  79. // too soon, the authoritative state service ends, or communication
  80. // problems persistently thwart the desired result).
  81. //
  82. // The keys in the Store are of the form namespace/name for namespaced
  83. // objects, and are simply the name for non-namespaced objects.
  84. // Clients can use `MetaNamespaceKeyFunc(obj)` to extract the key for
  85. // a given object, and `SplitMetaNamespaceKey(key)` to split a key
  86. // into its constituent parts.
  87. //
  88. // Every query against the local cache is answered entirely from one
  89. // snapshot of the cache's state. Thus, the result of a `List` call
  90. // will not contain two entries with the same namespace and name.
  91. //
  92. // A client is identified here by a ResourceEventHandler. For every
  93. // update to the SharedInformer's local cache and for every client
  94. // added before `Run()`, eventually either the SharedInformer is
  95. // stopped or the client is notified of the update. A client added
  96. // after `Run()` starts gets a startup batch of notifications of
  97. // additions of the objects existing in the cache at the time that
  98. // client was added; also, for every update to the SharedInformer's
  99. // local cache after that client was added, eventually either the
  100. // SharedInformer is stopped or that client is notified of that
  101. // update. Client notifications happen after the corresponding cache
  102. // update and, in the case of a SharedIndexInformer, after the
  103. // corresponding index updates. It is possible that additional cache
  104. // and index updates happen before such a prescribed notification.
  105. // For a given SharedInformer and client, the notifications are
  106. // delivered sequentially. For a given SharedInformer, client, and
  107. // object ID, the notifications are delivered in order. Because
  108. // `ObjectMeta.UID` has no role in identifying objects, it is possible
  109. // that when (1) object O1 with ID (e.g. namespace and name) X and
  110. // `ObjectMeta.UID` U1 in the SharedInformer's local cache is deleted
  111. // and later (2) another object O2 with ID X and ObjectMeta.UID U2 is
  112. // created the informer's clients are not notified of (1) and (2) but
  113. // rather are notified only of an update from O1 to O2. Clients that
  114. // need to detect such cases might do so by comparing the `ObjectMeta.UID`
  115. // field of the old and the new object in the code that handles update
  116. // notifications (i.e. `OnUpdate` method of ResourceEventHandler).
  117. //
  118. // A client must process each notification promptly; a SharedInformer
  119. // is not engineered to deal well with a large backlog of
  120. // notifications to deliver. Lengthy processing should be passed off
  121. // to something else, for example through a
  122. // `client-go/util/workqueue`.
  123. //
  124. // A delete notification exposes the last locally known non-absent
  125. // state, except that its ResourceVersion is replaced with a
  126. // ResourceVersion in which the object is actually absent.
  127. type SharedInformer interface {
  128. // AddEventHandler adds an event handler to the shared informer using
  129. // the shared informer's resync period. Events to a single handler are
  130. // delivered sequentially, but there is no coordination between
  131. // different handlers.
  132. // It returns a registration handle for the handler that can be used to
  133. // remove the handler again, or to tell if the handler is synced (has
  134. // seen every item in the initial list).
  135. AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
  136. // AddEventHandlerWithResyncPeriod adds an event handler to the
  137. // shared informer with the requested resync period; zero means
  138. // this handler does not care about resyncs. The resync operation
  139. // consists of delivering to the handler an update notification
  140. // for every object in the informer's local cache; it does not add
  141. // any interactions with the authoritative storage. Some
  142. // informers do no resyncs at all, not even for handlers added
  143. // with a non-zero resyncPeriod. For an informer that does
  144. // resyncs, and for each handler that requests resyncs, that
  145. // informer develops a nominal resync period that is no shorter
  146. // than the requested period but may be longer. The actual time
  147. // between any two resyncs may be longer than the nominal period
  148. // because the implementation takes time to do work and there may
  149. // be competing load and scheduling noise.
  150. // It returns a registration handle for the handler that can be used to remove
  151. // the handler again and an error if the handler cannot be added.
  152. AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
  153. // RemoveEventHandler removes a formerly added event handler given by
  154. // its registration handle.
  155. // This function is guaranteed to be idempotent, and thread-safe.
  156. RemoveEventHandler(handle ResourceEventHandlerRegistration) error
  157. // GetStore returns the informer's local cache as a Store.
  158. GetStore() Store
  159. // GetController is deprecated, it does nothing useful
  160. GetController() Controller
  161. // Run starts and runs the shared informer, returning after it stops.
  162. // The informer will be stopped when stopCh is closed.
  163. Run(stopCh <-chan struct{})
  164. // HasSynced returns true if the shared informer's store has been
  165. // informed by at least one full LIST of the authoritative state
  166. // of the informer's object collection. This is unrelated to "resync".
  167. //
  168. // Note that this doesn't tell you if an individual handler is synced!!
  169. // For that, please call HasSynced on the handle returned by
  170. // AddEventHandler.
  171. HasSynced() bool
  172. // LastSyncResourceVersion is the resource version observed when last synced with the underlying
  173. // store. The value returned is not synchronized with access to the underlying store and is not
  174. // thread-safe.
  175. LastSyncResourceVersion() string
  176. // The WatchErrorHandler is called whenever ListAndWatch drops the
  177. // connection with an error. After calling this handler, the informer
  178. // will backoff and retry.
  179. //
  180. // The default implementation looks at the error type and tries to log
  181. // the error message at an appropriate level.
  182. //
  183. // There's only one handler, so if you call this multiple times, last one
  184. // wins; calling after the informer has been started returns an error.
  185. //
  186. // The handler is intended for visibility, not to e.g. pause the consumers.
  187. // The handler should return quickly - any expensive processing should be
  188. // offloaded.
  189. SetWatchErrorHandler(handler WatchErrorHandler) error
  190. // The TransformFunc is called for each object which is about to be stored.
  191. //
  192. // This function is intended for you to take the opportunity to
  193. // remove, transform, or normalize fields. One use case is to strip unused
  194. // metadata fields out of objects to save on RAM cost.
  195. //
  196. // Must be set before starting the informer.
  197. //
  198. // Please see the comment on TransformFunc for more details.
  199. SetTransform(handler TransformFunc) error
  200. // IsStopped reports whether the informer has already been stopped.
  201. // Adding event handlers to already stopped informers is not possible.
  202. // An informer already stopped will never be started again.
  203. IsStopped() bool
  204. }
  205. // Opaque interface representing the registration of ResourceEventHandler for
  206. // a SharedInformer. Must be supplied back to the same SharedInformer's
  207. // `RemoveEventHandler` to unregister the handlers.
  208. //
  209. // Also used to tell if the handler is synced (has had all items in the initial
  210. // list delivered).
  211. type ResourceEventHandlerRegistration interface {
  212. // HasSynced reports if both the parent has synced and all pre-sync
  213. // events have been delivered.
  214. HasSynced() bool
  215. }
  216. // SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
  217. type SharedIndexInformer interface {
  218. SharedInformer
  219. // AddIndexers add indexers to the informer before it starts.
  220. AddIndexers(indexers Indexers) error
  221. GetIndexer() Indexer
  222. }
  223. // NewSharedInformer creates a new instance for the ListerWatcher. See NewSharedIndexInformerWithOptions for full details.
  224. func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
  225. return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
  226. }
  227. // NewSharedIndexInformer creates a new instance for the ListerWatcher and specified Indexers. See
  228. // NewSharedIndexInformerWithOptions for full details.
  229. func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
  230. return NewSharedIndexInformerWithOptions(
  231. lw,
  232. exampleObject,
  233. SharedIndexInformerOptions{
  234. ResyncPeriod: defaultEventHandlerResyncPeriod,
  235. Indexers: indexers,
  236. },
  237. )
  238. }
  239. // NewSharedIndexInformerWithOptions creates a new instance for the ListerWatcher.
  240. // The created informer will not do resyncs if options.ResyncPeriod is zero. Otherwise: for each
  241. // handler that with a non-zero requested resync period, whether added
  242. // before or after the informer starts, the nominal resync period is
  243. // the requested resync period rounded up to a multiple of the
  244. // informer's resync checking period. Such an informer's resync
  245. // checking period is established when the informer starts running,
  246. // and is the maximum of (a) the minimum of the resync periods
  247. // requested before the informer starts and the
  248. // options.ResyncPeriod given here and (b) the constant
  249. // `minimumResyncPeriod` defined in this file.
  250. func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
  251. realClock := &clock.RealClock{}
  252. return &sharedIndexInformer{
  253. indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
  254. processor: &sharedProcessor{clock: realClock},
  255. listerWatcher: lw,
  256. objectType: exampleObject,
  257. objectDescription: options.ObjectDescription,
  258. resyncCheckPeriod: options.ResyncPeriod,
  259. defaultEventHandlerResyncPeriod: options.ResyncPeriod,
  260. clock: realClock,
  261. cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
  262. }
  263. }
  264. // SharedIndexInformerOptions configures a sharedIndexInformer.
  265. type SharedIndexInformerOptions struct {
  266. // ResyncPeriod is the default event handler resync period and resync check
  267. // period. If unset/unspecified, these are defaulted to 0 (do not resync).
  268. ResyncPeriod time.Duration
  269. // Indexers is the sharedIndexInformer's indexers. If unset/unspecified, no indexers are configured.
  270. Indexers Indexers
  271. // ObjectDescription is the sharedIndexInformer's object description. This is passed through to the
  272. // underlying Reflector's type description.
  273. ObjectDescription string
  274. }
  275. // InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
  276. type InformerSynced func() bool
  277. const (
  278. // syncedPollPeriod controls how often you look at the status of your sync funcs
  279. syncedPollPeriod = 100 * time.Millisecond
  280. // initialBufferSize is the initial number of event notifications that can be buffered.
  281. initialBufferSize = 1024
  282. )
  283. // WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages
  284. // indicating that the caller identified by name is waiting for syncs, followed by
  285. // either a successful or failed sync.
  286. func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
  287. klog.Infof("Waiting for caches to sync for %s", controllerName)
  288. if !WaitForCacheSync(stopCh, cacheSyncs...) {
  289. utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName))
  290. return false
  291. }
  292. klog.Infof("Caches are synced for %s", controllerName)
  293. return true
  294. }
  295. // WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
  296. // if the controller should shutdown
  297. // callers should prefer WaitForNamedCacheSync()
  298. func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
  299. err := wait.PollImmediateUntil(syncedPollPeriod,
  300. func() (bool, error) {
  301. for _, syncFunc := range cacheSyncs {
  302. if !syncFunc() {
  303. return false, nil
  304. }
  305. }
  306. return true, nil
  307. },
  308. stopCh)
  309. if err != nil {
  310. klog.V(2).Infof("stop requested")
  311. return false
  312. }
  313. klog.V(4).Infof("caches populated")
  314. return true
  315. }
  316. // `*sharedIndexInformer` implements SharedIndexInformer and has three
  317. // main components. One is an indexed local cache, `indexer Indexer`.
  318. // The second main component is a Controller that pulls
  319. // objects/notifications using the ListerWatcher and pushes them into
  320. // a DeltaFIFO --- whose knownObjects is the informer's local cache
  321. // --- while concurrently Popping Deltas values from that fifo and
  322. // processing them with `sharedIndexInformer::HandleDeltas`. Each
  323. // invocation of HandleDeltas, which is done with the fifo's lock
  324. // held, processes each Delta in turn. For each Delta this both
  325. // updates the local cache and stuffs the relevant notification into
  326. // the sharedProcessor. The third main component is that
  327. // sharedProcessor, which is responsible for relaying those
  328. // notifications to each of the informer's clients.
  329. type sharedIndexInformer struct {
  330. indexer Indexer
  331. controller Controller
  332. processor *sharedProcessor
  333. cacheMutationDetector MutationDetector
  334. listerWatcher ListerWatcher
  335. // objectType is an example object of the type this informer is expected to handle. If set, an event
  336. // with an object with a mismatching type is dropped instead of being delivered to listeners.
  337. objectType runtime.Object
  338. // objectDescription is the description of this informer's objects. This typically defaults to
  339. objectDescription string
  340. // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
  341. // shouldResync to check if any of our listeners need a resync.
  342. resyncCheckPeriod time.Duration
  343. // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
  344. // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
  345. // value).
  346. defaultEventHandlerResyncPeriod time.Duration
  347. // clock allows for testability
  348. clock clock.Clock
  349. started, stopped bool
  350. startedLock sync.Mutex
  351. // blockDeltas gives a way to stop all event distribution so that a late event handler
  352. // can safely join the shared informer.
  353. blockDeltas sync.Mutex
  354. // Called whenever the ListAndWatch drops the connection with an error.
  355. watchErrorHandler WatchErrorHandler
  356. transform TransformFunc
  357. }
  358. // dummyController hides the fact that a SharedInformer is different from a dedicated one
  359. // where a caller can `Run`. The run method is disconnected in this case, because higher
  360. // level logic will decide when to start the SharedInformer and related controller.
  361. // Because returning information back is always asynchronous, the legacy callers shouldn't
  362. // notice any change in behavior.
  363. type dummyController struct {
  364. informer *sharedIndexInformer
  365. }
  366. func (v *dummyController) Run(stopCh <-chan struct{}) {
  367. }
  368. func (v *dummyController) HasSynced() bool {
  369. return v.informer.HasSynced()
  370. }
  371. func (v *dummyController) LastSyncResourceVersion() string {
  372. return ""
  373. }
  374. type updateNotification struct {
  375. oldObj interface{}
  376. newObj interface{}
  377. }
  378. type addNotification struct {
  379. newObj interface{}
  380. isInInitialList bool
  381. }
  382. type deleteNotification struct {
  383. oldObj interface{}
  384. }
  385. func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
  386. s.startedLock.Lock()
  387. defer s.startedLock.Unlock()
  388. if s.started {
  389. return fmt.Errorf("informer has already started")
  390. }
  391. s.watchErrorHandler = handler
  392. return nil
  393. }
  394. func (s *sharedIndexInformer) SetTransform(handler TransformFunc) error {
  395. s.startedLock.Lock()
  396. defer s.startedLock.Unlock()
  397. if s.started {
  398. return fmt.Errorf("informer has already started")
  399. }
  400. s.transform = handler
  401. return nil
  402. }
  403. func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  404. defer utilruntime.HandleCrash()
  405. if s.HasStarted() {
  406. klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
  407. return
  408. }
  409. func() {
  410. s.startedLock.Lock()
  411. defer s.startedLock.Unlock()
  412. fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
  413. KnownObjects: s.indexer,
  414. EmitDeltaTypeReplaced: true,
  415. Transformer: s.transform,
  416. })
  417. cfg := &Config{
  418. Queue: fifo,
  419. ListerWatcher: s.listerWatcher,
  420. ObjectType: s.objectType,
  421. ObjectDescription: s.objectDescription,
  422. FullResyncPeriod: s.resyncCheckPeriod,
  423. RetryOnError: false,
  424. ShouldResync: s.processor.shouldResync,
  425. Process: s.HandleDeltas,
  426. WatchErrorHandler: s.watchErrorHandler,
  427. }
  428. s.controller = New(cfg)
  429. s.controller.(*controller).clock = s.clock
  430. s.started = true
  431. }()
  432. // Separate stop channel because Processor should be stopped strictly after controller
  433. processorStopCh := make(chan struct{})
  434. var wg wait.Group
  435. defer wg.Wait() // Wait for Processor to stop
  436. defer close(processorStopCh) // Tell Processor to stop
  437. wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
  438. wg.StartWithChannel(processorStopCh, s.processor.run)
  439. defer func() {
  440. s.startedLock.Lock()
  441. defer s.startedLock.Unlock()
  442. s.stopped = true // Don't want any new listeners
  443. }()
  444. s.controller.Run(stopCh)
  445. }
  446. func (s *sharedIndexInformer) HasStarted() bool {
  447. s.startedLock.Lock()
  448. defer s.startedLock.Unlock()
  449. return s.started
  450. }
  451. func (s *sharedIndexInformer) HasSynced() bool {
  452. s.startedLock.Lock()
  453. defer s.startedLock.Unlock()
  454. if s.controller == nil {
  455. return false
  456. }
  457. return s.controller.HasSynced()
  458. }
  459. func (s *sharedIndexInformer) LastSyncResourceVersion() string {
  460. s.startedLock.Lock()
  461. defer s.startedLock.Unlock()
  462. if s.controller == nil {
  463. return ""
  464. }
  465. return s.controller.LastSyncResourceVersion()
  466. }
  467. func (s *sharedIndexInformer) GetStore() Store {
  468. return s.indexer
  469. }
  470. func (s *sharedIndexInformer) GetIndexer() Indexer {
  471. return s.indexer
  472. }
  473. func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
  474. s.startedLock.Lock()
  475. defer s.startedLock.Unlock()
  476. if s.started {
  477. return fmt.Errorf("informer has already started")
  478. }
  479. return s.indexer.AddIndexers(indexers)
  480. }
  481. func (s *sharedIndexInformer) GetController() Controller {
  482. return &dummyController{informer: s}
  483. }
  484. func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
  485. return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
  486. }
  487. func determineResyncPeriod(desired, check time.Duration) time.Duration {
  488. if desired == 0 {
  489. return desired
  490. }
  491. if check == 0 {
  492. klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
  493. return 0
  494. }
  495. if desired < check {
  496. klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
  497. return check
  498. }
  499. return desired
  500. }
  501. const minimumResyncPeriod = 1 * time.Second
  502. func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
  503. s.startedLock.Lock()
  504. defer s.startedLock.Unlock()
  505. if s.stopped {
  506. return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler)
  507. }
  508. if resyncPeriod > 0 {
  509. if resyncPeriod < minimumResyncPeriod {
  510. klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)
  511. resyncPeriod = minimumResyncPeriod
  512. }
  513. if resyncPeriod < s.resyncCheckPeriod {
  514. if s.started {
  515. klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
  516. resyncPeriod = s.resyncCheckPeriod
  517. } else {
  518. // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
  519. // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
  520. // accordingly
  521. s.resyncCheckPeriod = resyncPeriod
  522. s.processor.resyncCheckPeriodChanged(resyncPeriod)
  523. }
  524. }
  525. }
  526. listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
  527. if !s.started {
  528. return s.processor.addListener(listener), nil
  529. }
  530. // in order to safely join, we have to
  531. // 1. stop sending add/update/delete notifications
  532. // 2. do a list against the store
  533. // 3. send synthetic "Add" events to the new handler
  534. // 4. unblock
  535. s.blockDeltas.Lock()
  536. defer s.blockDeltas.Unlock()
  537. handle := s.processor.addListener(listener)
  538. for _, item := range s.indexer.List() {
  539. // Note that we enqueue these notifications with the lock held
  540. // and before returning the handle. That means there is never a
  541. // chance for anyone to call the handle's HasSynced method in a
  542. // state when it would falsely return true (i.e., when the
  543. // shared informer is synced but it has not observed an Add
  544. // with isInitialList being true, nor when the thread
  545. // processing notifications somehow goes faster than this
  546. // thread adding them and the counter is temporarily zero).
  547. listener.add(addNotification{newObj: item, isInInitialList: true})
  548. }
  549. return handle, nil
  550. }
  551. func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
  552. s.blockDeltas.Lock()
  553. defer s.blockDeltas.Unlock()
  554. if deltas, ok := obj.(Deltas); ok {
  555. return processDeltas(s, s.indexer, deltas, isInInitialList)
  556. }
  557. return errors.New("object given as Process argument is not Deltas")
  558. }
  559. // Conforms to ResourceEventHandler
  560. func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
  561. // Invocation of this function is locked under s.blockDeltas, so it is
  562. // save to distribute the notification
  563. s.cacheMutationDetector.AddObject(obj)
  564. s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
  565. }
  566. // Conforms to ResourceEventHandler
  567. func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
  568. isSync := false
  569. // If is a Sync event, isSync should be true
  570. // If is a Replaced event, isSync is true if resource version is unchanged.
  571. // If RV is unchanged: this is a Sync/Replaced event, so isSync is true
  572. if accessor, err := meta.Accessor(new); err == nil {
  573. if oldAccessor, err := meta.Accessor(old); err == nil {
  574. // Events that didn't change resourceVersion are treated as resync events
  575. // and only propagated to listeners that requested resync
  576. isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
  577. }
  578. }
  579. // Invocation of this function is locked under s.blockDeltas, so it is
  580. // save to distribute the notification
  581. s.cacheMutationDetector.AddObject(new)
  582. s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
  583. }
  584. // Conforms to ResourceEventHandler
  585. func (s *sharedIndexInformer) OnDelete(old interface{}) {
  586. // Invocation of this function is locked under s.blockDeltas, so it is
  587. // save to distribute the notification
  588. s.processor.distribute(deleteNotification{oldObj: old}, false)
  589. }
  590. // IsStopped reports whether the informer has already been stopped
  591. func (s *sharedIndexInformer) IsStopped() bool {
  592. s.startedLock.Lock()
  593. defer s.startedLock.Unlock()
  594. return s.stopped
  595. }
  596. func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error {
  597. s.startedLock.Lock()
  598. defer s.startedLock.Unlock()
  599. // in order to safely remove, we have to
  600. // 1. stop sending add/update/delete notifications
  601. // 2. remove and stop listener
  602. // 3. unblock
  603. s.blockDeltas.Lock()
  604. defer s.blockDeltas.Unlock()
  605. return s.processor.removeListener(handle)
  606. }
  607. // sharedProcessor has a collection of processorListener and can
  608. // distribute a notification object to its listeners. There are two
  609. // kinds of distribute operations. The sync distributions go to a
  610. // subset of the listeners that (a) is recomputed in the occasional
  611. // calls to shouldResync and (b) every listener is initially put in.
  612. // The non-sync distributions go to every listener.
  613. type sharedProcessor struct {
  614. listenersStarted bool
  615. listenersLock sync.RWMutex
  616. // Map from listeners to whether or not they are currently syncing
  617. listeners map[*processorListener]bool
  618. clock clock.Clock
  619. wg wait.Group
  620. }
  621. func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener {
  622. p.listenersLock.RLock()
  623. defer p.listenersLock.RUnlock()
  624. if p.listeners == nil {
  625. return nil
  626. }
  627. if result, ok := registration.(*processorListener); ok {
  628. if _, exists := p.listeners[result]; exists {
  629. return result
  630. }
  631. }
  632. return nil
  633. }
  634. func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
  635. p.listenersLock.Lock()
  636. defer p.listenersLock.Unlock()
  637. if p.listeners == nil {
  638. p.listeners = make(map[*processorListener]bool)
  639. }
  640. p.listeners[listener] = true
  641. if p.listenersStarted {
  642. p.wg.Start(listener.run)
  643. p.wg.Start(listener.pop)
  644. }
  645. return listener
  646. }
  647. func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration) error {
  648. p.listenersLock.Lock()
  649. defer p.listenersLock.Unlock()
  650. listener, ok := handle.(*processorListener)
  651. if !ok {
  652. return fmt.Errorf("invalid key type %t", handle)
  653. } else if p.listeners == nil {
  654. // No listeners are registered, do nothing
  655. return nil
  656. } else if _, exists := p.listeners[listener]; !exists {
  657. // Listener is not registered, just do nothing
  658. return nil
  659. }
  660. delete(p.listeners, listener)
  661. if p.listenersStarted {
  662. close(listener.addCh)
  663. }
  664. return nil
  665. }
  666. func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
  667. p.listenersLock.RLock()
  668. defer p.listenersLock.RUnlock()
  669. for listener, isSyncing := range p.listeners {
  670. switch {
  671. case !sync:
  672. // non-sync messages are delivered to every listener
  673. listener.add(obj)
  674. case isSyncing:
  675. // sync messages are delivered to every syncing listener
  676. listener.add(obj)
  677. default:
  678. // skipping a sync obj for a non-syncing listener
  679. }
  680. }
  681. }
  682. func (p *sharedProcessor) run(stopCh <-chan struct{}) {
  683. func() {
  684. p.listenersLock.RLock()
  685. defer p.listenersLock.RUnlock()
  686. for listener := range p.listeners {
  687. p.wg.Start(listener.run)
  688. p.wg.Start(listener.pop)
  689. }
  690. p.listenersStarted = true
  691. }()
  692. <-stopCh
  693. p.listenersLock.Lock()
  694. defer p.listenersLock.Unlock()
  695. for listener := range p.listeners {
  696. close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
  697. }
  698. // Wipe out list of listeners since they are now closed
  699. // (processorListener cannot be re-used)
  700. p.listeners = nil
  701. // Reset to false since no listeners are running
  702. p.listenersStarted = false
  703. p.wg.Wait() // Wait for all .pop() and .run() to stop
  704. }
  705. // shouldResync queries every listener to determine if any of them need a resync, based on each
  706. // listener's resyncPeriod.
  707. func (p *sharedProcessor) shouldResync() bool {
  708. p.listenersLock.Lock()
  709. defer p.listenersLock.Unlock()
  710. resyncNeeded := false
  711. now := p.clock.Now()
  712. for listener := range p.listeners {
  713. // need to loop through all the listeners to see if they need to resync so we can prepare any
  714. // listeners that are going to be resyncing.
  715. shouldResync := listener.shouldResync(now)
  716. p.listeners[listener] = shouldResync
  717. if shouldResync {
  718. resyncNeeded = true
  719. listener.determineNextResync(now)
  720. }
  721. }
  722. return resyncNeeded
  723. }
  724. func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
  725. p.listenersLock.RLock()
  726. defer p.listenersLock.RUnlock()
  727. for listener := range p.listeners {
  728. resyncPeriod := determineResyncPeriod(
  729. listener.requestedResyncPeriod, resyncCheckPeriod)
  730. listener.setResyncPeriod(resyncPeriod)
  731. }
  732. }
  733. // processorListener relays notifications from a sharedProcessor to
  734. // one ResourceEventHandler --- using two goroutines, two unbuffered
  735. // channels, and an unbounded ring buffer. The `add(notification)`
  736. // function sends the given notification to `addCh`. One goroutine
  737. // runs `pop()`, which pumps notifications from `addCh` to `nextCh`
  738. // using storage in the ring buffer while `nextCh` is not keeping up.
  739. // Another goroutine runs `run()`, which receives notifications from
  740. // `nextCh` and synchronously invokes the appropriate handler method.
  741. //
  742. // processorListener also keeps track of the adjusted requested resync
  743. // period of the listener.
  744. type processorListener struct {
  745. nextCh chan interface{}
  746. addCh chan interface{}
  747. handler ResourceEventHandler
  748. syncTracker *synctrack.SingleFileTracker
  749. // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
  750. // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
  751. // added until we OOM.
  752. // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
  753. // we should try to do something better.
  754. pendingNotifications buffer.RingGrowing
  755. // requestedResyncPeriod is how frequently the listener wants a
  756. // full resync from the shared informer, but modified by two
  757. // adjustments. One is imposing a lower bound,
  758. // `minimumResyncPeriod`. The other is another lower bound, the
  759. // sharedIndexInformer's `resyncCheckPeriod`, that is imposed (a) only
  760. // in AddEventHandlerWithResyncPeriod invocations made after the
  761. // sharedIndexInformer starts and (b) only if the informer does
  762. // resyncs at all.
  763. requestedResyncPeriod time.Duration
  764. // resyncPeriod is the threshold that will be used in the logic
  765. // for this listener. This value differs from
  766. // requestedResyncPeriod only when the sharedIndexInformer does
  767. // not do resyncs, in which case the value here is zero. The
  768. // actual time between resyncs depends on when the
  769. // sharedProcessor's `shouldResync` function is invoked and when
  770. // the sharedIndexInformer processes `Sync` type Delta objects.
  771. resyncPeriod time.Duration
  772. // nextResync is the earliest time the listener should get a full resync
  773. nextResync time.Time
  774. // resyncLock guards access to resyncPeriod and nextResync
  775. resyncLock sync.Mutex
  776. }
  777. // HasSynced returns true if the source informer has synced, and all
  778. // corresponding events have been delivered.
  779. func (p *processorListener) HasSynced() bool {
  780. return p.syncTracker.HasSynced()
  781. }
  782. func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
  783. ret := &processorListener{
  784. nextCh: make(chan interface{}),
  785. addCh: make(chan interface{}),
  786. handler: handler,
  787. syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
  788. pendingNotifications: *buffer.NewRingGrowing(bufferSize),
  789. requestedResyncPeriod: requestedResyncPeriod,
  790. resyncPeriod: resyncPeriod,
  791. }
  792. ret.determineNextResync(now)
  793. return ret
  794. }
  795. func (p *processorListener) add(notification interface{}) {
  796. if a, ok := notification.(addNotification); ok && a.isInInitialList {
  797. p.syncTracker.Start()
  798. }
  799. p.addCh <- notification
  800. }
  801. func (p *processorListener) pop() {
  802. defer utilruntime.HandleCrash()
  803. defer close(p.nextCh) // Tell .run() to stop
  804. var nextCh chan<- interface{}
  805. var notification interface{}
  806. for {
  807. select {
  808. case nextCh <- notification:
  809. // Notification dispatched
  810. var ok bool
  811. notification, ok = p.pendingNotifications.ReadOne()
  812. if !ok { // Nothing to pop
  813. nextCh = nil // Disable this select case
  814. }
  815. case notificationToAdd, ok := <-p.addCh:
  816. if !ok {
  817. return
  818. }
  819. if notification == nil { // No notification to pop (and pendingNotifications is empty)
  820. // Optimize the case - skip adding to pendingNotifications
  821. notification = notificationToAdd
  822. nextCh = p.nextCh
  823. } else { // There is already a notification waiting to be dispatched
  824. p.pendingNotifications.WriteOne(notificationToAdd)
  825. }
  826. }
  827. }
  828. }
  829. func (p *processorListener) run() {
  830. // this call blocks until the channel is closed. When a panic happens during the notification
  831. // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
  832. // the next notification will be attempted. This is usually better than the alternative of never
  833. // delivering again.
  834. stopCh := make(chan struct{})
  835. wait.Until(func() {
  836. for next := range p.nextCh {
  837. switch notification := next.(type) {
  838. case updateNotification:
  839. p.handler.OnUpdate(notification.oldObj, notification.newObj)
  840. case addNotification:
  841. p.handler.OnAdd(notification.newObj, notification.isInInitialList)
  842. if notification.isInInitialList {
  843. p.syncTracker.Finished()
  844. }
  845. case deleteNotification:
  846. p.handler.OnDelete(notification.oldObj)
  847. default:
  848. utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
  849. }
  850. }
  851. // the only way to get here is if the p.nextCh is empty and closed
  852. close(stopCh)
  853. }, 1*time.Second, stopCh)
  854. }
  855. // shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
  856. // this always returns false.
  857. func (p *processorListener) shouldResync(now time.Time) bool {
  858. p.resyncLock.Lock()
  859. defer p.resyncLock.Unlock()
  860. if p.resyncPeriod == 0 {
  861. return false
  862. }
  863. return now.After(p.nextResync) || now.Equal(p.nextResync)
  864. }
  865. func (p *processorListener) determineNextResync(now time.Time) {
  866. p.resyncLock.Lock()
  867. defer p.resyncLock.Unlock()
  868. p.nextResync = now.Add(p.resyncPeriod)
  869. }
  870. func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
  871. p.resyncLock.Lock()
  872. defer p.resyncLock.Unlock()
  873. p.resyncPeriod = resyncPeriod
  874. }