feat(telemetry): instrument inference duration and frame age

This commit is contained in:
2021-12-02 09:16:50 +01:00
parent 85fe69dcaa
commit bb99d3b7c9
450 changed files with 201257 additions and 2 deletions

201
vendor/go.opentelemetry.io/otel/sdk/metric/LICENSE generated vendored Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,52 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aggregator // import "go.opentelemetry.io/otel/sdk/metric/aggregator"
import (
"fmt"
"math"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
)
// NewInconsistentAggregatorError formats an error describing an attempt to
// Checkpoint or Merge different-type aggregators. The result can be unwrapped as
// an ErrInconsistentType.
func NewInconsistentAggregatorError(a1, a2 export.Aggregator) error {
return fmt.Errorf("%w: %T and %T", aggregation.ErrInconsistentType, a1, a2)
}
// RangeTest is a common routine for testing for valid input values.
// This rejects NaN values. This rejects negative values when the
// metric instrument does not support negative values, including
// monotonic counter metrics and absolute Histogram metrics.
func RangeTest(num number.Number, descriptor *sdkapi.Descriptor) error {
numberKind := descriptor.NumberKind()
if numberKind == number.Float64Kind && math.IsNaN(num.AsFloat64()) {
return aggregation.ErrNaNInput
}
switch descriptor.InstrumentKind() {
case sdkapi.CounterInstrumentKind, sdkapi.CounterObserverInstrumentKind:
if num.IsNegative(numberKind) {
return aggregation.ErrNegativeInput
}
}
return nil
}

View File

@ -0,0 +1,130 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package exact // import "go.opentelemetry.io/otel/sdk/metric/aggregator/exact"
import (
"context"
"sync"
"time"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
)
type (
// Aggregator aggregates events that form a distribution, keeping
// an array with the exact set of values.
Aggregator struct {
lock sync.Mutex
samples []aggregation.Point
}
)
var _ export.Aggregator = &Aggregator{}
var _ aggregation.Points = &Aggregator{}
var _ aggregation.Count = &Aggregator{}
// New returns cnt many new exact aggregators, which aggregate recorded
// measurements by storing them in an array. This type uses a mutex
// for Update() and SynchronizedMove() concurrency.
func New(cnt int) []Aggregator {
return make([]Aggregator, cnt)
}
// Aggregation returns an interface for reading the state of this aggregator.
func (c *Aggregator) Aggregation() aggregation.Aggregation {
return c
}
// Kind returns aggregation.ExactKind.
func (c *Aggregator) Kind() aggregation.Kind {
return aggregation.ExactKind
}
// Count returns the number of values in the checkpoint.
func (c *Aggregator) Count() (uint64, error) {
return uint64(len(c.samples)), nil
}
// Points returns access to the raw data set.
func (c *Aggregator) Points() ([]aggregation.Point, error) {
return c.samples, nil
}
// SynchronizedMove saves the current state to oa and resets the current state to
// the empty set, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)
if oa != nil && o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
c.lock.Lock()
defer c.lock.Unlock()
if o != nil {
o.samples = c.samples
}
c.samples = nil
return nil
}
// Update adds the recorded measurement to the current data set.
// Update takes a lock to prevent concurrent Update() and SynchronizedMove()
// calls.
func (c *Aggregator) Update(_ context.Context, number number.Number, desc *sdkapi.Descriptor) error {
now := time.Now()
c.lock.Lock()
defer c.lock.Unlock()
c.samples = append(c.samples, aggregation.Point{
Number: number,
Time: now,
})
return nil
}
// Merge combines two data sets into one.
func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
c.samples = combine(c.samples, o.samples)
return nil
}
func combine(a, b []aggregation.Point) []aggregation.Point {
result := make([]aggregation.Point, 0, len(a)+len(b))
for len(a) != 0 && len(b) != 0 {
if a[0].Time.Before(b[0].Time) {
result = append(result, a[0])
a = a[1:]
} else {
result = append(result, b[0])
b = b[1:]
}
}
result = append(result, a...)
result = append(result, b...)
return result
}

View File

@ -0,0 +1,270 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package histogram // import "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
import (
"context"
"sort"
"sync"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
)
// Note: This code uses a Mutex to govern access to the exclusive
// aggregator state. This is in contrast to a lock-free approach
// (as in the Go prometheus client) that was reverted here:
// https://github.com/open-telemetry/opentelemetry-go/pull/669
type (
// Aggregator observe events and counts them in pre-determined buckets.
// It also calculates the sum and count of all events.
Aggregator struct {
lock sync.Mutex
boundaries []float64
kind number.Kind
state *state
}
// config describes how the histogram is aggregated.
config struct {
// explicitBoundaries support arbitrary bucketing schemes. This
// is the general case.
explicitBoundaries []float64
}
// Option configures a histogram config.
Option interface {
// apply sets one or more config fields.
apply(*config)
}
// state represents the state of a histogram, consisting of
// the sum and counts for all observed values and
// the less than equal bucket count for the pre-determined boundaries.
state struct {
bucketCounts []uint64
sum number.Number
count uint64
}
)
// WithExplicitBoundaries sets the ExplicitBoundaries configuration option of a config.
func WithExplicitBoundaries(explicitBoundaries []float64) Option {
return explicitBoundariesOption{explicitBoundaries}
}
type explicitBoundariesOption struct {
boundaries []float64
}
func (o explicitBoundariesOption) apply(config *config) {
config.explicitBoundaries = o.boundaries
}
// defaultExplicitBoundaries have been copied from prometheus.DefBuckets.
//
// Note we anticipate the use of a high-precision histogram sketch as
// the standard histogram aggregator for OTLP export.
// (https://github.com/open-telemetry/opentelemetry-specification/issues/982).
var defaultFloat64ExplicitBoundaries = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
// defaultInt64ExplicitBoundaryMultiplier determines the default
// integer histogram boundaries.
const defaultInt64ExplicitBoundaryMultiplier = 1e6
// defaultInt64ExplicitBoundaries applies a multiplier to the default
// float64 boundaries: [ 5K, 10K, 25K, ..., 2.5M, 5M, 10M ]
var defaultInt64ExplicitBoundaries = func(bounds []float64) (asint []float64) {
for _, f := range bounds {
asint = append(asint, defaultInt64ExplicitBoundaryMultiplier*f)
}
return
}(defaultFloat64ExplicitBoundaries)
var _ export.Aggregator = &Aggregator{}
var _ aggregation.Sum = &Aggregator{}
var _ aggregation.Count = &Aggregator{}
var _ aggregation.Histogram = &Aggregator{}
// New returns a new aggregator for computing Histograms.
//
// A Histogram observe events and counts them in pre-defined buckets.
// And also provides the total sum and count of all observations.
//
// Note that this aggregator maintains each value using independent
// atomic operations, which introduces the possibility that
// checkpoints are inconsistent.
func New(cnt int, desc *sdkapi.Descriptor, opts ...Option) []Aggregator {
var cfg config
if desc.NumberKind() == number.Int64Kind {
cfg.explicitBoundaries = defaultInt64ExplicitBoundaries
} else {
cfg.explicitBoundaries = defaultFloat64ExplicitBoundaries
}
for _, opt := range opts {
opt.apply(&cfg)
}
aggs := make([]Aggregator, cnt)
// Boundaries MUST be ordered otherwise the histogram could not
// be properly computed.
sortedBoundaries := make([]float64, len(cfg.explicitBoundaries))
copy(sortedBoundaries, cfg.explicitBoundaries)
sort.Float64s(sortedBoundaries)
for i := range aggs {
aggs[i] = Aggregator{
kind: desc.NumberKind(),
boundaries: sortedBoundaries,
}
aggs[i].state = aggs[i].newState()
}
return aggs
}
// Aggregation returns an interface for reading the state of this aggregator.
func (c *Aggregator) Aggregation() aggregation.Aggregation {
return c
}
// Kind returns aggregation.HistogramKind.
func (c *Aggregator) Kind() aggregation.Kind {
return aggregation.HistogramKind
}
// Sum returns the sum of all values in the checkpoint.
func (c *Aggregator) Sum() (number.Number, error) {
return c.state.sum, nil
}
// Count returns the number of values in the checkpoint.
func (c *Aggregator) Count() (uint64, error) {
return c.state.count, nil
}
// Histogram returns the count of events in pre-determined buckets.
func (c *Aggregator) Histogram() (aggregation.Buckets, error) {
return aggregation.Buckets{
Boundaries: c.boundaries,
Counts: c.state.bucketCounts,
}, nil
}
// SynchronizedMove saves the current state into oa and resets the current state to
// the empty set. Since no locks are taken, there is a chance that
// the independent Sum, Count and Bucket Count are not consistent with each
// other.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)
if oa != nil && o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
if o != nil {
// Swap case: This is the ordinary case for a
// synchronous instrument, where the SDK allocates two
// Aggregators and lock contention is anticipated.
// Reset the target state before swapping it under the
// lock below.
o.clearState()
}
c.lock.Lock()
if o != nil {
c.state, o.state = o.state, c.state
} else {
// No swap case: This is the ordinary case for an
// asynchronous instrument, where the SDK allocates a
// single Aggregator and there is no anticipated lock
// contention.
c.clearState()
}
c.lock.Unlock()
return nil
}
func (c *Aggregator) newState() *state {
return &state{
bucketCounts: make([]uint64, len(c.boundaries)+1),
}
}
func (c *Aggregator) clearState() {
for i := range c.state.bucketCounts {
c.state.bucketCounts[i] = 0
}
c.state.sum = 0
c.state.count = 0
}
// Update adds the recorded measurement to the current data set.
func (c *Aggregator) Update(_ context.Context, number number.Number, desc *sdkapi.Descriptor) error {
kind := desc.NumberKind()
asFloat := number.CoerceToFloat64(kind)
bucketID := len(c.boundaries)
for i, boundary := range c.boundaries {
if asFloat < boundary {
bucketID = i
break
}
}
// Note: Binary-search was compared using the benchmarks. The following
// code is equivalent to the linear search above:
//
// bucketID := sort.Search(len(c.boundaries), func(i int) bool {
// return asFloat < c.boundaries[i]
// })
//
// The binary search wins for very large boundary sets, but
// the linear search performs better up through arrays between
// 256 and 512 elements, which is a relatively large histogram, so we
// continue to prefer linear search.
c.lock.Lock()
defer c.lock.Unlock()
c.state.count++
c.state.sum.AddNumber(kind, number)
c.state.bucketCounts[bucketID]++
return nil
}
// Merge combines two histograms that have the same buckets into a single one.
func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
c.state.sum.AddNumber(desc.NumberKind(), o.state.sum)
c.state.count += o.state.count
for i := 0; i < len(c.state.bucketCounts); i++ {
c.state.bucketCounts[i] += o.state.bucketCounts[i]
}
return nil
}

View File

@ -0,0 +1,135 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package lastvalue // import "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
import (
"context"
"sync/atomic"
"time"
"unsafe"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
)
type (
// Aggregator aggregates lastValue events.
Aggregator struct {
// value is an atomic pointer to *lastValueData. It is never nil.
value unsafe.Pointer
}
// lastValueData stores the current value of a lastValue along with
// a sequence number to determine the winner of a race.
lastValueData struct {
// value is the int64- or float64-encoded Set() data
//
// value needs to be aligned for 64-bit atomic operations.
value number.Number
// timestamp indicates when this record was submitted.
// this can be used to pick a winner when multiple
// records contain lastValue data for the same labels due
// to races.
timestamp time.Time
}
)
var _ export.Aggregator = &Aggregator{}
var _ aggregation.LastValue = &Aggregator{}
// An unset lastValue has zero timestamp and zero value.
var unsetLastValue = &lastValueData{}
// New returns a new lastValue aggregator. This aggregator retains the
// last value and timestamp that were recorded.
func New(cnt int) []Aggregator {
aggs := make([]Aggregator, cnt)
for i := range aggs {
aggs[i] = Aggregator{
value: unsafe.Pointer(unsetLastValue),
}
}
return aggs
}
// Aggregation returns an interface for reading the state of this aggregator.
func (g *Aggregator) Aggregation() aggregation.Aggregation {
return g
}
// Kind returns aggregation.LastValueKind.
func (g *Aggregator) Kind() aggregation.Kind {
return aggregation.LastValueKind
}
// LastValue returns the last-recorded lastValue value and the
// corresponding timestamp. The error value aggregation.ErrNoData
// will be returned if (due to a race condition) the checkpoint was
// computed before the first value was set.
func (g *Aggregator) LastValue() (number.Number, time.Time, error) {
gd := (*lastValueData)(g.value)
if gd == unsetLastValue {
return 0, time.Time{}, aggregation.ErrNoData
}
return gd.value.AsNumber(), gd.timestamp, nil
}
// SynchronizedMove atomically saves the current value.
func (g *Aggregator) SynchronizedMove(oa export.Aggregator, _ *sdkapi.Descriptor) error {
if oa == nil {
atomic.StorePointer(&g.value, unsafe.Pointer(unsetLastValue))
return nil
}
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(g, oa)
}
o.value = atomic.SwapPointer(&g.value, unsafe.Pointer(unsetLastValue))
return nil
}
// Update atomically sets the current "last" value.
func (g *Aggregator) Update(_ context.Context, number number.Number, desc *sdkapi.Descriptor) error {
ngd := &lastValueData{
value: number,
timestamp: time.Now(),
}
atomic.StorePointer(&g.value, unsafe.Pointer(ngd))
return nil
}
// Merge combines state from two aggregators. The most-recently set
// value is chosen.
func (g *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(g, oa)
}
ggd := (*lastValueData)(atomic.LoadPointer(&g.value))
ogd := (*lastValueData)(atomic.LoadPointer(&o.value))
if ggd.timestamp.After(ogd.timestamp) {
return nil
}
g.value = unsafe.Pointer(ogd)
return nil
}

View File

@ -0,0 +1,165 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package minmaxsumcount // import "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
import (
"context"
"sync"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
)
type (
// Aggregator aggregates events that form a distribution,
// keeping only the min, max, sum, and count.
Aggregator struct {
lock sync.Mutex
kind number.Kind
state
}
state struct {
sum number.Number
min number.Number
max number.Number
count uint64
}
)
var _ export.Aggregator = &Aggregator{}
var _ aggregation.MinMaxSumCount = &Aggregator{}
// New returns a new aggregator for computing the min, max, sum, and
// count.
//
// This type uses a mutex for Update() and SynchronizedMove() concurrency.
func New(cnt int, desc *sdkapi.Descriptor) []Aggregator {
kind := desc.NumberKind()
aggs := make([]Aggregator, cnt)
for i := range aggs {
aggs[i] = Aggregator{
kind: kind,
state: emptyState(kind),
}
}
return aggs
}
// Aggregation returns an interface for reading the state of this aggregator.
func (c *Aggregator) Aggregation() aggregation.Aggregation {
return c
}
// Kind returns aggregation.MinMaxSumCountKind.
func (c *Aggregator) Kind() aggregation.Kind {
return aggregation.MinMaxSumCountKind
}
// Sum returns the sum of values in the checkpoint.
func (c *Aggregator) Sum() (number.Number, error) {
return c.sum, nil
}
// Count returns the number of values in the checkpoint.
func (c *Aggregator) Count() (uint64, error) {
return c.count, nil
}
// Min returns the minimum value in the checkpoint.
// The error value aggregation.ErrNoData will be returned
// if there were no measurements recorded during the checkpoint.
func (c *Aggregator) Min() (number.Number, error) {
if c.count == 0 {
return 0, aggregation.ErrNoData
}
return c.min, nil
}
// Max returns the maximum value in the checkpoint.
// The error value aggregation.ErrNoData will be returned
// if there were no measurements recorded during the checkpoint.
func (c *Aggregator) Max() (number.Number, error) {
if c.count == 0 {
return 0, aggregation.ErrNoData
}
return c.max, nil
}
// SynchronizedMove saves the current state into oa and resets the current state to
// the empty set.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)
if oa != nil && o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
c.lock.Lock()
if o != nil {
o.state = c.state
}
c.state = emptyState(c.kind)
c.lock.Unlock()
return nil
}
func emptyState(kind number.Kind) state {
return state{
count: 0,
sum: 0,
min: kind.Maximum(),
max: kind.Minimum(),
}
}
// Update adds the recorded measurement to the current data set.
func (c *Aggregator) Update(_ context.Context, number number.Number, desc *sdkapi.Descriptor) error {
kind := desc.NumberKind()
c.lock.Lock()
defer c.lock.Unlock()
c.count++
c.sum.AddNumber(kind, number)
if number.CompareNumber(kind, c.min) < 0 {
c.min = number
}
if number.CompareNumber(kind, c.max) > 0 {
c.max = number
}
return nil
}
// Merge combines two data sets into one.
func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
c.count += o.count
c.sum.AddNumber(desc.NumberKind(), o.sum)
if c.min.CompareNumber(desc.NumberKind(), o.min) > 0 {
c.min.SetNumber(o.min)
}
if c.max.CompareNumber(desc.NumberKind(), o.max) < 0 {
c.max.SetNumber(o.max)
}
return nil
}

View File

@ -0,0 +1,106 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sum // import "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
import (
"context"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
)
// Aggregator aggregates counter events.
type Aggregator struct {
// current holds current increments to this counter record
// current needs to be aligned for 64-bit atomic operations.
value number.Number
}
var _ export.Aggregator = &Aggregator{}
var _ export.Subtractor = &Aggregator{}
var _ aggregation.Sum = &Aggregator{}
// New returns a new counter aggregator implemented by atomic
// operations. This aggregator implements the aggregation.Sum
// export interface.
func New(cnt int) []Aggregator {
return make([]Aggregator, cnt)
}
// Aggregation returns an interface for reading the state of this aggregator.
func (c *Aggregator) Aggregation() aggregation.Aggregation {
return c
}
// Kind returns aggregation.SumKind.
func (c *Aggregator) Kind() aggregation.Kind {
return aggregation.SumKind
}
// Sum returns the last-checkpointed sum. This will never return an
// error.
func (c *Aggregator) Sum() (number.Number, error) {
return c.value, nil
}
// SynchronizedMove atomically saves the current value into oa and resets the
// current sum to zero.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *sdkapi.Descriptor) error {
if oa == nil {
c.value.SetRawAtomic(0)
return nil
}
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
o.value = c.value.SwapNumberAtomic(number.Number(0))
return nil
}
// Update atomically adds to the current value.
func (c *Aggregator) Update(_ context.Context, num number.Number, desc *sdkapi.Descriptor) error {
c.value.AddNumberAtomic(desc.NumberKind(), num)
return nil
}
// Merge combines two counters by adding their sums.
func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
c.value.AddNumber(desc.NumberKind(), o.value)
return nil
}
func (c *Aggregator) Subtract(opAgg, resAgg export.Aggregator, descriptor *sdkapi.Descriptor) error {
op, _ := opAgg.(*Aggregator)
if op == nil {
return aggregator.NewInconsistentAggregatorError(c, opAgg)
}
res, _ := resAgg.(*Aggregator)
if res == nil {
return aggregator.NewInconsistentAggregatorError(c, resAgg)
}
res.value = c.value
res.value.AddNumber(descriptor.NumberKind(), number.NewNumberSignChange(descriptor.NumberKind(), op.value))
return nil
}

View File

@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric // import "go.opentelemetry.io/otel/sdk/metric"
import "unsafe"
func AtomicFieldOffsets() map[string]uintptr {
return map[string]uintptr{
"record.refMapped.value": unsafe.Offsetof(record{}.refMapped.value),
"record.updateCount": unsafe.Offsetof(record{}.updateCount),
}
}

View File

@ -0,0 +1,126 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package basic // import "go.opentelemetry.io/otel/sdk/metric/controller/basic"
import (
"time"
"go.opentelemetry.io/otel"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/resource"
)
// config contains configuration for a basic Controller.
type config struct {
// Resource is the OpenTelemetry resource associated with all Meters
// created by the Controller.
Resource *resource.Resource
// CollectPeriod is the interval between calls to Collect a
// checkpoint.
//
// When pulling metrics and not exporting, this is the minimum
// time between calls to Collect. In a pull-only
// configuration, collection is performed on demand; set
// CollectPeriod to 0 always recompute the export record set.
//
// When exporting metrics, this must be > 0.
//
// Default value is 10s.
CollectPeriod time.Duration
// CollectTimeout is the timeout of the Context passed to
// Collect() and subsequently to Observer instrument callbacks.
//
// Default value is 10s. If zero, no Collect timeout is applied.
CollectTimeout time.Duration
// Exporter is used for exporting metric data.
//
// Note: Exporters such as Prometheus that pull data do not implement
// export.Exporter. These will directly call Collect() and ForEach().
Exporter export.Exporter
// PushTimeout is the timeout of the Context when a exporter is configured.
//
// Default value is 10s. If zero, no Export timeout is applied.
PushTimeout time.Duration
}
// Option is the interface that applies the value to a configuration option.
type Option interface {
// apply sets the Option value of a Config.
apply(*config)
}
// WithResource sets the Resource configuration option of a Config by merging it
// with the Resource configuration in the environment.
func WithResource(r *resource.Resource) Option {
return resourceOption{r}
}
type resourceOption struct{ *resource.Resource }
func (o resourceOption) apply(cfg *config) {
res, err := resource.Merge(cfg.Resource, o.Resource)
if err != nil {
otel.Handle(err)
}
cfg.Resource = res
}
// WithCollectPeriod sets the CollectPeriod configuration option of a Config.
func WithCollectPeriod(period time.Duration) Option {
return collectPeriodOption(period)
}
type collectPeriodOption time.Duration
func (o collectPeriodOption) apply(cfg *config) {
cfg.CollectPeriod = time.Duration(o)
}
// WithCollectTimeout sets the CollectTimeout configuration option of a Config.
func WithCollectTimeout(timeout time.Duration) Option {
return collectTimeoutOption(timeout)
}
type collectTimeoutOption time.Duration
func (o collectTimeoutOption) apply(cfg *config) {
cfg.CollectTimeout = time.Duration(o)
}
// WithExporter sets the exporter configuration option of a Config.
func WithExporter(exporter export.Exporter) Option {
return exporterOption{exporter}
}
type exporterOption struct{ exporter export.Exporter }
func (o exporterOption) apply(cfg *config) {
cfg.Exporter = o.exporter
}
// WithPushTimeout sets the PushTimeout configuration option of a Config.
func WithPushTimeout(timeout time.Duration) Option {
return pushTimeoutOption(timeout)
}
type pushTimeoutOption time.Duration
func (o pushTimeoutOption) apply(cfg *config) {
cfg.PushTimeout = time.Duration(o)
}

View File

@ -0,0 +1,388 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package basic // import "go.opentelemetry.io/otel/sdk/metric/controller/basic"
import (
"context"
"fmt"
"sync"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/metric/registry"
"go.opentelemetry.io/otel/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdk "go.opentelemetry.io/otel/sdk/metric"
controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time"
"go.opentelemetry.io/otel/sdk/resource"
)
// DefaultPeriod is used for:
//
// - the minimum time between calls to Collect()
// - the timeout for Export()
// - the timeout for Collect().
const DefaultPeriod = 10 * time.Second
// ErrControllerStarted indicates that a controller was started more
// than once.
var ErrControllerStarted = fmt.Errorf("controller already started")
// Controller organizes and synchronizes collection of metric data in
// both "pull" and "push" configurations. This supports two distinct
// modes:
//
// - Push and Pull: Start() must be called to begin calling the exporter;
// Collect() is called periodically by a background thread after starting
// the controller.
// - Pull-Only: Start() is optional in this case, to call Collect periodically.
// If Start() is not called, Collect() can be called manually to initiate
// collection
//
// The controller supports mixing push and pull access to metric data
// using the export.Reader RWLock interface. Collection will
// be blocked by a pull request in the basic controller.
type Controller struct {
// lock protects libraries and synchronizes Start() and Stop().
lock sync.Mutex
// TODO: libraries is synchronized by lock, but could be
// accomplished using a sync.Map. The SDK specification will
// probably require this, as the draft already states that
// Stop() and MeterProvider.Meter() should not block each
// other.
libraries map[instrumentation.Library]*registry.UniqueInstrumentMeterImpl
checkpointerFactory export.CheckpointerFactory
resource *resource.Resource
exporter export.Exporter
wg sync.WaitGroup
stopCh chan struct{}
clock controllerTime.Clock
ticker controllerTime.Ticker
collectPeriod time.Duration
collectTimeout time.Duration
pushTimeout time.Duration
// collectedTime is used only in configurations with no
// exporter, when ticker != nil.
collectedTime time.Time
}
var _ export.InstrumentationLibraryReader = &Controller{}
var _ metric.MeterProvider = &Controller{}
func (c *Controller) Meter(instrumentationName string, opts ...metric.MeterOption) metric.Meter {
cfg := metric.NewMeterConfig(opts...)
library := instrumentation.Library{
Name: instrumentationName,
Version: cfg.InstrumentationVersion(),
SchemaURL: cfg.SchemaURL(),
}
c.lock.Lock()
defer c.lock.Unlock()
m, ok := c.libraries[library]
if !ok {
checkpointer := c.checkpointerFactory.NewCheckpointer()
accumulator := sdk.NewAccumulator(checkpointer)
m = registry.NewUniqueInstrumentMeterImpl(&accumulatorCheckpointer{
Accumulator: accumulator,
checkpointer: checkpointer,
library: library,
})
c.libraries[library] = m
}
return metric.WrapMeterImpl(m)
}
type accumulatorCheckpointer struct {
*sdk.Accumulator
checkpointer export.Checkpointer
library instrumentation.Library
}
// New constructs a Controller using the provided checkpointer factory
// and options (including optional exporter) to configure a metric
// export pipeline.
func New(checkpointerFactory export.CheckpointerFactory, opts ...Option) *Controller {
c := &config{
CollectPeriod: DefaultPeriod,
CollectTimeout: DefaultPeriod,
PushTimeout: DefaultPeriod,
}
for _, opt := range opts {
opt.apply(c)
}
if c.Resource == nil {
c.Resource = resource.Default()
} else {
var err error
c.Resource, err = resource.Merge(resource.Environment(), c.Resource)
if err != nil {
otel.Handle(err)
}
}
return &Controller{
libraries: map[instrumentation.Library]*registry.UniqueInstrumentMeterImpl{},
checkpointerFactory: checkpointerFactory,
exporter: c.Exporter,
resource: c.Resource,
stopCh: nil,
clock: controllerTime.RealClock{},
collectPeriod: c.CollectPeriod,
collectTimeout: c.CollectTimeout,
pushTimeout: c.PushTimeout,
}
}
// SetClock supports setting a mock clock for testing. This must be
// called before Start().
func (c *Controller) SetClock(clock controllerTime.Clock) {
c.lock.Lock()
defer c.lock.Unlock()
c.clock = clock
}
// Resource returns the *resource.Resource associated with this
// controller.
func (c *Controller) Resource() *resource.Resource {
return c.resource
}
// Start begins a ticker that periodically collects and exports
// metrics with the configured interval. This is required for calling
// a configured Exporter (see WithExporter) and is otherwise optional
// when only pulling metric data.
//
// The passed context is passed to Collect() and subsequently to
// asynchronous instrument callbacks. Returns an error when the
// controller was already started.
//
// Note that it is not necessary to Start a controller when only
// pulling data; use the Collect() and ForEach() methods directly in
// this case.
func (c *Controller) Start(ctx context.Context) error {
c.lock.Lock()
defer c.lock.Unlock()
if c.stopCh != nil {
return ErrControllerStarted
}
c.wg.Add(1)
c.stopCh = make(chan struct{})
c.ticker = c.clock.Ticker(c.collectPeriod)
go c.runTicker(ctx, c.stopCh)
return nil
}
// Stop waits for the background goroutine to return and then collects
// and exports metrics one last time before returning. The passed
// context is passed to the final Collect() and subsequently to the
// final asynchronous instruments.
//
// Note that Stop() will not cancel an ongoing collection or export.
func (c *Controller) Stop(ctx context.Context) error {
if lastCollection := func() bool {
c.lock.Lock()
defer c.lock.Unlock()
if c.stopCh == nil {
return false
}
close(c.stopCh)
c.stopCh = nil
c.wg.Wait()
c.ticker.Stop()
c.ticker = nil
return true
}(); !lastCollection {
return nil
}
return c.collect(ctx)
}
// runTicker collection on ticker events until the stop channel is closed.
func (c *Controller) runTicker(ctx context.Context, stopCh chan struct{}) {
defer c.wg.Done()
for {
select {
case <-stopCh:
return
case <-c.ticker.C():
if err := c.collect(ctx); err != nil {
otel.Handle(err)
}
}
}
}
// collect computes a checkpoint and optionally exports it.
func (c *Controller) collect(ctx context.Context) error {
if err := c.checkpoint(ctx); err != nil {
return err
}
if c.exporter == nil {
return nil
}
// Note: this is not subject to collectTimeout. This blocks the next
// collection despite collectTimeout because it holds a lock.
return c.export(ctx)
}
// accumulatorList returns a snapshot of current accumulators
// registered to this controller. This briefly locks the controller.
func (c *Controller) accumulatorList() []*accumulatorCheckpointer {
c.lock.Lock()
defer c.lock.Unlock()
var r []*accumulatorCheckpointer
for _, entry := range c.libraries {
acc, ok := entry.MeterImpl().(*accumulatorCheckpointer)
if ok {
r = append(r, acc)
}
}
return r
}
// checkpoint calls the Accumulator and Checkpointer interfaces to
// compute the Reader. This applies the configured collection
// timeout. Note that this does not try to cancel a Collect or Export
// when Stop() is called.
func (c *Controller) checkpoint(ctx context.Context) error {
for _, impl := range c.accumulatorList() {
if err := c.checkpointSingleAccumulator(ctx, impl); err != nil {
return err
}
}
return nil
}
// checkpointSingleAccumulator checkpoints a single instrumentation
// library's accumulator, which involves calling
// checkpointer.StartCollection, accumulator.Collect, and
// checkpointer.FinishCollection in sequence.
func (c *Controller) checkpointSingleAccumulator(ctx context.Context, ac *accumulatorCheckpointer) error {
ckpt := ac.checkpointer.Reader()
ckpt.Lock()
defer ckpt.Unlock()
ac.checkpointer.StartCollection()
if c.collectTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, c.collectTimeout)
defer cancel()
}
_ = ac.Accumulator.Collect(ctx)
var err error
select {
case <-ctx.Done():
err = ctx.Err()
default:
// The context wasn't done, ok.
}
// Finish the checkpoint whether the accumulator timed out or not.
if cerr := ac.checkpointer.FinishCollection(); cerr != nil {
if err == nil {
err = cerr
} else {
err = fmt.Errorf("%s: %w", cerr.Error(), err)
}
}
return err
}
// export calls the exporter with a read lock on the Reader,
// applying the configured export timeout.
func (c *Controller) export(ctx context.Context) error {
if c.pushTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, c.pushTimeout)
defer cancel()
}
return c.exporter.Export(ctx, c.resource, c)
}
// ForEach implements export.InstrumentationLibraryReader.
func (c *Controller) ForEach(readerFunc func(l instrumentation.Library, r export.Reader) error) error {
for _, acPair := range c.accumulatorList() {
reader := acPair.checkpointer.Reader()
// TODO: We should not fail fast; instead accumulate errors.
if err := func() error {
reader.RLock()
defer reader.RUnlock()
return readerFunc(acPair.library, reader)
}(); err != nil {
return err
}
}
return nil
}
// IsRunning returns true if the controller was started via Start(),
// indicating that the current export.Reader is being kept
// up-to-date.
func (c *Controller) IsRunning() bool {
c.lock.Lock()
defer c.lock.Unlock()
return c.ticker != nil
}
// Collect requests a collection. The collection will be skipped if
// the last collection is aged less than the configured collection
// period.
func (c *Controller) Collect(ctx context.Context) error {
if c.IsRunning() {
// When there's a non-nil ticker, there's a goroutine
// computing checkpoints with the collection period.
return ErrControllerStarted
}
if !c.shouldCollect() {
return nil
}
return c.checkpoint(ctx)
}
// shouldCollect returns true if the collector should collect now,
// based on the timestamp, the last collection time, and the
// configured period.
func (c *Controller) shouldCollect() bool {
c.lock.Lock()
defer c.lock.Unlock()
if c.collectPeriod == 0 {
return true
}
now := c.clock.Now()
if now.Sub(c.collectedTime) < c.collectPeriod {
return false
}
c.collectedTime = now
return true
}

View File

@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package time // import "go.opentelemetry.io/otel/sdk/metric/controller/time"
import (
"time"
lib "time"
)
// Several types below are created to match "github.com/benbjohnson/clock"
// so that it remains a test-only dependency.
type Clock interface {
Now() lib.Time
Ticker(duration lib.Duration) Ticker
}
type Ticker interface {
Stop()
C() <-chan lib.Time
}
type RealClock struct {
}
type RealTicker struct {
ticker *lib.Ticker
}
var _ Clock = RealClock{}
var _ Ticker = RealTicker{}
func (RealClock) Now() time.Time {
return time.Now()
}
func (RealClock) Ticker(period time.Duration) Ticker {
return RealTicker{time.NewTicker(period)}
}
func (t RealTicker) Stop() {
t.ticker.Stop()
}
func (t RealTicker) C() <-chan time.Time {
return t.ticker.C
}

141
vendor/go.opentelemetry.io/otel/sdk/metric/doc.go generated vendored Normal file
View File

@ -0,0 +1,141 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Package metric implements the OpenTelemetry metric API.
This package is currently in a pre-GA phase. Backwards incompatible changes
may be introduced in subsequent minor version releases as we work to track the
evolving OpenTelemetry specification and user feedback.
The Accumulator type supports configurable metrics export behavior through a
collection of export interfaces that support various export strategies,
described below.
The OpenTelemetry metric API consists of methods for constructing synchronous
and asynchronous instruments. There are two constructors per instrument for
the two kinds of number (int64, float64).
Synchronous instruments are managed by a sync.Map containing a *record
with the current state for each synchronous instrument. A bound
instrument encapsulates a direct pointer to the record, allowing
bound metric events to bypass a sync.Map lookup. A lock-free
algorithm is used to protect against races when adding and removing
items from the sync.Map.
Asynchronous instruments are managed by an internal
AsyncInstrumentState, which coordinates calling batch and single
instrument callbacks.
Internal Structure
Each observer also has its own kind of record stored in the SDK. This
record contains a set of recorders for every specific label set used in the
callback.
A sync.Map maintains the mapping of current instruments and label sets to
internal records. To create a new bound instrument, the SDK consults the Map to
locate an existing record, otherwise it constructs a new record. The SDK
maintains a count of the number of references to each record, ensuring
that records are not reclaimed from the Map while they are still active
from the user's perspective.
Metric collection is performed via a single-threaded call to Collect that
sweeps through all records in the SDK, checkpointing their state. When a
record is discovered that has no references and has not been updated since
the prior collection pass, it is removed from the Map.
Both synchronous and asynchronous instruments have an associated
aggregator, which maintains the current state resulting from all metric
events since its last checkpoint. Aggregators may be lock-free or they may
use locking, but they should expect to be called concurrently. Aggregators
must be capable of merging with another aggregator of the same type.
Export Pipeline
While the SDK serves to maintain a current set of records and
coordinate collection, the behavior of a metrics export pipeline is
configured through the export types in
go.opentelemetry.io/otel/sdk/export/metric. It is important to keep
in mind the context these interfaces are called from. There are two
contexts, instrumentation context, where a user-level goroutine that
enters the SDK resulting in a new record, and collection context,
where a system-level thread performs a collection pass through the
SDK.
Descriptor is a struct that describes the metric instrument to the
export pipeline, containing the name, units, description, metric kind,
number kind (int64 or float64). A Descriptor accompanies metric data
as it passes through the export pipeline.
The AggregatorSelector interface supports choosing the method of
aggregation to apply to a particular instrument, by delegating the
construction of an Aggregator to this interface. Given the Descriptor,
the AggregatorFor method returns an implementation of Aggregator. If this
interface returns nil, the metric will be disabled. The aggregator should
be matched to the capabilities of the exporter. Selecting the aggregator
for Adding instruments is relatively straightforward, but many options
are available for aggregating distributions from Grouping instruments.
Aggregator is an interface which implements a concrete strategy for
aggregating metric updates. Several Aggregator implementations are
provided by the SDK. Aggregators may be lock-free or use locking,
depending on their structure and semantics. Aggregators implement an
Update method, called in instrumentation context, to receive a single
metric event. Aggregators implement a Checkpoint method, called in
collection context, to save a checkpoint of the current state.
Aggregators implement a Merge method, also called in collection
context, that combines state from two aggregators into one. Each SDK
record has an associated aggregator.
Processor is an interface which sits between the SDK and an exporter.
The Processor embeds an AggregatorSelector, used by the SDK to assign
new Aggregators. The Processor supports a Process() API for submitting
checkpointed aggregators to the processor, and a Reader() API
for producing a complete checkpoint for the exporter. Two default
Processor implementations are provided, the "defaultkeys" Processor groups
aggregate metrics by their recommended Descriptor.Keys(), the
"simple" Processor aggregates metrics at full dimensionality.
LabelEncoder is an optional optimization that allows an exporter to
provide the serialization logic for labels. This allows avoiding
duplicate serialization of labels, once as a unique key in the SDK (or
Processor) and once in the exporter.
Reader is an interface between the Processor and the Exporter.
After completing a collection pass, the Processor.Reader() method
returns a Reader, which the Exporter uses to iterate over all
the updated metrics.
Record is a struct containing the state of an individual exported
metric. This is the result of one collection interface for one
instrument and one label set.
Labels is a struct containing an ordered set of labels, the
corresponding unique encoding, and the encoder that produced it.
Exporter is the final stage of an export pipeline. It is called with
a Reader capable of enumerating all the updated metrics.
Controller is not an export interface per se, but it orchestrates the
export pipeline. For example, a "push" controller will establish a
periodic timer to regularly collect and export metrics. A "pull"
controller will await a pull request before initiating metric
collection. Either way, the job of the controller is to call the SDK
Collect() method, then read the checkpoint, then invoke the exporter.
Controllers are expected to implement the public metric.MeterProvider
API, meaning they can be installed as the global Meter provider.
*/
package metric // import "go.opentelemetry.io/otel/sdk/metric"

View File

@ -0,0 +1,396 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package basic // import "go.opentelemetry.io/otel/sdk/metric/processor/basic"
import (
"errors"
"fmt"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/sdkapi"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
)
type (
Processor struct {
aggregation.TemporalitySelector
export.AggregatorSelector
state
}
stateKey struct {
// TODO: This code is organized to support multiple
// accumulators which could theoretically produce the
// data for the same instrument, and this code has
// logic to combine data properly from multiple
// accumulators. However, the use of
// *sdkapi.Descriptor in the stateKey makes such
// combination impossible, because each accumulator
// allocates its own instruments. This can be fixed
// by using the instrument name and kind instead of
// the descriptor pointer. See
// https://github.com/open-telemetry/opentelemetry-go/issues/862.
descriptor *sdkapi.Descriptor
distinct attribute.Distinct
}
stateValue struct {
// labels corresponds to the stateKey.distinct field.
labels *attribute.Set
// updated indicates the last sequence number when this value had
// Process() called by an accumulator.
updated int64
// stateful indicates that a cumulative aggregation is
// being maintained, taken from the process start time.
stateful bool
// currentOwned indicates that "current" was allocated
// by the processor in order to merge results from
// multiple Accumulators during a single collection
// round, which may happen either because:
// (1) multiple Accumulators output the same Accumulation.
// (2) one Accumulator is configured with dimensionality reduction.
currentOwned bool
// current refers to the output from a single Accumulator
// (if !currentOwned) or it refers to an Aggregator
// owned by the processor used to accumulate multiple
// values in a single collection round.
current export.Aggregator
// delta, if non-nil, refers to an Aggregator owned by
// the processor used to compute deltas between
// precomputed sums.
delta export.Aggregator
// cumulative, if non-nil, refers to an Aggregator owned
// by the processor used to store the last cumulative
// value.
cumulative export.Aggregator
}
state struct {
config config
// RWMutex implements locking for the `Reader` interface.
sync.RWMutex
values map[stateKey]*stateValue
// Note: the timestamp logic currently assumes all
// exports are deltas.
processStart time.Time
intervalStart time.Time
intervalEnd time.Time
// startedCollection and finishedCollection are the
// number of StartCollection() and FinishCollection()
// calls, used to ensure that the sequence of starts
// and finishes are correctly balanced.
startedCollection int64
finishedCollection int64
}
)
var _ export.Processor = &Processor{}
var _ export.Checkpointer = &Processor{}
var _ export.Reader = &state{}
// ErrInconsistentState is returned when the sequence of collection's starts and finishes are incorrectly balanced.
var ErrInconsistentState = fmt.Errorf("inconsistent processor state")
// ErrInvalidTemporality is returned for unknown metric.Temporality.
var ErrInvalidTemporality = fmt.Errorf("invalid aggregation temporality")
// New returns a basic Processor that is also a Checkpointer using the provided
// AggregatorSelector to select Aggregators. The TemporalitySelector
// is consulted to determine the kind(s) of exporter that will consume
// data, so that this Processor can prepare to compute Delta or
// Cumulative Aggregations as needed.
func New(aselector export.AggregatorSelector, tselector aggregation.TemporalitySelector, opts ...Option) *Processor {
return NewFactory(aselector, tselector, opts...).NewCheckpointer().(*Processor)
}
type factory struct {
aselector export.AggregatorSelector
tselector aggregation.TemporalitySelector
config config
}
func NewFactory(aselector export.AggregatorSelector, tselector aggregation.TemporalitySelector, opts ...Option) export.CheckpointerFactory {
var config config
for _, opt := range opts {
opt.applyProcessor(&config)
}
return factory{
aselector: aselector,
tselector: tselector,
config: config,
}
}
var _ export.CheckpointerFactory = factory{}
func (f factory) NewCheckpointer() export.Checkpointer {
now := time.Now()
p := &Processor{
AggregatorSelector: f.aselector,
TemporalitySelector: f.tselector,
state: state{
values: map[stateKey]*stateValue{},
processStart: now,
intervalStart: now,
config: f.config,
},
}
return p
}
// Process implements export.Processor.
func (b *Processor) Process(accum export.Accumulation) error {
if b.startedCollection != b.finishedCollection+1 {
return ErrInconsistentState
}
desc := accum.Descriptor()
key := stateKey{
descriptor: desc,
distinct: accum.Labels().Equivalent(),
}
agg := accum.Aggregator()
// Check if there is an existing value.
value, ok := b.state.values[key]
if !ok {
stateful := b.TemporalityFor(desc, agg.Aggregation().Kind()).MemoryRequired(desc.InstrumentKind())
newValue := &stateValue{
labels: accum.Labels(),
updated: b.state.finishedCollection,
stateful: stateful,
current: agg,
}
if stateful {
if desc.InstrumentKind().PrecomputedSum() {
// If we know we need to compute deltas, allocate two aggregators.
b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta)
} else {
// In this case we are certain not to need a delta, only allocate
// a cumulative aggregator.
b.AggregatorFor(desc, &newValue.cumulative)
}
}
b.state.values[key] = newValue
return nil
}
// Advance the update sequence number.
sameCollection := b.state.finishedCollection == value.updated
value.updated = b.state.finishedCollection
// At this point in the code, we have located an existing
// value for some stateKey. This can be because:
//
// (a) stateful aggregation is being used, the entry was
// entered during a prior collection, and this is the first
// time processing an accumulation for this stateKey in the
// current collection. Since this is the first time
// processing an accumulation for this stateKey during this
// collection, we don't know yet whether there are multiple
// accumulators at work. If there are multiple accumulators,
// they'll hit case (b) the second time through.
//
// (b) multiple accumulators are being used, whether stateful
// or not.
//
// Case (a) occurs when the instrument and the exporter
// require memory to work correctly, either because the
// instrument reports a PrecomputedSum to a DeltaExporter or
// the reverse, a non-PrecomputedSum instrument with a
// CumulativeExporter. This logic is encapsulated in
// Temporality.MemoryRequired(InstrumentKind).
//
// Case (b) occurs when the variable `sameCollection` is true,
// indicating that the stateKey for Accumulation has already
// been seen in the same collection. When this happens, it
// implies that multiple Accumulators are being used, or that
// a single Accumulator has been configured with a label key
// filter.
if !sameCollection {
if !value.currentOwned {
// This is the first Accumulation we've seen
// for this stateKey during this collection.
// Just keep a reference to the Accumulator's
// Aggregator. All the other cases copy
// Aggregator state.
value.current = agg
return nil
}
return agg.SynchronizedMove(value.current, desc)
}
// If the current is not owned, take ownership of a copy
// before merging below.
if !value.currentOwned {
tmp := value.current
b.AggregatorSelector.AggregatorFor(desc, &value.current)
value.currentOwned = true
if err := tmp.SynchronizedMove(value.current, desc); err != nil {
return err
}
}
// Combine this Accumulation with the prior Accumulation.
return value.current.Merge(agg, desc)
}
// Reader returns the associated Reader. Use the
// Reader Locker interface to synchronize access to this
// object. The Reader.ForEach() method cannot be called
// concurrently with Process().
func (b *Processor) Reader() export.Reader {
return &b.state
}
// StartCollection signals to the Processor one or more Accumulators
// will begin calling Process() calls during collection.
func (b *Processor) StartCollection() {
if b.startedCollection != 0 {
b.intervalStart = b.intervalEnd
}
b.startedCollection++
}
// FinishCollection signals to the Processor that a complete
// collection has finished and that ForEach will be called to access
// the Reader.
func (b *Processor) FinishCollection() error {
b.intervalEnd = time.Now()
if b.startedCollection != b.finishedCollection+1 {
return ErrInconsistentState
}
defer func() { b.finishedCollection++ }()
for key, value := range b.values {
mkind := key.descriptor.InstrumentKind()
stale := value.updated != b.finishedCollection
stateless := !value.stateful
// The following branch updates stateful aggregators. Skip
// these updates if the aggregator is not stateful or if the
// aggregator is stale.
if stale || stateless {
// If this processor does not require memeory,
// stale, stateless entries can be removed.
// This implies that they were not updated
// over the previous full collection interval.
if stale && stateless && !b.config.Memory {
delete(b.values, key)
}
continue
}
// Update Aggregator state to support exporting either a
// delta or a cumulative aggregation.
var err error
if mkind.PrecomputedSum() {
if currentSubtractor, ok := value.current.(export.Subtractor); ok {
// This line is equivalent to:
// value.delta = currentSubtractor - value.cumulative
err = currentSubtractor.Subtract(value.cumulative, value.delta, key.descriptor)
if err == nil {
err = value.current.SynchronizedMove(value.cumulative, key.descriptor)
}
} else {
err = aggregation.ErrNoSubtraction
}
} else {
// This line is equivalent to:
// value.cumulative = value.cumulative + value.delta
err = value.cumulative.Merge(value.current, key.descriptor)
}
if err != nil {
return err
}
}
return nil
}
// ForEach iterates through the Reader, passing an
// export.Record with the appropriate Cumulative or Delta aggregation
// to an exporter.
func (b *state) ForEach(exporter aggregation.TemporalitySelector, f func(export.Record) error) error {
if b.startedCollection != b.finishedCollection {
return ErrInconsistentState
}
for key, value := range b.values {
mkind := key.descriptor.InstrumentKind()
var agg aggregation.Aggregation
var start time.Time
// If the processor does not have Config.Memory and it was not updated
// in the prior round, do not visit this value.
if !b.config.Memory && value.updated != (b.finishedCollection-1) {
continue
}
aggTemp := exporter.TemporalityFor(key.descriptor, value.current.Aggregation().Kind())
switch aggTemp {
case aggregation.CumulativeTemporality:
// If stateful, the sum has been computed. If stateless, the
// input was already cumulative. Either way, use the checkpointed
// value:
if value.stateful {
agg = value.cumulative.Aggregation()
} else {
agg = value.current.Aggregation()
}
start = b.processStart
case aggregation.DeltaTemporality:
// Precomputed sums are a special case.
if mkind.PrecomputedSum() {
agg = value.delta.Aggregation()
} else {
agg = value.current.Aggregation()
}
start = b.intervalStart
default:
return fmt.Errorf("%v: %w", aggTemp, ErrInvalidTemporality)
}
if err := f(export.NewRecord(
key.descriptor,
value.labels,
agg,
start,
b.intervalEnd,
)); err != nil && !errors.Is(err, aggregation.ErrNoData) {
return err
}
}
return nil
}

View File

@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package basic // import "go.opentelemetry.io/otel/sdk/metric/processor/basic"
// config contains the options for configuring a basic metric processor.
type config struct {
// Memory controls whether the processor remembers metric
// instruments and label sets that were previously reported.
// When Memory is true, Reader.ForEach() will visit
// metrics that were not updated in the most recent interval.
Memory bool
}
type Option interface {
applyProcessor(*config)
}
// WithMemory sets the memory behavior of a Processor. If this is
// true, the processor will report metric instruments and label sets
// that were previously reported but not updated in the most recent
// interval.
func WithMemory(memory bool) Option {
return memoryOption(memory)
}
type memoryOption bool
func (m memoryOption) applyProcessor(cfg *config) {
cfg.Memory = bool(m)
}

View File

@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"sync/atomic"
)
// refcountMapped atomically counts the number of references (usages) of an entry
// while also keeping a state of mapped/unmapped into a different data structure
// (an external map or list for example).
//
// refcountMapped uses an atomic value where the least significant bit is used to
// keep the state of mapping ('1' is used for unmapped and '0' is for mapped) and
// the rest of the bits are used for refcounting.
type refcountMapped struct {
// refcount has to be aligned for 64-bit atomic operations.
value int64
}
// ref returns true if the entry is still mapped and increases the
// reference usages, if unmapped returns false.
func (rm *refcountMapped) ref() bool {
// Check if this entry was marked as unmapped between the moment
// we got a reference to it (or will be removed very soon) and here.
return atomic.AddInt64(&rm.value, 2)&1 == 0
}
func (rm *refcountMapped) unref() {
atomic.AddInt64(&rm.value, -2)
}
// tryUnmap flips the mapped bit to "unmapped" state and returns true if both of the
// following conditions are true upon entry to this function:
// * There are no active references;
// * The mapped bit is in "mapped" state.
// Otherwise no changes are done to mapped bit and false is returned.
func (rm *refcountMapped) tryUnmap() bool {
if atomic.LoadInt64(&rm.value) != 0 {
return false
}
return atomic.CompareAndSwapInt64(
&rm.value,
0,
1,
)
}

551
vendor/go.opentelemetry.io/otel/sdk/metric/sdk.go generated vendored Normal file
View File

@ -0,0 +1,551 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
internal "go.opentelemetry.io/otel/internal/metric"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
)
type (
// Accumulator implements the OpenTelemetry Meter API. The
// Accumulator is bound to a single export.Processor in
// `NewAccumulator()`.
//
// The Accumulator supports a Collect() API to gather and export
// current data. Collect() should be arranged according to
// the processor model. Push-based processors will setup a
// timer to call Collect() periodically. Pull-based processors
// will call Collect() when a pull request arrives.
Accumulator struct {
// current maps `mapkey` to *record.
current sync.Map
// asyncInstruments is a set of
// `*asyncInstrument` instances
asyncLock sync.Mutex
asyncInstruments *internal.AsyncInstrumentState
// currentEpoch is the current epoch number. It is
// incremented in `Collect()`.
currentEpoch int64
// processor is the configured processor+configuration.
processor export.Processor
// collectLock prevents simultaneous calls to Collect().
collectLock sync.Mutex
// asyncSortSlice has a single purpose - as a temporary
// place for sorting during labels creation to avoid
// allocation. It is cleared after use.
asyncSortSlice attribute.Sortable
}
syncInstrument struct {
instrument
}
// mapkey uniquely describes a metric instrument in terms of
// its InstrumentID and the encoded form of its labels.
mapkey struct {
descriptor *sdkapi.Descriptor
ordered attribute.Distinct
}
// record maintains the state of one metric instrument. Due
// the use of lock-free algorithms, there may be more than one
// `record` in existence at a time, although at most one can
// be referenced from the `Accumulator.current` map.
record struct {
// refMapped keeps track of refcounts and the mapping state to the
// Accumulator.current map.
refMapped refcountMapped
// updateCount is incremented on every Update.
updateCount int64
// collectedCount is set to updateCount on collection,
// supports checking for no updates during a round.
collectedCount int64
// storage is the stored label set for this record,
// except in cases where a label set is shared due to
// batch recording.
storage attribute.Set
// labels is the processed label set for this record.
// this may refer to the `storage` field in another
// record if this label set is shared resulting from
// `RecordBatch`.
labels *attribute.Set
// sortSlice has a single purpose - as a temporary
// place for sorting during labels creation to avoid
// allocation.
sortSlice attribute.Sortable
// inst is a pointer to the corresponding instrument.
inst *syncInstrument
// current implements the actual RecordOne() API,
// depending on the type of aggregation. If nil, the
// metric was disabled by the exporter.
current export.Aggregator
checkpoint export.Aggregator
}
instrument struct {
meter *Accumulator
descriptor sdkapi.Descriptor
}
asyncInstrument struct {
instrument
// recorders maps ordered labels to the pair of
// labelset and recorder
recorders map[attribute.Distinct]*labeledRecorder
}
labeledRecorder struct {
observedEpoch int64
labels *attribute.Set
observed export.Aggregator
}
)
var (
_ sdkapi.MeterImpl = &Accumulator{}
_ sdkapi.AsyncImpl = &asyncInstrument{}
_ sdkapi.SyncImpl = &syncInstrument{}
_ sdkapi.BoundSyncImpl = &record{}
// ErrUninitializedInstrument is returned when an instrument is used when uninitialized.
ErrUninitializedInstrument = fmt.Errorf("use of an uninitialized instrument")
)
func (inst *instrument) Descriptor() sdkapi.Descriptor {
return inst.descriptor
}
func (a *asyncInstrument) Implementation() interface{} {
return a
}
func (s *syncInstrument) Implementation() interface{} {
return s
}
func (a *asyncInstrument) observe(num number.Number, labels *attribute.Set) {
if err := aggregator.RangeTest(num, &a.descriptor); err != nil {
otel.Handle(err)
return
}
recorder := a.getRecorder(labels)
if recorder == nil {
// The instrument is disabled according to the
// AggregatorSelector.
return
}
if err := recorder.Update(context.Background(), num, &a.descriptor); err != nil {
otel.Handle(err)
return
}
}
func (a *asyncInstrument) getRecorder(labels *attribute.Set) export.Aggregator {
lrec, ok := a.recorders[labels.Equivalent()]
if ok {
// Note: SynchronizedMove(nil) can't return an error
_ = lrec.observed.SynchronizedMove(nil, &a.descriptor)
lrec.observedEpoch = a.meter.currentEpoch
a.recorders[labels.Equivalent()] = lrec
return lrec.observed
}
var rec export.Aggregator
a.meter.processor.AggregatorFor(&a.descriptor, &rec)
if a.recorders == nil {
a.recorders = make(map[attribute.Distinct]*labeledRecorder)
}
// This may store nil recorder in the map, thus disabling the
// asyncInstrument for the labelset for good. This is intentional,
// but will be revisited later.
a.recorders[labels.Equivalent()] = &labeledRecorder{
observed: rec,
labels: labels,
observedEpoch: a.meter.currentEpoch,
}
return rec
}
// acquireHandle gets or creates a `*record` corresponding to `kvs`,
// the input labels. The second argument `labels` is passed in to
// support re-use of the orderedLabels computed by a previous
// measurement in the same batch. This performs two allocations
// in the common case.
func (s *syncInstrument) acquireHandle(kvs []attribute.KeyValue, labelPtr *attribute.Set) *record {
var rec *record
var equiv attribute.Distinct
if labelPtr == nil {
// This memory allocation may not be used, but it's
// needed for the `sortSlice` field, to avoid an
// allocation while sorting.
rec = &record{}
rec.storage = attribute.NewSetWithSortable(kvs, &rec.sortSlice)
rec.labels = &rec.storage
equiv = rec.storage.Equivalent()
} else {
equiv = labelPtr.Equivalent()
}
// Create lookup key for sync.Map (one allocation, as this
// passes through an interface{})
mk := mapkey{
descriptor: &s.descriptor,
ordered: equiv,
}
if actual, ok := s.meter.current.Load(mk); ok {
// Existing record case.
existingRec := actual.(*record)
if existingRec.refMapped.ref() {
// At this moment it is guaranteed that the entry is in
// the map and will not be removed.
return existingRec
}
// This entry is no longer mapped, try to add a new entry.
}
if rec == nil {
rec = &record{}
rec.labels = labelPtr
}
rec.refMapped = refcountMapped{value: 2}
rec.inst = s
s.meter.processor.AggregatorFor(&s.descriptor, &rec.current, &rec.checkpoint)
for {
// Load/Store: there's a memory allocation to place `mk` into
// an interface here.
if actual, loaded := s.meter.current.LoadOrStore(mk, rec); loaded {
// Existing record case. Cannot change rec here because if fail
// will try to add rec again to avoid new allocations.
oldRec := actual.(*record)
if oldRec.refMapped.ref() {
// At this moment it is guaranteed that the entry is in
// the map and will not be removed.
return oldRec
}
// This loaded entry is marked as unmapped (so Collect will remove
// it from the map immediately), try again - this is a busy waiting
// strategy to wait until Collect() removes this entry from the map.
//
// This can be improved by having a list of "Unmapped" entries for
// one time only usages, OR we can make this a blocking path and use
// a Mutex that protects the delete operation (delete only if the old
// record is associated with the key).
// Let collector get work done to remove the entry from the map.
runtime.Gosched()
continue
}
// The new entry was added to the map, good to go.
return rec
}
}
// The order of the input array `kvs` may be sorted after the function is called.
func (s *syncInstrument) Bind(kvs []attribute.KeyValue) sdkapi.BoundSyncImpl {
return s.acquireHandle(kvs, nil)
}
// The order of the input array `kvs` may be sorted after the function is called.
func (s *syncInstrument) RecordOne(ctx context.Context, num number.Number, kvs []attribute.KeyValue) {
h := s.acquireHandle(kvs, nil)
defer h.Unbind()
h.RecordOne(ctx, num)
}
// NewAccumulator constructs a new Accumulator for the given
// processor. This Accumulator supports only a single processor.
//
// The Accumulator does not start any background process to collect itself
// periodically, this responsibility lies with the processor, typically,
// depending on the type of export. For example, a pull-based
// processor will call Collect() when it receives a request to scrape
// current metric values. A push-based processor should configure its
// own periodic collection.
func NewAccumulator(processor export.Processor) *Accumulator {
return &Accumulator{
processor: processor,
asyncInstruments: internal.NewAsyncInstrumentState(),
}
}
// NewSyncInstrument implements sdkapi.MetricImpl.
func (m *Accumulator) NewSyncInstrument(descriptor sdkapi.Descriptor) (sdkapi.SyncImpl, error) {
return &syncInstrument{
instrument: instrument{
descriptor: descriptor,
meter: m,
},
}, nil
}
// NewAsyncInstrument implements sdkapi.MetricImpl.
func (m *Accumulator) NewAsyncInstrument(descriptor sdkapi.Descriptor, runner sdkapi.AsyncRunner) (sdkapi.AsyncImpl, error) {
a := &asyncInstrument{
instrument: instrument{
descriptor: descriptor,
meter: m,
},
}
m.asyncLock.Lock()
defer m.asyncLock.Unlock()
m.asyncInstruments.Register(a, runner)
return a, nil
}
// Collect traverses the list of active records and observers and
// exports data for each active instrument. Collect() may not be
// called concurrently.
//
// During the collection pass, the export.Processor will receive
// one Export() call per current aggregation.
//
// Returns the number of records that were checkpointed.
func (m *Accumulator) Collect(ctx context.Context) int {
m.collectLock.Lock()
defer m.collectLock.Unlock()
checkpointed := m.observeAsyncInstruments(ctx)
checkpointed += m.collectSyncInstruments()
m.currentEpoch++
return checkpointed
}
func (m *Accumulator) collectSyncInstruments() int {
checkpointed := 0
m.current.Range(func(key interface{}, value interface{}) bool {
// Note: always continue to iterate over the entire
// map by returning `true` in this function.
inuse := value.(*record)
mods := atomic.LoadInt64(&inuse.updateCount)
coll := inuse.collectedCount
if mods != coll {
// Updates happened in this interval,
// checkpoint and continue.
checkpointed += m.checkpointRecord(inuse)
inuse.collectedCount = mods
return true
}
// Having no updates since last collection, try to unmap:
if unmapped := inuse.refMapped.tryUnmap(); !unmapped {
// The record is referenced by a binding, continue.
return true
}
// If any other goroutines are now trying to re-insert this
// entry in the map, they are busy calling Gosched() awaiting
// this deletion:
m.current.Delete(inuse.mapkey())
// There's a potential race between `LoadInt64` and
// `tryUnmap` in this function. Since this is the
// last we'll see of this record, checkpoint
mods = atomic.LoadInt64(&inuse.updateCount)
if mods != coll {
checkpointed += m.checkpointRecord(inuse)
}
return true
})
return checkpointed
}
// CollectAsync implements internal.AsyncCollector.
// The order of the input array `kvs` may be sorted after the function is called.
func (m *Accumulator) CollectAsync(kv []attribute.KeyValue, obs ...sdkapi.Observation) {
labels := attribute.NewSetWithSortable(kv, &m.asyncSortSlice)
for _, ob := range obs {
if a := m.fromAsync(ob.AsyncImpl()); a != nil {
a.observe(ob.Number(), &labels)
}
}
}
func (m *Accumulator) observeAsyncInstruments(ctx context.Context) int {
m.asyncLock.Lock()
defer m.asyncLock.Unlock()
asyncCollected := 0
m.asyncInstruments.Run(ctx, m)
for _, inst := range m.asyncInstruments.Instruments() {
if a := m.fromAsync(inst); a != nil {
asyncCollected += m.checkpointAsync(a)
}
}
return asyncCollected
}
func (m *Accumulator) checkpointRecord(r *record) int {
if r.current == nil {
return 0
}
err := r.current.SynchronizedMove(r.checkpoint, &r.inst.descriptor)
if err != nil {
otel.Handle(err)
return 0
}
a := export.NewAccumulation(&r.inst.descriptor, r.labels, r.checkpoint)
err = m.processor.Process(a)
if err != nil {
otel.Handle(err)
}
return 1
}
func (m *Accumulator) checkpointAsync(a *asyncInstrument) int {
if len(a.recorders) == 0 {
return 0
}
checkpointed := 0
for encodedLabels, lrec := range a.recorders {
lrec := lrec
epochDiff := m.currentEpoch - lrec.observedEpoch
if epochDiff == 0 {
if lrec.observed != nil {
a := export.NewAccumulation(&a.descriptor, lrec.labels, lrec.observed)
err := m.processor.Process(a)
if err != nil {
otel.Handle(err)
}
checkpointed++
}
} else if epochDiff > 1 {
// This is second collection cycle with no
// observations for this labelset. Remove the
// recorder.
delete(a.recorders, encodedLabels)
}
}
if len(a.recorders) == 0 {
a.recorders = nil
}
return checkpointed
}
// RecordBatch enters a batch of metric events.
// The order of the input array `kvs` may be sorted after the function is called.
func (m *Accumulator) RecordBatch(ctx context.Context, kvs []attribute.KeyValue, measurements ...sdkapi.Measurement) {
// Labels will be computed the first time acquireHandle is
// called. Subsequent calls to acquireHandle will re-use the
// previously computed value instead of recomputing the
// ordered labels.
var labelsPtr *attribute.Set
for i, meas := range measurements {
s := m.fromSync(meas.SyncImpl())
if s == nil {
continue
}
h := s.acquireHandle(kvs, labelsPtr)
// Re-use labels for the next measurement.
if i == 0 {
labelsPtr = h.labels
}
defer h.Unbind()
h.RecordOne(ctx, meas.Number())
}
}
// RecordOne implements sdkapi.SyncImpl.
func (r *record) RecordOne(ctx context.Context, num number.Number) {
if r.current == nil {
// The instrument is disabled according to the AggregatorSelector.
return
}
if err := aggregator.RangeTest(num, &r.inst.descriptor); err != nil {
otel.Handle(err)
return
}
if err := r.current.Update(ctx, num, &r.inst.descriptor); err != nil {
otel.Handle(err)
return
}
// Record was modified, inform the Collect() that things need
// to be collected while the record is still mapped.
atomic.AddInt64(&r.updateCount, 1)
}
// Unbind implements sdkapi.SyncImpl.
func (r *record) Unbind() {
r.refMapped.unref()
}
func (r *record) mapkey() mapkey {
return mapkey{
descriptor: &r.inst.descriptor,
ordered: r.labels.Equivalent(),
}
}
// fromSync gets a sync implementation object, checking for
// uninitialized instruments and instruments created by another SDK.
func (m *Accumulator) fromSync(sync sdkapi.SyncImpl) *syncInstrument {
if sync != nil {
if inst, ok := sync.Implementation().(*syncInstrument); ok {
return inst
}
}
otel.Handle(ErrUninitializedInstrument)
return nil
}
// fromSync gets an async implementation object, checking for
// uninitialized instruments and instruments created by another SDK.
func (m *Accumulator) fromAsync(async sdkapi.AsyncImpl) *asyncInstrument {
if async != nil {
if inst, ok := async.Implementation().(*asyncInstrument); ok {
return inst
}
}
otel.Handle(ErrUninitializedInstrument)
return nil
}

View File

@ -0,0 +1,120 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package simple // import "go.opentelemetry.io/otel/sdk/metric/selector/simple"
import (
"go.opentelemetry.io/otel/metric/sdkapi"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/exact"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
)
type (
selectorInexpensive struct{}
selectorExact struct{}
selectorHistogram struct {
options []histogram.Option
}
)
var (
_ export.AggregatorSelector = selectorInexpensive{}
_ export.AggregatorSelector = selectorExact{}
_ export.AggregatorSelector = selectorHistogram{}
)
// NewWithInexpensiveDistribution returns a simple aggregator selector
// that uses minmaxsumcount aggregators for `Histogram`
// instruments. This selector is faster and uses less memory than the
// others in this package because minmaxsumcount aggregators maintain
// the least information about the distribution among these choices.
func NewWithInexpensiveDistribution() export.AggregatorSelector {
return selectorInexpensive{}
}
// NewWithExactDistribution returns a simple aggregator selector that
// uses exact aggregators for `Histogram` instruments. This
// selector uses more memory than the others in this package because
// exact aggregators maintain the most information about the
// distribution among these choices.
func NewWithExactDistribution() export.AggregatorSelector {
return selectorExact{}
}
// NewWithHistogramDistribution returns a simple aggregator selector
// that uses histogram aggregators for `Histogram` instruments.
// This selector is a good default choice for most metric exporters.
func NewWithHistogramDistribution(options ...histogram.Option) export.AggregatorSelector {
return selectorHistogram{options: options}
}
func sumAggs(aggPtrs []*export.Aggregator) {
aggs := sum.New(len(aggPtrs))
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
}
func lastValueAggs(aggPtrs []*export.Aggregator) {
aggs := lastvalue.New(len(aggPtrs))
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
}
func (selectorInexpensive) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) {
switch descriptor.InstrumentKind() {
case sdkapi.GaugeObserverInstrumentKind:
lastValueAggs(aggPtrs)
case sdkapi.HistogramInstrumentKind:
aggs := minmaxsumcount.New(len(aggPtrs), descriptor)
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
default:
sumAggs(aggPtrs)
}
}
func (selectorExact) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) {
switch descriptor.InstrumentKind() {
case sdkapi.GaugeObserverInstrumentKind:
lastValueAggs(aggPtrs)
case sdkapi.HistogramInstrumentKind:
aggs := exact.New(len(aggPtrs))
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
default:
sumAggs(aggPtrs)
}
}
func (s selectorHistogram) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) {
switch descriptor.InstrumentKind() {
case sdkapi.GaugeObserverInstrumentKind:
lastValueAggs(aggPtrs)
case sdkapi.HistogramInstrumentKind:
aggs := histogram.New(len(aggPtrs), descriptor, s.options...)
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
default:
sumAggs(aggPtrs)
}
}