Skip to content

Commit 2c99011

Browse files
fix(spanner): use AggregationDefault for spanner grpc metrics
This prevents the OTel SDK from dropping the configured AttributeFilter when processing asynchronous gauge metrics (such as grpc.subchannel.open_connections). Forcing AggregationSum on gauge instruments caused the View to be rejected, exporting distinct metric points with full cardinality. The exporter then blindly stripped the labels, resulting in identical TimeSeries collisions and metric drops in Cloud Monitoring due to duplicate timestamps. By using AggregationDefault, the OTel SDK correctly interprets the AttributeFilter for all metrics and natively aggregates gauges together before export.
1 parent 370e0ed commit 2c99011

2 files changed

Lines changed: 100 additions & 1 deletion

File tree

spanner/metric_monitoring_exporter_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"net"
2222
"strings"
2323
"sync"
24+
"testing"
2425
"time"
2526

2627
"cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
@@ -29,6 +30,13 @@ import (
2930
"google.golang.org/grpc/keepalive"
3031
"google.golang.org/grpc/metadata"
3132
"google.golang.org/protobuf/types/known/emptypb"
33+
34+
"cloud.google.com/go/spanner/internal"
35+
"go.opentelemetry.io/otel/attribute"
36+
"go.opentelemetry.io/otel/metric"
37+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
38+
"google.golang.org/api/option"
39+
"google.golang.org/grpc/credentials/insecure"
3240
)
3341

3442
type MetricsTestServer struct {
@@ -124,3 +132,94 @@ func NewMetricTestServer() (*MetricsTestServer, error) {
124132

125133
return testServer, nil
126134
}
135+
136+
func TestBuiltinMetrics_GaugeAggregation(t *testing.T) {
137+
ctx := context.Background()
138+
139+
// Setup mock monitoring server
140+
monitoringServer, err := NewMetricTestServer()
141+
if err != nil {
142+
t.Fatalf("Error setting up metrics test server")
143+
}
144+
defer monitoringServer.Shutdown()
145+
go monitoringServer.Serve()
146+
147+
// Override exporter options to use the mock
148+
origCreateExporterOptions := createExporterOptions
149+
createExporterOptions = func(opts ...option.ClientOption) []option.ClientOption {
150+
return []option.ClientOption{
151+
option.WithEndpoint(monitoringServer.Endpoint), // Connect to mock
152+
option.WithoutAuthentication(),
153+
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
154+
}
155+
}
156+
defer func() {
157+
createExporterOptions = origCreateExporterOptions
158+
}()
159+
160+
// Build the meter provider using the built-in configs
161+
clientAttributes := []attribute.KeyValue{
162+
attribute.String("client_uid", "test-uid"),
163+
}
164+
165+
mpOptions, exporter, err := builtInMeterProviderOptions("test-project", "", clientAttributes)
166+
if err != nil {
167+
t.Fatalf("builtInMeterProviderOptions failed: %v", err)
168+
}
169+
170+
mp := sdkmetric.NewMeterProvider(mpOptions...)
171+
defer mp.Shutdown(ctx)
172+
173+
// Obtain the specific meter bound to the grpc metric meter name configured in views
174+
meter := mp.Meter("grpc-go", metric.WithInstrumentationVersion(internal.Version))
175+
176+
// Create the gauge (UpDownCounter in grpc-go)
177+
openConns, err := meter.Int64UpDownCounter("grpc.subchannel.open_connections")
178+
if err != nil {
179+
t.Fatalf("Failed to create counter: %v", err)
180+
}
181+
182+
// Record values with DIFFERENT target attributes (which should be DROPPED by the AttributeFilter)
183+
openConns.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.target", "target-1")))
184+
openConns.Add(ctx, 2, metric.WithAttributes(attribute.String("grpc.target", "target-2")))
185+
186+
// Force flush to the mock server
187+
err = mp.ForceFlush(ctx)
188+
if err != nil {
189+
t.Fatalf("ForceFlush failed: %v", err)
190+
}
191+
// Give the mock server a moment to receive the RPC
192+
time.Sleep(100 * time.Millisecond)
193+
194+
reqs := monitoringServer.CreateServiceTimeSeriesRequests()
195+
if len(reqs) == 0 {
196+
t.Fatalf("No CreateTimeSeriesRequests received")
197+
}
198+
199+
// Check if all exported metrics only have one data point for grpc.subchannel.open_connections
200+
var gaugePoints int64
201+
var foundTimeSeries int
202+
203+
for _, req := range reqs {
204+
for _, ts := range req.TimeSeries {
205+
if ts.Metric.Type == "spanner.googleapis.com/internal/client/grpc/subchannel/open_connections" {
206+
foundTimeSeries++
207+
if len(ts.Points) > 0 {
208+
gaugePoints += ts.Points[0].Value.GetInt64Value()
209+
}
210+
}
211+
}
212+
}
213+
214+
if foundTimeSeries != 1 {
215+
t.Errorf("Expected exactly ONE TimeSeries for open_connections due to attribute filtering making them identical, got %d", foundTimeSeries)
216+
}
217+
218+
// Since both Adds are recorded and their distinct attributes dropped, they are aggregated into 3.
219+
if gaugePoints != 3 {
220+
t.Errorf("Expected sum of open connections to be 3, got %d", gaugePoints)
221+
}
222+
223+
// Ensure that after flush we cleanly close exporter
224+
exporter.stop()
225+
}

spanner/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ func builtInMeterProviderOptions(project, compression string, clientAttributes [
362362
Name: m,
363363
},
364364
sdkmetric.Stream{
365-
Aggregation: sdkmetric.AggregationSum{},
365+
Aggregation: sdkmetric.AggregationDefault{},
366366
AttributeFilter: func(kv attribute.KeyValue) bool {
367367
if _, ok := allowedMetricLabels[string(kv.Key)]; ok {
368368
return true

0 commit comments

Comments
 (0)