jaeger.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. // Copyright The OpenTelemetry Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package jaeger // import "go.opentelemetry.io/otel/exporters/jaeger"
  15. import (
  16. "context"
  17. "encoding/binary"
  18. "encoding/json"
  19. "fmt"
  20. "sync"
  21. "go.opentelemetry.io/otel/attribute"
  22. "go.opentelemetry.io/otel/codes"
  23. gen "go.opentelemetry.io/otel/exporters/jaeger/internal/gen-go/jaeger"
  24. "go.opentelemetry.io/otel/sdk/resource"
  25. sdktrace "go.opentelemetry.io/otel/sdk/trace"
  26. semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
  27. "go.opentelemetry.io/otel/trace"
  28. )
  29. const (
  30. keyInstrumentationLibraryName = "otel.library.name"
  31. keyInstrumentationLibraryVersion = "otel.library.version"
  32. keyError = "error"
  33. keySpanKind = "span.kind"
  34. keyStatusCode = "otel.status_code"
  35. keyStatusMessage = "otel.status_description"
  36. keyDroppedAttributeCount = "otel.event.dropped_attributes_count"
  37. keyEventName = "event"
  38. )
  39. // New returns an OTel Exporter implementation that exports the collected
  40. // spans to Jaeger.
  41. func New(endpointOption EndpointOption) (*Exporter, error) {
  42. uploader, err := endpointOption.newBatchUploader()
  43. if err != nil {
  44. return nil, err
  45. }
  46. // Fetch default service.name from default resource for backup
  47. var defaultServiceName string
  48. defaultResource := resource.Default()
  49. if value, exists := defaultResource.Set().Value(semconv.ServiceNameKey); exists {
  50. defaultServiceName = value.AsString()
  51. }
  52. if defaultServiceName == "" {
  53. return nil, fmt.Errorf("failed to get service name from default resource")
  54. }
  55. stopCh := make(chan struct{})
  56. e := &Exporter{
  57. uploader: uploader,
  58. stopCh: stopCh,
  59. defaultServiceName: defaultServiceName,
  60. }
  61. return e, nil
  62. }
  63. // Exporter exports OpenTelemetry spans to a Jaeger agent or collector.
  64. type Exporter struct {
  65. uploader batchUploader
  66. stopOnce sync.Once
  67. stopCh chan struct{}
  68. defaultServiceName string
  69. }
  70. var _ sdktrace.SpanExporter = (*Exporter)(nil)
  71. // ExportSpans transforms and exports OpenTelemetry spans to Jaeger.
  72. func (e *Exporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
  73. // Return fast if context is already canceled or Exporter shutdown.
  74. select {
  75. case <-ctx.Done():
  76. return ctx.Err()
  77. case <-e.stopCh:
  78. return nil
  79. default:
  80. }
  81. // Cancel export if Exporter is shutdown.
  82. var cancel context.CancelFunc
  83. ctx, cancel = context.WithCancel(ctx)
  84. defer cancel()
  85. go func(ctx context.Context, cancel context.CancelFunc) {
  86. select {
  87. case <-ctx.Done():
  88. case <-e.stopCh:
  89. cancel()
  90. }
  91. }(ctx, cancel)
  92. for _, batch := range jaegerBatchList(spans, e.defaultServiceName) {
  93. if err := e.uploader.upload(ctx, batch); err != nil {
  94. return err
  95. }
  96. }
  97. return nil
  98. }
  99. // Shutdown stops the Exporter. This will close all connections and release
  100. // all resources held by the Exporter.
  101. func (e *Exporter) Shutdown(ctx context.Context) error {
  102. // Stop any active and subsequent exports.
  103. e.stopOnce.Do(func() { close(e.stopCh) })
  104. select {
  105. case <-ctx.Done():
  106. return ctx.Err()
  107. default:
  108. }
  109. return e.uploader.shutdown(ctx)
  110. }
  111. // MarshalLog is the marshaling function used by the logging system to represent this exporter.
  112. func (e *Exporter) MarshalLog() interface{} {
  113. return struct {
  114. Type string
  115. }{
  116. Type: "jaeger",
  117. }
  118. }
  119. func spanToThrift(ss sdktrace.ReadOnlySpan) *gen.Span {
  120. attr := ss.Attributes()
  121. tags := make([]*gen.Tag, 0, len(attr))
  122. for _, kv := range attr {
  123. tag := keyValueToTag(kv)
  124. if tag != nil {
  125. tags = append(tags, tag)
  126. }
  127. }
  128. if is := ss.InstrumentationScope(); is.Name != "" {
  129. tags = append(tags, getStringTag(keyInstrumentationLibraryName, is.Name))
  130. if is.Version != "" {
  131. tags = append(tags, getStringTag(keyInstrumentationLibraryVersion, is.Version))
  132. }
  133. }
  134. if ss.SpanKind() != trace.SpanKindInternal {
  135. tags = append(tags,
  136. getStringTag(keySpanKind, ss.SpanKind().String()),
  137. )
  138. }
  139. if ss.Status().Code != codes.Unset {
  140. switch ss.Status().Code {
  141. case codes.Ok:
  142. tags = append(tags, getStringTag(keyStatusCode, "OK"))
  143. case codes.Error:
  144. tags = append(tags, getBoolTag(keyError, true))
  145. tags = append(tags, getStringTag(keyStatusCode, "ERROR"))
  146. }
  147. if ss.Status().Description != "" {
  148. tags = append(tags, getStringTag(keyStatusMessage, ss.Status().Description))
  149. }
  150. }
  151. var logs []*gen.Log
  152. for _, a := range ss.Events() {
  153. nTags := len(a.Attributes)
  154. if a.Name != "" {
  155. nTags++
  156. }
  157. if a.DroppedAttributeCount != 0 {
  158. nTags++
  159. }
  160. fields := make([]*gen.Tag, 0, nTags)
  161. if a.Name != "" {
  162. // If an event contains an attribute with the same key, it needs
  163. // to be given precedence and overwrite this.
  164. fields = append(fields, getStringTag(keyEventName, a.Name))
  165. }
  166. for _, kv := range a.Attributes {
  167. tag := keyValueToTag(kv)
  168. if tag != nil {
  169. fields = append(fields, tag)
  170. }
  171. }
  172. if a.DroppedAttributeCount != 0 {
  173. fields = append(fields, getInt64Tag(keyDroppedAttributeCount, int64(a.DroppedAttributeCount)))
  174. }
  175. logs = append(logs, &gen.Log{
  176. Timestamp: a.Time.UnixNano() / 1000,
  177. Fields: fields,
  178. })
  179. }
  180. var refs []*gen.SpanRef
  181. for _, link := range ss.Links() {
  182. tid := link.SpanContext.TraceID()
  183. sid := link.SpanContext.SpanID()
  184. refs = append(refs, &gen.SpanRef{
  185. TraceIdHigh: int64(binary.BigEndian.Uint64(tid[0:8])),
  186. TraceIdLow: int64(binary.BigEndian.Uint64(tid[8:16])),
  187. SpanId: int64(binary.BigEndian.Uint64(sid[:])),
  188. RefType: gen.SpanRefType_FOLLOWS_FROM,
  189. })
  190. }
  191. tid := ss.SpanContext().TraceID()
  192. sid := ss.SpanContext().SpanID()
  193. psid := ss.Parent().SpanID()
  194. return &gen.Span{
  195. TraceIdHigh: int64(binary.BigEndian.Uint64(tid[0:8])),
  196. TraceIdLow: int64(binary.BigEndian.Uint64(tid[8:16])),
  197. SpanId: int64(binary.BigEndian.Uint64(sid[:])),
  198. ParentSpanId: int64(binary.BigEndian.Uint64(psid[:])),
  199. OperationName: ss.Name(), // TODO: if span kind is added then add prefix "Sent"/"Recv"
  200. Flags: int32(ss.SpanContext().TraceFlags()),
  201. StartTime: ss.StartTime().UnixNano() / 1000,
  202. Duration: ss.EndTime().Sub(ss.StartTime()).Nanoseconds() / 1000,
  203. Tags: tags,
  204. Logs: logs,
  205. References: refs,
  206. }
  207. }
  208. func keyValueToTag(keyValue attribute.KeyValue) *gen.Tag {
  209. var tag *gen.Tag
  210. switch keyValue.Value.Type() {
  211. case attribute.STRING:
  212. s := keyValue.Value.AsString()
  213. tag = &gen.Tag{
  214. Key: string(keyValue.Key),
  215. VStr: &s,
  216. VType: gen.TagType_STRING,
  217. }
  218. case attribute.BOOL:
  219. b := keyValue.Value.AsBool()
  220. tag = &gen.Tag{
  221. Key: string(keyValue.Key),
  222. VBool: &b,
  223. VType: gen.TagType_BOOL,
  224. }
  225. case attribute.INT64:
  226. i := keyValue.Value.AsInt64()
  227. tag = &gen.Tag{
  228. Key: string(keyValue.Key),
  229. VLong: &i,
  230. VType: gen.TagType_LONG,
  231. }
  232. case attribute.FLOAT64:
  233. f := keyValue.Value.AsFloat64()
  234. tag = &gen.Tag{
  235. Key: string(keyValue.Key),
  236. VDouble: &f,
  237. VType: gen.TagType_DOUBLE,
  238. }
  239. case attribute.BOOLSLICE,
  240. attribute.INT64SLICE,
  241. attribute.FLOAT64SLICE,
  242. attribute.STRINGSLICE:
  243. data, _ := json.Marshal(keyValue.Value.AsInterface())
  244. a := (string)(data)
  245. tag = &gen.Tag{
  246. Key: string(keyValue.Key),
  247. VStr: &a,
  248. VType: gen.TagType_STRING,
  249. }
  250. }
  251. return tag
  252. }
  253. func getInt64Tag(k string, i int64) *gen.Tag {
  254. return &gen.Tag{
  255. Key: k,
  256. VLong: &i,
  257. VType: gen.TagType_LONG,
  258. }
  259. }
  260. func getStringTag(k, s string) *gen.Tag {
  261. return &gen.Tag{
  262. Key: k,
  263. VStr: &s,
  264. VType: gen.TagType_STRING,
  265. }
  266. }
  267. func getBoolTag(k string, b bool) *gen.Tag {
  268. return &gen.Tag{
  269. Key: k,
  270. VBool: &b,
  271. VType: gen.TagType_BOOL,
  272. }
  273. }
  274. // jaegerBatchList transforms a slice of spans into a slice of jaeger Batch.
  275. func jaegerBatchList(ssl []sdktrace.ReadOnlySpan, defaultServiceName string) []*gen.Batch {
  276. if len(ssl) == 0 {
  277. return nil
  278. }
  279. batchDict := make(map[attribute.Distinct]*gen.Batch)
  280. for _, ss := range ssl {
  281. if ss == nil {
  282. continue
  283. }
  284. resourceKey := ss.Resource().Equivalent()
  285. batch, bOK := batchDict[resourceKey]
  286. if !bOK {
  287. batch = &gen.Batch{
  288. Process: process(ss.Resource(), defaultServiceName),
  289. Spans: []*gen.Span{},
  290. }
  291. }
  292. batch.Spans = append(batch.Spans, spanToThrift(ss))
  293. batchDict[resourceKey] = batch
  294. }
  295. // Transform the categorized map into a slice
  296. batchList := make([]*gen.Batch, 0, len(batchDict))
  297. for _, batch := range batchDict {
  298. batchList = append(batchList, batch)
  299. }
  300. return batchList
  301. }
  302. // process transforms an OTel Resource into a jaeger Process.
  303. func process(res *resource.Resource, defaultServiceName string) *gen.Process {
  304. var process gen.Process
  305. var serviceName attribute.KeyValue
  306. if res != nil {
  307. for iter := res.Iter(); iter.Next(); {
  308. if iter.Attribute().Key == semconv.ServiceNameKey {
  309. serviceName = iter.Attribute()
  310. // Don't convert service.name into tag.
  311. continue
  312. }
  313. if tag := keyValueToTag(iter.Attribute()); tag != nil {
  314. process.Tags = append(process.Tags, tag)
  315. }
  316. }
  317. }
  318. // If no service.name is contained in a Span's Resource,
  319. // that field MUST be populated from the default Resource.
  320. if serviceName.Value.AsString() == "" {
  321. serviceName = semconv.ServiceName(defaultServiceName)
  322. }
  323. process.ServiceName = serviceName.Value.AsString()
  324. return &process
  325. }