Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,10 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf
fbOpts.EnableFallback = true
fbOpts.ErrorRateThreshold = 1
fbOpts.MinFailedCalls = 1
fbOpts.MeterProvider = config.OpenTelemetryMeterProvider

if metricsTracerFactory != nil && metricsTracerFactory.meterProvider != nil {
fbOpts.MeterProvider = metricsTracerFactory.meterProvider
}

gcpFallback, err := grpcgcp.NewGCPFallback(ctx, primaryConn, fallbackConn, fbOpts)
if err != nil {
Expand Down
40 changes: 40 additions & 0 deletions spanner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
const (
builtInMetricsMeterName = "gax-go"
grpcMetricMeterName = "grpc-go"
grpcGcpMetricMeterName = "grpc-gcp-go"

nativeMetricsPrefix = "spanner.googleapis.com/internal/client/"

Expand All @@ -72,6 +73,10 @@ const (
metricLabelKeyGRPCLBLocality = "grpc.lb.locality"
metricLabelKeyGRPCLBBackendService = "grpc.lb.backend_service"
metricLabelKeyGRPCDisconnectError = "grpc.disconnect_error"
metricLabelKeyFromChannelName = "from_channel_name"
metricLabelKeyToChannelName = "to_channel_name"
metricLabelKeyChannelName = "channel_name"
metricLabelKeyStatusCode = "status_code"

// Metric names
metricNameOperationLatencies = "operation_latencies"
Expand All @@ -82,6 +87,8 @@ const (
metricNameGFELatencies = "gfe_latencies"
metricNameGFEConnectivityErrorCount = "gfe_connectivity_error_count"
metricNameAFEConnectivityErrorCount = "afe_connectivity_error_count"
metricNameEEFFallbackCount = "eef.fallback_count"
metricNameEEFCallStatus = "eef.call_status"

// Metric units
metricUnitMS = "ms"
Expand Down Expand Up @@ -272,6 +279,8 @@ type builtinMetricsTracerFactory struct {
afeErrorCount metric.Int64Counter // Counter for the number of requests that failed to reach the Spanner API Frontend.
operationCount metric.Int64Counter // Counter for the number of operations.
attemptCount metric.Int64Counter // Counter for the number of attempts.

meterProvider metric.MeterProvider
}

func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath, compression string, isAFEBuiltInMetricEnabled, isEnableGRPCBuiltInMetrics bool, metricsProvider metric.MeterProvider, opts ...option.ClientOption) (*builtinMetricsTracerFactory, error) {
Expand Down Expand Up @@ -310,6 +319,7 @@ func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath, compression str
return tracerFactory, err
}
meterProvider = sdkmetric.NewMeterProvider(mpOptions...)
tracerFactory.meterProvider = meterProvider

if isEnableGRPCBuiltInMetrics {
mo := opentelemetry.MetricsOptions{
Expand Down Expand Up @@ -372,6 +382,36 @@ func builtInMeterProviderOptions(project, compression string, clientAttributes [
},
))
}
skippedEEFMetrics := []string{
"eef.probe_result",
"eef.error_ratio",
"eef.current_channel",
"eef.channel_downtime",
}
for _, m := range skippedEEFMetrics {
views = append(views, sdkmetric.NewView(
sdkmetric.Instrument{Name: m},
sdkmetric.Stream{Aggregation: sdkmetric.AggregationDrop{}},
))
}
eefMetricsToEnable := []string{
metricNameEEFFallbackCount,
metricNameEEFCallStatus,
}
for _, m := range eefMetricsToEnable {
views = append(views, sdkmetric.NewView(
sdkmetric.Instrument{Name: m},
sdkmetric.Stream{
Aggregation: sdkmetric.AggregationSum{},
AttributeFilter: func(kv attribute.KeyValue) bool {
if _, ok := allowedEEFMetricLabels[string(kv.Key)]; ok {
return true
}
return false
},
},
))
}
return []sdkmetric.Option{sdkmetric.WithReader(
sdkmetric.NewPeriodicReader(
defaultExporter,
Expand Down
18 changes: 17 additions & 1 deletion spanner/metrics_monitoring_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ var (
metricLabelKeyStatus: true,
}

allowedEEFMetricLabels = map[string]bool{
metricLabelKeyClientUID: true,
metricLabelKeyClientName: true,
metricLabelKeyFromChannelName: true,
metricLabelKeyToChannelName: true,
metricLabelKeyChannelName: true,
metricLabelKeyStatusCode: true,
}

errShutdown = fmt.Errorf("exporter is shutdown")
)

Expand Down Expand Up @@ -215,6 +224,7 @@ func (me *monitoringExporter) recordToMetricAndMonitoredResourcePbs(metrics otel
Labels: map[string]string{},
}
labels := make(map[string]string)
isEEFMetric := strings.HasPrefix(metrics.Name, "eef.")
addAttributes := func(attr *attribute.Set) {
iter := attr.Iter()
for iter.Next() {
Expand All @@ -228,12 +238,18 @@ func (me *monitoringExporter) recordToMetricAndMonitoredResourcePbs(metrics otel
if _, ok := allowedMetricLabels[string(kv.Key)]; ok {
labels[labelKey] = kv.Value.Emit()
}
if _, ok := allowedEEFMetricLabels[string(kv.Key)]; ok && isEEFMetric {
labels[labelKey] = kv.Value.Emit()
}
}
}
for _, label := range me.clientAttributes {
if _, isResLabel := monitoredResLabelsSet[string(label.Key)]; isResLabel {
mr.Labels[string(label.Key)] = label.Value.Emit()
} else {
if ok := allowedEEFMetricLabels[string(label.Key)]; isEEFMetric && !ok {
continue
}
labels[string(label.Key)] = label.Value.Emit()
}
}
Expand All @@ -255,7 +271,7 @@ func (me *monitoringExporter) recordsToTimeSeriesPbs(rm *otelmetricdata.Resource
errs []error
)
for _, scope := range rm.ScopeMetrics {
if !(scope.Scope.Name == builtInMetricsMeterName || scope.Scope.Name == grpcMetricMeterName) {
if !(scope.Scope.Name == builtInMetricsMeterName || scope.Scope.Name == grpcMetricMeterName || scope.Scope.Name == grpcGcpMetricMeterName) {
continue
}
for _, metrics := range scope.Metrics {
Expand Down
Loading