Skip to content

Commit 0231f02

Browse files
authored
fix(otlpmetrichttp): replay gzipped bodies on redirect (#8185)
Same as #8152 but for `otlpmetrichttp` When gzip compression is enabled, the OTLP metric HTTP client compressed the request body for the initial send but left GetBody wired to the original uncompressed protobuf payload. Any redirect or retry path that rebuilt the request from GetBody could then resend an uncompressed body while still advertising Content-Encoding: gzip. Update GetBody to return the gzipped buffer and add a redirect regression test that verifies the replayed body matches the original compressed request and can be decompressed successfully.
1 parent dde79e8 commit 0231f02

3 files changed

Lines changed: 77 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
3737

3838
### Fixed
3939

40+
- Fix gzipped request body replay on redirect in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#8135)
4041
- Fix gzipped request body replay on redirect in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#8152)
4142
- `go.opentelemetry.io/otel/exporters/prometheus` now uses `Value.String` formatting for label values following the [OpenTelemetry AnyValue representation for non-OTLP protocols](https://opentelemetry.io/docs/specs/otel/common/#anyvalue). (#8170)
4243

exporters/otlp/otlpmetric/otlpmetrichttp/client.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,10 @@ func (c *client) newRequest(ctx context.Context, body []byte) (request, error) {
265265
r.Header.Set("Content-Encoding", "gzip")
266266

267267
gz := gzPool.Get().(*gzip.Writer)
268-
defer gzPool.Put(gz)
268+
defer func() {
269+
gz.Reset(io.Discard)
270+
gzPool.Put(gz)
271+
}()
269272

270273
var b bytes.Buffer
271274
gz.Reset(&b)
@@ -279,7 +282,7 @@ func (c *client) newRequest(ctx context.Context, body []byte) (request, error) {
279282
}
280283

281284
req.bodyReader = bodyReader(b.Bytes())
282-
req.GetBody = bodyReaderErr(body)
285+
req.GetBody = bodyReaderErr(b.Bytes())
283286
}
284287

285288
return req, nil

exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package otlpmetrichttp
55

66
import (
7+
"bytes"
8+
"compress/gzip"
79
"context"
810
"crypto/tls"
911
"errors"
@@ -380,6 +382,75 @@ func TestGetBodyCalledOnRedirect(t *testing.T) {
380382
assert.Equal(t, requestBodies[0], requestBodies[1], "redirect body should match original")
381383
}
382384

385+
func TestGetBodyCalledOnRedirectWithGzip(t *testing.T) {
386+
// Test that req.GetBody replays the gzipped request body on redirects.
387+
var mu sync.Mutex
388+
var requestBodies [][]byte
389+
390+
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
391+
body, err := io.ReadAll(r.Body)
392+
if err != nil {
393+
w.WriteHeader(http.StatusInternalServerError)
394+
return
395+
}
396+
397+
assert.Equal(t, "gzip", r.Header.Get("Content-Encoding"))
398+
399+
mu.Lock()
400+
requestBodies = append(requestBodies, body)
401+
isFirstRequest := len(requestBodies) == 1
402+
mu.Unlock()
403+
404+
if isFirstRequest {
405+
w.Header().Set("Location", "/v1/metrics/final")
406+
w.WriteHeader(http.StatusTemporaryRedirect)
407+
return
408+
}
409+
410+
w.Header().Set("Content-Type", "application/x-protobuf")
411+
w.WriteHeader(http.StatusOK)
412+
})
413+
414+
server := httptest.NewServer(handler)
415+
defer server.Close()
416+
417+
opts := []Option{
418+
WithEndpoint(server.Listener.Addr().String()),
419+
WithInsecure(),
420+
WithCompression(GzipCompression),
421+
}
422+
cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...)
423+
client, err := newClient(cfg)
424+
require.NoError(t, err)
425+
426+
exporter, err := newExporter(client, cfg)
427+
require.NoError(t, err)
428+
ctx := t.Context()
429+
defer func() { _ = exporter.Shutdown(ctx) }()
430+
431+
err = exporter.Export(ctx, &metricdata.ResourceMetrics{})
432+
require.NoError(t, err)
433+
434+
mu.Lock()
435+
defer mu.Unlock()
436+
437+
require.Len(t, requestBodies, 2, "expected 2 requests (original + redirect)")
438+
assert.NotEmpty(t, requestBodies[0], "original request body should not be empty")
439+
assert.Equal(t, requestBodies[0], requestBodies[1], "redirect body should match original")
440+
441+
for _, body := range requestBodies {
442+
reader, err := gzip.NewReader(bytes.NewReader(body))
443+
require.NoError(t, err)
444+
func() {
445+
defer func() { assert.NoError(t, reader.Close()) }()
446+
447+
decoded, err := io.ReadAll(reader)
448+
require.NoError(t, err)
449+
assert.NotEmpty(t, decoded)
450+
}()
451+
}
452+
}
453+
383454
func TestResponseBodySizeLimit(t *testing.T) {
384455
// Override the limit to 1 byte so any non-empty response body exceeds it.
385456
orig := maxResponseBodySize

0 commit comments

Comments
 (0)