150 lines
4.9 KiB
Go
150 lines
4.9 KiB
Go
|
// Copyright The OpenTelemetry Authors
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
package metric // import "go.opentelemetry.io/otel/internal/metric"
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
|
||
|
"go.opentelemetry.io/otel"
|
||
|
"go.opentelemetry.io/otel/attribute"
|
||
|
"go.opentelemetry.io/otel/metric/sdkapi"
|
||
|
)
|
||
|
|
||
|
//nolint:revive // ignoring missing comments for exported error in an internal package
|
||
|
var ErrInvalidAsyncRunner = errors.New("unknown async runner type")
|
||
|
|
||
|
// AsyncCollector is an interface used between the MeterImpl and the
|
||
|
// AsyncInstrumentState helper below. This interface is implemented by
|
||
|
// the SDK to provide support for running observer callbacks.
|
||
|
type AsyncCollector interface {
|
||
|
// CollectAsync passes a batch of observations to the MeterImpl.
|
||
|
CollectAsync(labels []attribute.KeyValue, observation ...sdkapi.Observation)
|
||
|
}
|
||
|
|
||
|
// AsyncInstrumentState manages an ordered set of asynchronous
|
||
|
// instruments and the distinct runners, taking into account batch
|
||
|
// observer callbacks.
|
||
|
type AsyncInstrumentState struct {
|
||
|
lock sync.Mutex
|
||
|
|
||
|
// errorOnce will use the otel.Handler to report an error
|
||
|
// once in case of an invalid runner attempting to run.
|
||
|
errorOnce sync.Once
|
||
|
|
||
|
// runnerMap keeps the set of runners that will run each
|
||
|
// collection interval. Singletons are entered with a real
|
||
|
// instrument each, batch observers are entered with a nil
|
||
|
// instrument, ensuring that when a singleton callback is used
|
||
|
// repeatedly, it is executed repeatedly in the interval, while
|
||
|
// when a batch callback is used repeatedly, it only executes
|
||
|
// once per interval.
|
||
|
runnerMap map[asyncRunnerPair]struct{}
|
||
|
|
||
|
// runners maintains the set of runners in the order they were
|
||
|
// registered.
|
||
|
runners []asyncRunnerPair
|
||
|
|
||
|
// instruments maintains the set of instruments in the order
|
||
|
// they were registered.
|
||
|
instruments []sdkapi.AsyncImpl
|
||
|
}
|
||
|
|
||
|
// asyncRunnerPair is a map entry for Observer callback runners.
|
||
|
type asyncRunnerPair struct {
|
||
|
// runner is used as a map key here. The API ensures
|
||
|
// that all callbacks are pointers for this reason.
|
||
|
runner sdkapi.AsyncRunner
|
||
|
|
||
|
// inst refers to a non-nil instrument when `runner` is a
|
||
|
// AsyncSingleRunner.
|
||
|
inst sdkapi.AsyncImpl
|
||
|
}
|
||
|
|
||
|
// NewAsyncInstrumentState returns a new *AsyncInstrumentState, for
|
||
|
// use by MeterImpl to manage running the set of observer callbacks in
|
||
|
// the correct order.
|
||
|
func NewAsyncInstrumentState() *AsyncInstrumentState {
|
||
|
return &AsyncInstrumentState{
|
||
|
runnerMap: map[asyncRunnerPair]struct{}{},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Instruments returns the asynchronous instruments managed by this
|
||
|
// object, the set that should be checkpointed after observers are
|
||
|
// run.
|
||
|
func (a *AsyncInstrumentState) Instruments() []sdkapi.AsyncImpl {
|
||
|
a.lock.Lock()
|
||
|
defer a.lock.Unlock()
|
||
|
return a.instruments
|
||
|
}
|
||
|
|
||
|
// Register adds a new asynchronous instrument to by managed by this
|
||
|
// object. This should be called during NewAsyncInstrument() and
|
||
|
// assumes that errors (e.g., duplicate registration) have already
|
||
|
// been checked.
|
||
|
func (a *AsyncInstrumentState) Register(inst sdkapi.AsyncImpl, runner sdkapi.AsyncRunner) {
|
||
|
a.lock.Lock()
|
||
|
defer a.lock.Unlock()
|
||
|
|
||
|
a.instruments = append(a.instruments, inst)
|
||
|
|
||
|
// asyncRunnerPair reflects this callback in the asyncRunners
|
||
|
// list. If this is a batch runner, the instrument is nil.
|
||
|
// If this is a single-Observer runner, the instrument is
|
||
|
// included. This ensures that batch callbacks are called
|
||
|
// once and single callbacks are called once per instrument.
|
||
|
rp := asyncRunnerPair{
|
||
|
runner: runner,
|
||
|
}
|
||
|
if _, ok := runner.(sdkapi.AsyncSingleRunner); ok {
|
||
|
rp.inst = inst
|
||
|
}
|
||
|
|
||
|
if _, ok := a.runnerMap[rp]; !ok {
|
||
|
a.runnerMap[rp] = struct{}{}
|
||
|
a.runners = append(a.runners, rp)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Run executes the complete set of observer callbacks.
|
||
|
func (a *AsyncInstrumentState) Run(ctx context.Context, collector AsyncCollector) {
|
||
|
a.lock.Lock()
|
||
|
runners := a.runners
|
||
|
a.lock.Unlock()
|
||
|
|
||
|
for _, rp := range runners {
|
||
|
// The runner must be a single or batch runner, no
|
||
|
// other implementations are possible because the
|
||
|
// interface has un-exported methods.
|
||
|
|
||
|
if singleRunner, ok := rp.runner.(sdkapi.AsyncSingleRunner); ok {
|
||
|
singleRunner.Run(ctx, rp.inst, collector.CollectAsync)
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
if multiRunner, ok := rp.runner.(sdkapi.AsyncBatchRunner); ok {
|
||
|
multiRunner.Run(ctx, collector.CollectAsync)
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
a.errorOnce.Do(func() {
|
||
|
otel.Handle(fmt.Errorf("%w: type %T (reported once)", ErrInvalidAsyncRunner, rp))
|
||
|
})
|
||
|
}
|
||
|
}
|