@@ -6,18 +6,21 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel
66import (
77 "context"
88 "encoding/binary"
9+ "encoding/hex"
910 "encoding/json"
1011 "errors"
1112 "fmt"
1213 "strconv"
1314 "sync"
1415
16+ "go.opentelemetry.io/otel/trace"
1517 "go.uber.org/zap"
1618
1719 "go.opentelemetry.io/collector/component"
1820 "go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
1921 "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2022 "go.opentelemetry.io/collector/extension/xextension/storage"
23+ "go.opentelemetry.io/collector/featuregate"
2124 "go.opentelemetry.io/collector/pipeline"
2225)
2326
@@ -35,13 +38,23 @@ const (
3538 // TODO: Enable when https://github.com/open-telemetry/opentelemetry-collector/issues/12890 is done
3639 //nolint:unused
3740 queueMetadataKey = "qmv0"
41+ errInvalidTraceFlagsLength = "trace flags must only be 1 byte"
3842)
3943
4044var (
4145 errValueNotSet = errors .New ("value not set" )
4246 errInvalidValue = errors .New ("invalid value" )
4347 errNoStorageClient = errors .New ("no storage client extension found" )
4448 errWrongExtensionType = errors .New ("requested extension is not a storage extension" )
49+
50+ // persistRequestContextFeatureGate controls whether request context should be persisted in the queue.
51+ persistRequestContextFeatureGate = featuregate .GlobalRegistry ().MustRegister (
52+ "exporter.PersistRequestContext" ,
53+ featuregate .StageAlpha ,
54+ featuregate .WithRegisterFromVersion ("v0.127.0" ),
55+ featuregate .WithRegisterDescription ("controls whether context should be stored alongside requests in the persistent queue" ),
56+ featuregate .WithRegisterReferenceURL ("https://github.com/open-telemetry/opentelemetry-collector/pull/12934" ),
57+ )
4558)
4659
4760var indexDonePool = sync.Pool {
@@ -245,6 +258,80 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
245258 return pq .putInternal (ctx , req )
246259}
247260
261+ // necessary due to SpanContext and SpanContextConfig not supporting Unmarshal interface,
262+ // see https://github.com/open-telemetry/opentelemetry-go/issues/1819.
263+ type spanContext struct {
264+ TraceID string
265+ SpanID string
266+ TraceFlags string
267+ TraceState string
268+ Remote bool
269+ }
270+
271+ func localSpanContextFromTraceSpanContext (sc trace.SpanContext ) spanContext {
272+ return spanContext {
273+ TraceID : sc .TraceID ().String (),
274+ SpanID : sc .SpanID ().String (),
275+ TraceFlags : sc .TraceFlags ().String (),
276+ TraceState : sc .TraceState ().String (),
277+ Remote : sc .IsRemote (),
278+ }
279+ }
280+
281+ func contextWithLocalSpanContext (ctx context.Context , sc spanContext ) context.Context {
282+ traceID , err := trace .TraceIDFromHex (sc .TraceID )
283+ if err != nil {
284+ return ctx
285+ }
286+ spanID , err := trace .SpanIDFromHex (sc .SpanID )
287+ if err != nil {
288+ return ctx
289+ }
290+ traceFlags , err := traceFlagsFromHex (sc .TraceFlags )
291+ if err != nil {
292+ return ctx
293+ }
294+ traceState , err := trace .ParseTraceState (sc .TraceState )
295+ if err != nil {
296+ return ctx
297+ }
298+
299+ return trace .ContextWithSpanContext (ctx , trace .NewSpanContext (trace.SpanContextConfig {
300+ TraceID : traceID ,
301+ SpanID : spanID ,
302+ TraceFlags : * traceFlags ,
303+ TraceState : traceState ,
304+ Remote : sc .Remote ,
305+ }))
306+ }
307+
308+ // requestContext wraps trace.SpanContext to allow for unmarshaling as well as
309+ // future metadata key/value pairs to be added.
310+ type requestContext struct {
311+ SpanContext spanContext
312+ }
313+
314+ // reverse of code in trace library https://github.com/open-telemetry/opentelemetry-go/blob/v1.35.0/trace/trace.go#L143-L168
315+ func traceFlagsFromHex (hexStr string ) (* trace.TraceFlags , error ) {
316+ decoded , err := hex .DecodeString (hexStr )
317+ if err != nil {
318+ return nil , err
319+ }
320+ if len (decoded ) != 1 {
321+ return nil , errors .New (errInvalidTraceFlagsLength )
322+ }
323+ traceFlags := trace .TraceFlags (decoded [0 ])
324+ return & traceFlags , nil
325+ }
326+
327+ func getAndMarshalSpanContext (ctx context.Context ) ([]byte , error ) {
328+ if ! persistRequestContextFeatureGate .IsEnabled () {
329+ return nil , nil
330+ }
331+ rc := localSpanContextFromTraceSpanContext (trace .SpanContextFromContext (ctx ))
332+ return json .Marshal (requestContext {SpanContext : rc })
333+ }
334+
248335// putInternal is the internal version that requires caller to hold the mutex lock.
249336func (pq * persistentQueue [T ]) putInternal (ctx context.Context , req T ) error {
250337 reqSize := pq .set .sizer .Sizeof (req )
0 commit comments