reflector.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "math/rand"
  20. "os"
  21. "reflect"
  22. "strings"
  23. "sync"
  24. "time"
  25. apierrors "k8s.io/apimachinery/pkg/api/errors"
  26. "k8s.io/apimachinery/pkg/api/meta"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
  29. "k8s.io/apimachinery/pkg/runtime"
  30. "k8s.io/apimachinery/pkg/runtime/schema"
  31. "k8s.io/apimachinery/pkg/util/naming"
  32. utilnet "k8s.io/apimachinery/pkg/util/net"
  33. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  34. "k8s.io/apimachinery/pkg/util/wait"
  35. "k8s.io/apimachinery/pkg/watch"
  36. "k8s.io/client-go/tools/pager"
  37. "k8s.io/klog/v2"
  38. "k8s.io/utils/clock"
  39. "k8s.io/utils/pointer"
  40. "k8s.io/utils/trace"
  41. )
  42. const defaultExpectedTypeName = "<unspecified>"
  43. // Reflector watches a specified resource and causes all changes to be reflected in the given store.
  44. type Reflector struct {
  45. // name identifies this reflector. By default it will be a file:line if possible.
  46. name string
  47. // The name of the type we expect to place in the store. The name
  48. // will be the stringification of expectedGVK if provided, and the
  49. // stringification of expectedType otherwise. It is for display
  50. // only, and should not be used for parsing or comparison.
  51. typeDescription string
  52. // An example object of the type we expect to place in the store.
  53. // Only the type needs to be right, except that when that is
  54. // `unstructured.Unstructured` the object's `"apiVersion"` and
  55. // `"kind"` must also be right.
  56. expectedType reflect.Type
  57. // The GVK of the object we expect to place in the store if unstructured.
  58. expectedGVK *schema.GroupVersionKind
  59. // The destination to sync up with the watch source
  60. store Store
  61. // listerWatcher is used to perform lists and watches.
  62. listerWatcher ListerWatcher
  63. // backoff manages backoff of ListWatch
  64. backoffManager wait.BackoffManager
  65. resyncPeriod time.Duration
  66. // clock allows tests to manipulate time
  67. clock clock.Clock
  68. // paginatedResult defines whether pagination should be forced for list calls.
  69. // It is set based on the result of the initial list call.
  70. paginatedResult bool
  71. // lastSyncResourceVersion is the resource version token last
  72. // observed when doing a sync with the underlying store
  73. // it is thread safe, but not synchronized with the underlying store
  74. lastSyncResourceVersion string
  75. // isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
  76. // lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
  77. isLastSyncResourceVersionUnavailable bool
  78. // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
  79. lastSyncResourceVersionMutex sync.RWMutex
  80. // Called whenever the ListAndWatch drops the connection with an error.
  81. watchErrorHandler WatchErrorHandler
  82. // WatchListPageSize is the requested chunk size of initial and resync watch lists.
  83. // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
  84. // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
  85. // it will turn off pagination to allow serving them from watch cache.
  86. // NOTE: It should be used carefully as paginated lists are always served directly from
  87. // etcd, which is significantly less efficient and may lead to serious performance and
  88. // scalability problems.
  89. WatchListPageSize int64
  90. // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
  91. ShouldResync func() bool
  92. // MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
  93. MaxInternalErrorRetryDuration time.Duration
  94. // UseWatchList if turned on instructs the reflector to open a stream to bring data from the API server.
  95. // Streaming has the primary advantage of using fewer server's resources to fetch data.
  96. //
  97. // The old behaviour establishes a LIST request which gets data in chunks.
  98. // Paginated list is less efficient and depending on the actual size of objects
  99. // might result in an increased memory consumption of the APIServer.
  100. //
  101. // See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
  102. UseWatchList bool
  103. }
  104. // ResourceVersionUpdater is an interface that allows store implementation to
  105. // track the current resource version of the reflector. This is especially
  106. // important if storage bookmarks are enabled.
  107. type ResourceVersionUpdater interface {
  108. // UpdateResourceVersion is called each time current resource version of the reflector
  109. // is updated.
  110. UpdateResourceVersion(resourceVersion string)
  111. }
  112. // The WatchErrorHandler is called whenever ListAndWatch drops the
  113. // connection with an error. After calling this handler, the informer
  114. // will backoff and retry.
  115. //
  116. // The default implementation looks at the error type and tries to log
  117. // the error message at an appropriate level.
  118. //
  119. // Implementations of this handler may display the error message in other
  120. // ways. Implementations should return quickly - any expensive processing
  121. // should be offloaded.
  122. type WatchErrorHandler func(r *Reflector, err error)
  123. // DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
  124. func DefaultWatchErrorHandler(r *Reflector, err error) {
  125. switch {
  126. case isExpiredError(err):
  127. // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
  128. // has a semantic that it returns data at least as fresh as provided RV.
  129. // So first try to LIST with setting RV to resource version of last observed object.
  130. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
  131. case err == io.EOF:
  132. // watch closed normally
  133. case err == io.ErrUnexpectedEOF:
  134. klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.typeDescription, err)
  135. default:
  136. utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.typeDescription, err))
  137. }
  138. }
  139. var (
  140. // We try to spread the load on apiserver by setting timeouts for
  141. // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
  142. minWatchTimeout = 5 * time.Minute
  143. )
  144. // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
  145. // The indexer is configured to key on namespace
  146. func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
  147. indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
  148. reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
  149. return indexer, reflector
  150. }
  151. // NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack
  152. // that is outside this package. See NewReflectorWithOptions for further information.
  153. func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  154. return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod})
  155. }
  156. // NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further
  157. // information.
  158. func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  159. return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod})
  160. }
  161. // ReflectorOptions configures a Reflector.
  162. type ReflectorOptions struct {
  163. // Name is the Reflector's name. If unset/unspecified, the name defaults to the closest source_file.go:line
  164. // in the call stack that is outside this package.
  165. Name string
  166. // TypeDescription is the Reflector's type description. If unset/unspecified, the type description is defaulted
  167. // using the following rules: if the expectedType passed to NewReflectorWithOptions was nil, the type description is
  168. // "<unspecified>". If the expectedType is an instance of *unstructured.Unstructured and its apiVersion and kind fields
  169. // are set, the type description is the string encoding of those. Otherwise, the type description is set to the
  170. // go type of expectedType..
  171. TypeDescription string
  172. // ResyncPeriod is the Reflector's resync period. If unset/unspecified, the resync period defaults to 0
  173. // (do not resync).
  174. ResyncPeriod time.Duration
  175. // Clock allows tests to control time. If unset defaults to clock.RealClock{}
  176. Clock clock.Clock
  177. }
  178. // NewReflectorWithOptions creates a new Reflector object which will keep the
  179. // given store up to date with the server's contents for the given
  180. // resource. Reflector promises to only put things in the store that
  181. // have the type of expectedType, unless expectedType is nil. If
  182. // resyncPeriod is non-zero, then the reflector will periodically
  183. // consult its ShouldResync function to determine whether to invoke
  184. // the Store's Resync operation; `ShouldResync==nil` means always
  185. // "yes". This enables you to use reflectors to periodically process
  186. // everything as well as incrementally processing the things that
  187. // change.
  188. func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
  189. reflectorClock := options.Clock
  190. if reflectorClock == nil {
  191. reflectorClock = clock.RealClock{}
  192. }
  193. r := &Reflector{
  194. name: options.Name,
  195. resyncPeriod: options.ResyncPeriod,
  196. typeDescription: options.TypeDescription,
  197. listerWatcher: lw,
  198. store: store,
  199. // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
  200. // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
  201. // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
  202. backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
  203. clock: reflectorClock,
  204. watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
  205. expectedType: reflect.TypeOf(expectedType),
  206. }
  207. if r.name == "" {
  208. r.name = naming.GetNameFromCallsite(internalPackages...)
  209. }
  210. if r.typeDescription == "" {
  211. r.typeDescription = getTypeDescriptionFromObject(expectedType)
  212. }
  213. if r.expectedGVK == nil {
  214. r.expectedGVK = getExpectedGVKFromObject(expectedType)
  215. }
  216. if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 {
  217. r.UseWatchList = true
  218. }
  219. return r
  220. }
  221. func getTypeDescriptionFromObject(expectedType interface{}) string {
  222. if expectedType == nil {
  223. return defaultExpectedTypeName
  224. }
  225. reflectDescription := reflect.TypeOf(expectedType).String()
  226. obj, ok := expectedType.(*unstructured.Unstructured)
  227. if !ok {
  228. return reflectDescription
  229. }
  230. gvk := obj.GroupVersionKind()
  231. if gvk.Empty() {
  232. return reflectDescription
  233. }
  234. return gvk.String()
  235. }
  236. func getExpectedGVKFromObject(expectedType interface{}) *schema.GroupVersionKind {
  237. obj, ok := expectedType.(*unstructured.Unstructured)
  238. if !ok {
  239. return nil
  240. }
  241. gvk := obj.GroupVersionKind()
  242. if gvk.Empty() {
  243. return nil
  244. }
  245. return &gvk
  246. }
  247. // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
  248. // call chains to NewReflector, so they'd be low entropy names for reflectors
  249. var internalPackages = []string{"client-go/tools/cache/"}
  250. // Run repeatedly uses the reflector's ListAndWatch to fetch all the
  251. // objects and subsequent deltas.
  252. // Run will exit when stopCh is closed.
  253. func (r *Reflector) Run(stopCh <-chan struct{}) {
  254. klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
  255. wait.BackoffUntil(func() {
  256. if err := r.ListAndWatch(stopCh); err != nil {
  257. r.watchErrorHandler(r, err)
  258. }
  259. }, r.backoffManager, true, stopCh)
  260. klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
  261. }
  262. var (
  263. // nothing will ever be sent down this channel
  264. neverExitWatch <-chan time.Time = make(chan time.Time)
  265. // Used to indicate that watching stopped because of a signal from the stop
  266. // channel passed in from a client of the reflector.
  267. errorStopRequested = errors.New("stop requested")
  268. )
  269. // resyncChan returns a channel which will receive something when a resync is
  270. // required, and a cleanup function.
  271. func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
  272. if r.resyncPeriod == 0 {
  273. return neverExitWatch, func() bool { return false }
  274. }
  275. // The cleanup function is required: imagine the scenario where watches
  276. // always fail so we end up listing frequently. Then, if we don't
  277. // manually stop the timer, we could end up with many timers active
  278. // concurrently.
  279. t := r.clock.NewTimer(r.resyncPeriod)
  280. return t.C(), t.Stop
  281. }
  282. // ListAndWatch first lists all items and get the resource version at the moment of call,
  283. // and then use the resource version to watch.
  284. // It returns error if ListAndWatch didn't even try to initialize watch.
  285. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
  286. klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
  287. var err error
  288. var w watch.Interface
  289. fallbackToList := !r.UseWatchList
  290. if r.UseWatchList {
  291. w, err = r.watchList(stopCh)
  292. if w == nil && err == nil {
  293. // stopCh was closed
  294. return nil
  295. }
  296. if err != nil {
  297. if !apierrors.IsInvalid(err) {
  298. return err
  299. }
  300. klog.Warning("the watch-list feature is not supported by the server, falling back to the previous LIST/WATCH semantic")
  301. fallbackToList = true
  302. // Ensure that we won't accidentally pass some garbage down the watch.
  303. w = nil
  304. }
  305. }
  306. if fallbackToList {
  307. err = r.list(stopCh)
  308. if err != nil {
  309. return err
  310. }
  311. }
  312. resyncerrc := make(chan error, 1)
  313. cancelCh := make(chan struct{})
  314. defer close(cancelCh)
  315. go r.startResync(stopCh, cancelCh, resyncerrc)
  316. return r.watch(w, stopCh, resyncerrc)
  317. }
  318. // startResync periodically calls r.store.Resync() method.
  319. // Note that this method is blocking and should be
  320. // called in a separate goroutine.
  321. func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) {
  322. resyncCh, cleanup := r.resyncChan()
  323. defer func() {
  324. cleanup() // Call the last one written into cleanup
  325. }()
  326. for {
  327. select {
  328. case <-resyncCh:
  329. case <-stopCh:
  330. return
  331. case <-cancelCh:
  332. return
  333. }
  334. if r.ShouldResync == nil || r.ShouldResync() {
  335. klog.V(4).Infof("%s: forcing resync", r.name)
  336. if err := r.store.Resync(); err != nil {
  337. resyncerrc <- err
  338. return
  339. }
  340. }
  341. cleanup()
  342. resyncCh, cleanup = r.resyncChan()
  343. }
  344. }
  345. // watch simply starts a watch request with the server.
  346. func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
  347. var err error
  348. retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
  349. for {
  350. // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
  351. select {
  352. case <-stopCh:
  353. return nil
  354. default:
  355. }
  356. // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
  357. start := r.clock.Now()
  358. if w == nil {
  359. timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
  360. options := metav1.ListOptions{
  361. ResourceVersion: r.LastSyncResourceVersion(),
  362. // We want to avoid situations of hanging watchers. Stop any watchers that do not
  363. // receive any events within the timeout window.
  364. TimeoutSeconds: &timeoutSeconds,
  365. // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
  366. // Reflector doesn't assume bookmarks are returned at all (if the server do not support
  367. // watch bookmarks, it will ignore this field).
  368. AllowWatchBookmarks: true,
  369. }
  370. w, err = r.listerWatcher.Watch(options)
  371. if err != nil {
  372. if canRetry := isWatchErrorRetriable(err); canRetry {
  373. klog.V(4).Infof("%s: watch of %v returned %v - backing off", r.name, r.typeDescription, err)
  374. select {
  375. case <-stopCh:
  376. return nil
  377. case <-r.backoffManager.Backoff().C():
  378. continue
  379. }
  380. }
  381. return err
  382. }
  383. }
  384. err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
  385. // Ensure that watch will not be reused across iterations.
  386. w.Stop()
  387. w = nil
  388. retry.After(err)
  389. if err != nil {
  390. if err != errorStopRequested {
  391. switch {
  392. case isExpiredError(err):
  393. // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
  394. // has a semantic that it returns data at least as fresh as provided RV.
  395. // So first try to LIST with setting RV to resource version of last observed object.
  396. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
  397. case apierrors.IsTooManyRequests(err):
  398. klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription)
  399. select {
  400. case <-stopCh:
  401. return nil
  402. case <-r.backoffManager.Backoff().C():
  403. continue
  404. }
  405. case apierrors.IsInternalError(err) && retry.ShouldRetry():
  406. klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err)
  407. continue
  408. default:
  409. klog.Warningf("%s: watch of %v ended with: %v", r.name, r.typeDescription, err)
  410. }
  411. }
  412. return nil
  413. }
  414. }
  415. }
  416. // list simply lists all items and records a resource version obtained from the server at the moment of the call.
  417. // the resource version can be used for further progress notification (aka. watch).
  418. func (r *Reflector) list(stopCh <-chan struct{}) error {
  419. var resourceVersion string
  420. options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
  421. initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
  422. defer initTrace.LogIfLong(10 * time.Second)
  423. var list runtime.Object
  424. var paginatedResult bool
  425. var err error
  426. listCh := make(chan struct{}, 1)
  427. panicCh := make(chan interface{}, 1)
  428. go func() {
  429. defer func() {
  430. if r := recover(); r != nil {
  431. panicCh <- r
  432. }
  433. }()
  434. // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
  435. // list request will return the full response.
  436. pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
  437. return r.listerWatcher.List(opts)
  438. }))
  439. switch {
  440. case r.WatchListPageSize != 0:
  441. pager.PageSize = r.WatchListPageSize
  442. case r.paginatedResult:
  443. // We got a paginated result initially. Assume this resource and server honor
  444. // paging requests (i.e. watch cache is probably disabled) and leave the default
  445. // pager size set.
  446. case options.ResourceVersion != "" && options.ResourceVersion != "0":
  447. // User didn't explicitly request pagination.
  448. //
  449. // With ResourceVersion != "", we have a possibility to list from watch cache,
  450. // but we do that (for ResourceVersion != "0") only if Limit is unset.
  451. // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
  452. // switch off pagination to force listing from watch cache (if enabled).
  453. // With the existing semantic of RV (result is at least as fresh as provided RV),
  454. // this is correct and doesn't lead to going back in time.
  455. //
  456. // We also don't turn off pagination for ResourceVersion="0", since watch cache
  457. // is ignoring Limit in that case anyway, and if watch cache is not enabled
  458. // we don't introduce regression.
  459. pager.PageSize = 0
  460. }
  461. list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
  462. if isExpiredError(err) || isTooLargeResourceVersionError(err) {
  463. r.setIsLastSyncResourceVersionUnavailable(true)
  464. // Retry immediately if the resource version used to list is unavailable.
  465. // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
  466. // continuation pages, but the pager might not be enabled, the full list might fail because the
  467. // resource version it is listing at is expired or the cache may not yet be synced to the provided
  468. // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
  469. // the reflector makes forward progress.
  470. list, paginatedResult, err = pager.ListWithAlloc(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
  471. }
  472. close(listCh)
  473. }()
  474. select {
  475. case <-stopCh:
  476. return nil
  477. case r := <-panicCh:
  478. panic(r)
  479. case <-listCh:
  480. }
  481. initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
  482. if err != nil {
  483. klog.Warningf("%s: failed to list %v: %v", r.name, r.typeDescription, err)
  484. return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
  485. }
  486. // We check if the list was paginated and if so set the paginatedResult based on that.
  487. // However, we want to do that only for the initial list (which is the only case
  488. // when we set ResourceVersion="0"). The reasoning behind it is that later, in some
  489. // situations we may force listing directly from etcd (by setting ResourceVersion="")
  490. // which will return paginated result, even if watch cache is enabled. However, in
  491. // that case, we still want to prefer sending requests to watch cache if possible.
  492. //
  493. // Paginated result returned for request with ResourceVersion="0" mean that watch
  494. // cache is disabled and there are a lot of objects of a given type. In such case,
  495. // there is no need to prefer listing from watch cache.
  496. if options.ResourceVersion == "0" && paginatedResult {
  497. r.paginatedResult = true
  498. }
  499. r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
  500. listMetaInterface, err := meta.ListAccessor(list)
  501. if err != nil {
  502. return fmt.Errorf("unable to understand list result %#v: %v", list, err)
  503. }
  504. resourceVersion = listMetaInterface.GetResourceVersion()
  505. initTrace.Step("Resource version extracted")
  506. items, err := meta.ExtractListWithAlloc(list)
  507. if err != nil {
  508. return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
  509. }
  510. initTrace.Step("Objects extracted")
  511. if err := r.syncWith(items, resourceVersion); err != nil {
  512. return fmt.Errorf("unable to sync list result: %v", err)
  513. }
  514. initTrace.Step("SyncWith done")
  515. r.setLastSyncResourceVersion(resourceVersion)
  516. initTrace.Step("Resource version updated")
  517. return nil
  518. }
  519. // watchList establishes a stream to get a consistent snapshot of data
  520. // from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
  521. //
  522. // case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
  523. // Establishes a consistent stream with the server.
  524. // That means the returned data is consistent, as if, served directly from etcd via a quorum read.
  525. // It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion.
  526. // It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion.
  527. // After receiving a "Bookmark" event the reflector is considered to be synchronized.
  528. // It replaces its internal store with the collected items and
  529. // reuses the current watch requests for getting further events.
  530. //
  531. // case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
  532. // Establishes a stream with the server at the provided resource version.
  533. // To establish the initial state the server begins with synthetic "Added" events.
  534. // It ends with a synthetic "Bookmark" event containing the provided or newer resource version.
  535. // After receiving a "Bookmark" event the reflector is considered to be synchronized.
  536. // It replaces its internal store with the collected items and
  537. // reuses the current watch requests for getting further events.
  538. func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
  539. var w watch.Interface
  540. var err error
  541. var temporaryStore Store
  542. var resourceVersion string
  543. // TODO(#115478): see if this function could be turned
  544. // into a method and see if error handling
  545. // could be unified with the r.watch method
  546. isErrorRetriableWithSideEffectsFn := func(err error) bool {
  547. if canRetry := isWatchErrorRetriable(err); canRetry {
  548. klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err)
  549. <-r.backoffManager.Backoff().C()
  550. return true
  551. }
  552. if isExpiredError(err) || isTooLargeResourceVersionError(err) {
  553. // we tried to re-establish a watch request but the provided RV
  554. // has either expired or it is greater than the server knows about.
  555. // In that case we reset the RV and
  556. // try to get a consistent snapshot from the watch cache (case 1)
  557. r.setIsLastSyncResourceVersionUnavailable(true)
  558. return true
  559. }
  560. return false
  561. }
  562. initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
  563. defer initTrace.LogIfLong(10 * time.Second)
  564. for {
  565. select {
  566. case <-stopCh:
  567. return nil, nil
  568. default:
  569. }
  570. resourceVersion = ""
  571. lastKnownRV := r.rewatchResourceVersion()
  572. temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
  573. // TODO(#115478): large "list", slow clients, slow network, p&f
  574. // might slow down streaming and eventually fail.
  575. // maybe in such a case we should retry with an increased timeout?
  576. timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
  577. options := metav1.ListOptions{
  578. ResourceVersion: lastKnownRV,
  579. AllowWatchBookmarks: true,
  580. SendInitialEvents: pointer.Bool(true),
  581. ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
  582. TimeoutSeconds: &timeoutSeconds,
  583. }
  584. start := r.clock.Now()
  585. w, err = r.listerWatcher.Watch(options)
  586. if err != nil {
  587. if isErrorRetriableWithSideEffectsFn(err) {
  588. continue
  589. }
  590. return nil, err
  591. }
  592. bookmarkReceived := pointer.Bool(false)
  593. err = watchHandler(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
  594. func(rv string) { resourceVersion = rv },
  595. bookmarkReceived,
  596. r.clock, make(chan error), stopCh)
  597. if err != nil {
  598. w.Stop() // stop and retry with clean state
  599. if err == errorStopRequested {
  600. return nil, nil
  601. }
  602. if isErrorRetriableWithSideEffectsFn(err) {
  603. continue
  604. }
  605. return nil, err
  606. }
  607. if *bookmarkReceived {
  608. break
  609. }
  610. }
  611. // We successfully got initial state from watch-list confirmed by the
  612. // "k8s.io/initial-events-end" bookmark.
  613. initTrace.Step("Objects streamed", trace.Field{Key: "count", Value: len(temporaryStore.List())})
  614. r.setIsLastSyncResourceVersionUnavailable(false)
  615. if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
  616. return nil, fmt.Errorf("unable to sync watch-list result: %v", err)
  617. }
  618. initTrace.Step("SyncWith done")
  619. r.setLastSyncResourceVersion(resourceVersion)
  620. return w, nil
  621. }
  622. // syncWith replaces the store's items with the given list.
  623. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
  624. found := make([]interface{}, 0, len(items))
  625. for _, item := range items {
  626. found = append(found, item)
  627. }
  628. return r.store.Replace(found, resourceVersion)
  629. }
  630. // watchHandler watches w and sets setLastSyncResourceVersion
  631. func watchHandler(start time.Time,
  632. w watch.Interface,
  633. store Store,
  634. expectedType reflect.Type,
  635. expectedGVK *schema.GroupVersionKind,
  636. name string,
  637. expectedTypeName string,
  638. setLastSyncResourceVersion func(string),
  639. exitOnInitialEventsEndBookmark *bool,
  640. clock clock.Clock,
  641. errc chan error,
  642. stopCh <-chan struct{},
  643. ) error {
  644. eventCount := 0
  645. if exitOnInitialEventsEndBookmark != nil {
  646. // set it to false just in case somebody
  647. // made it positive
  648. *exitOnInitialEventsEndBookmark = false
  649. }
  650. loop:
  651. for {
  652. select {
  653. case <-stopCh:
  654. return errorStopRequested
  655. case err := <-errc:
  656. return err
  657. case event, ok := <-w.ResultChan():
  658. if !ok {
  659. break loop
  660. }
  661. if event.Type == watch.Error {
  662. return apierrors.FromObject(event.Object)
  663. }
  664. if expectedType != nil {
  665. if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
  666. utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
  667. continue
  668. }
  669. }
  670. if expectedGVK != nil {
  671. if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
  672. utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
  673. continue
  674. }
  675. }
  676. meta, err := meta.Accessor(event.Object)
  677. if err != nil {
  678. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
  679. continue
  680. }
  681. resourceVersion := meta.GetResourceVersion()
  682. switch event.Type {
  683. case watch.Added:
  684. err := store.Add(event.Object)
  685. if err != nil {
  686. utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
  687. }
  688. case watch.Modified:
  689. err := store.Update(event.Object)
  690. if err != nil {
  691. utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
  692. }
  693. case watch.Deleted:
  694. // TODO: Will any consumers need access to the "last known
  695. // state", which is passed in event.Object? If so, may need
  696. // to change this.
  697. err := store.Delete(event.Object)
  698. if err != nil {
  699. utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
  700. }
  701. case watch.Bookmark:
  702. // A `Bookmark` means watch has synced here, just update the resourceVersion
  703. if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok {
  704. if exitOnInitialEventsEndBookmark != nil {
  705. *exitOnInitialEventsEndBookmark = true
  706. }
  707. }
  708. default:
  709. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
  710. }
  711. setLastSyncResourceVersion(resourceVersion)
  712. if rvu, ok := store.(ResourceVersionUpdater); ok {
  713. rvu.UpdateResourceVersion(resourceVersion)
  714. }
  715. eventCount++
  716. if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark {
  717. watchDuration := clock.Since(start)
  718. klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration)
  719. return nil
  720. }
  721. }
  722. }
  723. watchDuration := clock.Since(start)
  724. if watchDuration < 1*time.Second && eventCount == 0 {
  725. return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
  726. }
  727. klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)
  728. return nil
  729. }
  730. // LastSyncResourceVersion is the resource version observed when last sync with the underlying store
  731. // The value returned is not synchronized with access to the underlying store and is not thread-safe
  732. func (r *Reflector) LastSyncResourceVersion() string {
  733. r.lastSyncResourceVersionMutex.RLock()
  734. defer r.lastSyncResourceVersionMutex.RUnlock()
  735. return r.lastSyncResourceVersion
  736. }
  737. func (r *Reflector) setLastSyncResourceVersion(v string) {
  738. r.lastSyncResourceVersionMutex.Lock()
  739. defer r.lastSyncResourceVersionMutex.Unlock()
  740. r.lastSyncResourceVersion = v
  741. }
  742. // relistResourceVersion determines the resource version the reflector should list or relist from.
  743. // Returns either the lastSyncResourceVersion so that this reflector will relist with a resource
  744. // versions no older than has already been observed in relist results or watch events, or, if the last relist resulted
  745. // in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in
  746. // etcd via a quorum read.
  747. func (r *Reflector) relistResourceVersion() string {
  748. r.lastSyncResourceVersionMutex.RLock()
  749. defer r.lastSyncResourceVersionMutex.RUnlock()
  750. if r.isLastSyncResourceVersionUnavailable {
  751. // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
  752. // if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
  753. // to the latest available ResourceVersion, using a consistent read from etcd.
  754. return ""
  755. }
  756. if r.lastSyncResourceVersion == "" {
  757. // For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
  758. // be served from the watch cache if it is enabled.
  759. return "0"
  760. }
  761. return r.lastSyncResourceVersion
  762. }
  763. // rewatchResourceVersion determines the resource version the reflector should start streaming from.
  764. func (r *Reflector) rewatchResourceVersion() string {
  765. r.lastSyncResourceVersionMutex.RLock()
  766. defer r.lastSyncResourceVersionMutex.RUnlock()
  767. if r.isLastSyncResourceVersionUnavailable {
  768. // initial stream should return data at the most recent resource version.
  769. // the returned data must be consistent i.e. as if served from etcd via a quorum read
  770. return ""
  771. }
  772. return r.lastSyncResourceVersion
  773. }
  774. // setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
  775. // "expired" or "too large resource version" error.
  776. func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
  777. r.lastSyncResourceVersionMutex.Lock()
  778. defer r.lastSyncResourceVersionMutex.Unlock()
  779. r.isLastSyncResourceVersionUnavailable = isUnavailable
  780. }
  781. func isExpiredError(err error) bool {
  782. // In Kubernetes 1.17 and earlier, the api server returns both apierrors.StatusReasonExpired and
  783. // apierrors.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
  784. // and always returns apierrors.StatusReasonExpired. For backward compatibility we can only remove the apierrors.IsGone
  785. // check when we fully drop support for Kubernetes 1.17 servers from reflectors.
  786. return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
  787. }
  788. func isTooLargeResourceVersionError(err error) bool {
  789. if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
  790. return true
  791. }
  792. // In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to
  793. // metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource
  794. // version is larger than the largest currently available resource version. To ensure backward
  795. // compatibility with these server versions we also need to detect the error based on the content
  796. // of the error message field.
  797. if !apierrors.IsTimeout(err) {
  798. return false
  799. }
  800. apierr, ok := err.(apierrors.APIStatus)
  801. if !ok || apierr == nil || apierr.Status().Details == nil {
  802. return false
  803. }
  804. for _, cause := range apierr.Status().Details.Causes {
  805. // Matches the message returned by api server 1.17.0-1.18.5 for this error condition
  806. if cause.Message == "Too large resource version" {
  807. return true
  808. }
  809. }
  810. // Matches the message returned by api server before 1.17.0
  811. if strings.Contains(apierr.Status().Message, "Too large resource version") {
  812. return true
  813. }
  814. return false
  815. }
  816. // isWatchErrorRetriable determines if it is safe to retry
  817. // a watch error retrieved from the server.
  818. func isWatchErrorRetriable(err error) bool {
  819. // If this is "connection refused" error, it means that most likely apiserver is not responsive.
  820. // It doesn't make sense to re-list all objects because most likely we will be able to restart
  821. // watch where we ended.
  822. // If that's the case begin exponentially backing off and resend watch request.
  823. // Do the same for "429" errors.
  824. if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
  825. return true
  826. }
  827. return false
  828. }