meter.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. // Copyright The OpenTelemetry Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package global // import "go.opentelemetry.io/otel/internal/global"
  15. import (
  16. "container/list"
  17. "sync"
  18. "sync/atomic"
  19. "go.opentelemetry.io/otel/metric"
  20. "go.opentelemetry.io/otel/metric/embedded"
  21. )
  22. // meterProvider is a placeholder for a configured SDK MeterProvider.
  23. //
  24. // All MeterProvider functionality is forwarded to a delegate once
  25. // configured.
  26. type meterProvider struct {
  27. embedded.MeterProvider
  28. mtx sync.Mutex
  29. meters map[il]*meter
  30. delegate metric.MeterProvider
  31. }
  32. // setDelegate configures p to delegate all MeterProvider functionality to
  33. // provider.
  34. //
  35. // All Meters provided prior to this function call are switched out to be
  36. // Meters provided by provider. All instruments and callbacks are recreated and
  37. // delegated.
  38. //
  39. // It is guaranteed by the caller that this happens only once.
  40. func (p *meterProvider) setDelegate(provider metric.MeterProvider) {
  41. p.mtx.Lock()
  42. defer p.mtx.Unlock()
  43. p.delegate = provider
  44. if len(p.meters) == 0 {
  45. return
  46. }
  47. for _, meter := range p.meters {
  48. meter.setDelegate(provider)
  49. }
  50. p.meters = nil
  51. }
  52. // Meter implements MeterProvider.
  53. func (p *meterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter {
  54. p.mtx.Lock()
  55. defer p.mtx.Unlock()
  56. if p.delegate != nil {
  57. return p.delegate.Meter(name, opts...)
  58. }
  59. // At this moment it is guaranteed that no sdk is installed, save the meter in the meters map.
  60. c := metric.NewMeterConfig(opts...)
  61. key := il{
  62. name: name,
  63. version: c.InstrumentationVersion(),
  64. }
  65. if p.meters == nil {
  66. p.meters = make(map[il]*meter)
  67. }
  68. if val, ok := p.meters[key]; ok {
  69. return val
  70. }
  71. t := &meter{name: name, opts: opts}
  72. p.meters[key] = t
  73. return t
  74. }
  75. // meter is a placeholder for a metric.Meter.
  76. //
  77. // All Meter functionality is forwarded to a delegate once configured.
  78. // Otherwise, all functionality is forwarded to a NoopMeter.
  79. type meter struct {
  80. embedded.Meter
  81. name string
  82. opts []metric.MeterOption
  83. mtx sync.Mutex
  84. instruments []delegatedInstrument
  85. registry list.List
  86. delegate atomic.Value // metric.Meter
  87. }
  88. type delegatedInstrument interface {
  89. setDelegate(metric.Meter)
  90. }
  91. // setDelegate configures m to delegate all Meter functionality to Meters
  92. // created by provider.
  93. //
  94. // All subsequent calls to the Meter methods will be passed to the delegate.
  95. //
  96. // It is guaranteed by the caller that this happens only once.
  97. func (m *meter) setDelegate(provider metric.MeterProvider) {
  98. meter := provider.Meter(m.name, m.opts...)
  99. m.delegate.Store(meter)
  100. m.mtx.Lock()
  101. defer m.mtx.Unlock()
  102. for _, inst := range m.instruments {
  103. inst.setDelegate(meter)
  104. }
  105. for e := m.registry.Front(); e != nil; e = e.Next() {
  106. r := e.Value.(*registration)
  107. r.setDelegate(meter)
  108. m.registry.Remove(e)
  109. }
  110. m.instruments = nil
  111. m.registry.Init()
  112. }
  113. func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) {
  114. if del, ok := m.delegate.Load().(metric.Meter); ok {
  115. return del.Int64Counter(name, options...)
  116. }
  117. m.mtx.Lock()
  118. defer m.mtx.Unlock()
  119. i := &siCounter{name: name, opts: options}
  120. m.instruments = append(m.instruments, i)
  121. return i, nil
  122. }
  123. func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) {
  124. if del, ok := m.delegate.Load().(metric.Meter); ok {
  125. return del.Int64UpDownCounter(name, options...)
  126. }
  127. m.mtx.Lock()
  128. defer m.mtx.Unlock()
  129. i := &siUpDownCounter{name: name, opts: options}
  130. m.instruments = append(m.instruments, i)
  131. return i, nil
  132. }
  133. func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
  134. if del, ok := m.delegate.Load().(metric.Meter); ok {
  135. return del.Int64Histogram(name, options...)
  136. }
  137. m.mtx.Lock()
  138. defer m.mtx.Unlock()
  139. i := &siHistogram{name: name, opts: options}
  140. m.instruments = append(m.instruments, i)
  141. return i, nil
  142. }
  143. func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
  144. if del, ok := m.delegate.Load().(metric.Meter); ok {
  145. return del.Int64ObservableCounter(name, options...)
  146. }
  147. m.mtx.Lock()
  148. defer m.mtx.Unlock()
  149. i := &aiCounter{name: name, opts: options}
  150. m.instruments = append(m.instruments, i)
  151. return i, nil
  152. }
  153. func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
  154. if del, ok := m.delegate.Load().(metric.Meter); ok {
  155. return del.Int64ObservableUpDownCounter(name, options...)
  156. }
  157. m.mtx.Lock()
  158. defer m.mtx.Unlock()
  159. i := &aiUpDownCounter{name: name, opts: options}
  160. m.instruments = append(m.instruments, i)
  161. return i, nil
  162. }
  163. func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
  164. if del, ok := m.delegate.Load().(metric.Meter); ok {
  165. return del.Int64ObservableGauge(name, options...)
  166. }
  167. m.mtx.Lock()
  168. defer m.mtx.Unlock()
  169. i := &aiGauge{name: name, opts: options}
  170. m.instruments = append(m.instruments, i)
  171. return i, nil
  172. }
  173. func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) {
  174. if del, ok := m.delegate.Load().(metric.Meter); ok {
  175. return del.Float64Counter(name, options...)
  176. }
  177. m.mtx.Lock()
  178. defer m.mtx.Unlock()
  179. i := &sfCounter{name: name, opts: options}
  180. m.instruments = append(m.instruments, i)
  181. return i, nil
  182. }
  183. func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) {
  184. if del, ok := m.delegate.Load().(metric.Meter); ok {
  185. return del.Float64UpDownCounter(name, options...)
  186. }
  187. m.mtx.Lock()
  188. defer m.mtx.Unlock()
  189. i := &sfUpDownCounter{name: name, opts: options}
  190. m.instruments = append(m.instruments, i)
  191. return i, nil
  192. }
  193. func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
  194. if del, ok := m.delegate.Load().(metric.Meter); ok {
  195. return del.Float64Histogram(name, options...)
  196. }
  197. m.mtx.Lock()
  198. defer m.mtx.Unlock()
  199. i := &sfHistogram{name: name, opts: options}
  200. m.instruments = append(m.instruments, i)
  201. return i, nil
  202. }
  203. func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
  204. if del, ok := m.delegate.Load().(metric.Meter); ok {
  205. return del.Float64ObservableCounter(name, options...)
  206. }
  207. m.mtx.Lock()
  208. defer m.mtx.Unlock()
  209. i := &afCounter{name: name, opts: options}
  210. m.instruments = append(m.instruments, i)
  211. return i, nil
  212. }
  213. func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
  214. if del, ok := m.delegate.Load().(metric.Meter); ok {
  215. return del.Float64ObservableUpDownCounter(name, options...)
  216. }
  217. m.mtx.Lock()
  218. defer m.mtx.Unlock()
  219. i := &afUpDownCounter{name: name, opts: options}
  220. m.instruments = append(m.instruments, i)
  221. return i, nil
  222. }
  223. func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
  224. if del, ok := m.delegate.Load().(metric.Meter); ok {
  225. return del.Float64ObservableGauge(name, options...)
  226. }
  227. m.mtx.Lock()
  228. defer m.mtx.Unlock()
  229. i := &afGauge{name: name, opts: options}
  230. m.instruments = append(m.instruments, i)
  231. return i, nil
  232. }
  233. // RegisterCallback captures the function that will be called during Collect.
  234. func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) (metric.Registration, error) {
  235. if del, ok := m.delegate.Load().(metric.Meter); ok {
  236. insts = unwrapInstruments(insts)
  237. return del.RegisterCallback(f, insts...)
  238. }
  239. m.mtx.Lock()
  240. defer m.mtx.Unlock()
  241. reg := &registration{instruments: insts, function: f}
  242. e := m.registry.PushBack(reg)
  243. reg.unreg = func() error {
  244. m.mtx.Lock()
  245. _ = m.registry.Remove(e)
  246. m.mtx.Unlock()
  247. return nil
  248. }
  249. return reg, nil
  250. }
  251. type wrapped interface {
  252. unwrap() metric.Observable
  253. }
  254. func unwrapInstruments(instruments []metric.Observable) []metric.Observable {
  255. out := make([]metric.Observable, 0, len(instruments))
  256. for _, inst := range instruments {
  257. if in, ok := inst.(wrapped); ok {
  258. out = append(out, in.unwrap())
  259. } else {
  260. out = append(out, inst)
  261. }
  262. }
  263. return out
  264. }
  265. type registration struct {
  266. embedded.Registration
  267. instruments []metric.Observable
  268. function metric.Callback
  269. unreg func() error
  270. unregMu sync.Mutex
  271. }
  272. func (c *registration) setDelegate(m metric.Meter) {
  273. insts := unwrapInstruments(c.instruments)
  274. c.unregMu.Lock()
  275. defer c.unregMu.Unlock()
  276. if c.unreg == nil {
  277. // Unregister already called.
  278. return
  279. }
  280. reg, err := m.RegisterCallback(c.function, insts...)
  281. if err != nil {
  282. GetErrorHandler().Handle(err)
  283. }
  284. c.unreg = reg.Unregister
  285. }
  286. func (c *registration) Unregister() error {
  287. c.unregMu.Lock()
  288. defer c.unregMu.Unlock()
  289. if c.unreg == nil {
  290. // Unregister already called.
  291. return nil
  292. }
  293. var err error
  294. err, c.unreg = c.unreg(), nil
  295. return err
  296. }