watch.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package watch
  14. import (
  15. "fmt"
  16. "sync"
  17. "k8s.io/klog/v2"
  18. "k8s.io/apimachinery/pkg/runtime"
  19. )
  20. // Interface can be implemented by anything that knows how to watch and report changes.
  21. type Interface interface {
  22. // Stop stops watching. Will close the channel returned by ResultChan(). Releases
  23. // any resources used by the watch.
  24. Stop()
  25. // ResultChan returns a chan which will receive all the events. If an error occurs
  26. // or Stop() is called, the implementation will close this channel and
  27. // release any resources used by the watch.
  28. ResultChan() <-chan Event
  29. }
  30. // EventType defines the possible types of events.
  31. type EventType string
  32. const (
  33. Added EventType = "ADDED"
  34. Modified EventType = "MODIFIED"
  35. Deleted EventType = "DELETED"
  36. Bookmark EventType = "BOOKMARK"
  37. Error EventType = "ERROR"
  38. )
  39. var (
  40. DefaultChanSize int32 = 100
  41. )
  42. // Event represents a single event to a watched resource.
  43. // +k8s:deepcopy-gen=true
  44. type Event struct {
  45. Type EventType
  46. // Object is:
  47. // * If Type is Added or Modified: the new state of the object.
  48. // * If Type is Deleted: the state of the object immediately before deletion.
  49. // * If Type is Bookmark: the object (instance of a type being watched) where
  50. // only ResourceVersion field is set. On successful restart of watch from a
  51. // bookmark resourceVersion, client is guaranteed to not get repeat event
  52. // nor miss any events.
  53. // * If Type is Error: *api.Status is recommended; other types may make sense
  54. // depending on context.
  55. Object runtime.Object
  56. }
  57. type emptyWatch chan Event
  58. // NewEmptyWatch returns a watch interface that returns no results and is closed.
  59. // May be used in certain error conditions where no information is available but
  60. // an error is not warranted.
  61. func NewEmptyWatch() Interface {
  62. ch := make(chan Event)
  63. close(ch)
  64. return emptyWatch(ch)
  65. }
  66. // Stop implements Interface
  67. func (w emptyWatch) Stop() {
  68. }
  69. // ResultChan implements Interface
  70. func (w emptyWatch) ResultChan() <-chan Event {
  71. return chan Event(w)
  72. }
  73. // FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
  74. type FakeWatcher struct {
  75. result chan Event
  76. stopped bool
  77. sync.Mutex
  78. }
  79. func NewFake() *FakeWatcher {
  80. return &FakeWatcher{
  81. result: make(chan Event),
  82. }
  83. }
  84. func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher {
  85. return &FakeWatcher{
  86. result: make(chan Event, size),
  87. }
  88. }
  89. // Stop implements Interface.Stop().
  90. func (f *FakeWatcher) Stop() {
  91. f.Lock()
  92. defer f.Unlock()
  93. if !f.stopped {
  94. klog.V(4).Infof("Stopping fake watcher.")
  95. close(f.result)
  96. f.stopped = true
  97. }
  98. }
  99. func (f *FakeWatcher) IsStopped() bool {
  100. f.Lock()
  101. defer f.Unlock()
  102. return f.stopped
  103. }
  104. // Reset prepares the watcher to be reused.
  105. func (f *FakeWatcher) Reset() {
  106. f.Lock()
  107. defer f.Unlock()
  108. f.stopped = false
  109. f.result = make(chan Event)
  110. }
  111. func (f *FakeWatcher) ResultChan() <-chan Event {
  112. return f.result
  113. }
  114. // Add sends an add event.
  115. func (f *FakeWatcher) Add(obj runtime.Object) {
  116. f.result <- Event{Added, obj}
  117. }
  118. // Modify sends a modify event.
  119. func (f *FakeWatcher) Modify(obj runtime.Object) {
  120. f.result <- Event{Modified, obj}
  121. }
  122. // Delete sends a delete event.
  123. func (f *FakeWatcher) Delete(lastValue runtime.Object) {
  124. f.result <- Event{Deleted, lastValue}
  125. }
  126. // Error sends an Error event.
  127. func (f *FakeWatcher) Error(errValue runtime.Object) {
  128. f.result <- Event{Error, errValue}
  129. }
  130. // Action sends an event of the requested type, for table-based testing.
  131. func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {
  132. f.result <- Event{action, obj}
  133. }
  134. // RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
  135. type RaceFreeFakeWatcher struct {
  136. result chan Event
  137. Stopped bool
  138. sync.Mutex
  139. }
  140. func NewRaceFreeFake() *RaceFreeFakeWatcher {
  141. return &RaceFreeFakeWatcher{
  142. result: make(chan Event, DefaultChanSize),
  143. }
  144. }
  145. // Stop implements Interface.Stop().
  146. func (f *RaceFreeFakeWatcher) Stop() {
  147. f.Lock()
  148. defer f.Unlock()
  149. if !f.Stopped {
  150. klog.V(4).Infof("Stopping fake watcher.")
  151. close(f.result)
  152. f.Stopped = true
  153. }
  154. }
  155. func (f *RaceFreeFakeWatcher) IsStopped() bool {
  156. f.Lock()
  157. defer f.Unlock()
  158. return f.Stopped
  159. }
  160. // Reset prepares the watcher to be reused.
  161. func (f *RaceFreeFakeWatcher) Reset() {
  162. f.Lock()
  163. defer f.Unlock()
  164. f.Stopped = false
  165. f.result = make(chan Event, DefaultChanSize)
  166. }
  167. func (f *RaceFreeFakeWatcher) ResultChan() <-chan Event {
  168. f.Lock()
  169. defer f.Unlock()
  170. return f.result
  171. }
  172. // Add sends an add event.
  173. func (f *RaceFreeFakeWatcher) Add(obj runtime.Object) {
  174. f.Lock()
  175. defer f.Unlock()
  176. if !f.Stopped {
  177. select {
  178. case f.result <- Event{Added, obj}:
  179. return
  180. default:
  181. panic(fmt.Errorf("channel full"))
  182. }
  183. }
  184. }
  185. // Modify sends a modify event.
  186. func (f *RaceFreeFakeWatcher) Modify(obj runtime.Object) {
  187. f.Lock()
  188. defer f.Unlock()
  189. if !f.Stopped {
  190. select {
  191. case f.result <- Event{Modified, obj}:
  192. return
  193. default:
  194. panic(fmt.Errorf("channel full"))
  195. }
  196. }
  197. }
  198. // Delete sends a delete event.
  199. func (f *RaceFreeFakeWatcher) Delete(lastValue runtime.Object) {
  200. f.Lock()
  201. defer f.Unlock()
  202. if !f.Stopped {
  203. select {
  204. case f.result <- Event{Deleted, lastValue}:
  205. return
  206. default:
  207. panic(fmt.Errorf("channel full"))
  208. }
  209. }
  210. }
  211. // Error sends an Error event.
  212. func (f *RaceFreeFakeWatcher) Error(errValue runtime.Object) {
  213. f.Lock()
  214. defer f.Unlock()
  215. if !f.Stopped {
  216. select {
  217. case f.result <- Event{Error, errValue}:
  218. return
  219. default:
  220. panic(fmt.Errorf("channel full"))
  221. }
  222. }
  223. }
  224. // Action sends an event of the requested type, for table-based testing.
  225. func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
  226. f.Lock()
  227. defer f.Unlock()
  228. if !f.Stopped {
  229. select {
  230. case f.result <- Event{action, obj}:
  231. return
  232. default:
  233. panic(fmt.Errorf("channel full"))
  234. }
  235. }
  236. }
  237. // ProxyWatcher lets you wrap your channel in watch Interface. threadsafe.
  238. type ProxyWatcher struct {
  239. result chan Event
  240. stopCh chan struct{}
  241. mutex sync.Mutex
  242. stopped bool
  243. }
  244. var _ Interface = &ProxyWatcher{}
  245. // NewProxyWatcher creates new ProxyWatcher by wrapping a channel
  246. func NewProxyWatcher(ch chan Event) *ProxyWatcher {
  247. return &ProxyWatcher{
  248. result: ch,
  249. stopCh: make(chan struct{}),
  250. stopped: false,
  251. }
  252. }
  253. // Stop implements Interface
  254. func (pw *ProxyWatcher) Stop() {
  255. pw.mutex.Lock()
  256. defer pw.mutex.Unlock()
  257. if !pw.stopped {
  258. pw.stopped = true
  259. close(pw.stopCh)
  260. }
  261. }
  262. // Stopping returns true if Stop() has been called
  263. func (pw *ProxyWatcher) Stopping() bool {
  264. pw.mutex.Lock()
  265. defer pw.mutex.Unlock()
  266. return pw.stopped
  267. }
  268. // ResultChan implements Interface
  269. func (pw *ProxyWatcher) ResultChan() <-chan Event {
  270. return pw.result
  271. }
  272. // StopChan returns stop channel
  273. func (pw *ProxyWatcher) StopChan() <-chan struct{} {
  274. return pw.stopCh
  275. }