agent.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. // Copyright The OpenTelemetry Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger // import "go.opentelemetry.io/otel/exporters/jaeger"
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net"
  20. "strings"
  21. "time"
  22. "github.com/go-logr/logr"
  23. genAgent "go.opentelemetry.io/otel/exporters/jaeger/internal/gen-go/agent"
  24. gen "go.opentelemetry.io/otel/exporters/jaeger/internal/gen-go/jaeger"
  25. "go.opentelemetry.io/otel/exporters/jaeger/internal/third_party/thrift/lib/go/thrift"
  26. )
  27. const (
  28. // udpPacketMaxLength is the max size of UDP packet we want to send, synced with jaeger-agent.
  29. udpPacketMaxLength = 65000
  30. // emitBatchOverhead is the additional overhead bytes used for enveloping the datagram,
  31. // synced with jaeger-agent https://github.com/jaegertracing/jaeger-client-go/blob/master/transport_udp.go#L37
  32. emitBatchOverhead = 70
  33. )
  34. // agentClientUDP is a UDP client to Jaeger agent that implements gen.Agent interface.
  35. type agentClientUDP struct {
  36. genAgent.Agent
  37. io.Closer
  38. connUDP udpConn
  39. client *genAgent.AgentClient
  40. maxPacketSize int // max size of datagram in bytes
  41. thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
  42. thriftProtocol thrift.TProtocol
  43. }
  44. type udpConn interface {
  45. Write([]byte) (int, error)
  46. SetWriteBuffer(int) error
  47. Close() error
  48. }
  49. type agentClientUDPParams struct {
  50. Host string
  51. Port string
  52. MaxPacketSize int
  53. Logger logr.Logger
  54. AttemptReconnecting bool
  55. AttemptReconnectInterval time.Duration
  56. }
  57. // newAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP.
  58. func newAgentClientUDP(params agentClientUDPParams) (*agentClientUDP, error) {
  59. hostPort := net.JoinHostPort(params.Host, params.Port)
  60. // validate hostport
  61. if _, _, err := net.SplitHostPort(hostPort); err != nil {
  62. return nil, err
  63. }
  64. if params.MaxPacketSize <= 0 || params.MaxPacketSize > udpPacketMaxLength {
  65. params.MaxPacketSize = udpPacketMaxLength
  66. }
  67. if params.AttemptReconnecting && params.AttemptReconnectInterval <= 0 {
  68. params.AttemptReconnectInterval = time.Second * 30
  69. }
  70. thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
  71. protocolFactory := thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{})
  72. thriftProtocol := protocolFactory.GetProtocol(thriftBuffer)
  73. client := genAgent.NewAgentClientFactory(thriftBuffer, protocolFactory)
  74. var connUDP udpConn
  75. var err error
  76. if params.AttemptReconnecting {
  77. // host is hostname, setup resolver loop in case host record changes during operation
  78. connUDP, err = newReconnectingUDPConn(hostPort, params.MaxPacketSize, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger)
  79. if err != nil {
  80. return nil, err
  81. }
  82. } else {
  83. destAddr, err := net.ResolveUDPAddr("udp", hostPort)
  84. if err != nil {
  85. return nil, err
  86. }
  87. connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr)
  88. if err != nil {
  89. return nil, err
  90. }
  91. }
  92. if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil {
  93. return nil, err
  94. }
  95. return &agentClientUDP{
  96. connUDP: connUDP,
  97. client: client,
  98. maxPacketSize: params.MaxPacketSize,
  99. thriftBuffer: thriftBuffer,
  100. thriftProtocol: thriftProtocol,
  101. }, nil
  102. }
  103. // EmitBatch buffers batch to fit into UDP packets and sends the data to the agent.
  104. func (a *agentClientUDP) EmitBatch(ctx context.Context, batch *gen.Batch) error {
  105. var errs []error
  106. processSize, err := a.calcSizeOfSerializedThrift(ctx, batch.Process)
  107. if err != nil {
  108. // drop the batch if serialization of process fails.
  109. return err
  110. }
  111. maxPacketSize := a.maxPacketSize
  112. if maxPacketSize > udpPacketMaxLength-emitBatchOverhead {
  113. maxPacketSize = udpPacketMaxLength - emitBatchOverhead
  114. }
  115. totalSize := processSize
  116. var spans []*gen.Span
  117. for _, span := range batch.Spans {
  118. spanSize, err := a.calcSizeOfSerializedThrift(ctx, span)
  119. if err != nil {
  120. errs = append(errs, fmt.Errorf("thrift serialization failed: %v", span))
  121. continue
  122. }
  123. if spanSize+processSize >= maxPacketSize {
  124. // drop the span that exceeds the limit.
  125. errs = append(errs, fmt.Errorf("span too large to send: %v", span))
  126. continue
  127. }
  128. if totalSize+spanSize >= maxPacketSize {
  129. if err := a.flush(ctx, &gen.Batch{
  130. Process: batch.Process,
  131. Spans: spans,
  132. }); err != nil {
  133. errs = append(errs, err)
  134. }
  135. spans = spans[:0]
  136. totalSize = processSize
  137. }
  138. totalSize += spanSize
  139. spans = append(spans, span)
  140. }
  141. if len(spans) > 0 {
  142. if err := a.flush(ctx, &gen.Batch{
  143. Process: batch.Process,
  144. Spans: spans,
  145. }); err != nil {
  146. errs = append(errs, err)
  147. }
  148. }
  149. if len(errs) == 1 {
  150. return errs[0]
  151. } else if len(errs) > 1 {
  152. joined := a.makeJoinedErrorString(errs)
  153. return fmt.Errorf("multiple errors during transform: %s", joined)
  154. }
  155. return nil
  156. }
  157. // makeJoinedErrorString join all the errors to one error message.
  158. func (a *agentClientUDP) makeJoinedErrorString(errs []error) string {
  159. var errMsgs []string
  160. for _, err := range errs {
  161. errMsgs = append(errMsgs, err.Error())
  162. }
  163. return strings.Join(errMsgs, ", ")
  164. }
  165. // flush will send the batch of spans to the agent.
  166. func (a *agentClientUDP) flush(ctx context.Context, batch *gen.Batch) error {
  167. a.thriftBuffer.Reset()
  168. if err := a.client.EmitBatch(ctx, batch); err != nil {
  169. return err
  170. }
  171. if a.thriftBuffer.Len() > a.maxPacketSize {
  172. return fmt.Errorf("data does not fit within one UDP packet; size %d, max %d, spans %d",
  173. a.thriftBuffer.Len(), a.maxPacketSize, len(batch.Spans))
  174. }
  175. _, err := a.connUDP.Write(a.thriftBuffer.Bytes())
  176. return err
  177. }
  178. // calcSizeOfSerializedThrift calculate the serialized thrift packet size.
  179. func (a *agentClientUDP) calcSizeOfSerializedThrift(ctx context.Context, thriftStruct thrift.TStruct) (int, error) {
  180. a.thriftBuffer.Reset()
  181. err := thriftStruct.Write(ctx, a.thriftProtocol)
  182. return a.thriftBuffer.Len(), err
  183. }
  184. // Close implements Close() of io.Closer and closes the underlying UDP connection.
  185. func (a *agentClientUDP) Close() error {
  186. return a.connUDP.Close()
  187. }