watch.go 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package clientv3
  15. import (
  16. "context"
  17. "errors"
  18. "fmt"
  19. "sync"
  20. "time"
  21. pb "go.etcd.io/etcd/api/v3/etcdserverpb"
  22. "go.etcd.io/etcd/api/v3/mvccpb"
  23. v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
  24. "go.uber.org/zap"
  25. "google.golang.org/grpc"
  26. "google.golang.org/grpc/codes"
  27. "google.golang.org/grpc/metadata"
  28. "google.golang.org/grpc/status"
  29. )
  30. const (
  31. EventTypeDelete = mvccpb.DELETE
  32. EventTypePut = mvccpb.PUT
  33. closeSendErrTimeout = 250 * time.Millisecond
  34. // AutoWatchID is the watcher ID passed in WatchStream.Watch when no
  35. // user-provided ID is available. If pass, an ID will automatically be assigned.
  36. AutoWatchID = 0
  37. // InvalidWatchID represents an invalid watch ID and prevents duplication with an existing watch.
  38. InvalidWatchID = -1
  39. )
  40. type Event mvccpb.Event
  41. type WatchChan <-chan WatchResponse
  42. type Watcher interface {
  43. // Watch watches on a key or prefix. The watched events will be returned
  44. // through the returned channel. If revisions waiting to be sent over the
  45. // watch are compacted, then the watch will be canceled by the server, the
  46. // client will post a compacted error watch response, and the channel will close.
  47. // If the requested revision is 0 or unspecified, the returned channel will
  48. // return watch events that happen after the server receives the watch request.
  49. // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
  50. // and "WatchResponse" from this closed channel has zero events and nil "Err()".
  51. // The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
  52. // to release the associated resources.
  53. //
  54. // If the context is "context.Background/TODO", returned "WatchChan" will
  55. // not be closed and block until event is triggered, except when server
  56. // returns a non-recoverable error (e.g. ErrCompacted).
  57. // For example, when context passed with "WithRequireLeader" and the
  58. // connected server has no leader (e.g. due to network partition),
  59. // error "etcdserver: no leader" (ErrNoLeader) will be returned,
  60. // and then "WatchChan" is closed with non-nil "Err()".
  61. // In order to prevent a watch stream being stuck in a partitioned node,
  62. // make sure to wrap context with "WithRequireLeader".
  63. //
  64. // Otherwise, as long as the context has not been canceled or timed out,
  65. // watch will retry on other recoverable errors forever until reconnected.
  66. //
  67. // TODO: explicitly set context error in the last "WatchResponse" message and close channel?
  68. // Currently, client contexts are overwritten with "valCtx" that never closes.
  69. // TODO(v3.4): configure watch retry policy, limit maximum retry number
  70. // (see https://github.com/etcd-io/etcd/issues/8980)
  71. Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
  72. // RequestProgress requests a progress notify response be sent in all watch channels.
  73. RequestProgress(ctx context.Context) error
  74. // Close closes the watcher and cancels all watch requests.
  75. Close() error
  76. }
  77. type WatchResponse struct {
  78. Header pb.ResponseHeader
  79. Events []*Event
  80. // CompactRevision is the minimum revision the watcher may receive.
  81. CompactRevision int64
  82. // Canceled is used to indicate watch failure.
  83. // If the watch failed and the stream was about to close, before the channel is closed,
  84. // the channel sends a final response that has Canceled set to true with a non-nil Err().
  85. Canceled bool
  86. // Created is used to indicate the creation of the watcher.
  87. Created bool
  88. closeErr error
  89. // cancelReason is a reason of canceling watch
  90. cancelReason string
  91. }
  92. // IsCreate returns true if the event tells that the key is newly created.
  93. func (e *Event) IsCreate() bool {
  94. return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
  95. }
  96. // IsModify returns true if the event tells that a new value is put on existing key.
  97. func (e *Event) IsModify() bool {
  98. return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
  99. }
  100. // Err is the error value if this WatchResponse holds an error.
  101. func (wr *WatchResponse) Err() error {
  102. switch {
  103. case wr.closeErr != nil:
  104. return v3rpc.Error(wr.closeErr)
  105. case wr.CompactRevision != 0:
  106. return v3rpc.ErrCompacted
  107. case wr.Canceled:
  108. if len(wr.cancelReason) != 0 {
  109. return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason))
  110. }
  111. return v3rpc.ErrFutureRev
  112. }
  113. return nil
  114. }
  115. // IsProgressNotify returns true if the WatchResponse is progress notification.
  116. func (wr *WatchResponse) IsProgressNotify() bool {
  117. return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0
  118. }
  119. // watcher implements the Watcher interface
  120. type watcher struct {
  121. remote pb.WatchClient
  122. callOpts []grpc.CallOption
  123. // mu protects the grpc streams map
  124. mu sync.Mutex
  125. // streams holds all the active grpc streams keyed by ctx value.
  126. streams map[string]*watchGrpcStream
  127. lg *zap.Logger
  128. }
  129. // watchGrpcStream tracks all watch resources attached to a single grpc stream.
  130. type watchGrpcStream struct {
  131. owner *watcher
  132. remote pb.WatchClient
  133. callOpts []grpc.CallOption
  134. // ctx controls internal remote.Watch requests
  135. ctx context.Context
  136. // ctxKey is the key used when looking up this stream's context
  137. ctxKey string
  138. cancel context.CancelFunc
  139. // substreams holds all active watchers on this grpc stream
  140. substreams map[int64]*watcherStream
  141. // resuming holds all resuming watchers on this grpc stream
  142. resuming []*watcherStream
  143. // reqc sends a watch request from Watch() to the main goroutine
  144. reqc chan watchStreamRequest
  145. // respc receives data from the watch client
  146. respc chan *pb.WatchResponse
  147. // donec closes to broadcast shutdown
  148. donec chan struct{}
  149. // errc transmits errors from grpc Recv to the watch stream reconnect logic
  150. errc chan error
  151. // closingc gets the watcherStream of closing watchers
  152. closingc chan *watcherStream
  153. // wg is Done when all substream goroutines have exited
  154. wg sync.WaitGroup
  155. // resumec closes to signal that all substreams should begin resuming
  156. resumec chan struct{}
  157. // closeErr is the error that closed the watch stream
  158. closeErr error
  159. lg *zap.Logger
  160. }
  161. // watchStreamRequest is a union of the supported watch request operation types
  162. type watchStreamRequest interface {
  163. toPB() *pb.WatchRequest
  164. }
  165. // watchRequest is issued by the subscriber to start a new watcher
  166. type watchRequest struct {
  167. ctx context.Context
  168. key string
  169. end string
  170. rev int64
  171. // send created notification event if this field is true
  172. createdNotify bool
  173. // progressNotify is for progress updates
  174. progressNotify bool
  175. // fragmentation should be disabled by default
  176. // if true, split watch events when total exceeds
  177. // "--max-request-bytes" flag value + 512-byte
  178. fragment bool
  179. // filters is the list of events to filter out
  180. filters []pb.WatchCreateRequest_FilterType
  181. // get the previous key-value pair before the event happens
  182. prevKV bool
  183. // retc receives a chan WatchResponse once the watcher is established
  184. retc chan chan WatchResponse
  185. }
  186. // progressRequest is issued by the subscriber to request watch progress
  187. type progressRequest struct {
  188. }
  189. // watcherStream represents a registered watcher
  190. type watcherStream struct {
  191. // initReq is the request that initiated this request
  192. initReq watchRequest
  193. // outc publishes watch responses to subscriber
  194. outc chan WatchResponse
  195. // recvc buffers watch responses before publishing
  196. recvc chan *WatchResponse
  197. // donec closes when the watcherStream goroutine stops.
  198. donec chan struct{}
  199. // closing is set to true when stream should be scheduled to shutdown.
  200. closing bool
  201. // id is the registered watch id on the grpc stream
  202. id int64
  203. // buf holds all events received from etcd but not yet consumed by the client
  204. buf []*WatchResponse
  205. }
  206. func NewWatcher(c *Client) Watcher {
  207. return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
  208. }
  209. func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
  210. w := &watcher{
  211. remote: wc,
  212. streams: make(map[string]*watchGrpcStream),
  213. }
  214. if c != nil {
  215. w.callOpts = c.callOpts
  216. w.lg = c.lg
  217. }
  218. return w
  219. }
  220. // never closes
  221. var valCtxCh = make(chan struct{})
  222. var zeroTime = time.Unix(0, 0)
  223. // ctx with only the values; never Done
  224. type valCtx struct{ context.Context }
  225. func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
  226. func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
  227. func (vc *valCtx) Err() error { return nil }
  228. func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
  229. ctx, cancel := context.WithCancel(&valCtx{inctx})
  230. wgs := &watchGrpcStream{
  231. owner: w,
  232. remote: w.remote,
  233. callOpts: w.callOpts,
  234. ctx: ctx,
  235. ctxKey: streamKeyFromCtx(inctx),
  236. cancel: cancel,
  237. substreams: make(map[int64]*watcherStream),
  238. respc: make(chan *pb.WatchResponse),
  239. reqc: make(chan watchStreamRequest),
  240. donec: make(chan struct{}),
  241. errc: make(chan error, 1),
  242. closingc: make(chan *watcherStream),
  243. resumec: make(chan struct{}),
  244. lg: w.lg,
  245. }
  246. go wgs.run()
  247. return wgs
  248. }
  249. // Watch posts a watch request to run() and waits for a new watcher channel
  250. func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
  251. ow := opWatch(key, opts...)
  252. var filters []pb.WatchCreateRequest_FilterType
  253. if ow.filterPut {
  254. filters = append(filters, pb.WatchCreateRequest_NOPUT)
  255. }
  256. if ow.filterDelete {
  257. filters = append(filters, pb.WatchCreateRequest_NODELETE)
  258. }
  259. wr := &watchRequest{
  260. ctx: ctx,
  261. createdNotify: ow.createdNotify,
  262. key: string(ow.key),
  263. end: string(ow.end),
  264. rev: ow.rev,
  265. progressNotify: ow.progressNotify,
  266. fragment: ow.fragment,
  267. filters: filters,
  268. prevKV: ow.prevKV,
  269. retc: make(chan chan WatchResponse, 1),
  270. }
  271. ok := false
  272. ctxKey := streamKeyFromCtx(ctx)
  273. var closeCh chan WatchResponse
  274. for {
  275. // find or allocate appropriate grpc watch stream
  276. w.mu.Lock()
  277. if w.streams == nil {
  278. // closed
  279. w.mu.Unlock()
  280. ch := make(chan WatchResponse)
  281. close(ch)
  282. return ch
  283. }
  284. wgs := w.streams[ctxKey]
  285. if wgs == nil {
  286. wgs = w.newWatcherGrpcStream(ctx)
  287. w.streams[ctxKey] = wgs
  288. }
  289. donec := wgs.donec
  290. reqc := wgs.reqc
  291. w.mu.Unlock()
  292. // couldn't create channel; return closed channel
  293. if closeCh == nil {
  294. closeCh = make(chan WatchResponse, 1)
  295. }
  296. // submit request
  297. select {
  298. case reqc <- wr:
  299. ok = true
  300. case <-wr.ctx.Done():
  301. ok = false
  302. case <-donec:
  303. ok = false
  304. if wgs.closeErr != nil {
  305. closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
  306. break
  307. }
  308. // retry; may have dropped stream from no ctxs
  309. continue
  310. }
  311. // receive channel
  312. if ok {
  313. select {
  314. case ret := <-wr.retc:
  315. return ret
  316. case <-ctx.Done():
  317. case <-donec:
  318. if wgs.closeErr != nil {
  319. closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
  320. break
  321. }
  322. // retry; may have dropped stream from no ctxs
  323. continue
  324. }
  325. }
  326. break
  327. }
  328. close(closeCh)
  329. return closeCh
  330. }
  331. func (w *watcher) Close() (err error) {
  332. w.mu.Lock()
  333. streams := w.streams
  334. w.streams = nil
  335. w.mu.Unlock()
  336. for _, wgs := range streams {
  337. if werr := wgs.close(); werr != nil {
  338. err = werr
  339. }
  340. }
  341. // Consider context.Canceled as a successful close
  342. if err == context.Canceled {
  343. err = nil
  344. }
  345. return err
  346. }
  347. // RequestProgress requests a progress notify response be sent in all watch channels.
  348. func (w *watcher) RequestProgress(ctx context.Context) (err error) {
  349. ctxKey := streamKeyFromCtx(ctx)
  350. w.mu.Lock()
  351. if w.streams == nil {
  352. w.mu.Unlock()
  353. return fmt.Errorf("no stream found for context")
  354. }
  355. wgs := w.streams[ctxKey]
  356. if wgs == nil {
  357. wgs = w.newWatcherGrpcStream(ctx)
  358. w.streams[ctxKey] = wgs
  359. }
  360. donec := wgs.donec
  361. reqc := wgs.reqc
  362. w.mu.Unlock()
  363. pr := &progressRequest{}
  364. select {
  365. case reqc <- pr:
  366. return nil
  367. case <-ctx.Done():
  368. return ctx.Err()
  369. case <-donec:
  370. if wgs.closeErr != nil {
  371. return wgs.closeErr
  372. }
  373. // retry; may have dropped stream from no ctxs
  374. return w.RequestProgress(ctx)
  375. }
  376. }
  377. func (w *watchGrpcStream) close() (err error) {
  378. w.cancel()
  379. <-w.donec
  380. select {
  381. case err = <-w.errc:
  382. default:
  383. }
  384. return toErr(w.ctx, err)
  385. }
  386. func (w *watcher) closeStream(wgs *watchGrpcStream) {
  387. w.mu.Lock()
  388. close(wgs.donec)
  389. wgs.cancel()
  390. if w.streams != nil {
  391. delete(w.streams, wgs.ctxKey)
  392. }
  393. w.mu.Unlock()
  394. }
  395. func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
  396. // check watch ID for backward compatibility (<= v3.3)
  397. if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
  398. w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
  399. // failed; no channel
  400. close(ws.recvc)
  401. return
  402. }
  403. ws.id = resp.WatchId
  404. w.substreams[ws.id] = ws
  405. }
  406. func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
  407. select {
  408. case ws.outc <- *resp:
  409. case <-ws.initReq.ctx.Done():
  410. case <-time.After(closeSendErrTimeout):
  411. }
  412. close(ws.outc)
  413. }
  414. func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
  415. // send channel response in case stream was never established
  416. select {
  417. case ws.initReq.retc <- ws.outc:
  418. default:
  419. }
  420. // close subscriber's channel
  421. if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
  422. go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
  423. } else if ws.outc != nil {
  424. close(ws.outc)
  425. }
  426. if ws.id != InvalidWatchID {
  427. delete(w.substreams, ws.id)
  428. return
  429. }
  430. for i := range w.resuming {
  431. if w.resuming[i] == ws {
  432. w.resuming[i] = nil
  433. return
  434. }
  435. }
  436. }
  437. // run is the root of the goroutines for managing a watcher client
  438. func (w *watchGrpcStream) run() {
  439. var wc pb.Watch_WatchClient
  440. var closeErr error
  441. // substreams marked to close but goroutine still running; needed for
  442. // avoiding double-closing recvc on grpc stream teardown
  443. closing := make(map[*watcherStream]struct{})
  444. defer func() {
  445. w.closeErr = closeErr
  446. // shutdown substreams and resuming substreams
  447. for _, ws := range w.substreams {
  448. if _, ok := closing[ws]; !ok {
  449. close(ws.recvc)
  450. closing[ws] = struct{}{}
  451. }
  452. }
  453. for _, ws := range w.resuming {
  454. if _, ok := closing[ws]; ws != nil && !ok {
  455. close(ws.recvc)
  456. closing[ws] = struct{}{}
  457. }
  458. }
  459. w.joinSubstreams()
  460. for range closing {
  461. w.closeSubstream(<-w.closingc)
  462. }
  463. w.wg.Wait()
  464. w.owner.closeStream(w)
  465. }()
  466. // start a stream with the etcd grpc server
  467. if wc, closeErr = w.newWatchClient(); closeErr != nil {
  468. return
  469. }
  470. cancelSet := make(map[int64]struct{})
  471. var cur *pb.WatchResponse
  472. backoff := time.Millisecond
  473. for {
  474. select {
  475. // Watch() requested
  476. case req := <-w.reqc:
  477. switch wreq := req.(type) {
  478. case *watchRequest:
  479. outc := make(chan WatchResponse, 1)
  480. // TODO: pass custom watch ID?
  481. ws := &watcherStream{
  482. initReq: *wreq,
  483. id: InvalidWatchID,
  484. outc: outc,
  485. // unbuffered so resumes won't cause repeat events
  486. recvc: make(chan *WatchResponse),
  487. }
  488. ws.donec = make(chan struct{})
  489. w.wg.Add(1)
  490. go w.serveSubstream(ws, w.resumec)
  491. // queue up for watcher creation/resume
  492. w.resuming = append(w.resuming, ws)
  493. if len(w.resuming) == 1 {
  494. // head of resume queue, can register a new watcher
  495. if err := wc.Send(ws.initReq.toPB()); err != nil {
  496. w.lg.Debug("error when sending request", zap.Error(err))
  497. }
  498. }
  499. case *progressRequest:
  500. if err := wc.Send(wreq.toPB()); err != nil {
  501. w.lg.Debug("error when sending request", zap.Error(err))
  502. }
  503. }
  504. // new events from the watch client
  505. case pbresp := <-w.respc:
  506. if cur == nil || pbresp.Created || pbresp.Canceled {
  507. cur = pbresp
  508. } else if cur != nil && cur.WatchId == pbresp.WatchId {
  509. // merge new events
  510. cur.Events = append(cur.Events, pbresp.Events...)
  511. // update "Fragment" field; last response with "Fragment" == false
  512. cur.Fragment = pbresp.Fragment
  513. }
  514. switch {
  515. case pbresp.Created:
  516. // response to head of queue creation
  517. if len(w.resuming) != 0 {
  518. if ws := w.resuming[0]; ws != nil {
  519. w.addSubstream(pbresp, ws)
  520. w.dispatchEvent(pbresp)
  521. w.resuming[0] = nil
  522. }
  523. }
  524. if ws := w.nextResume(); ws != nil {
  525. if err := wc.Send(ws.initReq.toPB()); err != nil {
  526. w.lg.Debug("error when sending request", zap.Error(err))
  527. }
  528. }
  529. // reset for next iteration
  530. cur = nil
  531. case pbresp.Canceled && pbresp.CompactRevision == 0:
  532. delete(cancelSet, pbresp.WatchId)
  533. if ws, ok := w.substreams[pbresp.WatchId]; ok {
  534. // signal to stream goroutine to update closingc
  535. close(ws.recvc)
  536. closing[ws] = struct{}{}
  537. }
  538. // reset for next iteration
  539. cur = nil
  540. case cur.Fragment:
  541. // watch response events are still fragmented
  542. // continue to fetch next fragmented event arrival
  543. continue
  544. default:
  545. // dispatch to appropriate watch stream
  546. ok := w.dispatchEvent(cur)
  547. // reset for next iteration
  548. cur = nil
  549. if ok {
  550. break
  551. }
  552. // watch response on unexpected watch id; cancel id
  553. if _, ok := cancelSet[pbresp.WatchId]; ok {
  554. break
  555. }
  556. cancelSet[pbresp.WatchId] = struct{}{}
  557. cr := &pb.WatchRequest_CancelRequest{
  558. CancelRequest: &pb.WatchCancelRequest{
  559. WatchId: pbresp.WatchId,
  560. },
  561. }
  562. req := &pb.WatchRequest{RequestUnion: cr}
  563. w.lg.Debug("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId))
  564. if err := wc.Send(req); err != nil {
  565. w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err))
  566. }
  567. }
  568. // watch client failed on Recv; spawn another if possible
  569. case err := <-w.errc:
  570. if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
  571. closeErr = err
  572. return
  573. }
  574. backoff = w.backoffIfUnavailable(backoff, err)
  575. if wc, closeErr = w.newWatchClient(); closeErr != nil {
  576. return
  577. }
  578. if ws := w.nextResume(); ws != nil {
  579. if err := wc.Send(ws.initReq.toPB()); err != nil {
  580. w.lg.Debug("error when sending request", zap.Error(err))
  581. }
  582. }
  583. cancelSet = make(map[int64]struct{})
  584. case <-w.ctx.Done():
  585. return
  586. case ws := <-w.closingc:
  587. w.closeSubstream(ws)
  588. delete(closing, ws)
  589. // no more watchers on this stream, shutdown, skip cancellation
  590. if len(w.substreams)+len(w.resuming) == 0 {
  591. return
  592. }
  593. if ws.id != InvalidWatchID {
  594. // client is closing an established watch; close it on the server proactively instead of waiting
  595. // to close when the next message arrives
  596. cancelSet[ws.id] = struct{}{}
  597. cr := &pb.WatchRequest_CancelRequest{
  598. CancelRequest: &pb.WatchCancelRequest{
  599. WatchId: ws.id,
  600. },
  601. }
  602. req := &pb.WatchRequest{RequestUnion: cr}
  603. w.lg.Debug("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id))
  604. if err := wc.Send(req); err != nil {
  605. w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
  606. }
  607. }
  608. }
  609. }
  610. }
  611. // nextResume chooses the next resuming to register with the grpc stream. Abandoned
  612. // streams are marked as nil in the queue since the head must wait for its inflight registration.
  613. func (w *watchGrpcStream) nextResume() *watcherStream {
  614. for len(w.resuming) != 0 {
  615. if w.resuming[0] != nil {
  616. return w.resuming[0]
  617. }
  618. w.resuming = w.resuming[1:len(w.resuming)]
  619. }
  620. return nil
  621. }
  622. // dispatchEvent sends a WatchResponse to the appropriate watcher stream
  623. func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
  624. events := make([]*Event, len(pbresp.Events))
  625. for i, ev := range pbresp.Events {
  626. events[i] = (*Event)(ev)
  627. }
  628. // TODO: return watch ID?
  629. wr := &WatchResponse{
  630. Header: *pbresp.Header,
  631. Events: events,
  632. CompactRevision: pbresp.CompactRevision,
  633. Created: pbresp.Created,
  634. Canceled: pbresp.Canceled,
  635. cancelReason: pbresp.CancelReason,
  636. }
  637. // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of InvalidWatchID to
  638. // indicate they should be broadcast.
  639. if wr.IsProgressNotify() && pbresp.WatchId == InvalidWatchID {
  640. return w.broadcastResponse(wr)
  641. }
  642. return w.unicastResponse(wr, pbresp.WatchId)
  643. }
  644. // broadcastResponse send a watch response to all watch substreams.
  645. func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
  646. for _, ws := range w.substreams {
  647. select {
  648. case ws.recvc <- wr:
  649. case <-ws.donec:
  650. }
  651. }
  652. return true
  653. }
  654. // unicastResponse sends a watch response to a specific watch substream.
  655. func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
  656. ws, ok := w.substreams[watchId]
  657. if !ok {
  658. return false
  659. }
  660. select {
  661. case ws.recvc <- wr:
  662. case <-ws.donec:
  663. return false
  664. }
  665. return true
  666. }
  667. // serveWatchClient forwards messages from the grpc stream to run()
  668. func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
  669. for {
  670. resp, err := wc.Recv()
  671. if err != nil {
  672. select {
  673. case w.errc <- err:
  674. case <-w.donec:
  675. }
  676. return
  677. }
  678. select {
  679. case w.respc <- resp:
  680. case <-w.donec:
  681. return
  682. }
  683. }
  684. }
  685. // serveSubstream forwards watch responses from run() to the subscriber
  686. func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
  687. if ws.closing {
  688. panic("created substream goroutine but substream is closing")
  689. }
  690. // nextRev is the minimum expected next revision
  691. nextRev := ws.initReq.rev
  692. resuming := false
  693. defer func() {
  694. if !resuming {
  695. ws.closing = true
  696. }
  697. close(ws.donec)
  698. if !resuming {
  699. w.closingc <- ws
  700. }
  701. w.wg.Done()
  702. }()
  703. emptyWr := &WatchResponse{}
  704. for {
  705. curWr := emptyWr
  706. outc := ws.outc
  707. if len(ws.buf) > 0 {
  708. curWr = ws.buf[0]
  709. } else {
  710. outc = nil
  711. }
  712. select {
  713. case outc <- *curWr:
  714. if ws.buf[0].Err() != nil {
  715. return
  716. }
  717. ws.buf[0] = nil
  718. ws.buf = ws.buf[1:]
  719. case wr, ok := <-ws.recvc:
  720. if !ok {
  721. // shutdown from closeSubstream
  722. return
  723. }
  724. if wr.Created {
  725. if ws.initReq.retc != nil {
  726. ws.initReq.retc <- ws.outc
  727. // to prevent next write from taking the slot in buffered channel
  728. // and posting duplicate create events
  729. ws.initReq.retc = nil
  730. // send first creation event only if requested
  731. if ws.initReq.createdNotify {
  732. ws.outc <- *wr
  733. }
  734. // once the watch channel is returned, a current revision
  735. // watch must resume at the store revision. This is necessary
  736. // for the following case to work as expected:
  737. // wch := m1.Watch("a")
  738. // m2.Put("a", "b")
  739. // <-wch
  740. // If the revision is only bound on the first observed event,
  741. // if wch is disconnected before the Put is issued, then reconnects
  742. // after it is committed, it'll miss the Put.
  743. if ws.initReq.rev == 0 {
  744. nextRev = wr.Header.Revision
  745. }
  746. }
  747. } else {
  748. // current progress of watch; <= store revision
  749. nextRev = wr.Header.Revision + 1
  750. }
  751. if len(wr.Events) > 0 {
  752. nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
  753. }
  754. ws.initReq.rev = nextRev
  755. // created event is already sent above,
  756. // watcher should not post duplicate events
  757. if wr.Created {
  758. continue
  759. }
  760. // TODO pause channel if buffer gets too large
  761. ws.buf = append(ws.buf, wr)
  762. case <-w.ctx.Done():
  763. return
  764. case <-ws.initReq.ctx.Done():
  765. return
  766. case <-resumec:
  767. resuming = true
  768. return
  769. }
  770. }
  771. // lazily send cancel message if events on missing id
  772. }
  773. func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
  774. // mark all substreams as resuming
  775. close(w.resumec)
  776. w.resumec = make(chan struct{})
  777. w.joinSubstreams()
  778. for _, ws := range w.substreams {
  779. ws.id = InvalidWatchID
  780. w.resuming = append(w.resuming, ws)
  781. }
  782. // strip out nils, if any
  783. var resuming []*watcherStream
  784. for _, ws := range w.resuming {
  785. if ws != nil {
  786. resuming = append(resuming, ws)
  787. }
  788. }
  789. w.resuming = resuming
  790. w.substreams = make(map[int64]*watcherStream)
  791. // connect to grpc stream while accepting watcher cancelation
  792. stopc := make(chan struct{})
  793. donec := w.waitCancelSubstreams(stopc)
  794. wc, err := w.openWatchClient()
  795. close(stopc)
  796. <-donec
  797. // serve all non-closing streams, even if there's a client error
  798. // so that the teardown path can shutdown the streams as expected.
  799. for _, ws := range w.resuming {
  800. if ws.closing {
  801. continue
  802. }
  803. ws.donec = make(chan struct{})
  804. w.wg.Add(1)
  805. go w.serveSubstream(ws, w.resumec)
  806. }
  807. if err != nil {
  808. return nil, v3rpc.Error(err)
  809. }
  810. // receive data from new grpc stream
  811. go w.serveWatchClient(wc)
  812. return wc, nil
  813. }
  814. func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
  815. var wg sync.WaitGroup
  816. wg.Add(len(w.resuming))
  817. donec := make(chan struct{})
  818. for i := range w.resuming {
  819. go func(ws *watcherStream) {
  820. defer wg.Done()
  821. if ws.closing {
  822. if ws.initReq.ctx.Err() != nil && ws.outc != nil {
  823. close(ws.outc)
  824. ws.outc = nil
  825. }
  826. return
  827. }
  828. select {
  829. case <-ws.initReq.ctx.Done():
  830. // closed ws will be removed from resuming
  831. ws.closing = true
  832. close(ws.outc)
  833. ws.outc = nil
  834. w.wg.Add(1)
  835. go func() {
  836. defer w.wg.Done()
  837. w.closingc <- ws
  838. }()
  839. case <-stopc:
  840. }
  841. }(w.resuming[i])
  842. }
  843. go func() {
  844. defer close(donec)
  845. wg.Wait()
  846. }()
  847. return donec
  848. }
  849. // joinSubstreams waits for all substream goroutines to complete.
  850. func (w *watchGrpcStream) joinSubstreams() {
  851. for _, ws := range w.substreams {
  852. <-ws.donec
  853. }
  854. for _, ws := range w.resuming {
  855. if ws != nil {
  856. <-ws.donec
  857. }
  858. }
  859. }
  860. var maxBackoff = 100 * time.Millisecond
  861. func (w *watchGrpcStream) backoffIfUnavailable(backoff time.Duration, err error) time.Duration {
  862. if isUnavailableErr(w.ctx, err) {
  863. // retry, but backoff
  864. if backoff < maxBackoff {
  865. // 25% backoff factor
  866. backoff = backoff + backoff/4
  867. if backoff > maxBackoff {
  868. backoff = maxBackoff
  869. }
  870. }
  871. time.Sleep(backoff)
  872. }
  873. return backoff
  874. }
  875. // openWatchClient retries opening a watch client until success or halt.
  876. // manually retry in case "ws==nil && err==nil"
  877. // TODO: remove FailFast=false
  878. func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
  879. backoff := time.Millisecond
  880. for {
  881. select {
  882. case <-w.ctx.Done():
  883. if err == nil {
  884. return nil, w.ctx.Err()
  885. }
  886. return nil, err
  887. default:
  888. }
  889. if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
  890. break
  891. }
  892. if isHaltErr(w.ctx, err) {
  893. return nil, v3rpc.Error(err)
  894. }
  895. backoff = w.backoffIfUnavailable(backoff, err)
  896. }
  897. return ws, nil
  898. }
  899. // toPB converts an internal watch request structure to its protobuf WatchRequest structure.
  900. func (wr *watchRequest) toPB() *pb.WatchRequest {
  901. req := &pb.WatchCreateRequest{
  902. StartRevision: wr.rev,
  903. Key: []byte(wr.key),
  904. RangeEnd: []byte(wr.end),
  905. ProgressNotify: wr.progressNotify,
  906. Filters: wr.filters,
  907. PrevKv: wr.prevKV,
  908. Fragment: wr.fragment,
  909. }
  910. cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
  911. return &pb.WatchRequest{RequestUnion: cr}
  912. }
  913. // toPB converts an internal progress request structure to its protobuf WatchRequest structure.
  914. func (pr *progressRequest) toPB() *pb.WatchRequest {
  915. req := &pb.WatchProgressRequest{}
  916. cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
  917. return &pb.WatchRequest{RequestUnion: cr}
  918. }
  919. func streamKeyFromCtx(ctx context.Context) string {
  920. if md, ok := metadata.FromOutgoingContext(ctx); ok {
  921. return fmt.Sprintf("%+v", md)
  922. }
  923. return ""
  924. }