pickfirst.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "encoding/json"
  21. "errors"
  22. "fmt"
  23. "google.golang.org/grpc/balancer"
  24. "google.golang.org/grpc/connectivity"
  25. "google.golang.org/grpc/internal/envconfig"
  26. internalgrpclog "google.golang.org/grpc/internal/grpclog"
  27. "google.golang.org/grpc/internal/grpcrand"
  28. "google.golang.org/grpc/internal/pretty"
  29. "google.golang.org/grpc/resolver"
  30. "google.golang.org/grpc/serviceconfig"
  31. )
  32. const (
  33. // PickFirstBalancerName is the name of the pick_first balancer.
  34. PickFirstBalancerName = "pick_first"
  35. logPrefix = "[pick-first-lb %p] "
  36. )
  37. func newPickfirstBuilder() balancer.Builder {
  38. return &pickfirstBuilder{}
  39. }
  40. type pickfirstBuilder struct{}
  41. func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
  42. b := &pickfirstBalancer{cc: cc}
  43. b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
  44. return b
  45. }
  46. func (*pickfirstBuilder) Name() string {
  47. return PickFirstBalancerName
  48. }
  49. type pfConfig struct {
  50. serviceconfig.LoadBalancingConfig `json:"-"`
  51. // If set to true, instructs the LB policy to shuffle the order of the list
  52. // of addresses received from the name resolver before attempting to
  53. // connect to them.
  54. ShuffleAddressList bool `json:"shuffleAddressList"`
  55. }
  56. func (*pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
  57. if !envconfig.PickFirstLBConfig {
  58. // Prior to supporting loadbalancing configuration, the pick_first LB
  59. // policy did not implement the balancer.ConfigParser interface. This
  60. // meant that if a non-empty configuration was passed to it, the service
  61. // config unmarshaling code would throw a warning log, but would
  62. // continue using the pick_first LB policy. The code below ensures the
  63. // same behavior is retained if the env var is not set.
  64. if string(js) != "{}" {
  65. logger.Warningf("Ignoring non-empty balancer configuration %q for the pick_first LB policy", string(js))
  66. }
  67. return nil, nil
  68. }
  69. var cfg pfConfig
  70. if err := json.Unmarshal(js, &cfg); err != nil {
  71. return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
  72. }
  73. return cfg, nil
  74. }
  75. type pickfirstBalancer struct {
  76. logger *internalgrpclog.PrefixLogger
  77. state connectivity.State
  78. cc balancer.ClientConn
  79. subConn balancer.SubConn
  80. }
  81. func (b *pickfirstBalancer) ResolverError(err error) {
  82. if b.logger.V(2) {
  83. b.logger.Infof("Received error from the name resolver: %v", err)
  84. }
  85. if b.subConn == nil {
  86. b.state = connectivity.TransientFailure
  87. }
  88. if b.state != connectivity.TransientFailure {
  89. // The picker will not change since the balancer does not currently
  90. // report an error.
  91. return
  92. }
  93. b.cc.UpdateState(balancer.State{
  94. ConnectivityState: connectivity.TransientFailure,
  95. Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
  96. })
  97. }
  98. func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
  99. addrs := state.ResolverState.Addresses
  100. if len(addrs) == 0 {
  101. // The resolver reported an empty address list. Treat it like an error by
  102. // calling b.ResolverError.
  103. if b.subConn != nil {
  104. // Shut down the old subConn. All addresses were removed, so it is
  105. // no longer valid.
  106. b.subConn.Shutdown()
  107. b.subConn = nil
  108. }
  109. b.ResolverError(errors.New("produced zero addresses"))
  110. return balancer.ErrBadResolverState
  111. }
  112. // We don't have to guard this block with the env var because ParseConfig
  113. // already does so.
  114. cfg, ok := state.BalancerConfig.(pfConfig)
  115. if state.BalancerConfig != nil && !ok {
  116. return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)
  117. }
  118. if cfg.ShuffleAddressList {
  119. addrs = append([]resolver.Address{}, addrs...)
  120. grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
  121. }
  122. if b.logger.V(2) {
  123. b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState))
  124. }
  125. if b.subConn != nil {
  126. b.cc.UpdateAddresses(b.subConn, addrs)
  127. return nil
  128. }
  129. var subConn balancer.SubConn
  130. subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{
  131. StateListener: func(state balancer.SubConnState) {
  132. b.updateSubConnState(subConn, state)
  133. },
  134. })
  135. if err != nil {
  136. if b.logger.V(2) {
  137. b.logger.Infof("Failed to create new SubConn: %v", err)
  138. }
  139. b.state = connectivity.TransientFailure
  140. b.cc.UpdateState(balancer.State{
  141. ConnectivityState: connectivity.TransientFailure,
  142. Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},
  143. })
  144. return balancer.ErrBadResolverState
  145. }
  146. b.subConn = subConn
  147. b.state = connectivity.Idle
  148. b.cc.UpdateState(balancer.State{
  149. ConnectivityState: connectivity.Connecting,
  150. Picker: &picker{err: balancer.ErrNoSubConnAvailable},
  151. })
  152. b.subConn.Connect()
  153. return nil
  154. }
  155. // UpdateSubConnState is unused as a StateListener is always registered when
  156. // creating SubConns.
  157. func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
  158. b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)
  159. }
  160. func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
  161. if b.logger.V(2) {
  162. b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state)
  163. }
  164. if b.subConn != subConn {
  165. if b.logger.V(2) {
  166. b.logger.Infof("Ignored state change because subConn is not recognized")
  167. }
  168. return
  169. }
  170. if state.ConnectivityState == connectivity.Shutdown {
  171. b.subConn = nil
  172. return
  173. }
  174. switch state.ConnectivityState {
  175. case connectivity.Ready:
  176. b.cc.UpdateState(balancer.State{
  177. ConnectivityState: state.ConnectivityState,
  178. Picker: &picker{result: balancer.PickResult{SubConn: subConn}},
  179. })
  180. case connectivity.Connecting:
  181. if b.state == connectivity.TransientFailure {
  182. // We stay in TransientFailure until we are Ready. See A62.
  183. return
  184. }
  185. b.cc.UpdateState(balancer.State{
  186. ConnectivityState: state.ConnectivityState,
  187. Picker: &picker{err: balancer.ErrNoSubConnAvailable},
  188. })
  189. case connectivity.Idle:
  190. if b.state == connectivity.TransientFailure {
  191. // We stay in TransientFailure until we are Ready. Also kick the
  192. // subConn out of Idle into Connecting. See A62.
  193. b.subConn.Connect()
  194. return
  195. }
  196. b.cc.UpdateState(balancer.State{
  197. ConnectivityState: state.ConnectivityState,
  198. Picker: &idlePicker{subConn: subConn},
  199. })
  200. case connectivity.TransientFailure:
  201. b.cc.UpdateState(balancer.State{
  202. ConnectivityState: state.ConnectivityState,
  203. Picker: &picker{err: state.ConnectionError},
  204. })
  205. }
  206. b.state = state.ConnectivityState
  207. }
  208. func (b *pickfirstBalancer) Close() {
  209. }
  210. func (b *pickfirstBalancer) ExitIdle() {
  211. if b.subConn != nil && b.state == connectivity.Idle {
  212. b.subConn.Connect()
  213. }
  214. }
  215. type picker struct {
  216. result balancer.PickResult
  217. err error
  218. }
  219. func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
  220. return p.result, p.err
  221. }
  222. // idlePicker is used when the SubConn is IDLE and kicks the SubConn into
  223. // CONNECTING when Pick is called.
  224. type idlePicker struct {
  225. subConn balancer.SubConn
  226. }
  227. func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
  228. i.subConn.Connect()
  229. return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
  230. }
  231. func init() {
  232. balancer.Register(newPickfirstBuilder())
  233. }