Skip to content

Commit c88a904

Browse files
committed
OTEL-2540 create persistent_queue_context and test files
1 parent 700b678 commit c88a904

4 files changed

Lines changed: 936 additions & 896 deletions

File tree

exporter/exporterhelper/internal/queuebatch/persistent_queue.go

Lines changed: 1 addition & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,18 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel
66
import (
77
"context"
88
"encoding/binary"
9-
"encoding/hex"
109
"encoding/json"
1110
"errors"
1211
"fmt"
1312
"strconv"
1413
"sync"
1514

16-
"go.opentelemetry.io/otel/trace"
1715
"go.uber.org/zap"
1816

1917
"go.opentelemetry.io/collector/component"
2018
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
2119
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2220
"go.opentelemetry.io/collector/extension/xextension/storage"
23-
"go.opentelemetry.io/collector/featuregate"
2421
"go.opentelemetry.io/collector/pipeline"
2522
)
2623

@@ -37,24 +34,14 @@ const (
3734
// queueMetadataKey is the new single key for all queue metadata.
3835
// TODO: Enable when https://github.com/open-telemetry/opentelemetry-collector/issues/12890 is done
3936
//nolint:unused
40-
queueMetadataKey = "qmv0"
41-
errInvalidTraceFlagsLength = "trace flags must only be 1 byte"
37+
queueMetadataKey = "qmv0"
4238
)
4339

4440
var (
4541
errValueNotSet = errors.New("value not set")
4642
errInvalidValue = errors.New("invalid value")
4743
errNoStorageClient = errors.New("no storage client extension found")
4844
errWrongExtensionType = errors.New("requested extension is not a storage extension")
49-
50-
// persistRequestContextFeatureGate controls whether request context should be persisted in the queue.
51-
persistRequestContextFeatureGate = featuregate.GlobalRegistry().MustRegister(
52-
"exporter.PersistRequestContext",
53-
featuregate.StageAlpha,
54-
featuregate.WithRegisterFromVersion("v0.127.0"),
55-
featuregate.WithRegisterDescription("controls whether context should be stored alongside requests in the persistent queue"),
56-
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector/pull/12934"),
57-
)
5845
)
5946

6047
var indexDonePool = sync.Pool{
@@ -258,80 +245,6 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
258245
return pq.putInternal(ctx, req)
259246
}
260247

261-
// necessary due to SpanContext and SpanContextConfig not supporting Unmarshal interface,
262-
// see https://github.com/open-telemetry/opentelemetry-go/issues/1819.
263-
type spanContext struct {
264-
TraceID string
265-
SpanID string
266-
TraceFlags string
267-
TraceState string
268-
Remote bool
269-
}
270-
271-
func localSpanContextFromTraceSpanContext(sc trace.SpanContext) spanContext {
272-
return spanContext{
273-
TraceID: sc.TraceID().String(),
274-
SpanID: sc.SpanID().String(),
275-
TraceFlags: sc.TraceFlags().String(),
276-
TraceState: sc.TraceState().String(),
277-
Remote: sc.IsRemote(),
278-
}
279-
}
280-
281-
func contextWithLocalSpanContext(ctx context.Context, sc spanContext) context.Context {
282-
traceID, err := trace.TraceIDFromHex(sc.TraceID)
283-
if err != nil {
284-
return ctx
285-
}
286-
spanID, err := trace.SpanIDFromHex(sc.SpanID)
287-
if err != nil {
288-
return ctx
289-
}
290-
traceFlags, err := traceFlagsFromHex(sc.TraceFlags)
291-
if err != nil {
292-
return ctx
293-
}
294-
traceState, err := trace.ParseTraceState(sc.TraceState)
295-
if err != nil {
296-
return ctx
297-
}
298-
299-
return trace.ContextWithSpanContext(ctx, trace.NewSpanContext(trace.SpanContextConfig{
300-
TraceID: traceID,
301-
SpanID: spanID,
302-
TraceFlags: *traceFlags,
303-
TraceState: traceState,
304-
Remote: sc.Remote,
305-
}))
306-
}
307-
308-
// requestContext wraps trace.SpanContext to allow for unmarshaling as well as
309-
// future metadata key/value pairs to be added.
310-
type requestContext struct {
311-
SpanContext spanContext
312-
}
313-
314-
// reverse of code in trace library https://github.com/open-telemetry/opentelemetry-go/blob/v1.35.0/trace/trace.go#L143-L168
315-
func traceFlagsFromHex(hexStr string) (*trace.TraceFlags, error) {
316-
decoded, err := hex.DecodeString(hexStr)
317-
if err != nil {
318-
return nil, err
319-
}
320-
if len(decoded) != 1 {
321-
return nil, errors.New(errInvalidTraceFlagsLength)
322-
}
323-
traceFlags := trace.TraceFlags(decoded[0])
324-
return &traceFlags, nil
325-
}
326-
327-
func getAndMarshalSpanContext(ctx context.Context) ([]byte, error) {
328-
if !persistRequestContextFeatureGate.IsEnabled() {
329-
return nil, nil
330-
}
331-
rc := localSpanContextFromTraceSpanContext(trace.SpanContextFromContext(ctx))
332-
return json.Marshal(requestContext{SpanContext: rc})
333-
}
334-
335248
// putInternal is the internal version that requires caller to hold the mutex lock.
336249
func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
337250
reqSize := pq.set.sizer.Sizeof(req)
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
5+
6+
import (
7+
"context"
8+
"encoding/hex"
9+
"encoding/json"
10+
"errors"
11+
12+
"go.opentelemetry.io/otel/trace"
13+
14+
"go.opentelemetry.io/collector/featuregate"
15+
)
16+
17+
const (
18+
errInvalidTraceFlagsLength = "trace flags must only be 1 byte"
19+
)
20+
21+
// persistRequestContextFeatureGate controls whether request context should be persisted in the queue.
22+
var persistRequestContextFeatureGate = featuregate.GlobalRegistry().MustRegister(
23+
"exporter.PersistRequestContext",
24+
featuregate.StageAlpha,
25+
featuregate.WithRegisterFromVersion("v0.127.0"),
26+
featuregate.WithRegisterDescription("controls whether context should be stored alongside requests in the persistent queue"),
27+
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector/pull/12934"),
28+
)
29+
30+
// requestContext wraps trace.SpanContext to allow for unmarshaling as well as
31+
// future metadata key/value pairs to be added.
32+
type requestContext struct {
33+
SpanContext spanContext
34+
}
35+
36+
// necessary due to SpanContext and SpanContextConfig not supporting Unmarshal interface,
37+
// see https://github.com/open-telemetry/opentelemetry-go/issues/1819.
38+
type spanContext struct {
39+
TraceID string
40+
SpanID string
41+
TraceFlags string
42+
TraceState string
43+
Remote bool
44+
}
45+
46+
func localSpanContextFromTraceSpanContext(sc trace.SpanContext) spanContext {
47+
return spanContext{
48+
TraceID: sc.TraceID().String(),
49+
SpanID: sc.SpanID().String(),
50+
TraceFlags: sc.TraceFlags().String(),
51+
TraceState: sc.TraceState().String(),
52+
Remote: sc.IsRemote(),
53+
}
54+
}
55+
56+
func contextWithLocalSpanContext(ctx context.Context, sc spanContext) context.Context {
57+
traceID, err := trace.TraceIDFromHex(sc.TraceID)
58+
if err != nil {
59+
return ctx
60+
}
61+
spanID, err := trace.SpanIDFromHex(sc.SpanID)
62+
if err != nil {
63+
return ctx
64+
}
65+
traceFlags, err := traceFlagsFromHex(sc.TraceFlags)
66+
if err != nil {
67+
return ctx
68+
}
69+
traceState, err := trace.ParseTraceState(sc.TraceState)
70+
if err != nil {
71+
return ctx
72+
}
73+
74+
return trace.ContextWithSpanContext(ctx, trace.NewSpanContext(trace.SpanContextConfig{
75+
TraceID: traceID,
76+
SpanID: spanID,
77+
TraceFlags: *traceFlags,
78+
TraceState: traceState,
79+
Remote: sc.Remote,
80+
}))
81+
}
82+
83+
// reverse of code in trace library https://github.com/open-telemetry/opentelemetry-go/blob/v1.35.0/trace/trace.go#L143-L168
84+
func traceFlagsFromHex(hexStr string) (*trace.TraceFlags, error) {
85+
decoded, err := hex.DecodeString(hexStr)
86+
if err != nil {
87+
return nil, err
88+
}
89+
if len(decoded) != 1 {
90+
return nil, errors.New(errInvalidTraceFlagsLength)
91+
}
92+
traceFlags := trace.TraceFlags(decoded[0])
93+
return &traceFlags, nil
94+
}
95+
96+
func getAndMarshalSpanContext(ctx context.Context) ([]byte, error) {
97+
if !persistRequestContextFeatureGate.IsEnabled() {
98+
return nil, nil
99+
}
100+
rc := localSpanContextFromTraceSpanContext(trace.SpanContextFromContext(ctx))
101+
return json.Marshal(requestContext{SpanContext: rc})
102+
}

0 commit comments

Comments
 (0)