diff --git a/.chloggen/persist-request-context-api.yaml b/.chloggen/persist-request-context-api.yaml new file mode 100644 index 00000000000..ba2025e38f2 --- /dev/null +++ b/.chloggen/persist-request-context-api.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: QueueBatchEncoding interface is changed to support marshaling and unmarshaling of request context. + +# One or more tracking issues or pull requests related to the change +issues: [13188] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/persist-request-context.yaml b/.chloggen/persist-request-context.yaml new file mode 100644 index 00000000000..352806d9215 --- /dev/null +++ b/.chloggen/persist-request-context.yaml @@ -0,0 +1,28 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add an option to preserve request span context in the persistent queue + +# One or more tracking issues or pull requests related to the change +issues: [11740] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Currently, it is behind the exporter.PersistRequestContext feature gate, which can be enabled by adding + `--feature-gates=exporter.PersistRequestContext` to the collector command line. An exporter buffer stored by + a previous version of the collector (or by a collector with the feature gate disabled) can be read by a newer + collector with the feature enabled. However, the reverse is not supported: a buffer stored by a newer collector with + the feature enabled cannot be read by an older collector (or by a collector with the feature gate disabled). + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.codecov.yml b/.codecov.yml index b7698210cd3..469b449beb7 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -19,4 +19,5 @@ coverage: ignore: - "pdata/internal/data/protogen/**/*" + - "**/*.pb.go" - "cmd/mdatagen/third_party/**/*" diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 0d12a144cae..a9105ca5076 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -125,6 +125,10 @@ jobs: run: | make gogenerate git diff --exit-code || (echo 'Generated code is out of date, please run "make gogenerate" and commit the changes in this PR.' && exit 1) + - name: Generate proto files + run: | + make genproto + git diff --exit-code || (echo 'Generated code is out of date, please run "make genproto" and commit the changes in this PR.' && exit 1) - name: Gen Pdata run: | make genpdata diff --git a/.github/workflows/utils/cspell.json b/.github/workflows/utils/cspell.json index 8c72c50086f..7cf1fafbcee 100644 --- a/.github/workflows/utils/cspell.json +++ b/.github/workflows/utils/cspell.json @@ -445,6 +445,7 @@ "xexporter", "xexporterhelper", "xextension", + "xpdata", "xpipeline", "xprocessor", "xprocessorhelper", diff --git a/Makefile b/Makefile index e37c7429074..5877333326b 100644 --- a/Makefile +++ b/Makefile @@ -94,7 +94,6 @@ gotidy: gogenerate: cd cmd/mdatagen && $(GOCMD) install . @$(MAKE) for-all-target TARGET="generate" - $(MAKE) genproto_internal $(MAKE) fmt .PHONY: addlicense @@ -206,6 +205,7 @@ genproto: genproto-cleanup curl -sSL https://api.github.com/repos/open-telemetry/opentelemetry-proto/tarball/${OPENTELEMETRY_PROTO_VERSION} | tar xz --strip 1 -C ${OPENTELEMETRY_PROTO_SRC_DIR} # Call a sub-make to ensure OPENTELEMETRY_PROTO_FILES is populated $(MAKE) genproto_sub + $(MAKE) genproto_internal $(MAKE) fmt $(MAKE) genproto-cleanup @@ -250,10 +250,9 @@ genpdata: pushd pdata/ && $(GOCMD) run ./internal/cmd/pdatagen/main.go && popd $(MAKE) fmt -INTERNAL_PROTO_SRC_DIRS := exporter/exporterhelper/internal/queue -# INTERNAL_PROTO_SRC_DIRS += path/to/other/proto/dirs +INTERNAL_PROTO_SRC_DIRS := exporter/exporterhelper/internal/queue pdata/xpdata/request/internal INTERNAL_PROTO_FILES := $(foreach dir,$(INTERNAL_PROTO_SRC_DIRS),$(wildcard $(dir)/*.proto)) -INTERNAL_PROTOC := $(DOCKERCMD) run --rm -u ${shell id -u} -v${PWD}:${PWD} -w${PWD} ${DOCKER_PROTOBUF} --proto_path=${PWD} --go_out=${PWD} +INTERNAL_PROTOC := $(DOCKERCMD) run --rm -u ${shell id -u} -v${PWD}:${PWD} -w${PWD} ${DOCKER_PROTOBUF} --proto_path=${PWD} -I/usr/include/github.com/gogo/protobuf -I${PWD}/$(PROTO_INTERMEDIATE_DIR) --go_out=${PWD} .PHONY: genproto_internal genproto_internal: diff --git a/cmd/builder/internal/builder/main_test.go b/cmd/builder/internal/builder/main_test.go index caa4cae5dc3..b75d67bd9dd 100644 --- a/cmd/builder/internal/builder/main_test.go +++ b/cmd/builder/internal/builder/main_test.go @@ -95,6 +95,7 @@ var replaceModules = []string{ "/pdata", "/pdata/testdata", "/pdata/pprofile", + "/pdata/xpdata", "/pipeline", "/pipeline/xpipeline", "/processor", diff --git a/cmd/otelcorecol/builder-config.yaml b/cmd/otelcorecol/builder-config.yaml index 3ee02d24d7d..827ee97b80d 100644 --- a/cmd/otelcorecol/builder-config.yaml +++ b/cmd/otelcorecol/builder-config.yaml @@ -96,6 +96,7 @@ replaces: - go.opentelemetry.io/collector/pdata => ../../pdata - go.opentelemetry.io/collector/pdata/testdata => ../../pdata/testdata - go.opentelemetry.io/collector/pdata/pprofile => ../../pdata/pprofile + - go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata - go.opentelemetry.io/collector/pipeline => ../../pipeline - go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline - go.opentelemetry.io/collector/processor => ../../processor diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index b7659b17ee6..7835cb122f6 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -122,6 +122,7 @@ require ( go.opentelemetry.io/collector/pdata v1.34.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.128.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.128.0 // indirect + go.opentelemetry.io/collector/pdata/xpdata v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/pipeline v0.128.0 // indirect go.opentelemetry.io/collector/pipeline/xpipeline v0.128.0 // indirect go.opentelemetry.io/collector/processor/processorhelper v0.128.0 // indirect @@ -291,6 +292,8 @@ replace go.opentelemetry.io/collector/pdata/testdata => ../../pdata/testdata replace go.opentelemetry.io/collector/pdata/pprofile => ../../pdata/pprofile +replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata + replace go.opentelemetry.io/collector/pipeline => ../../pipeline replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline diff --git a/exporter/debugexporter/go.mod b/exporter/debugexporter/go.mod index 2c5c06b329e..1c1e03c5973 100644 --- a/exporter/debugexporter/go.mod +++ b/exporter/debugexporter/go.mod @@ -50,6 +50,7 @@ require ( go.opentelemetry.io/collector/extension/xextension v0.128.0 // indirect go.opentelemetry.io/collector/featuregate v1.34.0 // indirect go.opentelemetry.io/collector/internal/telemetry v0.128.0 // indirect + go.opentelemetry.io/collector/pdata/xpdata v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/pipeline v0.128.0 // indirect go.opentelemetry.io/collector/pipeline/xpipeline v0.128.0 // indirect go.opentelemetry.io/collector/receiver v1.34.0 // indirect @@ -127,3 +128,5 @@ replace go.opentelemetry.io/collector/extension/xextension => ../../extension/xe replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry replace go.opentelemetry.io/collector/client => ../../client + +replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata diff --git a/exporter/exporterhelper/internal/base_exporter_test.go b/exporter/exporterhelper/internal/base_exporter_test.go index e72740eaaee..c79a0b56804 100644 --- a/exporter/exporterhelper/internal/base_exporter_test.go +++ b/exporter/exporterhelper/internal/base_exporter_test.go @@ -162,10 +162,10 @@ func newFakeQueueBatch() QueueBatchSettings[request.Request] { type fakeEncoding struct{} -func (f fakeEncoding) Marshal(request.Request) ([]byte, error) { +func (f fakeEncoding) Marshal(context.Context, request.Request) ([]byte, error) { return []byte("mockRequest"), nil } -func (f fakeEncoding) Unmarshal([]byte) (request.Request, error) { - return &requesttest.FakeRequest{}, nil +func (f fakeEncoding) Unmarshal([]byte) (context.Context, request.Request, error) { + return context.Background(), &requesttest.FakeRequest{}, nil } diff --git a/exporter/exporterhelper/internal/oteltest/tracetest.go b/exporter/exporterhelper/internal/oteltest/tracetest.go index a05bf302978..171abdeaf8c 100644 --- a/exporter/exporterhelper/internal/oteltest/tracetest.go +++ b/exporter/exporterhelper/internal/oteltest/tracetest.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/codes" sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" ) func CheckStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) { @@ -19,3 +20,17 @@ func CheckStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) { require.Equal(t, codes.Unset, sd.Status().Code) } } + +func FakeSpanContext(t *testing.T) trace.SpanContext { + traceID, err := trace.TraceIDFromHex("0102030405060708090a0b0c0d0e0f10") + require.NoError(t, err) + spanID, err := trace.SpanIDFromHex("0102030405060708") + require.NoError(t, err) + return trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + TraceFlags: 0x01, + TraceState: trace.TraceState{}, + Remote: true, + }) +} diff --git a/exporter/exporterhelper/internal/queue/fg.go b/exporter/exporterhelper/internal/queue/fg.go new file mode 100644 index 00000000000..0e6a8210e58 --- /dev/null +++ b/exporter/exporterhelper/internal/queue/fg.go @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue" + +import "go.opentelemetry.io/collector/featuregate" + +// PersistRequestContextFeatureGate controls whether request context should be preserved in the persistent queue. +var PersistRequestContextFeatureGate = featuregate.GlobalRegistry().MustRegister( + "exporter.PersistRequestContext", + featuregate.StageAlpha, + featuregate.WithRegisterFromVersion("v0.128.0"), + featuregate.WithRegisterDescription("controls whether context should be stored alongside requests in the persistent queue"), +) + +// assign the feature gate to separate variables to make it possible to override the behavior in tests +// on write and read paths separately. +var ( + PersistRequestContextOnRead = PersistRequestContextFeatureGate.IsEnabled() + PersistRequestContextOnWrite = PersistRequestContextFeatureGate.IsEnabled() +) diff --git a/exporter/exporterhelper/internal/queue/persistent_queue.go b/exporter/exporterhelper/internal/queue/persistent_queue.go index f7cf0be16ba..d251b349362 100644 --- a/exporter/exporterhelper/internal/queue/persistent_queue.go +++ b/exporter/exporterhelper/internal/queue/persistent_queue.go @@ -253,7 +253,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { } } - reqBuf, err := pq.set.encoding.Marshal(req) + reqBuf, err := pq.set.encoding.Marshal(ctx, req) if err != nil { return err } @@ -294,7 +294,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don // Read until either a successful retrieved element or no more elements in the storage. for pq.metadata.ReadIndex != pq.metadata.WriteIndex { - index, req, consumed := pq.getNextItem(ctx) + index, req, reqCtx, consumed := pq.getNextItem(ctx) // Ensure the used size and the channel size are in sync. if pq.metadata.ReadIndex == pq.metadata.WriteIndex { pq.metadata.QueueSize = 0 @@ -303,7 +303,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don if consumed { id := indexDonePool.Get().(*indexDone) id.reset(index, pq.set.sizer.Sizeof(req), pq) - return context.Background(), req, id, true + return reqCtx, req, id, true } } @@ -316,7 +316,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don // getNextItem pulls the next available item from the persistent storage along with its index. Once processing is // finished, the index should be called with onDone to clean up the storage. If no new item is available, // returns false. -func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) { +func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, context.Context, bool) { index := pq.metadata.ReadIndex // Increase here, so even if errors happen below, it always iterates pq.metadata.ReadIndex++ @@ -328,8 +328,9 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) getOp) var request T + restoredCtx := context.Background() if err == nil { - request, err = pq.set.encoding.Unmarshal(getOp.Value) + restoredCtx, request, err = pq.set.encoding.Unmarshal(getOp.Value) } if err != nil { @@ -339,14 +340,14 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) pq.logger.Error("Error deleting item from queue", zap.Error(err)) } - return 0, request, false + return 0, request, restoredCtx, false } // Increase the reference count, so the client is not closed while the request is being processed. // The client cannot be closed because we hold the lock since last we checked `stopped`. pq.refClient++ - return index, request, true + return index, request, restoredCtx, true } // onDone should be called to remove the item of the given index from the queue once processing is finished. @@ -438,13 +439,13 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co pq.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet)) continue } - req, err := pq.set.encoding.Unmarshal(op.Value) + reqCtx, req, err := pq.set.encoding.Unmarshal(op.Value) // If error happened or item is nil, it will be efficiently ignored if err != nil { pq.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err)) continue } - if pq.putInternal(ctx, req) != nil { + if pq.putInternal(reqCtx, req) != nil { errCount++ } } diff --git a/exporter/exporterhelper/internal/queue/persistent_queue_test.go b/exporter/exporterhelper/internal/queue/persistent_queue_test.go index 2aa144b95de..d0968d778fe 100644 --- a/exporter/exporterhelper/internal/queue/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/queue/persistent_queue_test.go @@ -43,15 +43,15 @@ func (is *itemsSizer) Sizeof(val uint64) int64 { type uint64Encoding struct{} -func (uint64Encoding) Marshal(val uint64) ([]byte, error) { +func (uint64Encoding) Marshal(_ context.Context, val uint64) ([]byte, error) { return binary.LittleEndian.AppendUint64([]byte{}, val), nil } -func (uint64Encoding) Unmarshal(bytes []byte) (uint64, error) { +func (uint64Encoding) Unmarshal(bytes []byte) (context.Context, uint64, error) { if len(bytes) < 8 { - return 0, errInvalidValue + return context.Background(), 0, errInvalidValue } - return binary.LittleEndian.Uint64(bytes), nil + return context.Background(), binary.LittleEndian.Uint64(bytes), nil } func newFakeBoundedStorageClient(maxSizeInBytes int) *fakeBoundedStorageClient { @@ -913,7 +913,7 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) { } func TestPersistentQueue_StorageFull(t *testing.T) { - marshaled, err := uint64Encoding{}.Marshal(uint64(50)) + marshaled, err := uint64Encoding{}.Marshal(context.Background(), uint64(50)) require.NoError(t, err) maxSizeInBytes := len(marshaled) * 5 // arbitrary small number diff --git a/exporter/exporterhelper/internal/queue/queue.go b/exporter/exporterhelper/internal/queue/queue.go index 3a9110fe3ef..42f82e0abd7 100644 --- a/exporter/exporterhelper/internal/queue/queue.go +++ b/exporter/exporterhelper/internal/queue/queue.go @@ -14,10 +14,10 @@ import ( type Encoding[T any] interface { // Marshal is a function that can marshal a request into bytes. - Marshal(T) ([]byte, error) + Marshal(context.Context, T) ([]byte, error) // Unmarshal is a function that can unmarshal bytes into a request. - Unmarshal([]byte) (T, error) + Unmarshal([]byte) (context.Context, T, error) } // ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full and setup to diff --git a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go index 8b584de9928..cee43688ff3 100644 --- a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go +++ b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go @@ -45,12 +45,12 @@ type fakeEncoding struct { mr request.Request } -func (f fakeEncoding) Marshal(request.Request) ([]byte, error) { +func (f fakeEncoding) Marshal(context.Context, request.Request) ([]byte, error) { return []byte("mockRequest"), nil } -func (f fakeEncoding) Unmarshal([]byte) (request.Request, error) { - return f.mr, nil +func (f fakeEncoding) Unmarshal([]byte) (context.Context, request.Request, error) { + return context.Background(), f.mr, nil } func newFakeEncoding(mr request.Request) queue.Encoding[request.Request] { diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 8d40eb4fd06..6ca7d527913 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -14,9 +14,11 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" "go.opentelemetry.io/collector/pdata/plog" + pdatareq "go.opentelemetry.io/collector/pdata/xpdata/request" "go.opentelemetry.io/collector/pipeline" ) @@ -57,16 +59,32 @@ func newLogsRequest(ld plog.Logs) Request { type logsEncoding struct{} -func (logsEncoding) Unmarshal(bytes []byte) (Request, error) { +var _ QueueBatchEncoding[Request] = logsEncoding{} + +func (logsEncoding) Unmarshal(bytes []byte) (context.Context, Request, error) { + if queue.PersistRequestContextOnRead { + ctx, logs, err := pdatareq.UnmarshalLogs(bytes) + if errors.Is(err, pdatareq.ErrInvalidFormat) { + // fall back to unmarshaling without context + logs, err = logsUnmarshaler.UnmarshalLogs(bytes) + } + return ctx, newLogsRequest(logs), err + } + logs, err := logsUnmarshaler.UnmarshalLogs(bytes) if err != nil { - return nil, err + var req Request + return context.Background(), req, err } - return newLogsRequest(logs), nil + return context.Background(), newLogsRequest(logs), nil } -func (logsEncoding) Marshal(req Request) ([]byte, error) { - return logsMarshaler.MarshalLogs(req.(*logsRequest).ld) +func (logsEncoding) Marshal(ctx context.Context, req Request) ([]byte, error) { + logs := req.(*logsRequest).ld + if queue.PersistRequestContextOnWrite { + return pdatareq.MarshalLogs(ctx, logs) + } + return logsMarshaler.MarshalLogs(logs) } func (req *logsRequest) OnError(err error) Request { diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 5a4547419d8..0a04ed2ec47 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" @@ -31,6 +30,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper/internal/hosttest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadatatest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/oteltest" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sendertest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/storagetest" @@ -169,27 +169,76 @@ func TestLogsRequest_Default_ExportError(t *testing.T) { } func TestLogs_WithPersistentQueue(t *testing.T) { + fgOrigState := queue.PersistRequestContextFeatureGate.IsEnabled() qCfg := NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") qCfg.StorageID = &storageID - rCfg := configretry.NewDefaultBackOffConfig() - ts := consumertest.LogsSink{} set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue") - te, err := NewLogs(context.Background(), set, &fakeLogsConfig, ts.ConsumeLogs, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - host := hosttest.NewHost(map[component.ID]component.Component{ storageID: storagetest.NewMockStorageExtension(nil), }) - require.NoError(t, te.Start(context.Background(), host)) - t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) - - traces := testdata.GenerateLogs(2) - require.NoError(t, te.ConsumeLogs(context.Background(), traces)) - require.Eventually(t, func() bool { - return len(ts.AllLogs()) == 1 && ts.LogRecordCount() == 2 - }, 500*time.Millisecond, 10*time.Millisecond) + spanCtx := oteltest.FakeSpanContext(t) + + tests := []struct { + name string + fgEnabledOnWrite bool + fgEnabledOnRead bool + wantData bool + wantSpanCtx bool + }{ + { + name: "feature_gate_disabled_on_write_and_read", + wantData: true, + }, + { + name: "feature_gate_enabled_on_write_and_read", + fgEnabledOnWrite: true, + fgEnabledOnRead: true, + wantData: true, + wantSpanCtx: true, + }, + { + name: "feature_gate_disabled_on_write_enabled_on_read", + wantData: true, + fgEnabledOnRead: true, + }, + { + name: "feature_gate_enabled_on_write_disabled_on_read", + fgEnabledOnWrite: true, + wantData: false, // going back from enabled to disabled feature gate isn't supported + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + queue.PersistRequestContextOnRead = tt.fgEnabledOnRead + queue.PersistRequestContextOnWrite = tt.fgEnabledOnWrite + t.Cleanup(func() { + queue.PersistRequestContextOnRead = fgOrigState + queue.PersistRequestContextOnWrite = fgOrigState + }) + + ls := consumertest.LogsSink{} + te, err := NewLogs(context.Background(), set, &fakeLogsConfig, ls.ConsumeLogs, WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, te.Start(context.Background(), host)) + t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) + + logs := testdata.GenerateLogs(2) + require.NoError(t, te.ConsumeLogs(trace.ContextWithSpanContext(context.Background(), spanCtx), logs)) + if tt.wantData { + require.Eventually(t, func() bool { + return len(ls.AllLogs()) == 1 && ls.LogRecordCount() == 2 + }, 500*time.Millisecond, 10*time.Millisecond) + } + + // check that the span context is persisted if the feature gate is enabled + if tt.wantSpanCtx { + assert.Len(t, ls.Contexts(), 1) + assert.Equal(t, spanCtx, trace.SpanContextFromContext(ls.Contexts()[0])) + } + }) + } } func TestLogs_WithRecordMetrics(t *testing.T) { diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index f05748ad3a1..2516682316b 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -14,9 +14,11 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" "go.opentelemetry.io/collector/pdata/pmetric" + pdatareq "go.opentelemetry.io/collector/pdata/xpdata/request" "go.opentelemetry.io/collector/pipeline" ) @@ -57,16 +59,31 @@ func newMetricsRequest(md pmetric.Metrics) Request { type metricsEncoding struct{} -func (metricsEncoding) Unmarshal(bytes []byte) (Request, error) { +var _ QueueBatchEncoding[Request] = metricsEncoding{} + +func (metricsEncoding) Unmarshal(bytes []byte) (context.Context, Request, error) { + if queue.PersistRequestContextOnRead { + ctx, metrics, err := pdatareq.UnmarshalMetrics(bytes) + if errors.Is(err, pdatareq.ErrInvalidFormat) { + // fall back to unmarshaling without context + metrics, err = metricsUnmarshaler.UnmarshalMetrics(bytes) + } + return ctx, newMetricsRequest(metrics), err + } metrics, err := metricsUnmarshaler.UnmarshalMetrics(bytes) if err != nil { - return nil, err + var req Request + return context.Background(), req, err } - return newMetricsRequest(metrics), nil + return context.Background(), newMetricsRequest(metrics), nil } -func (metricsEncoding) Marshal(req Request) ([]byte, error) { - return metricsMarshaler.MarshalMetrics(req.(*metricsRequest).md) +func (metricsEncoding) Marshal(ctx context.Context, req Request) ([]byte, error) { + metrics := req.(*metricsRequest).md + if queue.PersistRequestContextOnWrite { + return pdatareq.MarshalMetrics(ctx, metrics) + } + return metricsMarshaler.MarshalMetrics(metrics) } func (req *metricsRequest) OnError(err error) Request { diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 3ddc1c20732..602ce44faea 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" @@ -31,6 +30,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper/internal/hosttest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadatatest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/oteltest" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sendertest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/storagetest" @@ -169,27 +169,76 @@ func TestMetricsRequest_Default_ExportError(t *testing.T) { } func TestMetrics_WithPersistentQueue(t *testing.T) { + fgOrigState := queue.PersistRequestContextFeatureGate.IsEnabled() qCfg := NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") qCfg.StorageID = &storageID - rCfg := configretry.NewDefaultBackOffConfig() - ms := consumertest.MetricsSink{} set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_metrics", "with_persistent_queue") - te, err := NewMetrics(context.Background(), set, &fakeMetricsConfig, ms.ConsumeMetrics, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - host := hosttest.NewHost(map[component.ID]component.Component{ storageID: storagetest.NewMockStorageExtension(nil), }) - require.NoError(t, te.Start(context.Background(), host)) - t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) - - metrics := testdata.GenerateMetrics(2) - require.NoError(t, te.ConsumeMetrics(context.Background(), metrics)) - require.Eventually(t, func() bool { - return len(ms.AllMetrics()) == 1 && ms.DataPointCount() == 4 - }, 500*time.Millisecond, 10*time.Millisecond) + spanCtx := oteltest.FakeSpanContext(t) + + tests := []struct { + name string + fgEnabledOnWrite bool + fgEnabledOnRead bool + wantData bool + wantSpanCtx bool + }{ + { + name: "feature_gate_disabled_on_write_and_read", + wantData: true, + }, + { + name: "feature_gate_enabled_on_write_and_read", + fgEnabledOnWrite: true, + fgEnabledOnRead: true, + wantData: true, + wantSpanCtx: true, + }, + { + name: "feature_gate_disabled_on_write_enabled_on_read", + wantData: true, + fgEnabledOnRead: true, + }, + { + name: "feature_gate_enabled_on_write_disabled_on_read", + fgEnabledOnWrite: true, + wantData: false, // going back from enabled to disabled feature gate isn't supported + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + queue.PersistRequestContextOnRead = tt.fgEnabledOnRead + queue.PersistRequestContextOnWrite = tt.fgEnabledOnWrite + t.Cleanup(func() { + queue.PersistRequestContextOnRead = fgOrigState + queue.PersistRequestContextOnWrite = fgOrigState + }) + + ms := consumertest.MetricsSink{} + te, err := NewMetrics(context.Background(), set, &fakeMetricsConfig, ms.ConsumeMetrics, WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, te.Start(context.Background(), host)) + t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) + + traces := testdata.GenerateMetrics(2) + require.NoError(t, te.ConsumeMetrics(trace.ContextWithSpanContext(context.Background(), spanCtx), traces)) + if tt.wantData { + require.Eventually(t, func() bool { + return len(ms.AllMetrics()) == 1 && ms.DataPointCount() == 4 + }, 500*time.Millisecond, 10*time.Millisecond) + } + + // check that the span context is persisted if the feature gate is enabled + if tt.wantSpanCtx { + assert.Len(t, ms.Contexts(), 1) + assert.Equal(t, spanCtx, trace.SpanContextFromContext(ms.Contexts()[0])) + } + }) + } } func TestMetrics_WithRecordMetrics(t *testing.T) { diff --git a/exporter/exporterhelper/queue_batch.go b/exporter/exporterhelper/queue_batch.go index 7e0527c559b..f4e3a4390a9 100644 --- a/exporter/exporterhelper/queue_batch.go +++ b/exporter/exporterhelper/queue_batch.go @@ -4,6 +4,8 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" import ( + "context" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" @@ -38,11 +40,11 @@ type BatchConfig = queuebatch.BatchConfig // QueueBatchEncoding defines the encoding to be used if persistent queue is configured. // Duplicate definition with queuebatch.Encoding since aliasing generics is not supported by default. type QueueBatchEncoding[T any] interface { - // Marshal is a function that can marshal a request into bytes. - Marshal(T) ([]byte, error) + // Marshal is a function that can marshal a request and its context into bytes. + Marshal(context.Context, T) ([]byte, error) - // Unmarshal is a function that can unmarshal bytes into a request. - Unmarshal([]byte) (T, error) + // Unmarshal is a function that can unmarshal bytes into a request and its context. + Unmarshal([]byte) (context.Context, T, error) } var ErrQueueIsFull = queue.ErrQueueIsFull diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index af49b7ce007..21a10da67ac 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -14,9 +14,11 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" "go.opentelemetry.io/collector/pdata/ptrace" + pdatareq "go.opentelemetry.io/collector/pdata/xpdata/request" "go.opentelemetry.io/collector/pipeline" ) @@ -57,16 +59,31 @@ func newTracesRequest(td ptrace.Traces) Request { type tracesEncoding struct{} -func (tracesEncoding) Unmarshal(bytes []byte) (Request, error) { +var _ QueueBatchEncoding[Request] = tracesEncoding{} + +func (tracesEncoding) Unmarshal(bytes []byte) (context.Context, Request, error) { + if queue.PersistRequestContextOnRead { + ctx, traces, err := pdatareq.UnmarshalTraces(bytes) + if errors.Is(err, pdatareq.ErrInvalidFormat) { + // fall back to unmarshaling without context + traces, err = tracesUnmarshaler.UnmarshalTraces(bytes) + } + return ctx, newTracesRequest(traces), err + } traces, err := tracesUnmarshaler.UnmarshalTraces(bytes) if err != nil { - return nil, err + var req Request + return context.Background(), req, err } - return newTracesRequest(traces), nil + return context.Background(), newTracesRequest(traces), nil } -func (tracesEncoding) Marshal(req Request) ([]byte, error) { - return tracesMarshaler.MarshalTraces(req.(*tracesRequest).td) +func (tracesEncoding) Marshal(ctx context.Context, req Request) ([]byte, error) { + traces := req.(*tracesRequest).td + if queue.PersistRequestContextOnWrite { + return pdatareq.MarshalTraces(ctx, traces) + } + return tracesMarshaler.MarshalTraces(traces) } func (req *tracesRequest) OnError(err error) Request { diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index b209a85df2c..5d82a1b18b6 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" @@ -31,6 +30,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper/internal/hosttest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadatatest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/oteltest" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sendertest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/storagetest" @@ -167,27 +167,76 @@ func TestTracesRequest_Default_ExportError(t *testing.T) { } func TestTraces_WithPersistentQueue(t *testing.T) { + fgOrigState := queue.PersistRequestContextFeatureGate.IsEnabled() qCfg := NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") qCfg.StorageID = &storageID - rCfg := configretry.NewDefaultBackOffConfig() - ts := consumertest.TracesSink{} set := exportertest.NewNopSettings(exportertest.NopType) - set.ID = component.MustNewIDWithName("test_traces", "with_persistent_queue") - te, err := NewTraces(context.Background(), set, &fakeTracesConfig, ts.ConsumeTraces, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - + set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ storageID: storagetest.NewMockStorageExtension(nil), }) - require.NoError(t, te.Start(context.Background(), host)) - t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) - - traces := testdata.GenerateTraces(2) - require.NoError(t, te.ConsumeTraces(context.Background(), traces)) - require.Eventually(t, func() bool { - return len(ts.AllTraces()) == 1 && ts.SpanCount() == 2 - }, 500*time.Millisecond, 10*time.Millisecond) + spanCtx := oteltest.FakeSpanContext(t) + + tests := []struct { + name string + fgEnabledOnWrite bool + fgEnabledOnRead bool + wantData bool + wantSpanCtx bool + }{ + { + name: "feature_gate_disabled_on_write_and_read", + wantData: true, + }, + { + name: "feature_gate_enabled_on_write_and_read", + fgEnabledOnWrite: true, + fgEnabledOnRead: true, + wantData: true, + wantSpanCtx: true, + }, + { + name: "feature_gate_disabled_on_write_enabled_on_read", + wantData: true, + fgEnabledOnRead: true, + }, + { + name: "feature_gate_enabled_on_write_disabled_on_read", + fgEnabledOnWrite: true, + wantData: false, // going back from enabled to disabled feature gate isn't supported + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + queue.PersistRequestContextOnRead = tt.fgEnabledOnRead + queue.PersistRequestContextOnWrite = tt.fgEnabledOnWrite + t.Cleanup(func() { + queue.PersistRequestContextOnRead = fgOrigState + queue.PersistRequestContextOnWrite = fgOrigState + }) + + ts := consumertest.TracesSink{} + te, err := NewTraces(context.Background(), set, &fakeTracesConfig, ts.ConsumeTraces, WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, te.Start(context.Background(), host)) + t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) + + traces := testdata.GenerateTraces(2) + require.NoError(t, te.ConsumeTraces(trace.ContextWithSpanContext(context.Background(), spanCtx), traces)) + if tt.wantData { + require.Eventually(t, func() bool { + return len(ts.AllTraces()) == 1 && ts.SpanCount() == 2 + }, 500*time.Millisecond, 10*time.Millisecond) + } + + // check that the span context is persisted if the feature gate is enabled + if tt.wantSpanCtx { + assert.Len(t, ts.Contexts(), 1) + assert.Equal(t, spanCtx, trace.SpanContextFromContext(ts.Contexts()[0])) + } + }) + } } func TestTraces_WithRecordMetrics(t *testing.T) { diff --git a/exporter/exporterhelper/xexporterhelper/go.mod b/exporter/exporterhelper/xexporterhelper/go.mod index 679ddc14e4c..0dd36863ca9 100644 --- a/exporter/exporterhelper/xexporterhelper/go.mod +++ b/exporter/exporterhelper/xexporterhelper/go.mod @@ -6,7 +6,6 @@ require ( github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v1.34.0 go.opentelemetry.io/collector/component/componenttest v0.128.0 - go.opentelemetry.io/collector/config/configretry v1.34.0 go.opentelemetry.io/collector/consumer v1.34.0 go.opentelemetry.io/collector/consumer/consumererror v0.128.0 go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.128.0 @@ -17,6 +16,7 @@ require ( go.opentelemetry.io/collector/exporter/xexporter v0.128.0 go.opentelemetry.io/collector/pdata/pprofile v0.128.0 go.opentelemetry.io/collector/pdata/testdata v0.128.0 + go.opentelemetry.io/collector/pdata/xpdata v0.0.0-00010101000000-000000000000 go.opentelemetry.io/collector/pipeline/xpipeline v0.128.0 go.opentelemetry.io/otel v1.36.0 go.opentelemetry.io/otel/sdk v1.36.0 @@ -44,6 +44,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/collector/config/configretry v1.34.0 // indirect go.opentelemetry.io/collector/confmap v1.34.0 // indirect go.opentelemetry.io/collector/extension v1.34.0 // indirect go.opentelemetry.io/collector/extension/xextension v0.128.0 // indirect @@ -120,3 +121,5 @@ replace go.opentelemetry.io/collector/confmap => ../../../confmap replace go.opentelemetry.io/collector/internal/telemetry => ../../../internal/telemetry replace go.opentelemetry.io/collector/client => ../../../client + +replace go.opentelemetry.io/collector/pdata/xpdata => ../../../pdata/xpdata diff --git a/exporter/exporterhelper/xexporterhelper/profiles.go b/exporter/exporterhelper/xexporterhelper/profiles.go index 41e5e40a8fb..5dbe4793e9e 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles.go +++ b/exporter/exporterhelper/xexporterhelper/profiles.go @@ -16,10 +16,12 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer" "go.opentelemetry.io/collector/exporter/xexporter" "go.opentelemetry.io/collector/pdata/pprofile" + pdatareq "go.opentelemetry.io/collector/pdata/xpdata/request" "go.opentelemetry.io/collector/pipeline/xpipeline" ) @@ -60,16 +62,31 @@ func newProfilesRequest(pd pprofile.Profiles) exporterhelper.Request { type profilesEncoding struct{} -func (profilesEncoding) Unmarshal(bytes []byte) (exporterhelper.Request, error) { +var _ exporterhelper.QueueBatchEncoding[request.Request] = profilesEncoding{} + +func (profilesEncoding) Unmarshal(bytes []byte) (context.Context, request.Request, error) { + if queue.PersistRequestContextOnRead { + ctx, profiles, err := pdatareq.UnmarshalProfiles(bytes) + if errors.Is(err, pdatareq.ErrInvalidFormat) { + // fall back to unmarshaling without context + profiles, err = profilesUnmarshaler.UnmarshalProfiles(bytes) + } + return ctx, newProfilesRequest(profiles), err + } profiles, err := profilesUnmarshaler.UnmarshalProfiles(bytes) if err != nil { - return nil, err + var req request.Request + return context.Background(), req, err } - return newProfilesRequest(profiles), nil + return context.Background(), newProfilesRequest(profiles), nil } -func (profilesEncoding) Marshal(req exporterhelper.Request) ([]byte, error) { - return profilesMarshaler.MarshalProfiles(req.(*profilesRequest).pd) +func (profilesEncoding) Marshal(ctx context.Context, req request.Request) ([]byte, error) { + profiles := req.(*profilesRequest).pd + if queue.PersistRequestContextOnWrite { + return pdatareq.MarshalProfiles(ctx, profiles) + } + return profilesMarshaler.MarshalProfiles(profiles) } func (req *profilesRequest) OnError(err error) exporterhelper.Request { diff --git a/exporter/exporterhelper/xexporterhelper/profiles_test.go b/exporter/exporterhelper/xexporterhelper/profiles_test.go index 2a3f99f5243..c454723e934 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_test.go @@ -20,7 +20,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumererror/xconsumererror" @@ -31,6 +30,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/hosttest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/oteltest" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sendertest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/storagetest" @@ -81,7 +81,7 @@ func TestProfilesExporter_NilPushProfilesData(t *testing.T) { require.Equal(t, errNilPushProfileData, err) } -func TestProfilesExporter_NilTracesConverter(t *testing.T) { +func TestProfilesExporter_NilProfilesConverter(t *testing.T) { te, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[exporterhelper.Request]()) require.Nil(t, te) require.Equal(t, errNilProfilesConverter, err) @@ -166,28 +166,77 @@ func TestProfilesRequestExporter_Default_ExportError(t *testing.T) { require.Equal(t, want, le.ConsumeProfiles(context.Background(), ld)) } -func TestProfilesExporter_WithPersistentQueue(t *testing.T) { +func TestProfiles_WithPersistentQueue(t *testing.T) { + fgOrigState := queue.PersistRequestContextFeatureGate.IsEnabled() qCfg := exporterhelper.NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") qCfg.StorageID = &storageID - rCfg := configretry.NewDefaultBackOffConfig() - ts := consumertest.ProfilesSink{} set := exportertest.NewNopSettings(exportertest.NopType) - set.ID = component.MustNewIDWithName("test_profiles", "with_persistent_queue") - te, err := NewProfilesExporter(context.Background(), set, &fakeProfilesExporterConfig, ts.ConsumeProfiles, exporterhelper.WithRetry(rCfg), exporterhelper.WithQueue(qCfg)) - require.NoError(t, err) - + set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ storageID: storagetest.NewMockStorageExtension(nil), }) - require.NoError(t, te.Start(context.Background(), host)) - t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) - - traces := testdata.GenerateProfiles(2) - require.NoError(t, te.ConsumeProfiles(context.Background(), traces)) - require.Eventually(t, func() bool { - return len(ts.AllProfiles()) == 1 && ts.SampleCount() == 2 - }, 500*time.Millisecond, 10*time.Millisecond) + spanCtx := oteltest.FakeSpanContext(t) + + tests := []struct { + name string + fgEnabledOnWrite bool + fgEnabledOnRead bool + wantData bool + wantSpanCtx bool + }{ + { + name: "feature_gate_disabled_on_write_and_read", + wantData: true, + }, + { + name: "feature_gate_enabled_on_write_and_read", + fgEnabledOnWrite: true, + fgEnabledOnRead: true, + wantData: true, + wantSpanCtx: true, + }, + { + name: "feature_gate_disabled_on_write_enabled_on_read", + wantData: true, + fgEnabledOnRead: true, + }, + { + name: "feature_gate_enabled_on_write_disabled_on_read", + fgEnabledOnWrite: true, + wantData: false, // going back from enabled to disabled feature gate isn't supported + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + queue.PersistRequestContextOnRead = tt.fgEnabledOnRead + queue.PersistRequestContextOnWrite = tt.fgEnabledOnWrite + t.Cleanup(func() { + queue.PersistRequestContextOnRead = fgOrigState + queue.PersistRequestContextOnWrite = fgOrigState + }) + + ts := consumertest.ProfilesSink{} + te, err := NewProfilesExporter(context.Background(), set, &fakeProfilesExporterConfig, ts.ConsumeProfiles, exporterhelper.WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, te.Start(context.Background(), host)) + t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) }) + + profiles := testdata.GenerateProfiles(2) + require.NoError(t, te.ConsumeProfiles(trace.ContextWithSpanContext(context.Background(), spanCtx), profiles)) + if tt.wantData { + require.Eventually(t, func() bool { + return len(ts.AllProfiles()) == 1 && ts.SampleCount() == 2 + }, 500*time.Millisecond, 10*time.Millisecond) + } + + // check that the span context is persisted if the feature gate is enabled + if tt.wantSpanCtx { + assert.Len(t, ts.Contexts(), 1) + assert.Equal(t, spanCtx, trace.SpanContextFromContext(ts.Contexts()[0])) + } + }) + } } func TestProfilesExporter_WithSpan(t *testing.T) { diff --git a/exporter/exportertest/go.mod b/exporter/exportertest/go.mod index 2ececec81f6..c02ff60005a 100644 --- a/exporter/exportertest/go.mod +++ b/exporter/exportertest/go.mod @@ -46,6 +46,7 @@ require ( go.opentelemetry.io/collector/extension/xextension v0.128.0 // indirect go.opentelemetry.io/collector/featuregate v1.34.0 // indirect go.opentelemetry.io/collector/internal/telemetry v0.128.0 // indirect + go.opentelemetry.io/collector/pdata/xpdata v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.128.0 // indirect go.opentelemetry.io/contrib/bridges/otelzap v0.11.0 // indirect go.opentelemetry.io/otel v1.36.0 // indirect @@ -110,3 +111,5 @@ replace go.opentelemetry.io/collector/confmap => ../../confmap replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry replace go.opentelemetry.io/collector/client => ../../client + +replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata diff --git a/exporter/go.mod b/exporter/go.mod index 3805cf5e7b5..0f21a263f3f 100644 --- a/exporter/go.mod +++ b/exporter/go.mod @@ -16,9 +16,11 @@ require ( go.opentelemetry.io/collector/exporter/exportertest v0.128.0 go.opentelemetry.io/collector/extension/extensiontest v0.128.0 go.opentelemetry.io/collector/extension/xextension v0.128.0 + go.opentelemetry.io/collector/featuregate v1.34.0 go.opentelemetry.io/collector/pdata v1.34.0 go.opentelemetry.io/collector/pdata/pprofile v0.128.0 go.opentelemetry.io/collector/pdata/testdata v0.128.0 + go.opentelemetry.io/collector/pdata/xpdata v0.0.0-00010101000000-000000000000 go.opentelemetry.io/collector/pipeline v0.128.0 go.opentelemetry.io/otel v1.36.0 go.opentelemetry.io/otel/metric v1.36.0 @@ -53,7 +55,6 @@ require ( go.opentelemetry.io/collector/consumer/xconsumer v0.128.0 // indirect go.opentelemetry.io/collector/exporter/xexporter v0.128.0 // indirect go.opentelemetry.io/collector/extension v1.34.0 // indirect - go.opentelemetry.io/collector/featuregate v1.34.0 // indirect go.opentelemetry.io/collector/internal/telemetry v0.128.0 // indirect go.opentelemetry.io/collector/receiver v1.34.0 // indirect go.opentelemetry.io/collector/receiver/receivertest v0.128.0 // indirect @@ -116,3 +117,5 @@ replace go.opentelemetry.io/collector/confmap => ../confmap replace go.opentelemetry.io/collector/internal/telemetry => ../internal/telemetry replace go.opentelemetry.io/collector/client => ../client + +replace go.opentelemetry.io/collector/pdata/xpdata => ../pdata/xpdata diff --git a/exporter/nopexporter/go.mod b/exporter/nopexporter/go.mod index ec3561d9a21..d9951f5f09c 100644 --- a/exporter/nopexporter/go.mod +++ b/exporter/nopexporter/go.mod @@ -110,3 +110,5 @@ replace go.opentelemetry.io/collector/extension/xextension => ../../extension/xe replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry replace go.opentelemetry.io/collector/client => ../../client + +replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata diff --git a/exporter/otlpexporter/go.mod b/exporter/otlpexporter/go.mod index 89af38e5509..daa70c3974f 100644 --- a/exporter/otlpexporter/go.mod +++ b/exporter/otlpexporter/go.mod @@ -69,6 +69,7 @@ require ( go.opentelemetry.io/collector/extension/xextension v0.128.0 // indirect go.opentelemetry.io/collector/featuregate v1.34.0 // indirect go.opentelemetry.io/collector/internal/telemetry v0.128.0 // indirect + go.opentelemetry.io/collector/pdata/xpdata v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/pipeline v0.128.0 // indirect go.opentelemetry.io/collector/pipeline/xpipeline v0.128.0 // indirect go.opentelemetry.io/collector/receiver v1.34.0 // indirect @@ -175,3 +176,5 @@ replace go.opentelemetry.io/collector/extension/extensionmiddleware => ../../ext replace go.opentelemetry.io/collector/config/configmiddleware => ../../config/configmiddleware replace go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest => ../../extension/extensionmiddleware/extensionmiddlewaretest + +replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata diff --git a/exporter/otlphttpexporter/go.mod b/exporter/otlphttpexporter/go.mod index 0f1e6a90712..9d4d3d3bd45 100644 --- a/exporter/otlphttpexporter/go.mod +++ b/exporter/otlphttpexporter/go.mod @@ -69,6 +69,7 @@ require ( go.opentelemetry.io/collector/extension/xextension v0.128.0 // indirect go.opentelemetry.io/collector/featuregate v1.34.0 // indirect go.opentelemetry.io/collector/internal/telemetry v0.128.0 // indirect + go.opentelemetry.io/collector/pdata/xpdata v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/pipeline v0.128.0 // indirect go.opentelemetry.io/collector/pipeline/xpipeline v0.128.0 // indirect go.opentelemetry.io/collector/receiver v1.34.0 // indirect @@ -173,3 +174,5 @@ replace go.opentelemetry.io/collector/config/configmiddleware => ../../config/co replace go.opentelemetry.io/collector/extension/extensionmiddleware => ../../extension/extensionmiddleware replace go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest => ../../extension/extensionmiddleware/extensionmiddlewaretest + +replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata diff --git a/exporter/xexporter/go.mod b/exporter/xexporter/go.mod index 63b46b242d3..0193811a259 100644 --- a/exporter/xexporter/go.mod +++ b/exporter/xexporter/go.mod @@ -90,3 +90,5 @@ replace go.opentelemetry.io/collector/confmap => ../../confmap replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry replace go.opentelemetry.io/collector/client => ../../client + +replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata diff --git a/internal/e2e/go.mod b/internal/e2e/go.mod index 84ef9cf91d3..cfbff711a1e 100644 --- a/internal/e2e/go.mod +++ b/internal/e2e/go.mod @@ -102,6 +102,7 @@ require ( go.opentelemetry.io/collector/internal/fanoutconsumer v0.128.0 // indirect go.opentelemetry.io/collector/internal/telemetry v0.128.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.128.0 // indirect + go.opentelemetry.io/collector/pdata/xpdata v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/pipeline/xpipeline v0.128.0 // indirect go.opentelemetry.io/collector/processor v1.34.0 // indirect go.opentelemetry.io/collector/processor/processortest v0.128.0 // indirect @@ -272,3 +273,5 @@ replace go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmid replace go.opentelemetry.io/collector/extension/extensionmiddleware => ../../extension/extensionmiddleware replace go.opentelemetry.io/collector/config/configmiddleware => ../../config/configmiddleware + +replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata diff --git a/otelcol/go.mod b/otelcol/go.mod index 1d0add27878..96f885a26bf 100644 --- a/otelcol/go.mod +++ b/otelcol/go.mod @@ -223,3 +223,5 @@ replace go.opentelemetry.io/collector/config/configmiddleware => ../config/confi replace go.opentelemetry.io/collector/extension/extensionmiddleware => ../extension/extensionmiddleware replace go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest => ../extension/extensionmiddleware/extensionmiddlewaretest + +replace go.opentelemetry.io/collector/pdata/xpdata => ../pdata/xpdata diff --git a/otelcol/otelcoltest/go.mod b/otelcol/otelcoltest/go.mod index 5774570ccae..06239b7d9f7 100644 --- a/otelcol/otelcoltest/go.mod +++ b/otelcol/otelcoltest/go.mod @@ -232,3 +232,5 @@ replace go.opentelemetry.io/collector/config/configmiddleware => ../../config/co replace go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest => ../../extension/extensionmiddleware/extensionmiddlewaretest replace go.opentelemetry.io/collector/extension/extensionmiddleware => ../../extension/extensionmiddleware + +replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata diff --git a/pdata/xpdata/Makefile b/pdata/xpdata/Makefile new file mode 100644 index 00000000000..ded7a36092d --- /dev/null +++ b/pdata/xpdata/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/pdata/xpdata/go.mod b/pdata/xpdata/go.mod new file mode 100644 index 00000000000..0cb8b07cf98 --- /dev/null +++ b/pdata/xpdata/go.mod @@ -0,0 +1,35 @@ +module go.opentelemetry.io/collector/pdata/xpdata + +go 1.23.0 + +require ( + github.com/stretchr/testify v1.10.0 + go.opentelemetry.io/collector/pdata v1.34.0 + go.opentelemetry.io/collector/pdata/pprofile v0.128.0 + go.opentelemetry.io/collector/pdata/testdata v0.128.0 + go.opentelemetry.io/otel v1.36.0 + go.opentelemetry.io/otel/trace v1.36.0 + google.golang.org/protobuf v1.36.6 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/net v0.39.0 // indirect + golang.org/x/sys v0.32.0 // indirect + golang.org/x/text v0.24.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect + google.golang.org/grpc v1.73.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace go.opentelemetry.io/collector/pdata => .. + +replace go.opentelemetry.io/collector/pdata/pprofile => ../pprofile + +replace go.opentelemetry.io/collector/pdata/testdata => ../testdata diff --git a/pdata/xpdata/go.sum b/pdata/xpdata/go.sum new file mode 100644 index 00000000000..c6a45282f10 --- /dev/null +++ b/pdata/xpdata/go.sum @@ -0,0 +1,97 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= +go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E= +go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE= +go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs= +go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= +go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= +go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= +go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= +go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= +go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= +golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 h1:e0AIkUUhxyBKh6ssZNrAMeqhA7RKUj42346d1y02i2g= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok= +google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pdata/xpdata/request/context.go b/pdata/xpdata/request/context.go new file mode 100644 index 00000000000..db743c96e03 --- /dev/null +++ b/pdata/xpdata/request/context.go @@ -0,0 +1,33 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/pdata/xpdata/request" + +import ( + "context" + + "go.opentelemetry.io/otel/propagation" + + "go.opentelemetry.io/collector/pdata/xpdata/request/internal" +) + +// Default trace context propagator +var tracePropagator = propagation.TraceContext{} + +// Trace propagator sets only two keys to encode the context: "traceparent" and "tracestate" +const reqContextKeysNum = 2 + +// encodeContext encodes the context into a map of strings. +func encodeContext(ctx context.Context) internal.RequestContext { + mc := propagation.MapCarrier(make(map[string]string, reqContextKeysNum)) + tracePropagator.Inject(ctx, mc) + return internal.RequestContext{SpanContextMap: mc} +} + +// decodeContext decodes the context from the bytes map. +func decodeContext(rc *internal.RequestContext) context.Context { + if rc == nil || rc.SpanContextMap == nil { + return context.Background() + } + return tracePropagator.Extract(context.Background(), propagation.MapCarrier(rc.SpanContextMap)) +} diff --git a/pdata/xpdata/request/internal/request.pb.go b/pdata/xpdata/request/internal/request.pb.go new file mode 100644 index 00000000000..be244e045cd --- /dev/null +++ b/pdata/xpdata/request/internal/request.pb.go @@ -0,0 +1,558 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.21.6 +// source: pdata/xpdata/request/internal/request.proto + +package internal + +import ( + reflect "reflect" + sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + + v12 "go.opentelemetry.io/collector/pdata/internal/data/protogen/logs/v1" + v11 "go.opentelemetry.io/collector/pdata/internal/data/protogen/metrics/v1" + v1development "go.opentelemetry.io/collector/pdata/internal/data/protogen/profiles/v1development" + v1 "go.opentelemetry.io/collector/pdata/internal/data/protogen/trace/v1" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// RequestContext represents metadata associated with a telemetry export request. +type RequestContext struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Span context encoded using W3C trace context format. + // This map typically includes "traceparent" and optionally "tracestate" keys. + SpanContextMap map[string]string `protobuf:"bytes,1,rep,name=span_context_map,json=spanContextMap,proto3" json:"span_context_map,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *RequestContext) Reset() { + *x = RequestContext{} + if protoimpl.UnsafeEnabled { + mi := &file_pdata_xpdata_request_internal_request_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RequestContext) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RequestContext) ProtoMessage() {} + +func (x *RequestContext) ProtoReflect() protoreflect.Message { + mi := &file_pdata_xpdata_request_internal_request_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RequestContext.ProtoReflect.Descriptor instead. +func (*RequestContext) Descriptor() ([]byte, []int) { + return file_pdata_xpdata_request_internal_request_proto_rawDescGZIP(), []int{0} +} + +func (x *RequestContext) GetSpanContextMap() map[string]string { + if x != nil { + return x.SpanContextMap + } + return nil +} + +type TracesRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + FormatVersion uint32 `protobuf:"fixed32,1,opt,name=format_version,json=formatVersion,proto3" json:"format_version,omitempty"` + RequestContext *RequestContext `protobuf:"bytes,2,opt,name=request_context,json=requestContext,proto3" json:"request_context,omitempty"` + TracesData *v1.TracesData `protobuf:"bytes,3,opt,name=traces_data,json=tracesData,proto3" json:"traces_data,omitempty"` +} + +func (x *TracesRequest) Reset() { + *x = TracesRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pdata_xpdata_request_internal_request_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TracesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TracesRequest) ProtoMessage() {} + +func (x *TracesRequest) ProtoReflect() protoreflect.Message { + mi := &file_pdata_xpdata_request_internal_request_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TracesRequest.ProtoReflect.Descriptor instead. +func (*TracesRequest) Descriptor() ([]byte, []int) { + return file_pdata_xpdata_request_internal_request_proto_rawDescGZIP(), []int{1} +} + +func (x *TracesRequest) GetFormatVersion() uint32 { + if x != nil { + return x.FormatVersion + } + return 0 +} + +func (x *TracesRequest) GetRequestContext() *RequestContext { + if x != nil { + return x.RequestContext + } + return nil +} + +func (x *TracesRequest) GetTracesData() *v1.TracesData { + if x != nil { + return x.TracesData + } + return nil +} + +type MetricsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + FormatVersion uint32 `protobuf:"fixed32,1,opt,name=format_version,json=formatVersion,proto3" json:"format_version,omitempty"` + RequestContext *RequestContext `protobuf:"bytes,2,opt,name=request_context,json=requestContext,proto3" json:"request_context,omitempty"` + MetricsData *v11.MetricsData `protobuf:"bytes,3,opt,name=metrics_data,json=metricsData,proto3" json:"metrics_data,omitempty"` +} + +func (x *MetricsRequest) Reset() { + *x = MetricsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pdata_xpdata_request_internal_request_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MetricsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricsRequest) ProtoMessage() {} + +func (x *MetricsRequest) ProtoReflect() protoreflect.Message { + mi := &file_pdata_xpdata_request_internal_request_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricsRequest.ProtoReflect.Descriptor instead. +func (*MetricsRequest) Descriptor() ([]byte, []int) { + return file_pdata_xpdata_request_internal_request_proto_rawDescGZIP(), []int{2} +} + +func (x *MetricsRequest) GetFormatVersion() uint32 { + if x != nil { + return x.FormatVersion + } + return 0 +} + +func (x *MetricsRequest) GetRequestContext() *RequestContext { + if x != nil { + return x.RequestContext + } + return nil +} + +func (x *MetricsRequest) GetMetricsData() *v11.MetricsData { + if x != nil { + return x.MetricsData + } + return nil +} + +type LogsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + FormatVersion uint32 `protobuf:"fixed32,1,opt,name=format_version,json=formatVersion,proto3" json:"format_version,omitempty"` + RequestContext *RequestContext `protobuf:"bytes,2,opt,name=request_context,json=requestContext,proto3" json:"request_context,omitempty"` + LogsData *v12.LogsData `protobuf:"bytes,3,opt,name=logs_data,json=logsData,proto3" json:"logs_data,omitempty"` +} + +func (x *LogsRequest) Reset() { + *x = LogsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pdata_xpdata_request_internal_request_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LogsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogsRequest) ProtoMessage() {} + +func (x *LogsRequest) ProtoReflect() protoreflect.Message { + mi := &file_pdata_xpdata_request_internal_request_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogsRequest.ProtoReflect.Descriptor instead. +func (*LogsRequest) Descriptor() ([]byte, []int) { + return file_pdata_xpdata_request_internal_request_proto_rawDescGZIP(), []int{3} +} + +func (x *LogsRequest) GetFormatVersion() uint32 { + if x != nil { + return x.FormatVersion + } + return 0 +} + +func (x *LogsRequest) GetRequestContext() *RequestContext { + if x != nil { + return x.RequestContext + } + return nil +} + +func (x *LogsRequest) GetLogsData() *v12.LogsData { + if x != nil { + return x.LogsData + } + return nil +} + +type ProfilesRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + FormatVersion uint32 `protobuf:"fixed32,1,opt,name=format_version,json=formatVersion,proto3" json:"format_version,omitempty"` + RequestContext *RequestContext `protobuf:"bytes,2,opt,name=request_context,json=requestContext,proto3" json:"request_context,omitempty"` + ProfilesData *v1development.ProfilesData `protobuf:"bytes,3,opt,name=profiles_data,json=profilesData,proto3" json:"profiles_data,omitempty"` +} + +func (x *ProfilesRequest) Reset() { + *x = ProfilesRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pdata_xpdata_request_internal_request_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ProfilesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProfilesRequest) ProtoMessage() {} + +func (x *ProfilesRequest) ProtoReflect() protoreflect.Message { + mi := &file_pdata_xpdata_request_internal_request_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProfilesRequest.ProtoReflect.Descriptor instead. +func (*ProfilesRequest) Descriptor() ([]byte, []int) { + return file_pdata_xpdata_request_internal_request_proto_rawDescGZIP(), []int{4} +} + +func (x *ProfilesRequest) GetFormatVersion() uint32 { + if x != nil { + return x.FormatVersion + } + return 0 +} + +func (x *ProfilesRequest) GetRequestContext() *RequestContext { + if x != nil { + return x.RequestContext + } + return nil +} + +func (x *ProfilesRequest) GetProfilesData() *v1development.ProfilesData { + if x != nil { + return x.ProfilesData + } + return nil +} + +var File_pdata_xpdata_request_internal_request_proto protoreflect.FileDescriptor + +var file_pdata_xpdata_request_internal_request_proto_rawDesc = []byte{ + 0x0a, 0x2b, 0x70, 0x64, 0x61, 0x74, 0x61, 0x2f, 0x78, 0x70, 0x64, 0x61, 0x74, 0x61, 0x2f, 0x72, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, + 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x2d, 0x6f, + 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x63, 0x6f, 0x6c, + 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x78, 0x70, 0x64, + 0x61, 0x74, 0x61, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x1a, 0x28, 0x6f, 0x70, + 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2f, 0x74, 0x72, 0x61, 0x63, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x74, 0x72, 0x61, 0x63, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x2c, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, + 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x65, 0x74, 0x72, + 0x69, 0x63, 0x73, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x26, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, + 0x74, 0x72, 0x79, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6c, 0x6f, 0x67, 0x73, 0x2f, 0x76, + 0x31, 0x2f, 0x6c, 0x6f, 0x67, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x39, 0x6f, 0x70, + 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2f, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x2f, 0x76, 0x31, 0x64, 0x65, 0x76, + 0x65, 0x6c, 0x6f, 0x70, 0x6d, 0x65, 0x6e, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, + 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xd0, 0x01, 0x0a, 0x0e, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x7b, 0x0a, 0x10, 0x73, 0x70, + 0x61, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x5f, 0x6d, 0x61, 0x70, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x51, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, + 0x65, 0x74, 0x72, 0x79, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x70, + 0x64, 0x61, 0x74, 0x61, 0x2e, 0x78, 0x70, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x43, 0x6f, 0x6e, 0x74, + 0x65, 0x78, 0x74, 0x2e, 0x53, 0x70, 0x61, 0x6e, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x4d, + 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0e, 0x73, 0x70, 0x61, 0x6e, 0x43, 0x6f, 0x6e, + 0x74, 0x65, 0x78, 0x74, 0x4d, 0x61, 0x70, 0x1a, 0x41, 0x0a, 0x13, 0x53, 0x70, 0x61, 0x6e, 0x43, + 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, + 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, + 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xe9, 0x01, 0x0a, 0x0d, 0x54, + 0x72, 0x61, 0x63, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, + 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x07, 0x52, 0x0d, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x56, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x12, 0x66, 0x0a, 0x0f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x63, + 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, + 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x63, 0x6f, 0x6c, + 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x78, 0x70, 0x64, + 0x61, 0x74, 0x61, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x52, 0x0e, 0x72, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x49, 0x0a, 0x0b, 0x74, + 0x72, 0x61, 0x63, 0x65, 0x73, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x28, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x74, 0x72, 0x61, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, + 0x54, 0x72, 0x61, 0x63, 0x65, 0x73, 0x44, 0x61, 0x74, 0x61, 0x52, 0x0a, 0x74, 0x72, 0x61, 0x63, + 0x65, 0x73, 0x44, 0x61, 0x74, 0x61, 0x22, 0xef, 0x01, 0x0a, 0x0e, 0x4d, 0x65, 0x74, 0x72, 0x69, + 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x66, 0x6f, 0x72, + 0x6d, 0x61, 0x74, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x07, 0x52, 0x0d, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, + 0x12, 0x66, 0x0a, 0x0f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x74, + 0x65, 0x78, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x70, 0x65, 0x6e, + 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, + 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x78, 0x70, 0x64, 0x61, 0x74, 0x61, + 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x52, 0x0e, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x4e, 0x0a, 0x0c, 0x6d, 0x65, 0x74, 0x72, + 0x69, 0x63, 0x73, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, + 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x76, 0x31, 0x2e, + 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x44, 0x61, 0x74, 0x61, 0x52, 0x0b, 0x6d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x73, 0x44, 0x61, 0x74, 0x61, 0x22, 0xe0, 0x01, 0x0a, 0x0b, 0x4c, 0x6f, 0x67, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x66, 0x6f, 0x72, 0x6d, + 0x61, 0x74, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x07, + 0x52, 0x0d, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, + 0x66, 0x0a, 0x0f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, + 0x78, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, + 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, + 0x6f, 0x72, 0x2e, 0x70, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x78, 0x70, 0x64, 0x61, 0x74, 0x61, 0x2e, + 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x52, 0x0e, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x42, 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x73, 0x5f, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x6f, 0x70, 0x65, + 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x6c, 0x6f, 0x67, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x6f, 0x67, 0x73, 0x44, 0x61, 0x74, + 0x61, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x73, 0x44, 0x61, 0x74, 0x61, 0x22, 0xff, 0x01, 0x0a, 0x0f, + 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x25, 0x0a, 0x0e, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x07, 0x52, 0x0d, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x56, + 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x66, 0x0a, 0x0f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x3d, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, + 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x64, 0x61, 0x74, 0x61, 0x2e, + 0x78, 0x70, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x52, 0x0e, + 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x5d, + 0x0a, 0x0d, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x38, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, + 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x66, + 0x69, 0x6c, 0x65, 0x73, 0x2e, 0x76, 0x31, 0x64, 0x65, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x6d, 0x65, + 0x6e, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x44, 0x61, 0x74, 0x61, 0x52, + 0x0c, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x44, 0x61, 0x74, 0x61, 0x42, 0x35, 0x5a, + 0x33, 0x67, 0x6f, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, + 0x79, 0x2e, 0x69, 0x6f, 0x2f, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x70, + 0x64, 0x61, 0x74, 0x61, 0x2f, 0x78, 0x70, 0x64, 0x61, 0x74, 0x61, 0x2f, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_pdata_xpdata_request_internal_request_proto_rawDescOnce sync.Once + file_pdata_xpdata_request_internal_request_proto_rawDescData = file_pdata_xpdata_request_internal_request_proto_rawDesc +) + +func file_pdata_xpdata_request_internal_request_proto_rawDescGZIP() []byte { + file_pdata_xpdata_request_internal_request_proto_rawDescOnce.Do(func() { + file_pdata_xpdata_request_internal_request_proto_rawDescData = protoimpl.X.CompressGZIP(file_pdata_xpdata_request_internal_request_proto_rawDescData) + }) + return file_pdata_xpdata_request_internal_request_proto_rawDescData +} + +var file_pdata_xpdata_request_internal_request_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_pdata_xpdata_request_internal_request_proto_goTypes = []interface{}{ + (*RequestContext)(nil), // 0: opentelemetry.collector.pdata.xpdata.internal.RequestContext + (*TracesRequest)(nil), // 1: opentelemetry.collector.pdata.xpdata.internal.TracesRequest + (*MetricsRequest)(nil), // 2: opentelemetry.collector.pdata.xpdata.internal.MetricsRequest + (*LogsRequest)(nil), // 3: opentelemetry.collector.pdata.xpdata.internal.LogsRequest + (*ProfilesRequest)(nil), // 4: opentelemetry.collector.pdata.xpdata.internal.ProfilesRequest + nil, // 5: opentelemetry.collector.pdata.xpdata.internal.RequestContext.SpanContextMapEntry + (*v1.TracesData)(nil), // 6: opentelemetry.proto.trace.v1.TracesData + (*v11.MetricsData)(nil), // 7: opentelemetry.proto.metrics.v1.MetricsData + (*v12.LogsData)(nil), // 8: opentelemetry.proto.logs.v1.LogsData + (*v1development.ProfilesData)(nil), // 9: opentelemetry.proto.profiles.v1development.ProfilesData +} +var file_pdata_xpdata_request_internal_request_proto_depIdxs = []int32{ + 5, // 0: opentelemetry.collector.pdata.xpdata.internal.RequestContext.span_context_map:type_name -> opentelemetry.collector.pdata.xpdata.internal.RequestContext.SpanContextMapEntry + 0, // 1: opentelemetry.collector.pdata.xpdata.internal.TracesRequest.request_context:type_name -> opentelemetry.collector.pdata.xpdata.internal.RequestContext + 6, // 2: opentelemetry.collector.pdata.xpdata.internal.TracesRequest.traces_data:type_name -> opentelemetry.proto.trace.v1.TracesData + 0, // 3: opentelemetry.collector.pdata.xpdata.internal.MetricsRequest.request_context:type_name -> opentelemetry.collector.pdata.xpdata.internal.RequestContext + 7, // 4: opentelemetry.collector.pdata.xpdata.internal.MetricsRequest.metrics_data:type_name -> opentelemetry.proto.metrics.v1.MetricsData + 0, // 5: opentelemetry.collector.pdata.xpdata.internal.LogsRequest.request_context:type_name -> opentelemetry.collector.pdata.xpdata.internal.RequestContext + 8, // 6: opentelemetry.collector.pdata.xpdata.internal.LogsRequest.logs_data:type_name -> opentelemetry.proto.logs.v1.LogsData + 0, // 7: opentelemetry.collector.pdata.xpdata.internal.ProfilesRequest.request_context:type_name -> opentelemetry.collector.pdata.xpdata.internal.RequestContext + 9, // 8: opentelemetry.collector.pdata.xpdata.internal.ProfilesRequest.profiles_data:type_name -> opentelemetry.proto.profiles.v1development.ProfilesData + 9, // [9:9] is the sub-list for method output_type + 9, // [9:9] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name +} + +func init() { file_pdata_xpdata_request_internal_request_proto_init() } +func file_pdata_xpdata_request_internal_request_proto_init() { + if File_pdata_xpdata_request_internal_request_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pdata_xpdata_request_internal_request_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RequestContext); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pdata_xpdata_request_internal_request_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TracesRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pdata_xpdata_request_internal_request_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pdata_xpdata_request_internal_request_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LogsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pdata_xpdata_request_internal_request_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ProfilesRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pdata_xpdata_request_internal_request_proto_rawDesc, + NumEnums: 0, + NumMessages: 6, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_pdata_xpdata_request_internal_request_proto_goTypes, + DependencyIndexes: file_pdata_xpdata_request_internal_request_proto_depIdxs, + MessageInfos: file_pdata_xpdata_request_internal_request_proto_msgTypes, + }.Build() + File_pdata_xpdata_request_internal_request_proto = out.File + file_pdata_xpdata_request_internal_request_proto_rawDesc = nil + file_pdata_xpdata_request_internal_request_proto_goTypes = nil + file_pdata_xpdata_request_internal_request_proto_depIdxs = nil +} diff --git a/pdata/xpdata/request/internal/request.proto b/pdata/xpdata/request/internal/request.proto new file mode 100644 index 00000000000..777d4903291 --- /dev/null +++ b/pdata/xpdata/request/internal/request.proto @@ -0,0 +1,50 @@ +syntax = "proto3"; + +package opentelemetry.collector.pdata.xpdata.internal; + +option go_package = "go.opentelemetry.io/collector/pdata/xpdata/internal"; + +import "opentelemetry/proto/trace/v1/trace.proto"; +import "opentelemetry/proto/metrics/v1/metrics.proto"; +import "opentelemetry/proto/logs/v1/logs.proto"; +import "opentelemetry/proto/profiles/v1development/profiles.proto"; + + +// RequestContext represents metadata associated with a telemetry export request. +message RequestContext { + // Span context encoded using W3C trace context format. + // This map typically includes "traceparent" and optionally "tracestate" keys. + map span_context_map = 1; +} + +// The following messages are wrappers around standard OpenTelemetry data types. +// They embed request-level context and a version discriminator to ensure they are not wire-compatible with +// the canonical OpenTelemetry proto messages. +// +// Each wrapper reserves field tag 1 for a `fixed32` (protobuf wire type 5) format_version field, which makes it +// structurally incompatible with the standard OTLP messages which use tag 1 for the data message field (protobuf wire type 2). +// This ensures old and new formats cannot be confused during decoding. + +message TracesRequest { + fixed32 format_version = 1; + RequestContext request_context = 2; + opentelemetry.proto.trace.v1.TracesData traces_data = 3; +} + +message MetricsRequest { + fixed32 format_version = 1; + RequestContext request_context = 2; + opentelemetry.proto.metrics.v1.MetricsData metrics_data = 3; +} + +message LogsRequest { + fixed32 format_version = 1; + RequestContext request_context = 2; + opentelemetry.proto.logs.v1.LogsData logs_data = 3; +} + +message ProfilesRequest { + fixed32 format_version = 1; + RequestContext request_context = 2; + opentelemetry.proto.profiles.v1development.ProfilesData profiles_data = 3; +} diff --git a/pdata/xpdata/request/logs_request.go b/pdata/xpdata/request/logs_request.go new file mode 100644 index 00000000000..cf38f0518cc --- /dev/null +++ b/pdata/xpdata/request/logs_request.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/pdata/xpdata/request" + +import ( + "context" + "fmt" + + "google.golang.org/protobuf/proto" + + "go.opentelemetry.io/collector/pdata/internal" + "go.opentelemetry.io/collector/pdata/plog" + reqint "go.opentelemetry.io/collector/pdata/xpdata/request/internal" +) + +// MarshalLogs marshals plog.Logs along with the context into a byte slice. +func MarshalLogs(ctx context.Context, ld plog.Logs) ([]byte, error) { + otlpLogs := internal.LogsToProto(internal.Logs(ld)) + rc := encodeContext(ctx) + lr := reqint.LogsRequest{ + FormatVersion: requestFormatVersion, + LogsData: &otlpLogs, + RequestContext: &rc, + } + return proto.Marshal(&lr) +} + +// UnmarshalLogs unmarshals a byte slice into plog.Logs and the context. +func UnmarshalLogs(buf []byte) (context.Context, plog.Logs, error) { + if !isRequestPayloadV1(buf) { + return context.Background(), plog.Logs{}, ErrInvalidFormat + } + lr := reqint.LogsRequest{} + if err := proto.Unmarshal(buf, &lr); err != nil { + return context.Background(), plog.Logs{}, fmt.Errorf("failed to unmarshal logs request: %w", err) + } + return decodeContext(lr.RequestContext), plog.Logs(internal.LogsFromProto(*lr.LogsData)), nil +} diff --git a/pdata/xpdata/request/logs_request_test.go b/pdata/xpdata/request/logs_request_test.go new file mode 100644 index 00000000000..50f51997dd4 --- /dev/null +++ b/pdata/xpdata/request/logs_request_test.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/pdata/xpdata/request" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/testdata" +) + +func TestMarshalUnmarshalLogsRequest(t *testing.T) { + logs := testdata.GenerateLogs(3) + + // unmarshal logs request with a context + spanCtx := fakeSpanContext(t) + buf, err := MarshalLogs(trace.ContextWithSpanContext(context.Background(), spanCtx), logs) + require.NoError(t, err) + gotCtx, gotLogs, err := UnmarshalLogs(buf) + require.NoError(t, err) + assert.Equal(t, spanCtx, trace.SpanContextFromContext(gotCtx)) + assert.Equal(t, logs, gotLogs) + + // unmarshal logs request with empty context + buf, err = MarshalLogs(context.Background(), logs) + require.NoError(t, err) + gotCtx, gotLogs, err = UnmarshalLogs(buf) + require.NoError(t, err) + assert.Equal(t, context.Background(), gotCtx) + assert.Equal(t, logs, gotLogs) + + // unmarshal corrupted data + _, _, err = UnmarshalLogs(buf[:len(buf)-1]) + require.ErrorContains(t, err, "failed to unmarshal logs request") + + // unmarshal invalid format (bare logs) + buf, err = (&plog.ProtoMarshaler{}).MarshalLogs(logs) + require.NoError(t, err) + _, _, err = UnmarshalLogs(buf) + require.ErrorIs(t, err, ErrInvalidFormat) +} diff --git a/pdata/xpdata/request/metrics_request.go b/pdata/xpdata/request/metrics_request.go new file mode 100644 index 00000000000..428a8eab3a8 --- /dev/null +++ b/pdata/xpdata/request/metrics_request.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/pdata/xpdata/request" + +import ( + "context" + "fmt" + + "google.golang.org/protobuf/proto" + + "go.opentelemetry.io/collector/pdata/internal" + "go.opentelemetry.io/collector/pdata/pmetric" + reqint "go.opentelemetry.io/collector/pdata/xpdata/request/internal" +) + +// MarshalMetrics marshals pmetric.Metrics along with the context into a byte slice. +func MarshalMetrics(ctx context.Context, ld pmetric.Metrics) ([]byte, error) { + otlpMetrics := internal.MetricsToProto(internal.Metrics(ld)) + rc := encodeContext(ctx) + mr := reqint.MetricsRequest{ + FormatVersion: requestFormatVersion, + MetricsData: &otlpMetrics, + RequestContext: &rc, + } + return proto.Marshal(&mr) +} + +// UnmarshalMetrics unmarshals a byte slice into pmetric.Metrics and the context. +func UnmarshalMetrics(buf []byte) (context.Context, pmetric.Metrics, error) { + if !isRequestPayloadV1(buf) { + return context.Background(), pmetric.Metrics{}, ErrInvalidFormat + } + mr := reqint.MetricsRequest{} + if err := proto.Unmarshal(buf, &mr); err != nil { + return context.Background(), pmetric.Metrics{}, fmt.Errorf("failed to unmarshal metrics request: %w", err) + } + return decodeContext(mr.RequestContext), pmetric.Metrics(internal.MetricsFromProto(*mr.MetricsData)), nil +} diff --git a/pdata/xpdata/request/metrics_request_test.go b/pdata/xpdata/request/metrics_request_test.go new file mode 100644 index 00000000000..a7cac305b0b --- /dev/null +++ b/pdata/xpdata/request/metrics_request_test.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/pdata/xpdata/request" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/testdata" +) + +func TestMarshalUnmarshalMetricsRequest(t *testing.T) { + metrics := testdata.GenerateMetrics(3) + + // unmarshal metrics request with a context + spanCtx := fakeSpanContext(t) + buf, err := MarshalMetrics(trace.ContextWithSpanContext(context.Background(), spanCtx), metrics) + require.NoError(t, err) + gotCtx, gotMetrics, err := UnmarshalMetrics(buf) + require.NoError(t, err) + assert.Equal(t, spanCtx, trace.SpanContextFromContext(gotCtx)) + assert.Equal(t, metrics, gotMetrics) + + // unmarshal metrics request with empty context + buf, err = MarshalMetrics(context.Background(), metrics) + require.NoError(t, err) + gotCtx, gotMetrics, err = UnmarshalMetrics(buf) + require.NoError(t, err) + assert.Equal(t, context.Background(), gotCtx) + assert.Equal(t, metrics, gotMetrics) + + // unmarshal corrupted data + _, _, err = UnmarshalMetrics(buf[:len(buf)-1]) + require.ErrorContains(t, err, "failed to unmarshal metrics request") + + // unmarshal invalid format (bare metrics) + buf, err = (&pmetric.ProtoMarshaler{}).MarshalMetrics(metrics) + require.NoError(t, err) + _, _, err = UnmarshalMetrics(buf) + require.ErrorIs(t, err, ErrInvalidFormat) +} diff --git a/pdata/xpdata/request/profiles_request.go b/pdata/xpdata/request/profiles_request.go new file mode 100644 index 00000000000..cdb0d9751d0 --- /dev/null +++ b/pdata/xpdata/request/profiles_request.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/pdata/xpdata/request" + +import ( + "context" + "fmt" + + "google.golang.org/protobuf/proto" + + "go.opentelemetry.io/collector/pdata/internal" + "go.opentelemetry.io/collector/pdata/pprofile" + reqint "go.opentelemetry.io/collector/pdata/xpdata/request/internal" +) + +// MarshalProfiles marshals pprofile.Profiles along with the context into a byte slice. +func MarshalProfiles(ctx context.Context, ld pprofile.Profiles) ([]byte, error) { + otlpProfiles := internal.ProfilesToProto(internal.Profiles(ld)) + rc := encodeContext(ctx) + pr := reqint.ProfilesRequest{ + FormatVersion: requestFormatVersion, + ProfilesData: &otlpProfiles, + RequestContext: &rc, + } + return proto.Marshal(&pr) +} + +// UnmarshalProfiles unmarshals a byte slice into pprofile.Profiles and the context. +func UnmarshalProfiles(buf []byte) (context.Context, pprofile.Profiles, error) { + if !isRequestPayloadV1(buf) { + return context.Background(), pprofile.Profiles{}, ErrInvalidFormat + } + pr := reqint.ProfilesRequest{} + if err := proto.Unmarshal(buf, &pr); err != nil { + return context.Background(), pprofile.Profiles{}, fmt.Errorf("failed to unmarshal profiles request: %w", err) + } + return decodeContext(pr.RequestContext), pprofile.Profiles(internal.ProfilesFromProto(*pr.ProfilesData)), nil +} diff --git a/pdata/xpdata/request/profiles_request_test.go b/pdata/xpdata/request/profiles_request_test.go new file mode 100644 index 00000000000..22b83fe7502 --- /dev/null +++ b/pdata/xpdata/request/profiles_request_test.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/pdata/xpdata/request" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pdata/testdata" +) + +func TestMarshalUnmarshalProfilesRequest(t *testing.T) { + profiles := testdata.GenerateProfiles(3) + + // unmarshal profiles request with a context + spanCtx := fakeSpanContext(t) + buf, err := MarshalProfiles(trace.ContextWithSpanContext(context.Background(), spanCtx), profiles) + require.NoError(t, err) + gotCtx, gotProfiles, err := UnmarshalProfiles(buf) + require.NoError(t, err) + assert.Equal(t, spanCtx, trace.SpanContextFromContext(gotCtx)) + assert.Equal(t, profiles, gotProfiles) + + // unmarshal profiles request with empty context + buf, err = MarshalProfiles(context.Background(), profiles) + require.NoError(t, err) + gotCtx, gotProfiles, err = UnmarshalProfiles(buf) + require.NoError(t, err) + assert.Equal(t, context.Background(), gotCtx) + assert.Equal(t, profiles, gotProfiles) + + // unmarshal corrupted data + _, _, err = UnmarshalProfiles(buf[:len(buf)-1]) + require.ErrorContains(t, err, "failed to unmarshal profiles request") + + // unmarshal invalid format (bare profiles) + buf, err = (&pprofile.ProtoMarshaler{}).MarshalProfiles(profiles) + require.NoError(t, err) + _, _, err = UnmarshalProfiles(buf) + require.ErrorIs(t, err, ErrInvalidFormat) +} diff --git a/pdata/xpdata/request/requesttest.go b/pdata/xpdata/request/requesttest.go new file mode 100644 index 00000000000..099c2bdb005 --- /dev/null +++ b/pdata/xpdata/request/requesttest.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/pdata/xpdata/request" + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" +) + +func fakeSpanContext(t *testing.T) trace.SpanContext { + traceID, err := trace.TraceIDFromHex("0102030405060708090a0b0c0d0e0f10") + require.NoError(t, err) + spanID, err := trace.SpanIDFromHex("0102030405060708") + require.NoError(t, err) + return trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + TraceFlags: 0x01, + Remote: true, + }) +} diff --git a/pdata/xpdata/request/traces_request.go b/pdata/xpdata/request/traces_request.go new file mode 100644 index 00000000000..ba06cffa37e --- /dev/null +++ b/pdata/xpdata/request/traces_request.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/pdata/xpdata/request" + +import ( + "context" + "fmt" + + "google.golang.org/protobuf/proto" + + "go.opentelemetry.io/collector/pdata/internal" + "go.opentelemetry.io/collector/pdata/ptrace" + reqint "go.opentelemetry.io/collector/pdata/xpdata/request/internal" +) + +// MarshalTraces marshals ptrace.Traces along with the context into a byte slice. +func MarshalTraces(ctx context.Context, ld ptrace.Traces) ([]byte, error) { + otlpTraces := internal.TracesToProto(internal.Traces(ld)) + rc := encodeContext(ctx) + tr := reqint.TracesRequest{ + FormatVersion: requestFormatVersion, + TracesData: &otlpTraces, + RequestContext: &rc, + } + return proto.Marshal(&tr) +} + +// UnmarshalTraces unmarshals a byte slice into ptrace.Traces and the context. +func UnmarshalTraces(buf []byte) (context.Context, ptrace.Traces, error) { + if !isRequestPayloadV1(buf) { + return context.Background(), ptrace.Traces{}, ErrInvalidFormat + } + tr := reqint.TracesRequest{} + if err := proto.Unmarshal(buf, &tr); err != nil { + return context.Background(), ptrace.Traces{}, fmt.Errorf("failed to unmarshal traces request: %w", err) + } + return decodeContext(tr.RequestContext), ptrace.Traces(internal.TracesFromProto(*tr.TracesData)), nil +} diff --git a/pdata/xpdata/request/traces_request_test.go b/pdata/xpdata/request/traces_request_test.go new file mode 100644 index 00000000000..8bfc561ccc2 --- /dev/null +++ b/pdata/xpdata/request/traces_request_test.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/pdata/xpdata/request" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/testdata" +) + +func TestMarshalUnmarshalTracesRequest(t *testing.T) { + traces := testdata.GenerateTraces(3) + + // unmarshal traces request with a context + spanCtx := fakeSpanContext(t) + buf, err := MarshalTraces(trace.ContextWithSpanContext(context.Background(), spanCtx), traces) + require.NoError(t, err) + gotCtx, gotTraces, err := UnmarshalTraces(buf) + require.NoError(t, err) + assert.Equal(t, spanCtx, trace.SpanContextFromContext(gotCtx)) + assert.Equal(t, traces, gotTraces) + + // unmarshal traces request with empty context + buf, err = MarshalTraces(context.Background(), traces) + require.NoError(t, err) + gotCtx, gotTraces, err = UnmarshalTraces(buf) + require.NoError(t, err) + assert.Equal(t, context.Background(), gotCtx) + assert.Equal(t, traces, gotTraces) + + // unmarshal corrupted data + _, _, err = UnmarshalTraces(buf[:len(buf)-1]) + require.ErrorContains(t, err, "failed to unmarshal traces request") + + // unmarshal invalid format (bare traces) + buf, err = (&ptrace.ProtoMarshaler{}).MarshalTraces(traces) + require.NoError(t, err) + _, _, err = UnmarshalTraces(buf) + require.ErrorIs(t, err, ErrInvalidFormat) +} diff --git a/pdata/xpdata/request/version_check.go b/pdata/xpdata/request/version_check.go new file mode 100644 index 00000000000..71e81ee00f8 --- /dev/null +++ b/pdata/xpdata/request/version_check.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/pdata/xpdata/request" + +import ( + "encoding/binary" + "errors" +) + +const ( + // field 1 << 3, wire type 5 (fixed32) + protoTag1TypeByte = 0x0D + + // version of the request payload format set in the `format_version` field + requestFormatVersion = uint32(1) +) + +var ErrInvalidFormat = errors.New("invalid request payload format") + +// isRequestPayloadV1 returns true if the given payload represents one of the wrappers around standard OpenTelemetry +// data types defined in internal/request.go and has the version set to 1. +func isRequestPayloadV1(data []byte) bool { + if len(data) < 5 { + return false + } + if data[0] != protoTag1TypeByte { + return false + } + return binary.LittleEndian.Uint32(data[1:5]) == requestFormatVersion +} diff --git a/pdata/xpdata/request/version_check_test.go b/pdata/xpdata/request/version_check_test.go new file mode 100644 index 00000000000..1d40378446d --- /dev/null +++ b/pdata/xpdata/request/version_check_test.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request + +import ( + "context" + "encoding/binary" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestIsRequestPayloadV1(t *testing.T) { + // too short + assert.False(t, isRequestPayloadV1([]byte{protoTag1TypeByte, 0x00})) + + buf := make([]byte, 5) + + // wrong type: field 1, wire type 2 (length-delimited) + buf[0] = 0x0A + assert.False(t, isRequestPayloadV1([]byte{protoTag1TypeByte, 0x00})) + + // wrong version + buf[0] = protoTag1TypeByte + binary.LittleEndian.PutUint32(buf[1:], 2) + assert.False(t, isRequestPayloadV1(buf)) + + binary.LittleEndian.PutUint32(buf[1:], requestFormatVersion) + assert.True(t, isRequestPayloadV1(buf)) + + buf, err := MarshalMetrics(context.Background(), pmetric.NewMetrics()) + require.NoError(t, err) + assert.True(t, isRequestPayloadV1(buf)) + + buf, err = MarshalTraces(context.Background(), ptrace.NewTraces()) + require.NoError(t, err) + assert.True(t, isRequestPayloadV1(buf)) + + buf, err = MarshalLogs(context.Background(), plog.NewLogs()) + require.NoError(t, err) + assert.True(t, isRequestPayloadV1(buf)) + + buf, err = MarshalProfiles(context.Background(), pprofile.NewProfiles()) + require.NoError(t, err) + assert.True(t, isRequestPayloadV1(buf)) +} diff --git a/service/go.mod b/service/go.mod index 868c0ac483e..3dd165a4a04 100644 --- a/service/go.mod +++ b/service/go.mod @@ -243,3 +243,5 @@ replace go.opentelemetry.io/collector/extension/extensionmiddleware => ../extens replace go.opentelemetry.io/collector/config/configmiddleware => ../config/configmiddleware replace go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest => ../extension/extensionmiddleware/extensionmiddlewaretest + +replace go.opentelemetry.io/collector/pdata/xpdata => ../pdata/xpdata diff --git a/service/hostcapabilities/go.mod b/service/hostcapabilities/go.mod index d2ecf97a466..2fb8fa831dd 100644 --- a/service/hostcapabilities/go.mod +++ b/service/hostcapabilities/go.mod @@ -95,3 +95,5 @@ replace go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmid replace go.opentelemetry.io/collector/config/configmiddleware => ../../config/configmiddleware replace go.opentelemetry.io/collector/extension/extensionmiddleware => ../../extension/extensionmiddleware + +replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata diff --git a/versions.yaml b/versions.yaml index 8f509d6d179..f435b6d5900 100644 --- a/versions.yaml +++ b/versions.yaml @@ -73,6 +73,7 @@ module-sets: - go.opentelemetry.io/collector/otelcol/otelcoltest - go.opentelemetry.io/collector/pdata/pprofile - go.opentelemetry.io/collector/pdata/testdata + - go.opentelemetry.io/collector/pdata/xpdata - go.opentelemetry.io/collector/pipeline - go.opentelemetry.io/collector/pipeline/xpipeline - go.opentelemetry.io/collector/processor/processortest