123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- /*
- Copyright 2021 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package rest
- import (
- "context"
- "fmt"
- "io"
- "net/http"
- "net/url"
- "time"
- "k8s.io/klog/v2"
- )
- // IsRetryableErrorFunc allows the client to provide its own function
- // that determines whether the specified err from the server is retryable.
- //
- // request: the original request sent to the server
- // err: the server sent this error to us
- //
- // The function returns true if the error is retryable and the request
- // can be retried, otherwise it returns false.
- // We have four mode of communications - 'Stream', 'Watch', 'Do' and 'DoRaw', this
- // function allows us to customize the retryability aspect of each.
- type IsRetryableErrorFunc func(request *http.Request, err error) bool
- func (r IsRetryableErrorFunc) IsErrorRetryable(request *http.Request, err error) bool {
- return r(request, err)
- }
- var neverRetryError = IsRetryableErrorFunc(func(_ *http.Request, _ error) bool {
- return false
- })
- // WithRetry allows the client to retry a request up to a certain number of times
- // Note that WithRetry is not safe for concurrent use by multiple
- // goroutines without additional locking or coordination.
- type WithRetry interface {
- // IsNextRetry advances the retry counter appropriately
- // and returns true if the request should be retried,
- // otherwise it returns false, if:
- // - we have already reached the maximum retry threshold.
- // - the error does not fall into the retryable category.
- // - the server has not sent us a 429, or 5xx status code and the
- // 'Retry-After' response header is not set with a value.
- // - we need to seek to the beginning of the request body before we
- // initiate the next retry, the function should log an error and
- // return false if it fails to do so.
- //
- // restReq: the associated rest.Request
- // httpReq: the HTTP Request sent to the server
- // resp: the response sent from the server, it is set if err is nil
- // err: the server sent this error to us, if err is set then resp is nil.
- // f: a IsRetryableErrorFunc function provided by the client that determines
- // if the err sent by the server is retryable.
- IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool
- // Before should be invoked prior to each attempt, including
- // the first one. If an error is returned, the request should
- // be aborted immediately.
- //
- // Before may also be additionally responsible for preparing
- // the request for the next retry, namely in terms of resetting
- // the request body in case it has been read.
- Before(ctx context.Context, r *Request) error
- // After should be invoked immediately after an attempt is made.
- After(ctx context.Context, r *Request, resp *http.Response, err error)
- // WrapPreviousError wraps the error from any previous attempt into
- // the final error specified in 'finalErr', so the user has more
- // context why the request failed.
- // For example, if a request times out after multiple retries then
- // we see a generic context.Canceled or context.DeadlineExceeded
- // error which is not very useful in debugging. This function can
- // wrap any error from previous attempt(s) to provide more context to
- // the user. The error returned in 'err' must satisfy the
- // following conditions:
- // a: errors.Unwrap(err) = errors.Unwrap(finalErr) if finalErr
- // implements Unwrap
- // b: errors.Unwrap(err) = finalErr if finalErr does not
- // implements Unwrap
- // c: errors.Is(err, otherErr) = errors.Is(finalErr, otherErr)
- WrapPreviousError(finalErr error) (err error)
- }
- // RetryAfter holds information associated with the next retry.
- type RetryAfter struct {
- // Wait is the duration the server has asked us to wait before
- // the next retry is initiated.
- // This is the value of the 'Retry-After' response header in seconds.
- Wait time.Duration
- // Attempt is the Nth attempt after which we have received a retryable
- // error or a 'Retry-After' response header from the server.
- Attempt int
- // Reason describes why we are retrying the request
- Reason string
- }
- type withRetry struct {
- maxRetries int
- attempts int
- // retry after parameters that pertain to the attempt that is to
- // be made soon, so as to enable 'Before' and 'After' to refer
- // to the retry parameters.
- // - for the first attempt, it will always be nil
- // - for consecutive attempts, it is non nil and holds the
- // retry after parameters for the next attempt to be made.
- retryAfter *RetryAfter
- // we keep track of two most recent errors, if the most
- // recent attempt is labeled as 'N' then:
- // - currentErr represents the error returned by attempt N, it
- // can be nil if attempt N did not return an error.
- // - previousErr represents an error from an attempt 'M' which
- // precedes attempt 'N' (N - M >= 1), it is non nil only when:
- // - for a sequence of attempt(s) 1..n (n>1), there
- // is an attempt k (k<n) that returned an error.
- previousErr, currentErr error
- }
- func (r *withRetry) trackPreviousError(err error) {
- // keep track of two most recent errors
- if r.currentErr != nil {
- r.previousErr = r.currentErr
- }
- r.currentErr = err
- }
- func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool {
- defer r.trackPreviousError(err)
- if httpReq == nil || (resp == nil && err == nil) {
- // bad input, we do nothing.
- return false
- }
- if restReq.body != nil {
- // we have an opaque reader, we can't safely reset it
- return false
- }
- r.attempts++
- r.retryAfter = &RetryAfter{Attempt: r.attempts}
- if r.attempts > r.maxRetries {
- return false
- }
- // if the server returned an error, it takes precedence over the http response.
- var errIsRetryable bool
- if f != nil && err != nil && f.IsErrorRetryable(httpReq, err) {
- errIsRetryable = true
- // we have a retryable error, for which we will create an
- // artificial "Retry-After" response.
- resp = retryAfterResponse()
- }
- if err != nil && !errIsRetryable {
- return false
- }
- // if we are here, we have either a or b:
- // a: we have a retryable error, for which we already
- // have an artificial "Retry-After" response.
- // b: we have a response from the server for which we
- // need to check if it is retryable
- seconds, wait := checkWait(resp)
- if !wait {
- return false
- }
- r.retryAfter.Wait = time.Duration(seconds) * time.Second
- r.retryAfter.Reason = getRetryReason(r.attempts, seconds, resp, err)
- return true
- }
- func (r *withRetry) Before(ctx context.Context, request *Request) error {
- // If the request context is already canceled there
- // is no need to retry.
- if ctx.Err() != nil {
- r.trackPreviousError(ctx.Err())
- return ctx.Err()
- }
- url := request.URL()
- // r.retryAfter represents the retry after parameters calculated
- // from the (response, err) tuple from the last attempt, so 'Before'
- // can apply these retry after parameters prior to the next attempt.
- // 'r.retryAfter == nil' indicates that this is the very first attempt.
- if r.retryAfter == nil {
- // we do a backoff sleep before the first attempt is made,
- // (preserving current behavior).
- if request.backoff != nil {
- request.backoff.Sleep(request.backoff.CalculateBackoff(url))
- }
- return nil
- }
- // if we are here, we have made attempt(s) at least once before.
- if request.backoff != nil {
- delay := request.backoff.CalculateBackoff(url)
- if r.retryAfter.Wait > delay {
- delay = r.retryAfter.Wait
- }
- request.backoff.Sleep(delay)
- }
- // We are retrying the request that we already send to
- // apiserver at least once before. This request should
- // also be throttled with the client-internal rate limiter.
- if err := request.tryThrottleWithInfo(ctx, r.retryAfter.Reason); err != nil {
- r.trackPreviousError(ctx.Err())
- return err
- }
- klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String())
- return nil
- }
- func (r *withRetry) After(ctx context.Context, request *Request, resp *http.Response, err error) {
- // 'After' is invoked immediately after an attempt is made, let's label
- // the attempt we have just made as attempt 'N'.
- // the current value of r.retryAfter represents the retry after
- // parameters calculated from the (response, err) tuple from
- // attempt N-1, so r.retryAfter is outdated and should not be
- // referred to here.
- isRetry := r.retryAfter != nil
- r.retryAfter = nil
- // the client finishes a single request after N attempts (1..N)
- // - all attempts (1..N) are counted to the rest_client_requests_total
- // metric (current behavior).
- // - every attempt after the first (2..N) are counted to the
- // rest_client_request_retries_total metric.
- updateRequestResultMetric(ctx, request, resp, err)
- if isRetry {
- // this is attempt 2 or later
- updateRequestRetryMetric(ctx, request, resp, err)
- }
- if request.c.base != nil {
- if err != nil {
- request.backoff.UpdateBackoff(request.URL(), err, 0)
- } else {
- request.backoff.UpdateBackoff(request.URL(), err, resp.StatusCode)
- }
- }
- }
- func (r *withRetry) WrapPreviousError(currentErr error) error {
- if currentErr == nil || r.previousErr == nil {
- return currentErr
- }
- // if both previous and current error objects represent the error,
- // then there is no need to wrap the previous error.
- if currentErr.Error() == r.previousErr.Error() {
- return currentErr
- }
- previousErr := r.previousErr
- // net/http wraps the underlying error with an url.Error, if the
- // previous err object is an instance of url.Error, then we can
- // unwrap it to get to the inner error object, this is so we can
- // avoid error message like:
- // Error: Get "http://foo.bar/api/v1": context deadline exceeded - error \
- // from a previous attempt: Error: Get "http://foo.bar/api/v1": EOF
- if urlErr, ok := r.previousErr.(*url.Error); ok && urlErr != nil {
- if urlErr.Unwrap() != nil {
- previousErr = urlErr.Unwrap()
- }
- }
- return &wrapPreviousError{
- currentErr: currentErr,
- previousError: previousErr,
- }
- }
- type wrapPreviousError struct {
- currentErr, previousError error
- }
- func (w *wrapPreviousError) Unwrap() error { return w.currentErr }
- func (w *wrapPreviousError) Error() string {
- return fmt.Sprintf("%s - error from a previous attempt: %s", w.currentErr.Error(), w.previousError.Error())
- }
- // checkWait returns true along with a number of seconds if
- // the server instructed us to wait before retrying.
- func checkWait(resp *http.Response) (int, bool) {
- switch r := resp.StatusCode; {
- // any 500 error code and 429 can trigger a wait
- case r == http.StatusTooManyRequests, r >= 500:
- default:
- return 0, false
- }
- i, ok := retryAfterSeconds(resp)
- return i, ok
- }
- func getRetryReason(retries, seconds int, resp *http.Response, err error) string {
- // priority and fairness sets the UID of the FlowSchema
- // associated with a request in the following response Header.
- const responseHeaderMatchedFlowSchemaUID = "X-Kubernetes-PF-FlowSchema-UID"
- message := fmt.Sprintf("retries: %d, retry-after: %ds", retries, seconds)
- switch {
- case resp.StatusCode == http.StatusTooManyRequests:
- // it is server-side throttling from priority and fairness
- flowSchemaUID := resp.Header.Get(responseHeaderMatchedFlowSchemaUID)
- return fmt.Sprintf("%s - retry-reason: due to server-side throttling, FlowSchema UID: %q", message, flowSchemaUID)
- case err != nil:
- // it's a retryable error
- return fmt.Sprintf("%s - retry-reason: due to retryable error, error: %v", message, err)
- default:
- return fmt.Sprintf("%s - retry-reason: %d", message, resp.StatusCode)
- }
- }
- func readAndCloseResponseBody(resp *http.Response) {
- if resp == nil {
- return
- }
- // Ensure the response body is fully read and closed
- // before we reconnect, so that we reuse the same TCP
- // connection.
- const maxBodySlurpSize = 2 << 10
- defer resp.Body.Close()
- if resp.ContentLength <= maxBodySlurpSize {
- io.Copy(io.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
- }
- }
- func retryAfterResponse() *http.Response {
- return retryAfterResponseWithDelay("1")
- }
- func retryAfterResponseWithDelay(delay string) *http.Response {
- return retryAfterResponseWithCodeAndDelay(http.StatusInternalServerError, delay)
- }
- func retryAfterResponseWithCodeAndDelay(code int, delay string) *http.Response {
- return &http.Response{
- StatusCode: code,
- Header: http.Header{"Retry-After": []string{delay}},
- }
- }
|