@@ -6,21 +6,18 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel
66import (
77 "context"
88 "encoding/binary"
9- "encoding/hex"
109 "encoding/json"
1110 "errors"
1211 "fmt"
1312 "strconv"
1413 "sync"
1514
16- "go.opentelemetry.io/otel/trace"
1715 "go.uber.org/zap"
1816
1917 "go.opentelemetry.io/collector/component"
2018 "go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
2119 "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2220 "go.opentelemetry.io/collector/extension/xextension/storage"
23- "go.opentelemetry.io/collector/featuregate"
2421 "go.opentelemetry.io/collector/pipeline"
2522)
2623
@@ -38,23 +35,13 @@ const (
3835 // TODO: Enable when https://github.com/open-telemetry/opentelemetry-collector/issues/12890 is done
3936 //nolint:unused
4037 queueMetadataKey = "qmv0"
41- errInvalidTraceFlagsLength = "trace flags must only be 1 byte"
4238)
4339
4440var (
4541 errValueNotSet = errors .New ("value not set" )
4642 errInvalidValue = errors .New ("invalid value" )
4743 errNoStorageClient = errors .New ("no storage client extension found" )
4844 errWrongExtensionType = errors .New ("requested extension is not a storage extension" )
49-
50- // persistRequestContextFeatureGate controls whether request context should be persisted in the queue.
51- persistRequestContextFeatureGate = featuregate .GlobalRegistry ().MustRegister (
52- "exporter.PersistRequestContext" ,
53- featuregate .StageAlpha ,
54- featuregate .WithRegisterFromVersion ("v0.127.0" ),
55- featuregate .WithRegisterDescription ("controls whether context should be stored alongside requests in the persistent queue" ),
56- featuregate .WithRegisterReferenceURL ("https://github.com/open-telemetry/opentelemetry-collector/pull/12934" ),
57- )
5845)
5946
6047var indexDonePool = sync.Pool {
@@ -258,80 +245,6 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
258245 return pq .putInternal (ctx , req )
259246}
260247
261- // necessary due to SpanContext and SpanContextConfig not supporting Unmarshal interface,
262- // see https://github.com/open-telemetry/opentelemetry-go/issues/1819.
263- type spanContext struct {
264- TraceID string
265- SpanID string
266- TraceFlags string
267- TraceState string
268- Remote bool
269- }
270-
271- func localSpanContextFromTraceSpanContext (sc trace.SpanContext ) spanContext {
272- return spanContext {
273- TraceID : sc .TraceID ().String (),
274- SpanID : sc .SpanID ().String (),
275- TraceFlags : sc .TraceFlags ().String (),
276- TraceState : sc .TraceState ().String (),
277- Remote : sc .IsRemote (),
278- }
279- }
280-
281- func contextWithLocalSpanContext (ctx context.Context , sc spanContext ) context.Context {
282- traceID , err := trace .TraceIDFromHex (sc .TraceID )
283- if err != nil {
284- return ctx
285- }
286- spanID , err := trace .SpanIDFromHex (sc .SpanID )
287- if err != nil {
288- return ctx
289- }
290- traceFlags , err := traceFlagsFromHex (sc .TraceFlags )
291- if err != nil {
292- return ctx
293- }
294- traceState , err := trace .ParseTraceState (sc .TraceState )
295- if err != nil {
296- return ctx
297- }
298-
299- return trace .ContextWithSpanContext (ctx , trace .NewSpanContext (trace.SpanContextConfig {
300- TraceID : traceID ,
301- SpanID : spanID ,
302- TraceFlags : * traceFlags ,
303- TraceState : traceState ,
304- Remote : sc .Remote ,
305- }))
306- }
307-
308- // requestContext wraps trace.SpanContext to allow for unmarshaling as well as
309- // future metadata key/value pairs to be added.
310- type requestContext struct {
311- SpanContext spanContext
312- }
313-
314- // reverse of code in trace library https://github.com/open-telemetry/opentelemetry-go/blob/v1.35.0/trace/trace.go#L143-L168
315- func traceFlagsFromHex (hexStr string ) (* trace.TraceFlags , error ) {
316- decoded , err := hex .DecodeString (hexStr )
317- if err != nil {
318- return nil , err
319- }
320- if len (decoded ) != 1 {
321- return nil , errors .New (errInvalidTraceFlagsLength )
322- }
323- traceFlags := trace .TraceFlags (decoded [0 ])
324- return & traceFlags , nil
325- }
326-
327- func getAndMarshalSpanContext (ctx context.Context ) ([]byte , error ) {
328- if ! persistRequestContextFeatureGate .IsEnabled () {
329- return nil , nil
330- }
331- rc := localSpanContextFromTraceSpanContext (trace .SpanContextFromContext (ctx ))
332- return json .Marshal (requestContext {SpanContext : rc })
333- }
334-
335248// putInternal is the internal version that requires caller to hold the mutex lock.
336249func (pq * persistentQueue [T ]) putInternal (ctx context.Context , req T ) error {
337250 reqSize := pq .set .sizer .Sizeof (req )
@@ -668,10 +581,6 @@ func getItemKey(index uint64) string {
668581 return strconv .FormatUint (index , 10 )
669582}
670583
671- func getContextKey (index uint64 ) string {
672- return strconv .FormatUint (index , 10 ) + "_context"
673- }
674-
675584func itemIndexToBytes (value uint64 ) []byte {
676585 return binary .LittleEndian .AppendUint64 ([]byte {}, value )
677586}
0 commit comments