cache.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. /*
  2. Copyright 2022 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package cached provides a cache mechanism based on etags to lazily
  14. // build, and/or cache results from expensive operation such that those
  15. // operations are not repeated unnecessarily. The operations can be
  16. // created as a tree, and replaced dynamically as needed.
  17. //
  18. // All the operations in this module are thread-safe.
  19. //
  20. // # Dependencies and types of caches
  21. //
  22. // This package uses a source/transform/sink model of caches to build
  23. // the dependency tree, and can be used as follows:
  24. // - [Func]: A source cache that recomputes the content every time.
  25. // - [Once]: A source cache that always produces the
  26. // same content, it is only called once.
  27. // - [Transform]: A cache that transforms data from one format to
  28. // another. It's only refreshed when the source changes.
  29. // - [Merge]: A cache that aggregates multiple caches in a map into one.
  30. // It's only refreshed when the source changes.
  31. // - [MergeList]: A cache that aggregates multiple caches in a list into one.
  32. // It's only refreshed when the source changes.
  33. // - [Atomic]: A cache adapter that atomically replaces the source with a new one.
  34. // - [LastSuccess]: A cache adapter that caches the last successful and returns
  35. // it if the next call fails. It extends [Atomic].
  36. //
  37. // # Etags
  38. //
  39. // Etags in this library is a cache version identifier. It doesn't
  40. // necessarily strictly match to the semantics of http `etags`, but are
  41. // somewhat inspired from it and function with the same principles.
  42. // Hashing the content is a good way to guarantee that your function is
  43. // never going to be called spuriously. In Kubernetes world, this could
  44. // be a `resourceVersion`, this can be an actual etag, a hash, a UUID
  45. // (if the cache always changes), or even a made-up string when the
  46. // content of the cache never changes.
  47. package cached
  48. import (
  49. "fmt"
  50. "sync"
  51. "sync/atomic"
  52. )
  53. // Value is wrapping a value behind a getter for lazy evaluation.
  54. type Value[T any] interface {
  55. Get() (value T, etag string, err error)
  56. }
  57. // Result is wrapping T and error into a struct for cases where a tuple is more
  58. // convenient or necessary in Golang.
  59. type Result[T any] struct {
  60. Value T
  61. Etag string
  62. Err error
  63. }
  64. func (r Result[T]) Get() (T, string, error) {
  65. return r.Value, r.Etag, r.Err
  66. }
  67. // Func wraps a (thread-safe) function as a Value[T].
  68. func Func[T any](fn func() (T, string, error)) Value[T] {
  69. return valueFunc[T](fn)
  70. }
  71. type valueFunc[T any] func() (T, string, error)
  72. func (c valueFunc[T]) Get() (T, string, error) {
  73. return c()
  74. }
  75. // Static returns constant values.
  76. func Static[T any](value T, etag string) Value[T] {
  77. return Result[T]{Value: value, Etag: etag}
  78. }
  79. // Merge merges a of cached values. The merge function only gets called if any of
  80. // the dependency has changed.
  81. //
  82. // If any of the dependency returned an error before, or any of the
  83. // dependency returned an error this time, or if the mergeFn failed
  84. // before, then the function is run again.
  85. //
  86. // Note that this assumes there is no "partial" merge, the merge
  87. // function will remerge all the dependencies together everytime. Since
  88. // the list of dependencies is constant, there is no way to save some
  89. // partial merge information either.
  90. //
  91. // Also note that Golang map iteration is not stable. If the mergeFn
  92. // depends on the order iteration to be stable, it will need to
  93. // implement its own sorting or iteration order.
  94. func Merge[K comparable, T, V any](mergeFn func(results map[K]Result[T]) (V, string, error), caches map[K]Value[T]) Value[V] {
  95. list := make([]Value[T], 0, len(caches))
  96. // map from index to key
  97. indexes := make(map[int]K, len(caches))
  98. i := 0
  99. for k := range caches {
  100. list = append(list, caches[k])
  101. indexes[i] = k
  102. i++
  103. }
  104. return MergeList(func(results []Result[T]) (V, string, error) {
  105. if len(results) != len(indexes) {
  106. panic(fmt.Errorf("invalid result length %d, expected %d", len(results), len(indexes)))
  107. }
  108. m := make(map[K]Result[T], len(results))
  109. for i := range results {
  110. m[indexes[i]] = results[i]
  111. }
  112. return mergeFn(m)
  113. }, list)
  114. }
  115. // MergeList merges a list of cached values. The function only gets called if
  116. // any of the dependency has changed.
  117. //
  118. // The benefit of ListMerger over the basic Merger is that caches are
  119. // stored in an ordered list so the order of the cache will be
  120. // preserved in the order of the results passed to the mergeFn.
  121. //
  122. // If any of the dependency returned an error before, or any of the
  123. // dependency returned an error this time, or if the mergeFn failed
  124. // before, then the function is reran.
  125. //
  126. // Note that this assumes there is no "partial" merge, the merge
  127. // function will remerge all the dependencies together everytime. Since
  128. // the list of dependencies is constant, there is no way to save some
  129. // partial merge information either.
  130. func MergeList[T, V any](mergeFn func(results []Result[T]) (V, string, error), delegates []Value[T]) Value[V] {
  131. return &listMerger[T, V]{
  132. mergeFn: mergeFn,
  133. delegates: delegates,
  134. }
  135. }
  136. type listMerger[T, V any] struct {
  137. lock sync.Mutex
  138. mergeFn func([]Result[T]) (V, string, error)
  139. delegates []Value[T]
  140. cache []Result[T]
  141. result Result[V]
  142. }
  143. func (c *listMerger[T, V]) prepareResultsLocked() []Result[T] {
  144. cacheResults := make([]Result[T], len(c.delegates))
  145. ch := make(chan struct {
  146. int
  147. Result[T]
  148. }, len(c.delegates))
  149. for i := range c.delegates {
  150. go func(index int) {
  151. value, etag, err := c.delegates[index].Get()
  152. ch <- struct {
  153. int
  154. Result[T]
  155. }{index, Result[T]{Value: value, Etag: etag, Err: err}}
  156. }(i)
  157. }
  158. for i := 0; i < len(c.delegates); i++ {
  159. res := <-ch
  160. cacheResults[res.int] = res.Result
  161. }
  162. return cacheResults
  163. }
  164. func (c *listMerger[T, V]) needsRunningLocked(results []Result[T]) bool {
  165. if c.cache == nil {
  166. return true
  167. }
  168. if c.result.Err != nil {
  169. return true
  170. }
  171. if len(results) != len(c.cache) {
  172. panic(fmt.Errorf("invalid number of results: %v (expected %v)", len(results), len(c.cache)))
  173. }
  174. for i, oldResult := range c.cache {
  175. newResult := results[i]
  176. if newResult.Etag != oldResult.Etag || newResult.Err != nil || oldResult.Err != nil {
  177. return true
  178. }
  179. }
  180. return false
  181. }
  182. func (c *listMerger[T, V]) Get() (V, string, error) {
  183. c.lock.Lock()
  184. defer c.lock.Unlock()
  185. cacheResults := c.prepareResultsLocked()
  186. if c.needsRunningLocked(cacheResults) {
  187. c.cache = cacheResults
  188. c.result.Value, c.result.Etag, c.result.Err = c.mergeFn(c.cache)
  189. }
  190. return c.result.Value, c.result.Etag, c.result.Err
  191. }
  192. // Transform the result of another cached value. The transformFn will only be called
  193. // if the source has updated, otherwise, the result will be returned.
  194. //
  195. // If the dependency returned an error before, or it returns an error
  196. // this time, or if the transformerFn failed before, the function is
  197. // reran.
  198. func Transform[T, V any](transformerFn func(T, string, error) (V, string, error), source Value[T]) Value[V] {
  199. return MergeList(func(delegates []Result[T]) (V, string, error) {
  200. if len(delegates) != 1 {
  201. panic(fmt.Errorf("invalid cache for transformer cache: %v", delegates))
  202. }
  203. return transformerFn(delegates[0].Value, delegates[0].Etag, delegates[0].Err)
  204. }, []Value[T]{source})
  205. }
  206. // Once calls Value[T].Get() lazily and only once, even in case of an error result.
  207. func Once[T any](d Value[T]) Value[T] {
  208. return &once[T]{
  209. data: d,
  210. }
  211. }
  212. type once[T any] struct {
  213. once sync.Once
  214. data Value[T]
  215. result Result[T]
  216. }
  217. func (c *once[T]) Get() (T, string, error) {
  218. c.once.Do(func() {
  219. c.result.Value, c.result.Etag, c.result.Err = c.data.Get()
  220. })
  221. return c.result.Value, c.result.Etag, c.result.Err
  222. }
  223. // Replaceable extends the Value[T] interface with the ability to change the
  224. // underlying Value[T] after construction.
  225. type Replaceable[T any] interface {
  226. Value[T]
  227. Store(Value[T])
  228. }
  229. // Atomic wraps a Value[T] as an atomic value that can be replaced. It implements
  230. // Replaceable[T].
  231. type Atomic[T any] struct {
  232. value atomic.Pointer[Value[T]]
  233. }
  234. var _ Replaceable[[]byte] = &Atomic[[]byte]{}
  235. func (x *Atomic[T]) Store(val Value[T]) { x.value.Store(&val) }
  236. func (x *Atomic[T]) Get() (T, string, error) { return (*x.value.Load()).Get() }
  237. // LastSuccess calls Value[T].Get(), but hides errors by returning the last
  238. // success if there has been any.
  239. type LastSuccess[T any] struct {
  240. Atomic[T]
  241. success atomic.Pointer[Result[T]]
  242. }
  243. var _ Replaceable[[]byte] = &LastSuccess[[]byte]{}
  244. func (c *LastSuccess[T]) Get() (T, string, error) {
  245. success := c.success.Load()
  246. value, etag, err := c.Atomic.Get()
  247. if err == nil {
  248. if success == nil {
  249. c.success.CompareAndSwap(nil, &Result[T]{Value: value, Etag: etag, Err: err})
  250. }
  251. return value, etag, err
  252. }
  253. if success != nil {
  254. return success.Value, success.Etag, success.Err
  255. }
  256. return value, etag, err
  257. }