uploader.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. // Copyright The OpenTelemetry Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger // import "go.opentelemetry.io/otel/exporters/jaeger"
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "io"
  20. "log"
  21. "net/http"
  22. "time"
  23. "github.com/go-logr/logr"
  24. "github.com/go-logr/stdr"
  25. gen "go.opentelemetry.io/otel/exporters/jaeger/internal/gen-go/jaeger"
  26. "go.opentelemetry.io/otel/exporters/jaeger/internal/third_party/thrift/lib/go/thrift"
  27. )
  28. // batchUploader send a batch of spans to Jaeger.
  29. type batchUploader interface {
  30. upload(context.Context, *gen.Batch) error
  31. shutdown(context.Context) error
  32. }
  33. // EndpointOption configures a Jaeger endpoint.
  34. type EndpointOption interface {
  35. newBatchUploader() (batchUploader, error)
  36. }
  37. type endpointOptionFunc func() (batchUploader, error)
  38. func (fn endpointOptionFunc) newBatchUploader() (batchUploader, error) {
  39. return fn()
  40. }
  41. // WithAgentEndpoint configures the Jaeger exporter to send spans to a Jaeger agent
  42. // over compact thrift protocol. This will use the following environment variables for
  43. // configuration if no explicit option is provided:
  44. //
  45. // - OTEL_EXPORTER_JAEGER_AGENT_HOST is used for the agent address host
  46. // - OTEL_EXPORTER_JAEGER_AGENT_PORT is used for the agent address port
  47. //
  48. // The passed options will take precedence over any environment variables and default values
  49. // will be used if neither are provided.
  50. func WithAgentEndpoint(options ...AgentEndpointOption) EndpointOption {
  51. return endpointOptionFunc(func() (batchUploader, error) {
  52. cfg := agentEndpointConfig{
  53. agentClientUDPParams{
  54. AttemptReconnecting: true,
  55. Host: envOr(envAgentHost, "localhost"),
  56. Port: envOr(envAgentPort, "6831"),
  57. },
  58. }
  59. for _, opt := range options {
  60. cfg = opt.apply(cfg)
  61. }
  62. client, err := newAgentClientUDP(cfg.agentClientUDPParams)
  63. if err != nil {
  64. return nil, err
  65. }
  66. return &agentUploader{client: client}, nil
  67. })
  68. }
  69. // AgentEndpointOption configures a Jaeger agent endpoint.
  70. type AgentEndpointOption interface {
  71. apply(agentEndpointConfig) agentEndpointConfig
  72. }
  73. type agentEndpointConfig struct {
  74. agentClientUDPParams
  75. }
  76. type agentEndpointOptionFunc func(agentEndpointConfig) agentEndpointConfig
  77. func (fn agentEndpointOptionFunc) apply(cfg agentEndpointConfig) agentEndpointConfig {
  78. return fn(cfg)
  79. }
  80. // WithAgentHost sets a host to be used in the agent client endpoint.
  81. // This option overrides any value set for the
  82. // OTEL_EXPORTER_JAEGER_AGENT_HOST environment variable.
  83. // If this option is not passed and the env var is not set, "localhost" will be used by default.
  84. func WithAgentHost(host string) AgentEndpointOption {
  85. return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig {
  86. o.Host = host
  87. return o
  88. })
  89. }
  90. // WithAgentPort sets a port to be used in the agent client endpoint.
  91. // This option overrides any value set for the
  92. // OTEL_EXPORTER_JAEGER_AGENT_PORT environment variable.
  93. // If this option is not passed and the env var is not set, "6831" will be used by default.
  94. func WithAgentPort(port string) AgentEndpointOption {
  95. return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig {
  96. o.Port = port
  97. return o
  98. })
  99. }
  100. var emptyLogger = logr.Logger{}
  101. // WithLogger sets a logger to be used by agent client.
  102. // WithLogger and WithLogr will overwrite each other.
  103. func WithLogger(logger *log.Logger) AgentEndpointOption {
  104. return WithLogr(stdr.New(logger))
  105. }
  106. // WithLogr sets a logr.Logger to be used by agent client.
  107. // WithLogr and WithLogger will overwrite each other.
  108. func WithLogr(logger logr.Logger) AgentEndpointOption {
  109. return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig {
  110. o.Logger = logger
  111. return o
  112. })
  113. }
  114. // WithDisableAttemptReconnecting sets option to disable reconnecting udp client.
  115. func WithDisableAttemptReconnecting() AgentEndpointOption {
  116. return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig {
  117. o.AttemptReconnecting = false
  118. return o
  119. })
  120. }
  121. // WithAttemptReconnectingInterval sets the interval between attempts to re resolve agent endpoint.
  122. func WithAttemptReconnectingInterval(interval time.Duration) AgentEndpointOption {
  123. return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig {
  124. o.AttemptReconnectInterval = interval
  125. return o
  126. })
  127. }
  128. // WithMaxPacketSize sets the maximum UDP packet size for transport to the Jaeger agent.
  129. func WithMaxPacketSize(size int) AgentEndpointOption {
  130. return agentEndpointOptionFunc(func(o agentEndpointConfig) agentEndpointConfig {
  131. o.MaxPacketSize = size
  132. return o
  133. })
  134. }
  135. // WithCollectorEndpoint defines the full URL to the Jaeger HTTP Thrift collector. This will
  136. // use the following environment variables for configuration if no explicit option is provided:
  137. //
  138. // - OTEL_EXPORTER_JAEGER_ENDPOINT is the HTTP endpoint for sending spans directly to a collector.
  139. // - OTEL_EXPORTER_JAEGER_USER is the username to be sent as authentication to the collector endpoint.
  140. // - OTEL_EXPORTER_JAEGER_PASSWORD is the password to be sent as authentication to the collector endpoint.
  141. //
  142. // The passed options will take precedence over any environment variables.
  143. // If neither values are provided for the endpoint, the default value of "http://localhost:14268/api/traces" will be used.
  144. // If neither values are provided for the username or the password, they will not be set since there is no default.
  145. func WithCollectorEndpoint(options ...CollectorEndpointOption) EndpointOption {
  146. return endpointOptionFunc(func() (batchUploader, error) {
  147. cfg := collectorEndpointConfig{
  148. endpoint: envOr(envEndpoint, "http://localhost:14268/api/traces"),
  149. username: envOr(envUser, ""),
  150. password: envOr(envPassword, ""),
  151. httpClient: http.DefaultClient,
  152. }
  153. for _, opt := range options {
  154. cfg = opt.apply(cfg)
  155. }
  156. return &collectorUploader{
  157. endpoint: cfg.endpoint,
  158. username: cfg.username,
  159. password: cfg.password,
  160. httpClient: cfg.httpClient,
  161. }, nil
  162. })
  163. }
  164. // CollectorEndpointOption configures a Jaeger collector endpoint.
  165. type CollectorEndpointOption interface {
  166. apply(collectorEndpointConfig) collectorEndpointConfig
  167. }
  168. type collectorEndpointConfig struct {
  169. // endpoint for sending spans directly to a collector.
  170. endpoint string
  171. // username to be used for authentication with the collector endpoint.
  172. username string
  173. // password to be used for authentication with the collector endpoint.
  174. password string
  175. // httpClient to be used to make requests to the collector endpoint.
  176. httpClient *http.Client
  177. }
  178. type collectorEndpointOptionFunc func(collectorEndpointConfig) collectorEndpointConfig
  179. func (fn collectorEndpointOptionFunc) apply(cfg collectorEndpointConfig) collectorEndpointConfig {
  180. return fn(cfg)
  181. }
  182. // WithEndpoint is the URL for the Jaeger collector that spans are sent to.
  183. // This option overrides any value set for the
  184. // OTEL_EXPORTER_JAEGER_ENDPOINT environment variable.
  185. // If this option is not passed and the environment variable is not set,
  186. // "http://localhost:14268/api/traces" will be used by default.
  187. func WithEndpoint(endpoint string) CollectorEndpointOption {
  188. return collectorEndpointOptionFunc(func(o collectorEndpointConfig) collectorEndpointConfig {
  189. o.endpoint = endpoint
  190. return o
  191. })
  192. }
  193. // WithUsername sets the username to be used in the authorization header sent for all requests to the collector.
  194. // This option overrides any value set for the
  195. // OTEL_EXPORTER_JAEGER_USER environment variable.
  196. // If this option is not passed and the environment variable is not set, no username will be set.
  197. func WithUsername(username string) CollectorEndpointOption {
  198. return collectorEndpointOptionFunc(func(o collectorEndpointConfig) collectorEndpointConfig {
  199. o.username = username
  200. return o
  201. })
  202. }
  203. // WithPassword sets the password to be used in the authorization header sent for all requests to the collector.
  204. // This option overrides any value set for the
  205. // OTEL_EXPORTER_JAEGER_PASSWORD environment variable.
  206. // If this option is not passed and the environment variable is not set, no password will be set.
  207. func WithPassword(password string) CollectorEndpointOption {
  208. return collectorEndpointOptionFunc(func(o collectorEndpointConfig) collectorEndpointConfig {
  209. o.password = password
  210. return o
  211. })
  212. }
  213. // WithHTTPClient sets the http client to be used to make request to the collector endpoint.
  214. func WithHTTPClient(client *http.Client) CollectorEndpointOption {
  215. return collectorEndpointOptionFunc(func(o collectorEndpointConfig) collectorEndpointConfig {
  216. o.httpClient = client
  217. return o
  218. })
  219. }
  220. // agentUploader implements batchUploader interface sending batches to
  221. // Jaeger through the UDP agent.
  222. type agentUploader struct {
  223. client *agentClientUDP
  224. }
  225. var _ batchUploader = (*agentUploader)(nil)
  226. func (a *agentUploader) shutdown(ctx context.Context) error {
  227. done := make(chan error, 1)
  228. go func() {
  229. done <- a.client.Close()
  230. }()
  231. select {
  232. case <-ctx.Done():
  233. // Prioritize not blocking the calling thread and just leak the
  234. // spawned goroutine to close the client.
  235. return ctx.Err()
  236. case err := <-done:
  237. return err
  238. }
  239. }
  240. func (a *agentUploader) upload(ctx context.Context, batch *gen.Batch) error {
  241. return a.client.EmitBatch(ctx, batch)
  242. }
  243. // collectorUploader implements batchUploader interface sending batches to
  244. // Jaeger through the collector http endpoint.
  245. type collectorUploader struct {
  246. endpoint string
  247. username string
  248. password string
  249. httpClient *http.Client
  250. }
  251. var _ batchUploader = (*collectorUploader)(nil)
  252. func (c *collectorUploader) shutdown(ctx context.Context) error {
  253. // The Exporter will cancel any active exports and will prevent all
  254. // subsequent exports, so nothing to do here.
  255. return nil
  256. }
  257. func (c *collectorUploader) upload(ctx context.Context, batch *gen.Batch) error {
  258. body, err := serialize(batch)
  259. if err != nil {
  260. return err
  261. }
  262. req, err := http.NewRequestWithContext(ctx, "POST", c.endpoint, body)
  263. if err != nil {
  264. return err
  265. }
  266. if c.username != "" && c.password != "" {
  267. req.SetBasicAuth(c.username, c.password)
  268. }
  269. req.Header.Set("Content-Type", "application/x-thrift")
  270. resp, err := c.httpClient.Do(req)
  271. if err != nil {
  272. return err
  273. }
  274. _, _ = io.Copy(io.Discard, resp.Body)
  275. if err = resp.Body.Close(); err != nil {
  276. return err
  277. }
  278. if resp.StatusCode < 200 || resp.StatusCode >= 300 {
  279. return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode)
  280. }
  281. return nil
  282. }
  283. func serialize(obj thrift.TStruct) (*bytes.Buffer, error) {
  284. buf := thrift.NewTMemoryBuffer()
  285. if err := obj.Write(context.Background(), thrift.NewTBinaryProtocolConf(buf, &thrift.TConfiguration{})); err != nil {
  286. return nil, err
  287. }
  288. return buf.Buffer, nil
  289. }