idle.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. /*
  2. *
  3. * Copyright 2023 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package idle contains a component for managing idleness (entering and exiting)
  19. // based on RPC activity.
  20. package idle
  21. import (
  22. "fmt"
  23. "math"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. "google.golang.org/grpc/grpclog"
  28. )
  29. // For overriding in unit tests.
  30. var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
  31. return time.AfterFunc(d, f)
  32. }
  33. // Enforcer is the functionality provided by grpc.ClientConn to enter
  34. // and exit from idle mode.
  35. type Enforcer interface {
  36. ExitIdleMode() error
  37. EnterIdleMode() error
  38. }
  39. // Manager defines the functionality required to track RPC activity on a
  40. // channel.
  41. type Manager interface {
  42. OnCallBegin() error
  43. OnCallEnd()
  44. Close()
  45. }
  46. type noopManager struct{}
  47. func (noopManager) OnCallBegin() error { return nil }
  48. func (noopManager) OnCallEnd() {}
  49. func (noopManager) Close() {}
  50. // manager implements the Manager interface. It uses atomic operations to
  51. // synchronize access to shared state and a mutex to guarantee mutual exclusion
  52. // in a critical section.
  53. type manager struct {
  54. // State accessed atomically.
  55. lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed.
  56. activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there.
  57. activeSinceLastTimerCheck int32 // Boolean; True if there was an RPC since the last timer callback.
  58. closed int32 // Boolean; True when the manager is closed.
  59. // Can be accessed without atomics or mutex since these are set at creation
  60. // time and read-only after that.
  61. enforcer Enforcer // Functionality provided by grpc.ClientConn.
  62. timeout int64 // Idle timeout duration nanos stored as an int64.
  63. logger grpclog.LoggerV2
  64. // idleMu is used to guarantee mutual exclusion in two scenarios:
  65. // - Opposing intentions:
  66. // - a: Idle timeout has fired and handleIdleTimeout() is trying to put
  67. // the channel in idle mode because the channel has been inactive.
  68. // - b: At the same time an RPC is made on the channel, and OnCallBegin()
  69. // is trying to prevent the channel from going idle.
  70. // - Competing intentions:
  71. // - The channel is in idle mode and there are multiple RPCs starting at
  72. // the same time, all trying to move the channel out of idle. Only one
  73. // of them should succeed in doing so, while the other RPCs should
  74. // piggyback on the first one and be successfully handled.
  75. idleMu sync.RWMutex
  76. actuallyIdle bool
  77. timer *time.Timer
  78. }
  79. // ManagerOptions is a collection of options used by
  80. // NewManager.
  81. type ManagerOptions struct {
  82. Enforcer Enforcer
  83. Timeout time.Duration
  84. Logger grpclog.LoggerV2
  85. }
  86. // NewManager creates a new idleness manager implementation for the
  87. // given idle timeout.
  88. func NewManager(opts ManagerOptions) Manager {
  89. if opts.Timeout == 0 {
  90. return noopManager{}
  91. }
  92. m := &manager{
  93. enforcer: opts.Enforcer,
  94. timeout: int64(opts.Timeout),
  95. logger: opts.Logger,
  96. }
  97. m.timer = timeAfterFunc(opts.Timeout, m.handleIdleTimeout)
  98. return m
  99. }
  100. // resetIdleTimer resets the idle timer to the given duration. This method
  101. // should only be called from the timer callback.
  102. func (m *manager) resetIdleTimer(d time.Duration) {
  103. m.idleMu.Lock()
  104. defer m.idleMu.Unlock()
  105. if m.timer == nil {
  106. // Only close sets timer to nil. We are done.
  107. return
  108. }
  109. // It is safe to ignore the return value from Reset() because this method is
  110. // only ever called from the timer callback, which means the timer has
  111. // already fired.
  112. m.timer.Reset(d)
  113. }
  114. // handleIdleTimeout is the timer callback that is invoked upon expiry of the
  115. // configured idle timeout. The channel is considered inactive if there are no
  116. // ongoing calls and no RPC activity since the last time the timer fired.
  117. func (m *manager) handleIdleTimeout() {
  118. if m.isClosed() {
  119. return
  120. }
  121. if atomic.LoadInt32(&m.activeCallsCount) > 0 {
  122. m.resetIdleTimer(time.Duration(m.timeout))
  123. return
  124. }
  125. // There has been activity on the channel since we last got here. Reset the
  126. // timer and return.
  127. if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
  128. // Set the timer to fire after a duration of idle timeout, calculated
  129. // from the time the most recent RPC completed.
  130. atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0)
  131. m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime) + m.timeout - time.Now().UnixNano()))
  132. return
  133. }
  134. // This CAS operation is extremely likely to succeed given that there has
  135. // been no activity since the last time we were here. Setting the
  136. // activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() that the
  137. // channel is either in idle mode or is trying to get there.
  138. if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) {
  139. // This CAS operation can fail if an RPC started after we checked for
  140. // activity at the top of this method, or one was ongoing from before
  141. // the last time we were here. In both case, reset the timer and return.
  142. m.resetIdleTimer(time.Duration(m.timeout))
  143. return
  144. }
  145. // Now that we've set the active calls count to -math.MaxInt32, it's time to
  146. // actually move to idle mode.
  147. if m.tryEnterIdleMode() {
  148. // Successfully entered idle mode. No timer needed until we exit idle.
  149. return
  150. }
  151. // Failed to enter idle mode due to a concurrent RPC that kept the channel
  152. // active, or because of an error from the channel. Undo the attempt to
  153. // enter idle, and reset the timer to try again later.
  154. atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
  155. m.resetIdleTimer(time.Duration(m.timeout))
  156. }
  157. // tryEnterIdleMode instructs the channel to enter idle mode. But before
  158. // that, it performs a last minute check to ensure that no new RPC has come in,
  159. // making the channel active.
  160. //
  161. // Return value indicates whether or not the channel moved to idle mode.
  162. //
  163. // Holds idleMu which ensures mutual exclusion with exitIdleMode.
  164. func (m *manager) tryEnterIdleMode() bool {
  165. m.idleMu.Lock()
  166. defer m.idleMu.Unlock()
  167. if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 {
  168. // We raced and lost to a new RPC. Very rare, but stop entering idle.
  169. return false
  170. }
  171. if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
  172. // An very short RPC could have come in (and also finished) after we
  173. // checked for calls count and activity in handleIdleTimeout(), but
  174. // before the CAS operation. So, we need to check for activity again.
  175. return false
  176. }
  177. // No new RPCs have come in since we last set the active calls count value
  178. // -math.MaxInt32 in the timer callback. And since we have the lock, it is
  179. // safe to enter idle mode now.
  180. if err := m.enforcer.EnterIdleMode(); err != nil {
  181. m.logger.Errorf("Failed to enter idle mode: %v", err)
  182. return false
  183. }
  184. // Successfully entered idle mode.
  185. m.actuallyIdle = true
  186. return true
  187. }
  188. // OnCallBegin is invoked at the start of every RPC.
  189. func (m *manager) OnCallBegin() error {
  190. if m.isClosed() {
  191. return nil
  192. }
  193. if atomic.AddInt32(&m.activeCallsCount, 1) > 0 {
  194. // Channel is not idle now. Set the activity bit and allow the call.
  195. atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
  196. return nil
  197. }
  198. // Channel is either in idle mode or is in the process of moving to idle
  199. // mode. Attempt to exit idle mode to allow this RPC.
  200. if err := m.exitIdleMode(); err != nil {
  201. // Undo the increment to calls count, and return an error causing the
  202. // RPC to fail.
  203. atomic.AddInt32(&m.activeCallsCount, -1)
  204. return err
  205. }
  206. atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
  207. return nil
  208. }
  209. // exitIdleMode instructs the channel to exit idle mode.
  210. //
  211. // Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
  212. func (m *manager) exitIdleMode() error {
  213. m.idleMu.Lock()
  214. defer m.idleMu.Unlock()
  215. if !m.actuallyIdle {
  216. // This can happen in two scenarios:
  217. // - handleIdleTimeout() set the calls count to -math.MaxInt32 and called
  218. // tryEnterIdleMode(). But before the latter could grab the lock, an RPC
  219. // came in and OnCallBegin() noticed that the calls count is negative.
  220. // - Channel is in idle mode, and multiple new RPCs come in at the same
  221. // time, all of them notice a negative calls count in OnCallBegin and get
  222. // here. The first one to get the lock would got the channel to exit idle.
  223. //
  224. // Either way, nothing to do here.
  225. return nil
  226. }
  227. if err := m.enforcer.ExitIdleMode(); err != nil {
  228. return fmt.Errorf("channel failed to exit idle mode: %v", err)
  229. }
  230. // Undo the idle entry process. This also respects any new RPC attempts.
  231. atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
  232. m.actuallyIdle = false
  233. // Start a new timer to fire after the configured idle timeout.
  234. m.timer = timeAfterFunc(time.Duration(m.timeout), m.handleIdleTimeout)
  235. return nil
  236. }
  237. // OnCallEnd is invoked at the end of every RPC.
  238. func (m *manager) OnCallEnd() {
  239. if m.isClosed() {
  240. return
  241. }
  242. // Record the time at which the most recent call finished.
  243. atomic.StoreInt64(&m.lastCallEndTime, time.Now().UnixNano())
  244. // Decrement the active calls count. This count can temporarily go negative
  245. // when the timer callback is in the process of moving the channel to idle
  246. // mode, but one or more RPCs come in and complete before the timer callback
  247. // can get done with the process of moving to idle mode.
  248. atomic.AddInt32(&m.activeCallsCount, -1)
  249. }
  250. func (m *manager) isClosed() bool {
  251. return atomic.LoadInt32(&m.closed) == 1
  252. }
  253. func (m *manager) Close() {
  254. atomic.StoreInt32(&m.closed, 1)
  255. m.idleMu.Lock()
  256. m.timer.Stop()
  257. m.timer = nil
  258. m.idleMu.Unlock()
  259. }