ticker.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package backoff
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. )
  7. // Ticker holds a channel that delivers `ticks' of a clock at times reported by a BackOff.
  8. //
  9. // Ticks will continue to arrive when the previous operation is still running,
  10. // so operations that take a while to fail could run in quick succession.
  11. type Ticker struct {
  12. C <-chan time.Time
  13. c chan time.Time
  14. b BackOff
  15. ctx context.Context
  16. timer Timer
  17. stop chan struct{}
  18. stopOnce sync.Once
  19. }
  20. // NewTicker returns a new Ticker containing a channel that will send
  21. // the time at times specified by the BackOff argument. Ticker is
  22. // guaranteed to tick at least once. The channel is closed when Stop
  23. // method is called or BackOff stops. It is not safe to manipulate the
  24. // provided backoff policy (notably calling NextBackOff or Reset)
  25. // while the ticker is running.
  26. func NewTicker(b BackOff) *Ticker {
  27. return NewTickerWithTimer(b, &defaultTimer{})
  28. }
  29. // NewTickerWithTimer returns a new Ticker with a custom timer.
  30. // A default timer that uses system timer is used when nil is passed.
  31. func NewTickerWithTimer(b BackOff, timer Timer) *Ticker {
  32. if timer == nil {
  33. timer = &defaultTimer{}
  34. }
  35. c := make(chan time.Time)
  36. t := &Ticker{
  37. C: c,
  38. c: c,
  39. b: b,
  40. ctx: getContext(b),
  41. timer: timer,
  42. stop: make(chan struct{}),
  43. }
  44. t.b.Reset()
  45. go t.run()
  46. return t
  47. }
  48. // Stop turns off a ticker. After Stop, no more ticks will be sent.
  49. func (t *Ticker) Stop() {
  50. t.stopOnce.Do(func() { close(t.stop) })
  51. }
  52. func (t *Ticker) run() {
  53. c := t.c
  54. defer close(c)
  55. // Ticker is guaranteed to tick at least once.
  56. afterC := t.send(time.Now())
  57. for {
  58. if afterC == nil {
  59. return
  60. }
  61. select {
  62. case tick := <-afterC:
  63. afterC = t.send(tick)
  64. case <-t.stop:
  65. t.c = nil // Prevent future ticks from being sent to the channel.
  66. return
  67. case <-t.ctx.Done():
  68. return
  69. }
  70. }
  71. }
  72. func (t *Ticker) send(tick time.Time) <-chan time.Time {
  73. select {
  74. case t.c <- tick:
  75. case <-t.stop:
  76. return nil
  77. }
  78. next := t.b.NextBackOff()
  79. if next == Stop {
  80. t.Stop()
  81. return nil
  82. }
  83. t.timer.Start(next)
  84. return t.timer.C()
  85. }