Skip to content

Commit dc245af

Browse files
feat(storage): add client feature tracking support (#14320)
1 parent cfdd7ba commit dc245af

8 files changed

Lines changed: 248 additions & 2 deletions

File tree

storage/grpc_client.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ type grpcStorageClient struct {
124124
settings *settings
125125
config *storageConfig
126126
dpDiag string
127+
128+
// configFeatureAttributes tracks client-level features that are enabled for this
129+
// client instance.
130+
configFeatureAttributes uint32
127131
}
128132

129133
func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) (*metricsContext, error) {
@@ -240,6 +244,17 @@ func (c *grpcStorageClient) prepareDirectPathMetadata(ctx context.Context, targe
240244
md.Set(requestParamsHeaderKey, reason)
241245
}
242246
}
247+
248+
// Client level feature tracking.
249+
features := featureAttributes(ctx)
250+
features |= c.configFeatureAttributes
251+
// Merge all existing headers for this key from metadata.
252+
features |= mergeFeatureAttributes(md[featureTrackerHeaderName])
253+
254+
if features > 0 {
255+
md.Set(featureTrackerHeaderName, encodeUint32(features))
256+
}
257+
243258
return metadata.NewOutgoingContext(ctx, md), nil
244259
}
245260

storage/grpc_client_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,3 +496,73 @@ func TestPrepareDirectPathMetadata(t *testing.T) {
496496
})
497497
}
498498
}
499+
500+
func TestPrepareDirectPathMetadata_FeatureTracking(t *testing.T) {
501+
tests := []struct {
502+
desc string
503+
configFeatures uint32
504+
contextFeatures []trackedFeature
505+
wantFeatures uint32
506+
}{
507+
{
508+
desc: "no features",
509+
wantFeatures: 0,
510+
},
511+
{
512+
desc: "config features only",
513+
configFeatures: uint32(1 << featurePCU),
514+
wantFeatures: uint32(1 << featurePCU),
515+
},
516+
{
517+
desc: "merged features",
518+
configFeatures: uint32(1 << featurePCU),
519+
contextFeatures: []trackedFeature{featureMultistreamInMRD},
520+
wantFeatures: uint32(1<<featurePCU) | uint32(1<<featureMultistreamInMRD),
521+
},
522+
}
523+
524+
for _, tc := range tests {
525+
t.Run(tc.desc, func(t *testing.T) {
526+
c := &grpcStorageClient{
527+
config: &storageConfig{},
528+
configFeatureAttributes: tc.configFeatures,
529+
}
530+
531+
ctx := context.Background()
532+
if len(tc.contextFeatures) > 0 {
533+
ctx = addFeatureAttributes(ctx, tc.contextFeatures...)
534+
}
535+
536+
newCtx, err := c.prepareDirectPathMetadata(ctx, directPathEndpointPrefix)
537+
if err != nil {
538+
t.Fatalf("unexpected error: %v", err)
539+
}
540+
541+
md, ok := metadata.FromOutgoingContext(newCtx)
542+
if !ok {
543+
t.Fatal("metadata not found in context")
544+
}
545+
546+
got := md.Get(featureTrackerHeaderName)
547+
if tc.wantFeatures == 0 {
548+
if len(got) > 0 {
549+
t.Errorf("got features %q, want none", got[0])
550+
}
551+
return
552+
}
553+
554+
if len(got) == 0 {
555+
t.Fatalf("features header missing, want %d", tc.wantFeatures)
556+
}
557+
558+
decoded, err := decodeUint32(got[0])
559+
if err != nil {
560+
t.Fatalf("failed to decode features: %v", err)
561+
}
562+
563+
if decoded != tc.wantFeatures {
564+
t.Errorf("got features %d, want %d", decoded, tc.wantFeatures)
565+
}
566+
})
567+
}
568+
}

storage/http_client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ import (
4545

4646
// httpStorageClient is the HTTP-JSON API implementation of the transport-agnostic
4747
// storageClient interface.
48+
//
49+
// TODO(b/498422946): Add client feature tracker in HTTP client.
4850
type httpStorageClient struct {
4951
creds *auth.Credentials
5052
hc *http.Client

storage/pcu.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,9 @@ func (w *Writer) initPCU(ctx context.Context) error {
179179

180180
s := newPCUSettings(cfg.MaxConcurrency)
181181

182+
// Track PCU operations using client feature tracking header.
183+
ctx = addFeatureAttributes(ctx, featurePCU)
184+
182185
pCtx, cancel := context.WithCancel(ctx)
183186

184187
state := &pcuState{

storage/reader.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,9 @@ func (o *ObjectHandle) NewMultiRangeDownloader(ctx context.Context, opts ...MRDO
265265
for _, opt := range opts {
266266
opt.apply(params)
267267
}
268-
268+
if params.minConnections > 1 || params.maxConnections > 1 {
269+
spanCtx = addFeatureAttributes(spanCtx, featureMultistreamInMRD)
270+
}
269271
// This call will return the *MultiRangeDownloader with the .impl field set.
270272
return o.c.tc.NewMultiRangeDownloader(spanCtx, params, storageOpts...)
271273
}

storage/tracked_features.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ package storage
1616

1717
// trackedFeature represents a specific client feature being tracked, represented
1818
// as a bit in a bitmask. Each feature corresponds to a specific bit position.
19-
type trackedFeature uint
19+
type trackedFeature uint32
2020

2121
const (
2222
featureMultistreamInMRD trackedFeature = 0

storage/tracker.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package storage
16+
17+
import (
18+
"context"
19+
20+
"github.com/googleapis/gax-go/v2/callctx"
21+
)
22+
23+
const featureTrackerHeaderName = "x-goog-storage-go-features"
24+
25+
// addFeatureAttributes adds the specified feature codes to the context.
26+
// Features are stored as a bitmask in the callctx headers and will be
27+
// injected into the outgoing request headers by the transport.
28+
func addFeatureAttributes(ctx context.Context, features ...trackedFeature) context.Context {
29+
if len(features) == 0 {
30+
return ctx
31+
}
32+
33+
current := featureAttributes(ctx)
34+
updated := current
35+
for _, f := range features {
36+
updated |= (1 << f)
37+
}
38+
39+
if updated == current {
40+
return ctx
41+
}
42+
43+
return callctx.SetHeaders(ctx, featureTrackerHeaderName, encodeUint32(uint32(updated)))
44+
}
45+
46+
// featureAttributes extracts and merges all feature attributes present in the context.
47+
// It returns a bitmask represented as a uint8.
48+
func featureAttributes(ctx context.Context) uint32 {
49+
ctxHeaders := callctx.HeadersFromContext(ctx)
50+
// If multiple values are present in the context (e.g. from nested calls),
51+
// merge them into a single bitmask.
52+
return mergeFeatureAttributes(ctxHeaders[featureTrackerHeaderName])
53+
}
54+
55+
func mergeFeatureAttributes(vals []string) uint32 {
56+
features := uint32(0)
57+
for _, val := range vals {
58+
if decoded, err := decodeUint32(val); err == nil {
59+
features |= decoded
60+
}
61+
}
62+
return features
63+
}

storage/tracker_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package storage
16+
17+
import (
18+
"context"
19+
"testing"
20+
)
21+
22+
func TestAddFeatureAttributes(t *testing.T) {
23+
ctx := context.Background()
24+
25+
// Initial features should be 0.
26+
if got := featureAttributes(ctx); got != 0 {
27+
t.Errorf("getFeatureAttributes(empty) = %d; want 0", got)
28+
}
29+
30+
// Add a single feature.
31+
ctx = addFeatureAttributes(ctx, featureMultistreamInMRD)
32+
if got := featureAttributes(ctx); got != uint32(1<<featureMultistreamInMRD) {
33+
t.Errorf("getFeatureAttributes(MultiStream) = %d; want %d", got, featureMultistreamInMRD)
34+
}
35+
36+
// Add another feature (merge).
37+
ctx = addFeatureAttributes(ctx, featurePCU)
38+
want := uint32(1<<featureMultistreamInMRD) | uint32(1<<featurePCU)
39+
if got := featureAttributes(ctx); got != want {
40+
t.Errorf("getFeatureAttributes(MultiStream | PCU) = %d; want %d", got, want)
41+
}
42+
43+
// Adding same feature should be idempotent.
44+
ctx = addFeatureAttributes(ctx, featurePCU)
45+
if got := featureAttributes(ctx); got != want {
46+
t.Errorf("getFeatureAttributes(MultiStream | PCU | PCU) = %d; want %d", got, want)
47+
}
48+
}
49+
50+
func TestMergeFeatureAttributes(t *testing.T) {
51+
tests := []struct {
52+
name string
53+
vals []string
54+
want uint32
55+
}{
56+
{
57+
name: "empty",
58+
vals: []string{},
59+
want: 0,
60+
},
61+
{
62+
name: "single value",
63+
vals: []string{encodeUint32(1)},
64+
want: 1,
65+
},
66+
{
67+
name: "multiple values",
68+
vals: []string{encodeUint32(1), encodeUint32(2), encodeUint32(4)},
69+
want: 7,
70+
},
71+
{
72+
name: "overlapping values",
73+
vals: []string{encodeUint32(3), encodeUint32(6)},
74+
want: 7, // 011 | 110 = 111 (7)
75+
},
76+
{
77+
name: "invalid values ignored",
78+
vals: []string{encodeUint32(1), "invalid", encodeUint32(8)},
79+
want: 9,
80+
},
81+
}
82+
83+
for _, tc := range tests {
84+
t.Run(tc.name, func(t *testing.T) {
85+
got := mergeFeatureAttributes(tc.vals)
86+
if got != tc.want {
87+
t.Errorf("mergeFeatureAttributes(%v) = %d; want %d", tc.vals, got, tc.want)
88+
}
89+
})
90+
}
91+
}

0 commit comments

Comments
 (0)