Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,10 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf
fbOpts.EnableFallback = true
fbOpts.ErrorRateThreshold = 1
fbOpts.MinFailedCalls = 1
fbOpts.MeterProvider = config.OpenTelemetryMeterProvider

if metricsTracerFactory != nil && metricsTracerFactory.meterProvider != nil {
fbOpts.MeterProvider = metricsTracerFactory.meterProvider
}

gcpFallback, err := grpcgcp.NewGCPFallback(ctx, primaryConn, fallbackConn, fbOpts)
if err != nil {
Expand Down
40 changes: 40 additions & 0 deletions spanner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
const (
builtInMetricsMeterName = "gax-go"
grpcMetricMeterName = "grpc-go"
grpcGcpMetricMeterName = "grpc-gcp-go"

nativeMetricsPrefix = "spanner.googleapis.com/internal/client/"

Expand All @@ -72,6 +73,10 @@ const (
metricLabelKeyGRPCLBLocality = "grpc.lb.locality"
metricLabelKeyGRPCLBBackendService = "grpc.lb.backend_service"
metricLabelKeyGRPCDisconnectError = "grpc.disconnect_error"
metricLabelKeyFromChannelName = "from_channel_name"
metricLabelKeyToChannelName = "to_channel_name"
metricLabelKeyChannelName = "channel_name"
metricLabelKeyStatusCode = "status_code"

// Metric names
metricNameOperationLatencies = "operation_latencies"
Expand All @@ -82,6 +87,8 @@ const (
metricNameGFELatencies = "gfe_latencies"
metricNameGFEConnectivityErrorCount = "gfe_connectivity_error_count"
metricNameAFEConnectivityErrorCount = "afe_connectivity_error_count"
metricNameEEFFallbackCount = "eef.fallback_count"
metricNameEEFCallStatus = "eef.call_status"

// Metric units
metricUnitMS = "ms"
Expand Down Expand Up @@ -272,6 +279,8 @@ type builtinMetricsTracerFactory struct {
afeErrorCount metric.Int64Counter // Counter for the number of requests that failed to reach the Spanner API Frontend.
operationCount metric.Int64Counter // Counter for the number of operations.
attemptCount metric.Int64Counter // Counter for the number of attempts.

meterProvider metric.MeterProvider
}

func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath, compression string, isAFEBuiltInMetricEnabled, isEnableGRPCBuiltInMetrics bool, metricsProvider metric.MeterProvider, opts ...option.ClientOption) (*builtinMetricsTracerFactory, error) {
Expand Down Expand Up @@ -310,6 +319,7 @@ func newBuiltinMetricsTracerFactory(ctx context.Context, dbpath, compression str
return tracerFactory, err
}
meterProvider = sdkmetric.NewMeterProvider(mpOptions...)
tracerFactory.meterProvider = meterProvider

if isEnableGRPCBuiltInMetrics {
mo := opentelemetry.MetricsOptions{
Expand Down Expand Up @@ -372,6 +382,36 @@ func builtInMeterProviderOptions(project, compression string, clientAttributes [
},
))
}
skippedEEFMetrics := []string{
"eef.probe_result",
"eef.error_ratio",
"eef.current_channel",
"eef.channel_downtime",
}
for _, m := range skippedEEFMetrics {
views = append(views, sdkmetric.NewView(
sdkmetric.Instrument{Name: m},
sdkmetric.Stream{Aggregation: sdkmetric.AggregationDrop{}},
))
}
eefMetricsToEnable := []string{
metricNameEEFFallbackCount,
metricNameEEFCallStatus,
}
for _, m := range eefMetricsToEnable {
views = append(views, sdkmetric.NewView(
sdkmetric.Instrument{Name: m},
sdkmetric.Stream{
Aggregation: sdkmetric.AggregationSum{},
AttributeFilter: func(kv attribute.KeyValue) bool {
if _, ok := allowedEEFMetricLabels[string(kv.Key)]; ok {
return true
}
return false
},
},
))
}
return []sdkmetric.Option{sdkmetric.WithReader(
sdkmetric.NewPeriodicReader(
defaultExporter,
Expand Down
18 changes: 17 additions & 1 deletion spanner/metrics_monitoring_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ var (
metricLabelKeyStatus: true,
}

allowedEEFMetricLabels = map[string]bool{
metricLabelKeyClientUID: true,
metricLabelKeyClientName: true,
metricLabelKeyFromChannelName: true,
metricLabelKeyToChannelName: true,
metricLabelKeyChannelName: true,
metricLabelKeyStatusCode: true,
}

errShutdown = fmt.Errorf("exporter is shutdown")
)

Expand Down Expand Up @@ -215,6 +224,7 @@ func (me *monitoringExporter) recordToMetricAndMonitoredResourcePbs(metrics otel
Labels: map[string]string{},
}
labels := make(map[string]string)
isEEFMetric := strings.HasPrefix(metrics.Name, "eef.")
addAttributes := func(attr *attribute.Set) {
iter := attr.Iter()
for iter.Next() {
Expand All @@ -228,12 +238,18 @@ func (me *monitoringExporter) recordToMetricAndMonitoredResourcePbs(metrics otel
if _, ok := allowedMetricLabels[string(kv.Key)]; ok {
labels[labelKey] = kv.Value.Emit()
}
if _, ok := allowedEEFMetricLabels[string(kv.Key)]; ok && isEEFMetric {
labels[labelKey] = kv.Value.Emit()
}
}
}
for _, label := range me.clientAttributes {
if _, isResLabel := monitoredResLabelsSet[string(label.Key)]; isResLabel {
mr.Labels[string(label.Key)] = label.Value.Emit()
} else {
if ok := allowedEEFMetricLabels[string(label.Key)]; isEEFMetric && !ok {
continue
}
labels[string(label.Key)] = label.Value.Emit()
}
}
Expand All @@ -255,7 +271,7 @@ func (me *monitoringExporter) recordsToTimeSeriesPbs(rm *otelmetricdata.Resource
errs []error
)
for _, scope := range rm.ScopeMetrics {
if !(scope.Scope.Name == builtInMetricsMeterName || scope.Scope.Name == grpcMetricMeterName) {
if !(scope.Scope.Name == builtInMetricsMeterName || scope.Scope.Name == grpcMetricMeterName || scope.Scope.Name == grpcGcpMetricMeterName) {
continue
}
for _, metrics := range scope.Metrics {
Expand Down
Loading