123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cache
- import (
- "errors"
- "fmt"
- "sync"
- "time"
- "k8s.io/apimachinery/pkg/api/meta"
- "k8s.io/apimachinery/pkg/runtime"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/tools/cache/synctrack"
- "k8s.io/utils/buffer"
- "k8s.io/utils/clock"
- "k8s.io/klog/v2"
- )
- // SharedInformer provides eventually consistent linkage of its
- // clients to the authoritative state of a given collection of
- // objects. An object is identified by its API group, kind/resource,
- // namespace (if any), and name; the `ObjectMeta.UID` is not part of
- // an object's ID as far as this contract is concerned. One
- // SharedInformer provides linkage to objects of a particular API
- // group and kind/resource. The linked object collection of a
- // SharedInformer may be further restricted to one namespace (if
- // applicable) and/or by label selector and/or field selector.
- //
- // The authoritative state of an object is what apiservers provide
- // access to, and an object goes through a strict sequence of states.
- // An object state is either (1) present with a ResourceVersion and
- // other appropriate content or (2) "absent".
- //
- // A SharedInformer maintains a local cache --- exposed by GetStore(),
- // by GetIndexer() in the case of an indexed informer, and possibly by
- // machinery involved in creating and/or accessing the informer --- of
- // the state of each relevant object. This cache is eventually
- // consistent with the authoritative state. This means that, unless
- // prevented by persistent communication problems, if ever a
- // particular object ID X is authoritatively associated with a state S
- // then for every SharedInformer I whose collection includes (X, S)
- // eventually either (1) I's cache associates X with S or a later
- // state of X, (2) I is stopped, or (3) the authoritative state
- // service for X terminates. To be formally complete, we say that the
- // absent state meets any restriction by label selector or field
- // selector.
- //
- // For a given informer and relevant object ID X, the sequence of
- // states that appears in the informer's cache is a subsequence of the
- // states authoritatively associated with X. That is, some states
- // might never appear in the cache but ordering among the appearing
- // states is correct. Note, however, that there is no promise about
- // ordering between states seen for different objects.
- //
- // The local cache starts out empty, and gets populated and updated
- // during `Run()`.
- //
- // As a simple example, if a collection of objects is henceforth
- // unchanging, a SharedInformer is created that links to that
- // collection, and that SharedInformer is `Run()` then that
- // SharedInformer's cache eventually holds an exact copy of that
- // collection (unless it is stopped too soon, the authoritative state
- // service ends, or communication problems between the two
- // persistently thwart achievement).
- //
- // As another simple example, if the local cache ever holds a
- // non-absent state for some object ID and the object is eventually
- // removed from the authoritative state then eventually the object is
- // removed from the local cache (unless the SharedInformer is stopped
- // too soon, the authoritative state service ends, or communication
- // problems persistently thwart the desired result).
- //
- // The keys in the Store are of the form namespace/name for namespaced
- // objects, and are simply the name for non-namespaced objects.
- // Clients can use `MetaNamespaceKeyFunc(obj)` to extract the key for
- // a given object, and `SplitMetaNamespaceKey(key)` to split a key
- // into its constituent parts.
- //
- // Every query against the local cache is answered entirely from one
- // snapshot of the cache's state. Thus, the result of a `List` call
- // will not contain two entries with the same namespace and name.
- //
- // A client is identified here by a ResourceEventHandler. For every
- // update to the SharedInformer's local cache and for every client
- // added before `Run()`, eventually either the SharedInformer is
- // stopped or the client is notified of the update. A client added
- // after `Run()` starts gets a startup batch of notifications of
- // additions of the objects existing in the cache at the time that
- // client was added; also, for every update to the SharedInformer's
- // local cache after that client was added, eventually either the
- // SharedInformer is stopped or that client is notified of that
- // update. Client notifications happen after the corresponding cache
- // update and, in the case of a SharedIndexInformer, after the
- // corresponding index updates. It is possible that additional cache
- // and index updates happen before such a prescribed notification.
- // For a given SharedInformer and client, the notifications are
- // delivered sequentially. For a given SharedInformer, client, and
- // object ID, the notifications are delivered in order. Because
- // `ObjectMeta.UID` has no role in identifying objects, it is possible
- // that when (1) object O1 with ID (e.g. namespace and name) X and
- // `ObjectMeta.UID` U1 in the SharedInformer's local cache is deleted
- // and later (2) another object O2 with ID X and ObjectMeta.UID U2 is
- // created the informer's clients are not notified of (1) and (2) but
- // rather are notified only of an update from O1 to O2. Clients that
- // need to detect such cases might do so by comparing the `ObjectMeta.UID`
- // field of the old and the new object in the code that handles update
- // notifications (i.e. `OnUpdate` method of ResourceEventHandler).
- //
- // A client must process each notification promptly; a SharedInformer
- // is not engineered to deal well with a large backlog of
- // notifications to deliver. Lengthy processing should be passed off
- // to something else, for example through a
- // `client-go/util/workqueue`.
- //
- // A delete notification exposes the last locally known non-absent
- // state, except that its ResourceVersion is replaced with a
- // ResourceVersion in which the object is actually absent.
- type SharedInformer interface {
- // AddEventHandler adds an event handler to the shared informer using
- // the shared informer's resync period. Events to a single handler are
- // delivered sequentially, but there is no coordination between
- // different handlers.
- // It returns a registration handle for the handler that can be used to
- // remove the handler again, or to tell if the handler is synced (has
- // seen every item in the initial list).
- AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
- // AddEventHandlerWithResyncPeriod adds an event handler to the
- // shared informer with the requested resync period; zero means
- // this handler does not care about resyncs. The resync operation
- // consists of delivering to the handler an update notification
- // for every object in the informer's local cache; it does not add
- // any interactions with the authoritative storage. Some
- // informers do no resyncs at all, not even for handlers added
- // with a non-zero resyncPeriod. For an informer that does
- // resyncs, and for each handler that requests resyncs, that
- // informer develops a nominal resync period that is no shorter
- // than the requested period but may be longer. The actual time
- // between any two resyncs may be longer than the nominal period
- // because the implementation takes time to do work and there may
- // be competing load and scheduling noise.
- // It returns a registration handle for the handler that can be used to remove
- // the handler again and an error if the handler cannot be added.
- AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
- // RemoveEventHandler removes a formerly added event handler given by
- // its registration handle.
- // This function is guaranteed to be idempotent, and thread-safe.
- RemoveEventHandler(handle ResourceEventHandlerRegistration) error
- // GetStore returns the informer's local cache as a Store.
- GetStore() Store
- // GetController is deprecated, it does nothing useful
- GetController() Controller
- // Run starts and runs the shared informer, returning after it stops.
- // The informer will be stopped when stopCh is closed.
- Run(stopCh <-chan struct{})
- // HasSynced returns true if the shared informer's store has been
- // informed by at least one full LIST of the authoritative state
- // of the informer's object collection. This is unrelated to "resync".
- //
- // Note that this doesn't tell you if an individual handler is synced!!
- // For that, please call HasSynced on the handle returned by
- // AddEventHandler.
- HasSynced() bool
- // LastSyncResourceVersion is the resource version observed when last synced with the underlying
- // store. The value returned is not synchronized with access to the underlying store and is not
- // thread-safe.
- LastSyncResourceVersion() string
- // The WatchErrorHandler is called whenever ListAndWatch drops the
- // connection with an error. After calling this handler, the informer
- // will backoff and retry.
- //
- // The default implementation looks at the error type and tries to log
- // the error message at an appropriate level.
- //
- // There's only one handler, so if you call this multiple times, last one
- // wins; calling after the informer has been started returns an error.
- //
- // The handler is intended for visibility, not to e.g. pause the consumers.
- // The handler should return quickly - any expensive processing should be
- // offloaded.
- SetWatchErrorHandler(handler WatchErrorHandler) error
- // The TransformFunc is called for each object which is about to be stored.
- //
- // This function is intended for you to take the opportunity to
- // remove, transform, or normalize fields. One use case is to strip unused
- // metadata fields out of objects to save on RAM cost.
- //
- // Must be set before starting the informer.
- //
- // Please see the comment on TransformFunc for more details.
- SetTransform(handler TransformFunc) error
- // IsStopped reports whether the informer has already been stopped.
- // Adding event handlers to already stopped informers is not possible.
- // An informer already stopped will never be started again.
- IsStopped() bool
- }
- // Opaque interface representing the registration of ResourceEventHandler for
- // a SharedInformer. Must be supplied back to the same SharedInformer's
- // `RemoveEventHandler` to unregister the handlers.
- //
- // Also used to tell if the handler is synced (has had all items in the initial
- // list delivered).
- type ResourceEventHandlerRegistration interface {
- // HasSynced reports if both the parent has synced and all pre-sync
- // events have been delivered.
- HasSynced() bool
- }
- // SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
- type SharedIndexInformer interface {
- SharedInformer
- // AddIndexers add indexers to the informer before it starts.
- AddIndexers(indexers Indexers) error
- GetIndexer() Indexer
- }
- // NewSharedInformer creates a new instance for the ListerWatcher. See NewSharedIndexInformerWithOptions for full details.
- func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
- return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
- }
- // NewSharedIndexInformer creates a new instance for the ListerWatcher and specified Indexers. See
- // NewSharedIndexInformerWithOptions for full details.
- func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
- return NewSharedIndexInformerWithOptions(
- lw,
- exampleObject,
- SharedIndexInformerOptions{
- ResyncPeriod: defaultEventHandlerResyncPeriod,
- Indexers: indexers,
- },
- )
- }
- // NewSharedIndexInformerWithOptions creates a new instance for the ListerWatcher.
- // The created informer will not do resyncs if options.ResyncPeriod is zero. Otherwise: for each
- // handler that with a non-zero requested resync period, whether added
- // before or after the informer starts, the nominal resync period is
- // the requested resync period rounded up to a multiple of the
- // informer's resync checking period. Such an informer's resync
- // checking period is established when the informer starts running,
- // and is the maximum of (a) the minimum of the resync periods
- // requested before the informer starts and the
- // options.ResyncPeriod given here and (b) the constant
- // `minimumResyncPeriod` defined in this file.
- func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
- realClock := &clock.RealClock{}
- return &sharedIndexInformer{
- indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
- processor: &sharedProcessor{clock: realClock},
- listerWatcher: lw,
- objectType: exampleObject,
- objectDescription: options.ObjectDescription,
- resyncCheckPeriod: options.ResyncPeriod,
- defaultEventHandlerResyncPeriod: options.ResyncPeriod,
- clock: realClock,
- cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
- }
- }
- // SharedIndexInformerOptions configures a sharedIndexInformer.
- type SharedIndexInformerOptions struct {
- // ResyncPeriod is the default event handler resync period and resync check
- // period. If unset/unspecified, these are defaulted to 0 (do not resync).
- ResyncPeriod time.Duration
- // Indexers is the sharedIndexInformer's indexers. If unset/unspecified, no indexers are configured.
- Indexers Indexers
- // ObjectDescription is the sharedIndexInformer's object description. This is passed through to the
- // underlying Reflector's type description.
- ObjectDescription string
- }
- // InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
- type InformerSynced func() bool
- const (
- // syncedPollPeriod controls how often you look at the status of your sync funcs
- syncedPollPeriod = 100 * time.Millisecond
- // initialBufferSize is the initial number of event notifications that can be buffered.
- initialBufferSize = 1024
- )
- // WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages
- // indicating that the caller identified by name is waiting for syncs, followed by
- // either a successful or failed sync.
- func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
- klog.Infof("Waiting for caches to sync for %s", controllerName)
- if !WaitForCacheSync(stopCh, cacheSyncs...) {
- utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName))
- return false
- }
- klog.Infof("Caches are synced for %s", controllerName)
- return true
- }
- // WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
- // if the controller should shutdown
- // callers should prefer WaitForNamedCacheSync()
- func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
- err := wait.PollImmediateUntil(syncedPollPeriod,
- func() (bool, error) {
- for _, syncFunc := range cacheSyncs {
- if !syncFunc() {
- return false, nil
- }
- }
- return true, nil
- },
- stopCh)
- if err != nil {
- klog.V(2).Infof("stop requested")
- return false
- }
- klog.V(4).Infof("caches populated")
- return true
- }
- // `*sharedIndexInformer` implements SharedIndexInformer and has three
- // main components. One is an indexed local cache, `indexer Indexer`.
- // The second main component is a Controller that pulls
- // objects/notifications using the ListerWatcher and pushes them into
- // a DeltaFIFO --- whose knownObjects is the informer's local cache
- // --- while concurrently Popping Deltas values from that fifo and
- // processing them with `sharedIndexInformer::HandleDeltas`. Each
- // invocation of HandleDeltas, which is done with the fifo's lock
- // held, processes each Delta in turn. For each Delta this both
- // updates the local cache and stuffs the relevant notification into
- // the sharedProcessor. The third main component is that
- // sharedProcessor, which is responsible for relaying those
- // notifications to each of the informer's clients.
- type sharedIndexInformer struct {
- indexer Indexer
- controller Controller
- processor *sharedProcessor
- cacheMutationDetector MutationDetector
- listerWatcher ListerWatcher
- // objectType is an example object of the type this informer is expected to handle. If set, an event
- // with an object with a mismatching type is dropped instead of being delivered to listeners.
- objectType runtime.Object
- // objectDescription is the description of this informer's objects. This typically defaults to
- objectDescription string
- // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
- // shouldResync to check if any of our listeners need a resync.
- resyncCheckPeriod time.Duration
- // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
- // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
- // value).
- defaultEventHandlerResyncPeriod time.Duration
- // clock allows for testability
- clock clock.Clock
- started, stopped bool
- startedLock sync.Mutex
- // blockDeltas gives a way to stop all event distribution so that a late event handler
- // can safely join the shared informer.
- blockDeltas sync.Mutex
- // Called whenever the ListAndWatch drops the connection with an error.
- watchErrorHandler WatchErrorHandler
- transform TransformFunc
- }
- // dummyController hides the fact that a SharedInformer is different from a dedicated one
- // where a caller can `Run`. The run method is disconnected in this case, because higher
- // level logic will decide when to start the SharedInformer and related controller.
- // Because returning information back is always asynchronous, the legacy callers shouldn't
- // notice any change in behavior.
- type dummyController struct {
- informer *sharedIndexInformer
- }
- func (v *dummyController) Run(stopCh <-chan struct{}) {
- }
- func (v *dummyController) HasSynced() bool {
- return v.informer.HasSynced()
- }
- func (v *dummyController) LastSyncResourceVersion() string {
- return ""
- }
- type updateNotification struct {
- oldObj interface{}
- newObj interface{}
- }
- type addNotification struct {
- newObj interface{}
- isInInitialList bool
- }
- type deleteNotification struct {
- oldObj interface{}
- }
- func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
- if s.started {
- return fmt.Errorf("informer has already started")
- }
- s.watchErrorHandler = handler
- return nil
- }
- func (s *sharedIndexInformer) SetTransform(handler TransformFunc) error {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
- if s.started {
- return fmt.Errorf("informer has already started")
- }
- s.transform = handler
- return nil
- }
- func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
- defer utilruntime.HandleCrash()
- if s.HasStarted() {
- klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
- return
- }
- func() {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
- fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
- KnownObjects: s.indexer,
- EmitDeltaTypeReplaced: true,
- Transformer: s.transform,
- })
- cfg := &Config{
- Queue: fifo,
- ListerWatcher: s.listerWatcher,
- ObjectType: s.objectType,
- ObjectDescription: s.objectDescription,
- FullResyncPeriod: s.resyncCheckPeriod,
- RetryOnError: false,
- ShouldResync: s.processor.shouldResync,
- Process: s.HandleDeltas,
- WatchErrorHandler: s.watchErrorHandler,
- }
- s.controller = New(cfg)
- s.controller.(*controller).clock = s.clock
- s.started = true
- }()
- // Separate stop channel because Processor should be stopped strictly after controller
- processorStopCh := make(chan struct{})
- var wg wait.Group
- defer wg.Wait() // Wait for Processor to stop
- defer close(processorStopCh) // Tell Processor to stop
- wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
- wg.StartWithChannel(processorStopCh, s.processor.run)
- defer func() {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
- s.stopped = true // Don't want any new listeners
- }()
- s.controller.Run(stopCh)
- }
- func (s *sharedIndexInformer) HasStarted() bool {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
- return s.started
- }
- func (s *sharedIndexInformer) HasSynced() bool {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
- if s.controller == nil {
- return false
- }
- return s.controller.HasSynced()
- }
- func (s *sharedIndexInformer) LastSyncResourceVersion() string {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
- if s.controller == nil {
- return ""
- }
- return s.controller.LastSyncResourceVersion()
- }
- func (s *sharedIndexInformer) GetStore() Store {
- return s.indexer
- }
- func (s *sharedIndexInformer) GetIndexer() Indexer {
- return s.indexer
- }
- func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
- if s.started {
- return fmt.Errorf("informer has already started")
- }
- return s.indexer.AddIndexers(indexers)
- }
- func (s *sharedIndexInformer) GetController() Controller {
- return &dummyController{informer: s}
- }
- func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
- return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
- }
- func determineResyncPeriod(desired, check time.Duration) time.Duration {
- if desired == 0 {
- return desired
- }
- if check == 0 {
- klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
- return 0
- }
- if desired < check {
- klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
- return check
- }
- return desired
- }
- const minimumResyncPeriod = 1 * time.Second
- func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
- if s.stopped {
- return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler)
- }
- if resyncPeriod > 0 {
- if resyncPeriod < minimumResyncPeriod {
- klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)
- resyncPeriod = minimumResyncPeriod
- }
- if resyncPeriod < s.resyncCheckPeriod {
- if s.started {
- klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
- resyncPeriod = s.resyncCheckPeriod
- } else {
- // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
- // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
- // accordingly
- s.resyncCheckPeriod = resyncPeriod
- s.processor.resyncCheckPeriodChanged(resyncPeriod)
- }
- }
- }
- listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
- if !s.started {
- return s.processor.addListener(listener), nil
- }
- // in order to safely join, we have to
- // 1. stop sending add/update/delete notifications
- // 2. do a list against the store
- // 3. send synthetic "Add" events to the new handler
- // 4. unblock
- s.blockDeltas.Lock()
- defer s.blockDeltas.Unlock()
- handle := s.processor.addListener(listener)
- for _, item := range s.indexer.List() {
- // Note that we enqueue these notifications with the lock held
- // and before returning the handle. That means there is never a
- // chance for anyone to call the handle's HasSynced method in a
- // state when it would falsely return true (i.e., when the
- // shared informer is synced but it has not observed an Add
- // with isInitialList being true, nor when the thread
- // processing notifications somehow goes faster than this
- // thread adding them and the counter is temporarily zero).
- listener.add(addNotification{newObj: item, isInInitialList: true})
- }
- return handle, nil
- }
- func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
- s.blockDeltas.Lock()
- defer s.blockDeltas.Unlock()
- if deltas, ok := obj.(Deltas); ok {
- return processDeltas(s, s.indexer, deltas, isInInitialList)
- }
- return errors.New("object given as Process argument is not Deltas")
- }
- // Conforms to ResourceEventHandler
- func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
- // Invocation of this function is locked under s.blockDeltas, so it is
- // save to distribute the notification
- s.cacheMutationDetector.AddObject(obj)
- s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
- }
- // Conforms to ResourceEventHandler
- func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
- isSync := false
- // If is a Sync event, isSync should be true
- // If is a Replaced event, isSync is true if resource version is unchanged.
- // If RV is unchanged: this is a Sync/Replaced event, so isSync is true
- if accessor, err := meta.Accessor(new); err == nil {
- if oldAccessor, err := meta.Accessor(old); err == nil {
- // Events that didn't change resourceVersion are treated as resync events
- // and only propagated to listeners that requested resync
- isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
- }
- }
- // Invocation of this function is locked under s.blockDeltas, so it is
- // save to distribute the notification
- s.cacheMutationDetector.AddObject(new)
- s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
- }
- // Conforms to ResourceEventHandler
- func (s *sharedIndexInformer) OnDelete(old interface{}) {
- // Invocation of this function is locked under s.blockDeltas, so it is
- // save to distribute the notification
- s.processor.distribute(deleteNotification{oldObj: old}, false)
- }
- // IsStopped reports whether the informer has already been stopped
- func (s *sharedIndexInformer) IsStopped() bool {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
- return s.stopped
- }
- func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error {
- s.startedLock.Lock()
- defer s.startedLock.Unlock()
- // in order to safely remove, we have to
- // 1. stop sending add/update/delete notifications
- // 2. remove and stop listener
- // 3. unblock
- s.blockDeltas.Lock()
- defer s.blockDeltas.Unlock()
- return s.processor.removeListener(handle)
- }
- // sharedProcessor has a collection of processorListener and can
- // distribute a notification object to its listeners. There are two
- // kinds of distribute operations. The sync distributions go to a
- // subset of the listeners that (a) is recomputed in the occasional
- // calls to shouldResync and (b) every listener is initially put in.
- // The non-sync distributions go to every listener.
- type sharedProcessor struct {
- listenersStarted bool
- listenersLock sync.RWMutex
- // Map from listeners to whether or not they are currently syncing
- listeners map[*processorListener]bool
- clock clock.Clock
- wg wait.Group
- }
- func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener {
- p.listenersLock.RLock()
- defer p.listenersLock.RUnlock()
- if p.listeners == nil {
- return nil
- }
- if result, ok := registration.(*processorListener); ok {
- if _, exists := p.listeners[result]; exists {
- return result
- }
- }
- return nil
- }
- func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
- p.listenersLock.Lock()
- defer p.listenersLock.Unlock()
- if p.listeners == nil {
- p.listeners = make(map[*processorListener]bool)
- }
- p.listeners[listener] = true
- if p.listenersStarted {
- p.wg.Start(listener.run)
- p.wg.Start(listener.pop)
- }
- return listener
- }
- func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration) error {
- p.listenersLock.Lock()
- defer p.listenersLock.Unlock()
- listener, ok := handle.(*processorListener)
- if !ok {
- return fmt.Errorf("invalid key type %t", handle)
- } else if p.listeners == nil {
- // No listeners are registered, do nothing
- return nil
- } else if _, exists := p.listeners[listener]; !exists {
- // Listener is not registered, just do nothing
- return nil
- }
- delete(p.listeners, listener)
- if p.listenersStarted {
- close(listener.addCh)
- }
- return nil
- }
- func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
- p.listenersLock.RLock()
- defer p.listenersLock.RUnlock()
- for listener, isSyncing := range p.listeners {
- switch {
- case !sync:
- // non-sync messages are delivered to every listener
- listener.add(obj)
- case isSyncing:
- // sync messages are delivered to every syncing listener
- listener.add(obj)
- default:
- // skipping a sync obj for a non-syncing listener
- }
- }
- }
- func (p *sharedProcessor) run(stopCh <-chan struct{}) {
- func() {
- p.listenersLock.RLock()
- defer p.listenersLock.RUnlock()
- for listener := range p.listeners {
- p.wg.Start(listener.run)
- p.wg.Start(listener.pop)
- }
- p.listenersStarted = true
- }()
- <-stopCh
- p.listenersLock.Lock()
- defer p.listenersLock.Unlock()
- for listener := range p.listeners {
- close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
- }
- // Wipe out list of listeners since they are now closed
- // (processorListener cannot be re-used)
- p.listeners = nil
- // Reset to false since no listeners are running
- p.listenersStarted = false
- p.wg.Wait() // Wait for all .pop() and .run() to stop
- }
- // shouldResync queries every listener to determine if any of them need a resync, based on each
- // listener's resyncPeriod.
- func (p *sharedProcessor) shouldResync() bool {
- p.listenersLock.Lock()
- defer p.listenersLock.Unlock()
- resyncNeeded := false
- now := p.clock.Now()
- for listener := range p.listeners {
- // need to loop through all the listeners to see if they need to resync so we can prepare any
- // listeners that are going to be resyncing.
- shouldResync := listener.shouldResync(now)
- p.listeners[listener] = shouldResync
- if shouldResync {
- resyncNeeded = true
- listener.determineNextResync(now)
- }
- }
- return resyncNeeded
- }
- func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
- p.listenersLock.RLock()
- defer p.listenersLock.RUnlock()
- for listener := range p.listeners {
- resyncPeriod := determineResyncPeriod(
- listener.requestedResyncPeriod, resyncCheckPeriod)
- listener.setResyncPeriod(resyncPeriod)
- }
- }
- // processorListener relays notifications from a sharedProcessor to
- // one ResourceEventHandler --- using two goroutines, two unbuffered
- // channels, and an unbounded ring buffer. The `add(notification)`
- // function sends the given notification to `addCh`. One goroutine
- // runs `pop()`, which pumps notifications from `addCh` to `nextCh`
- // using storage in the ring buffer while `nextCh` is not keeping up.
- // Another goroutine runs `run()`, which receives notifications from
- // `nextCh` and synchronously invokes the appropriate handler method.
- //
- // processorListener also keeps track of the adjusted requested resync
- // period of the listener.
- type processorListener struct {
- nextCh chan interface{}
- addCh chan interface{}
- handler ResourceEventHandler
- syncTracker *synctrack.SingleFileTracker
- // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
- // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
- // added until we OOM.
- // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
- // we should try to do something better.
- pendingNotifications buffer.RingGrowing
- // requestedResyncPeriod is how frequently the listener wants a
- // full resync from the shared informer, but modified by two
- // adjustments. One is imposing a lower bound,
- // `minimumResyncPeriod`. The other is another lower bound, the
- // sharedIndexInformer's `resyncCheckPeriod`, that is imposed (a) only
- // in AddEventHandlerWithResyncPeriod invocations made after the
- // sharedIndexInformer starts and (b) only if the informer does
- // resyncs at all.
- requestedResyncPeriod time.Duration
- // resyncPeriod is the threshold that will be used in the logic
- // for this listener. This value differs from
- // requestedResyncPeriod only when the sharedIndexInformer does
- // not do resyncs, in which case the value here is zero. The
- // actual time between resyncs depends on when the
- // sharedProcessor's `shouldResync` function is invoked and when
- // the sharedIndexInformer processes `Sync` type Delta objects.
- resyncPeriod time.Duration
- // nextResync is the earliest time the listener should get a full resync
- nextResync time.Time
- // resyncLock guards access to resyncPeriod and nextResync
- resyncLock sync.Mutex
- }
- // HasSynced returns true if the source informer has synced, and all
- // corresponding events have been delivered.
- func (p *processorListener) HasSynced() bool {
- return p.syncTracker.HasSynced()
- }
- func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
- ret := &processorListener{
- nextCh: make(chan interface{}),
- addCh: make(chan interface{}),
- handler: handler,
- syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
- pendingNotifications: *buffer.NewRingGrowing(bufferSize),
- requestedResyncPeriod: requestedResyncPeriod,
- resyncPeriod: resyncPeriod,
- }
- ret.determineNextResync(now)
- return ret
- }
- func (p *processorListener) add(notification interface{}) {
- if a, ok := notification.(addNotification); ok && a.isInInitialList {
- p.syncTracker.Start()
- }
- p.addCh <- notification
- }
- func (p *processorListener) pop() {
- defer utilruntime.HandleCrash()
- defer close(p.nextCh) // Tell .run() to stop
- var nextCh chan<- interface{}
- var notification interface{}
- for {
- select {
- case nextCh <- notification:
- // Notification dispatched
- var ok bool
- notification, ok = p.pendingNotifications.ReadOne()
- if !ok { // Nothing to pop
- nextCh = nil // Disable this select case
- }
- case notificationToAdd, ok := <-p.addCh:
- if !ok {
- return
- }
- if notification == nil { // No notification to pop (and pendingNotifications is empty)
- // Optimize the case - skip adding to pendingNotifications
- notification = notificationToAdd
- nextCh = p.nextCh
- } else { // There is already a notification waiting to be dispatched
- p.pendingNotifications.WriteOne(notificationToAdd)
- }
- }
- }
- }
- func (p *processorListener) run() {
- // this call blocks until the channel is closed. When a panic happens during the notification
- // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
- // the next notification will be attempted. This is usually better than the alternative of never
- // delivering again.
- stopCh := make(chan struct{})
- wait.Until(func() {
- for next := range p.nextCh {
- switch notification := next.(type) {
- case updateNotification:
- p.handler.OnUpdate(notification.oldObj, notification.newObj)
- case addNotification:
- p.handler.OnAdd(notification.newObj, notification.isInInitialList)
- if notification.isInInitialList {
- p.syncTracker.Finished()
- }
- case deleteNotification:
- p.handler.OnDelete(notification.oldObj)
- default:
- utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
- }
- }
- // the only way to get here is if the p.nextCh is empty and closed
- close(stopCh)
- }, 1*time.Second, stopCh)
- }
- // shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
- // this always returns false.
- func (p *processorListener) shouldResync(now time.Time) bool {
- p.resyncLock.Lock()
- defer p.resyncLock.Unlock()
- if p.resyncPeriod == 0 {
- return false
- }
- return now.After(p.nextResync) || now.Equal(p.nextResync)
- }
- func (p *processorListener) determineNextResync(now time.Time) {
- p.resyncLock.Lock()
- defer p.resyncLock.Unlock()
- p.nextResync = now.Add(p.resyncPeriod)
- }
- func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
- p.resyncLock.Lock()
- defer p.resyncLock.Unlock()
- p.resyncPeriod = resyncPeriod
- }
|