pager.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pager
  14. import (
  15. "context"
  16. "fmt"
  17. "k8s.io/apimachinery/pkg/api/errors"
  18. "k8s.io/apimachinery/pkg/api/meta"
  19. metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/runtime"
  22. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  23. )
  24. const defaultPageSize = 500
  25. const defaultPageBufferSize = 10
  26. // ListPageFunc returns a list object for the given list options.
  27. type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error)
  28. // SimplePageFunc adapts a context-less list function into one that accepts a context.
  29. func SimplePageFunc(fn func(opts metav1.ListOptions) (runtime.Object, error)) ListPageFunc {
  30. return func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
  31. return fn(opts)
  32. }
  33. }
  34. // ListPager assists client code in breaking large list queries into multiple
  35. // smaller chunks of PageSize or smaller. PageFn is expected to accept a
  36. // metav1.ListOptions that supports paging and return a list. The pager does
  37. // not alter the field or label selectors on the initial options list.
  38. type ListPager struct {
  39. PageSize int64
  40. PageFn ListPageFunc
  41. FullListIfExpired bool
  42. // Number of pages to buffer
  43. PageBufferSize int32
  44. }
  45. // New creates a new pager from the provided pager function using the default
  46. // options. It will fall back to a full list if an expiration error is encountered
  47. // as a last resort.
  48. func New(fn ListPageFunc) *ListPager {
  49. return &ListPager{
  50. PageSize: defaultPageSize,
  51. PageFn: fn,
  52. FullListIfExpired: true,
  53. PageBufferSize: defaultPageBufferSize,
  54. }
  55. }
  56. // TODO: introduce other types of paging functions - such as those that retrieve from a list
  57. // of namespaces.
  58. // List returns a single list object, but attempts to retrieve smaller chunks from the
  59. // server to reduce the impact on the server. If the chunk attempt fails, it will load
  60. // the full list instead. The Limit field on options, if unset, will default to the page size.
  61. //
  62. // If items in the returned list are retained for different durations, and you want to avoid
  63. // retaining the whole slice returned by p.PageFn as long as any item is referenced,
  64. // use ListWithAlloc instead.
  65. func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
  66. return p.list(ctx, options, false)
  67. }
  68. // ListWithAlloc works like List, but avoids retaining references to the items slice returned by p.PageFn.
  69. // It does this by making a shallow copy of non-pointer items in the slice returned by p.PageFn.
  70. //
  71. // If the items in the returned list are not retained, or are retained for the same duration, use List instead for memory efficiency.
  72. func (p *ListPager) ListWithAlloc(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
  73. return p.list(ctx, options, true)
  74. }
  75. func (p *ListPager) list(ctx context.Context, options metav1.ListOptions, allocNew bool) (runtime.Object, bool, error) {
  76. if options.Limit == 0 {
  77. options.Limit = p.PageSize
  78. }
  79. requestedResourceVersion := options.ResourceVersion
  80. requestedResourceVersionMatch := options.ResourceVersionMatch
  81. var list *metainternalversion.List
  82. paginatedResult := false
  83. for {
  84. select {
  85. case <-ctx.Done():
  86. return nil, paginatedResult, ctx.Err()
  87. default:
  88. }
  89. obj, err := p.PageFn(ctx, options)
  90. if err != nil {
  91. // Only fallback to full list if an "Expired" errors is returned, FullListIfExpired is true, and
  92. // the "Expired" error occurred in page 2 or later (since full list is intended to prevent a pager.List from
  93. // failing when the resource versions is established by the first page request falls out of the compaction
  94. // during the subsequent list requests).
  95. if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
  96. return nil, paginatedResult, err
  97. }
  98. // the list expired while we were processing, fall back to a full list at
  99. // the requested ResourceVersion.
  100. options.Limit = 0
  101. options.Continue = ""
  102. options.ResourceVersion = requestedResourceVersion
  103. options.ResourceVersionMatch = requestedResourceVersionMatch
  104. result, err := p.PageFn(ctx, options)
  105. return result, paginatedResult, err
  106. }
  107. m, err := meta.ListAccessor(obj)
  108. if err != nil {
  109. return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err)
  110. }
  111. // exit early and return the object we got if we haven't processed any pages
  112. if len(m.GetContinue()) == 0 && list == nil {
  113. return obj, paginatedResult, nil
  114. }
  115. // initialize the list and fill its contents
  116. if list == nil {
  117. list = &metainternalversion.List{Items: make([]runtime.Object, 0, options.Limit+1)}
  118. list.ResourceVersion = m.GetResourceVersion()
  119. list.SelfLink = m.GetSelfLink()
  120. }
  121. eachListItemFunc := meta.EachListItem
  122. if allocNew {
  123. eachListItemFunc = meta.EachListItemWithAlloc
  124. }
  125. if err := eachListItemFunc(obj, func(obj runtime.Object) error {
  126. list.Items = append(list.Items, obj)
  127. return nil
  128. }); err != nil {
  129. return nil, paginatedResult, err
  130. }
  131. // if we have no more items, return the list
  132. if len(m.GetContinue()) == 0 {
  133. return list, paginatedResult, nil
  134. }
  135. // set the next loop up
  136. options.Continue = m.GetContinue()
  137. // Clear the ResourceVersion(Match) on the subsequent List calls to avoid the
  138. // `specifying resource version is not allowed when using continue` error.
  139. // See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143.
  140. options.ResourceVersion = ""
  141. options.ResourceVersionMatch = ""
  142. // At this point, result is already paginated.
  143. paginatedResult = true
  144. }
  145. }
  146. // EachListItem fetches runtime.Object items using this ListPager and invokes fn on each item. If
  147. // fn returns an error, processing stops and that error is returned. If fn does not return an error,
  148. // any error encountered while retrieving the list from the server is returned. If the context
  149. // cancels or times out, the context error is returned. Since the list is retrieved in paginated
  150. // chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the pagination list
  151. // requests exceed the expiration limit of the apiserver being called.
  152. //
  153. // Items are retrieved in chunks from the server to reduce the impact on the server with up to
  154. // ListPager.PageBufferSize chunks buffered concurrently in the background.
  155. //
  156. // If items passed to fn are retained for different durations, and you want to avoid
  157. // retaining the whole slice returned by p.PageFn as long as any item is referenced,
  158. // use EachListItemWithAlloc instead.
  159. func (p *ListPager) EachListItem(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
  160. return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
  161. return meta.EachListItem(obj, fn)
  162. })
  163. }
  164. // EachListItemWithAlloc works like EachListItem, but avoids retaining references to the items slice returned by p.PageFn.
  165. // It does this by making a shallow copy of non-pointer items in the slice returned by p.PageFn.
  166. //
  167. // If the items passed to fn are not retained, or are retained for the same duration, use EachListItem instead for memory efficiency.
  168. func (p *ListPager) EachListItemWithAlloc(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
  169. return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
  170. return meta.EachListItemWithAlloc(obj, fn)
  171. })
  172. }
  173. // eachListChunkBuffered fetches runtimeObject list chunks using this ListPager and invokes fn on
  174. // each list chunk. If fn returns an error, processing stops and that error is returned. If fn does
  175. // not return an error, any error encountered while retrieving the list from the server is
  176. // returned. If the context cancels or times out, the context error is returned. Since the list is
  177. // retrieved in paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if
  178. // the pagination list requests exceed the expiration limit of the apiserver being called.
  179. //
  180. // Up to ListPager.PageBufferSize chunks are buffered concurrently in the background.
  181. func (p *ListPager) eachListChunkBuffered(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
  182. if p.PageBufferSize < 0 {
  183. return fmt.Errorf("ListPager.PageBufferSize must be >= 0, got %d", p.PageBufferSize)
  184. }
  185. // Ensure background goroutine is stopped if this call exits before all list items are
  186. // processed. Cancelation error from this deferred cancel call is never returned to caller;
  187. // either the list result has already been sent to bgResultC or the fn error is returned and
  188. // the cancelation error is discarded.
  189. ctx, cancel := context.WithCancel(ctx)
  190. defer cancel()
  191. chunkC := make(chan runtime.Object, p.PageBufferSize)
  192. bgResultC := make(chan error, 1)
  193. go func() {
  194. defer utilruntime.HandleCrash()
  195. var err error
  196. defer func() {
  197. close(chunkC)
  198. bgResultC <- err
  199. }()
  200. err = p.eachListChunk(ctx, options, func(chunk runtime.Object) error {
  201. select {
  202. case chunkC <- chunk: // buffer the chunk, this can block
  203. case <-ctx.Done():
  204. return ctx.Err()
  205. }
  206. return nil
  207. })
  208. }()
  209. for o := range chunkC {
  210. select {
  211. case <-ctx.Done():
  212. return ctx.Err()
  213. default:
  214. }
  215. err := fn(o)
  216. if err != nil {
  217. return err // any fn error should be returned immediately
  218. }
  219. }
  220. // promote the results of our background goroutine to the foreground
  221. return <-bgResultC
  222. }
  223. // eachListChunk fetches runtimeObject list chunks using this ListPager and invokes fn on each list
  224. // chunk. If fn returns an error, processing stops and that error is returned. If fn does not return
  225. // an error, any error encountered while retrieving the list from the server is returned. If the
  226. // context cancels or times out, the context error is returned. Since the list is retrieved in
  227. // paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the
  228. // pagination list requests exceed the expiration limit of the apiserver being called.
  229. func (p *ListPager) eachListChunk(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
  230. if options.Limit == 0 {
  231. options.Limit = p.PageSize
  232. }
  233. for {
  234. select {
  235. case <-ctx.Done():
  236. return ctx.Err()
  237. default:
  238. }
  239. obj, err := p.PageFn(ctx, options)
  240. if err != nil {
  241. return err
  242. }
  243. m, err := meta.ListAccessor(obj)
  244. if err != nil {
  245. return fmt.Errorf("returned object must be a list: %v", err)
  246. }
  247. if err := fn(obj); err != nil {
  248. return err
  249. }
  250. // if we have no more items, return.
  251. if len(m.GetContinue()) == 0 {
  252. return nil
  253. }
  254. // set the next loop up
  255. options.Continue = m.GetContinue()
  256. }
  257. }