with_retry.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. /*
  2. Copyright 2021 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package rest
  14. import (
  15. "context"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "net/url"
  20. "time"
  21. "k8s.io/klog/v2"
  22. )
  23. // IsRetryableErrorFunc allows the client to provide its own function
  24. // that determines whether the specified err from the server is retryable.
  25. //
  26. // request: the original request sent to the server
  27. // err: the server sent this error to us
  28. //
  29. // The function returns true if the error is retryable and the request
  30. // can be retried, otherwise it returns false.
  31. // We have four mode of communications - 'Stream', 'Watch', 'Do' and 'DoRaw', this
  32. // function allows us to customize the retryability aspect of each.
  33. type IsRetryableErrorFunc func(request *http.Request, err error) bool
  34. func (r IsRetryableErrorFunc) IsErrorRetryable(request *http.Request, err error) bool {
  35. return r(request, err)
  36. }
  37. var neverRetryError = IsRetryableErrorFunc(func(_ *http.Request, _ error) bool {
  38. return false
  39. })
  40. // WithRetry allows the client to retry a request up to a certain number of times
  41. // Note that WithRetry is not safe for concurrent use by multiple
  42. // goroutines without additional locking or coordination.
  43. type WithRetry interface {
  44. // IsNextRetry advances the retry counter appropriately
  45. // and returns true if the request should be retried,
  46. // otherwise it returns false, if:
  47. // - we have already reached the maximum retry threshold.
  48. // - the error does not fall into the retryable category.
  49. // - the server has not sent us a 429, or 5xx status code and the
  50. // 'Retry-After' response header is not set with a value.
  51. // - we need to seek to the beginning of the request body before we
  52. // initiate the next retry, the function should log an error and
  53. // return false if it fails to do so.
  54. //
  55. // restReq: the associated rest.Request
  56. // httpReq: the HTTP Request sent to the server
  57. // resp: the response sent from the server, it is set if err is nil
  58. // err: the server sent this error to us, if err is set then resp is nil.
  59. // f: a IsRetryableErrorFunc function provided by the client that determines
  60. // if the err sent by the server is retryable.
  61. IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool
  62. // Before should be invoked prior to each attempt, including
  63. // the first one. If an error is returned, the request should
  64. // be aborted immediately.
  65. //
  66. // Before may also be additionally responsible for preparing
  67. // the request for the next retry, namely in terms of resetting
  68. // the request body in case it has been read.
  69. Before(ctx context.Context, r *Request) error
  70. // After should be invoked immediately after an attempt is made.
  71. After(ctx context.Context, r *Request, resp *http.Response, err error)
  72. // WrapPreviousError wraps the error from any previous attempt into
  73. // the final error specified in 'finalErr', so the user has more
  74. // context why the request failed.
  75. // For example, if a request times out after multiple retries then
  76. // we see a generic context.Canceled or context.DeadlineExceeded
  77. // error which is not very useful in debugging. This function can
  78. // wrap any error from previous attempt(s) to provide more context to
  79. // the user. The error returned in 'err' must satisfy the
  80. // following conditions:
  81. // a: errors.Unwrap(err) = errors.Unwrap(finalErr) if finalErr
  82. // implements Unwrap
  83. // b: errors.Unwrap(err) = finalErr if finalErr does not
  84. // implements Unwrap
  85. // c: errors.Is(err, otherErr) = errors.Is(finalErr, otherErr)
  86. WrapPreviousError(finalErr error) (err error)
  87. }
  88. // RetryAfter holds information associated with the next retry.
  89. type RetryAfter struct {
  90. // Wait is the duration the server has asked us to wait before
  91. // the next retry is initiated.
  92. // This is the value of the 'Retry-After' response header in seconds.
  93. Wait time.Duration
  94. // Attempt is the Nth attempt after which we have received a retryable
  95. // error or a 'Retry-After' response header from the server.
  96. Attempt int
  97. // Reason describes why we are retrying the request
  98. Reason string
  99. }
  100. type withRetry struct {
  101. maxRetries int
  102. attempts int
  103. // retry after parameters that pertain to the attempt that is to
  104. // be made soon, so as to enable 'Before' and 'After' to refer
  105. // to the retry parameters.
  106. // - for the first attempt, it will always be nil
  107. // - for consecutive attempts, it is non nil and holds the
  108. // retry after parameters for the next attempt to be made.
  109. retryAfter *RetryAfter
  110. // we keep track of two most recent errors, if the most
  111. // recent attempt is labeled as 'N' then:
  112. // - currentErr represents the error returned by attempt N, it
  113. // can be nil if attempt N did not return an error.
  114. // - previousErr represents an error from an attempt 'M' which
  115. // precedes attempt 'N' (N - M >= 1), it is non nil only when:
  116. // - for a sequence of attempt(s) 1..n (n>1), there
  117. // is an attempt k (k<n) that returned an error.
  118. previousErr, currentErr error
  119. }
  120. func (r *withRetry) trackPreviousError(err error) {
  121. // keep track of two most recent errors
  122. if r.currentErr != nil {
  123. r.previousErr = r.currentErr
  124. }
  125. r.currentErr = err
  126. }
  127. func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool {
  128. defer r.trackPreviousError(err)
  129. if httpReq == nil || (resp == nil && err == nil) {
  130. // bad input, we do nothing.
  131. return false
  132. }
  133. if restReq.body != nil {
  134. // we have an opaque reader, we can't safely reset it
  135. return false
  136. }
  137. r.attempts++
  138. r.retryAfter = &RetryAfter{Attempt: r.attempts}
  139. if r.attempts > r.maxRetries {
  140. return false
  141. }
  142. // if the server returned an error, it takes precedence over the http response.
  143. var errIsRetryable bool
  144. if f != nil && err != nil && f.IsErrorRetryable(httpReq, err) {
  145. errIsRetryable = true
  146. // we have a retryable error, for which we will create an
  147. // artificial "Retry-After" response.
  148. resp = retryAfterResponse()
  149. }
  150. if err != nil && !errIsRetryable {
  151. return false
  152. }
  153. // if we are here, we have either a or b:
  154. // a: we have a retryable error, for which we already
  155. // have an artificial "Retry-After" response.
  156. // b: we have a response from the server for which we
  157. // need to check if it is retryable
  158. seconds, wait := checkWait(resp)
  159. if !wait {
  160. return false
  161. }
  162. r.retryAfter.Wait = time.Duration(seconds) * time.Second
  163. r.retryAfter.Reason = getRetryReason(r.attempts, seconds, resp, err)
  164. return true
  165. }
  166. func (r *withRetry) Before(ctx context.Context, request *Request) error {
  167. // If the request context is already canceled there
  168. // is no need to retry.
  169. if ctx.Err() != nil {
  170. r.trackPreviousError(ctx.Err())
  171. return ctx.Err()
  172. }
  173. url := request.URL()
  174. // r.retryAfter represents the retry after parameters calculated
  175. // from the (response, err) tuple from the last attempt, so 'Before'
  176. // can apply these retry after parameters prior to the next attempt.
  177. // 'r.retryAfter == nil' indicates that this is the very first attempt.
  178. if r.retryAfter == nil {
  179. // we do a backoff sleep before the first attempt is made,
  180. // (preserving current behavior).
  181. if request.backoff != nil {
  182. request.backoff.Sleep(request.backoff.CalculateBackoff(url))
  183. }
  184. return nil
  185. }
  186. // if we are here, we have made attempt(s) at least once before.
  187. if request.backoff != nil {
  188. delay := request.backoff.CalculateBackoff(url)
  189. if r.retryAfter.Wait > delay {
  190. delay = r.retryAfter.Wait
  191. }
  192. request.backoff.Sleep(delay)
  193. }
  194. // We are retrying the request that we already send to
  195. // apiserver at least once before. This request should
  196. // also be throttled with the client-internal rate limiter.
  197. if err := request.tryThrottleWithInfo(ctx, r.retryAfter.Reason); err != nil {
  198. r.trackPreviousError(ctx.Err())
  199. return err
  200. }
  201. klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String())
  202. return nil
  203. }
  204. func (r *withRetry) After(ctx context.Context, request *Request, resp *http.Response, err error) {
  205. // 'After' is invoked immediately after an attempt is made, let's label
  206. // the attempt we have just made as attempt 'N'.
  207. // the current value of r.retryAfter represents the retry after
  208. // parameters calculated from the (response, err) tuple from
  209. // attempt N-1, so r.retryAfter is outdated and should not be
  210. // referred to here.
  211. isRetry := r.retryAfter != nil
  212. r.retryAfter = nil
  213. // the client finishes a single request after N attempts (1..N)
  214. // - all attempts (1..N) are counted to the rest_client_requests_total
  215. // metric (current behavior).
  216. // - every attempt after the first (2..N) are counted to the
  217. // rest_client_request_retries_total metric.
  218. updateRequestResultMetric(ctx, request, resp, err)
  219. if isRetry {
  220. // this is attempt 2 or later
  221. updateRequestRetryMetric(ctx, request, resp, err)
  222. }
  223. if request.c.base != nil {
  224. if err != nil {
  225. request.backoff.UpdateBackoff(request.URL(), err, 0)
  226. } else {
  227. request.backoff.UpdateBackoff(request.URL(), err, resp.StatusCode)
  228. }
  229. }
  230. }
  231. func (r *withRetry) WrapPreviousError(currentErr error) error {
  232. if currentErr == nil || r.previousErr == nil {
  233. return currentErr
  234. }
  235. // if both previous and current error objects represent the error,
  236. // then there is no need to wrap the previous error.
  237. if currentErr.Error() == r.previousErr.Error() {
  238. return currentErr
  239. }
  240. previousErr := r.previousErr
  241. // net/http wraps the underlying error with an url.Error, if the
  242. // previous err object is an instance of url.Error, then we can
  243. // unwrap it to get to the inner error object, this is so we can
  244. // avoid error message like:
  245. // Error: Get "http://foo.bar/api/v1": context deadline exceeded - error \
  246. // from a previous attempt: Error: Get "http://foo.bar/api/v1": EOF
  247. if urlErr, ok := r.previousErr.(*url.Error); ok && urlErr != nil {
  248. if urlErr.Unwrap() != nil {
  249. previousErr = urlErr.Unwrap()
  250. }
  251. }
  252. return &wrapPreviousError{
  253. currentErr: currentErr,
  254. previousError: previousErr,
  255. }
  256. }
  257. type wrapPreviousError struct {
  258. currentErr, previousError error
  259. }
  260. func (w *wrapPreviousError) Unwrap() error { return w.currentErr }
  261. func (w *wrapPreviousError) Error() string {
  262. return fmt.Sprintf("%s - error from a previous attempt: %s", w.currentErr.Error(), w.previousError.Error())
  263. }
  264. // checkWait returns true along with a number of seconds if
  265. // the server instructed us to wait before retrying.
  266. func checkWait(resp *http.Response) (int, bool) {
  267. switch r := resp.StatusCode; {
  268. // any 500 error code and 429 can trigger a wait
  269. case r == http.StatusTooManyRequests, r >= 500:
  270. default:
  271. return 0, false
  272. }
  273. i, ok := retryAfterSeconds(resp)
  274. return i, ok
  275. }
  276. func getRetryReason(retries, seconds int, resp *http.Response, err error) string {
  277. // priority and fairness sets the UID of the FlowSchema
  278. // associated with a request in the following response Header.
  279. const responseHeaderMatchedFlowSchemaUID = "X-Kubernetes-PF-FlowSchema-UID"
  280. message := fmt.Sprintf("retries: %d, retry-after: %ds", retries, seconds)
  281. switch {
  282. case resp.StatusCode == http.StatusTooManyRequests:
  283. // it is server-side throttling from priority and fairness
  284. flowSchemaUID := resp.Header.Get(responseHeaderMatchedFlowSchemaUID)
  285. return fmt.Sprintf("%s - retry-reason: due to server-side throttling, FlowSchema UID: %q", message, flowSchemaUID)
  286. case err != nil:
  287. // it's a retryable error
  288. return fmt.Sprintf("%s - retry-reason: due to retryable error, error: %v", message, err)
  289. default:
  290. return fmt.Sprintf("%s - retry-reason: %d", message, resp.StatusCode)
  291. }
  292. }
  293. func readAndCloseResponseBody(resp *http.Response) {
  294. if resp == nil {
  295. return
  296. }
  297. // Ensure the response body is fully read and closed
  298. // before we reconnect, so that we reuse the same TCP
  299. // connection.
  300. const maxBodySlurpSize = 2 << 10
  301. defer resp.Body.Close()
  302. if resp.ContentLength <= maxBodySlurpSize {
  303. io.Copy(io.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
  304. }
  305. }
  306. func retryAfterResponse() *http.Response {
  307. return retryAfterResponseWithDelay("1")
  308. }
  309. func retryAfterResponseWithDelay(delay string) *http.Response {
  310. return retryAfterResponseWithCodeAndDelay(http.StatusInternalServerError, delay)
  311. }
  312. func retryAfterResponseWithCodeAndDelay(code int, delay string) *http.Response {
  313. return &http.Response{
  314. StatusCode: code,
  315. Header: http.Header{"Retry-After": []string{delay}},
  316. }
  317. }