Skip to content

Commit 01997b2

Browse files
committed
[exporterhelper] persist span links through tracerequests
1 parent 42a3ae0 commit 01997b2

7 files changed

Lines changed: 380 additions & 80 deletions

File tree

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Persist span context in traces request, including through persistent queue.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [11740, 12212]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Span links are now able to be serialized/deserialized when using the persistent queue.
20+
This allows internal spans to be linked even when using persistent queue.
21+
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api]

exporter/exporterhelper/internal/queuebatch/batch_context.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ import (
1111

1212
type traceContextKeyType int
1313

14-
const batchSpanLinksKey traceContextKeyType = iota
14+
const BatchSpanLinksKey traceContextKeyType = iota
1515

1616
// LinksFromContext returns a list of trace links registered in the context.
1717
func LinksFromContext(ctx context.Context) []trace.Link {
1818
if ctx == nil {
1919
return []trace.Link{}
2020
}
21-
if links, ok := ctx.Value(batchSpanLinksKey).([]trace.Link); ok {
21+
if links, ok := ctx.Value(BatchSpanLinksKey).([]trace.Link); ok {
2222
return links
2323
}
2424
return []trace.Link{}
@@ -34,7 +34,7 @@ func parentsFromContext(ctx context.Context) []trace.Link {
3434
func contextWithMergedLinks(ctx1 context.Context, ctx2 context.Context) context.Context {
3535
return context.WithValue(
3636
context.Background(),
37-
batchSpanLinksKey,
37+
BatchSpanLinksKey,
3838
append(parentsFromContext(ctx1), parentsFromContext(ctx2)...),
3939
)
4040
}

exporter/exporterhelper/logs_batch_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ func TestMergeSplitLogsBasedOnByteSize(t *testing.T) {
234234

235235
func TestMergeSplitLogsInputNotModifiedIfErrorReturned(t *testing.T) {
236236
r1 := newLogsRequest(testdata.GenerateLogs(18))
237-
r2 := newTracesRequest(testdata.GenerateTraces(3))
237+
r2 := newTracesRequest(testdata.GenerateTraces(3), nil)
238238
_, err := r1.MergeSplit(context.Background(), 10, RequestSizerTypeItems, r2)
239239
require.Error(t, err)
240240
assert.Equal(t, 18, r1.ItemsCount())

exporter/exporterhelper/traces.go

Lines changed: 126 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,19 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte
55

66
import (
77
"context"
8+
"encoding/json"
89
"errors"
910

1011
"go.uber.org/zap"
1112

13+
"go.opentelemetry.io/otel/trace"
14+
1215
"go.opentelemetry.io/collector/component"
1316
"go.opentelemetry.io/collector/consumer"
1417
"go.opentelemetry.io/collector/consumer/consumererror"
1518
"go.opentelemetry.io/collector/exporter"
1619
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
20+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1721
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1822
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1923
"go.opentelemetry.io/collector/pdata/ptrace"
@@ -43,36 +47,145 @@ func NewTracesQueueBatchSettings() QueueBatchSettings {
4347
}
4448
}
4549

50+
// TraceStateSerializable represents a serializable version of TraceState
51+
type TraceStateSerializable struct {
52+
Members []struct {
53+
Key string `json:"key"`
54+
Value string `json:"value"`
55+
} `json:"members"`
56+
}
57+
58+
// traceStateToSerializable converts a TraceState to a serializable format
59+
func traceStateToSerializable(ts trace.TraceState) TraceStateSerializable {
60+
result := TraceStateSerializable{
61+
Members: make([]struct {
62+
Key string `json:"key"`
63+
Value string `json:"value"`
64+
}, 0, ts.Len()),
65+
}
66+
67+
ts.Walk(func(key, value string) bool {
68+
result.Members = append(result.Members, struct {
69+
Key string `json:"key"`
70+
Value string `json:"value"`
71+
}{
72+
Key: key,
73+
Value: value,
74+
})
75+
return true
76+
})
77+
78+
return result
79+
}
80+
81+
// serializableToTraceState converts a serializable format back to TraceState
82+
func serializableToTraceState(ts TraceStateSerializable) (trace.TraceState, error) {
83+
var result trace.TraceState
84+
var err error
85+
86+
for _, m := range ts.Members {
87+
result, err = result.Insert(m.Key, m.Value)
88+
if err != nil {
89+
return trace.TraceState{}, err
90+
}
91+
}
92+
93+
return result, nil
94+
}
95+
96+
// Update SerializableLink to use the new TraceState serialization
97+
type SerializableLink struct {
98+
TraceID [16]byte `json:"trace_id"`
99+
SpanID [8]byte `json:"span_id"`
100+
TraceFlags byte `json:"trace_flags"`
101+
TraceState TraceStateSerializable `json:"trace_state"`
102+
}
103+
104+
func linkToSerializable(l trace.Link) SerializableLink {
105+
return SerializableLink{
106+
TraceID: l.SpanContext.TraceID(),
107+
SpanID: l.SpanContext.SpanID(),
108+
TraceFlags: byte(l.SpanContext.TraceFlags()),
109+
TraceState: traceStateToSerializable(l.SpanContext.TraceState()),
110+
}
111+
}
112+
113+
func serializableToLink(sl SerializableLink) trace.Link {
114+
ts, err := serializableToTraceState(sl.TraceState)
115+
if err != nil {
116+
// If there's an error parsing the trace state, use an empty one
117+
ts = trace.TraceState{}
118+
}
119+
120+
sc := trace.NewSpanContext(trace.SpanContextConfig{
121+
TraceID: sl.TraceID,
122+
SpanID: sl.SpanID,
123+
TraceFlags: trace.TraceFlags(sl.TraceFlags),
124+
TraceState: ts,
125+
})
126+
return trace.Link{
127+
SpanContext: sc,
128+
}
129+
}
130+
46131
type tracesRequest struct {
47132
td ptrace.Traces
133+
links []trace.Link
48134
cachedSize int
49135
}
50136

51-
func newTracesRequest(td ptrace.Traces) Request {
137+
func newTracesRequest(td ptrace.Traces, links []trace.Link) Request {
52138
return &tracesRequest{
53139
td: td,
140+
links: links,
54141
cachedSize: -1,
55142
}
56143
}
57144

58145
type tracesEncoding struct{}
59146

147+
type tracesWithLinks struct {
148+
Traces []byte `json:"traces"`
149+
Links []SerializableLink `json:"links"`
150+
}
151+
60152
func (tracesEncoding) Unmarshal(bytes []byte) (Request, error) {
61-
traces, err := tracesUnmarshaler.UnmarshalTraces(bytes)
153+
var twl tracesWithLinks
154+
if err := json.Unmarshal(bytes, &twl); err != nil {
155+
return nil, err
156+
}
157+
traces, err := tracesUnmarshaler.UnmarshalTraces(twl.Traces)
62158
if err != nil {
63159
return nil, err
64160
}
65-
return newTracesRequest(traces), nil
161+
links := make([]trace.Link, len(twl.Links))
162+
for i, sl := range twl.Links {
163+
links[i] = serializableToLink(sl)
164+
}
165+
return newTracesRequest(traces, links), nil
66166
}
67167

68168
func (tracesEncoding) Marshal(req Request) ([]byte, error) {
69-
return tracesMarshaler.MarshalTraces(req.(*tracesRequest).td)
169+
tr := req.(*tracesRequest)
170+
tracesBytes, err := tracesMarshaler.MarshalTraces(tr.td)
171+
if err != nil {
172+
return nil, err
173+
}
174+
serializableLinks := make([]SerializableLink, len(tr.links))
175+
for i, l := range tr.links {
176+
serializableLinks[i] = linkToSerializable(l)
177+
}
178+
twl := tracesWithLinks{
179+
Traces: tracesBytes,
180+
Links: serializableLinks,
181+
}
182+
return json.Marshal(twl)
70183
}
71184

72185
func (req *tracesRequest) OnError(err error) Request {
73186
var traceError consumererror.Traces
74187
if errors.As(err, &traceError) {
75-
return newTracesRequest(traceError.Data())
188+
return newTracesRequest(traceError.Data(), req.links)
76189
}
77190
return req
78191
}
@@ -81,6 +194,11 @@ func (req *tracesRequest) ItemsCount() int {
81194
return req.td.SpanCount()
82195
}
83196

197+
// Links returns the trace links associated with this request.
198+
func (req *tracesRequest) Links() []trace.Link {
199+
return req.links
200+
}
201+
84202
func (req *tracesRequest) size(sizer sizer.TracesSizer) int {
85203
if req.cachedSize == -1 {
86204
req.cachedSize = sizer.TracesSize(req.td)
@@ -124,8 +242,9 @@ func requestConsumeFromTraces(pusher consumer.ConsumeTracesFunc) RequestConsumeF
124242

125243
// requestFromTraces returns a RequestConverterFunc that converts ptrace.Traces into a Request.
126244
func requestFromTraces() RequestConverterFunc[ptrace.Traces] {
127-
return func(_ context.Context, traces ptrace.Traces) (Request, error) {
128-
return newTracesRequest(traces), nil
245+
return func(ctx context.Context, traces ptrace.Traces) (Request, error) {
246+
links := queuebatch.LinksFromContext(ctx)
247+
return newTracesRequest(traces, links), nil
129248
}
130249
}
131250

exporter/exporterhelper/traces_batch.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,16 @@ func (req *tracesRequest) mergeTo(dst *tracesRequest, sz sizer.TracesSizer) {
4545
req.setCachedSize(0)
4646
}
4747
req.td.ResourceSpans().MoveAndAppendTo(dst.td.ResourceSpans())
48+
// Merge the links from both requests
49+
dst.links = append(dst.links, req.links...)
4850
}
4951

5052
func (req *tracesRequest) split(maxSize int, sz sizer.TracesSizer) []Request {
5153
var res []Request
5254
for req.size(sz) > maxSize {
5355
td, rmSize := extractTraces(req.td, maxSize, sz)
5456
req.setCachedSize(req.size(sz) - rmSize)
55-
res = append(res, newTracesRequest(td))
57+
res = append(res, newTracesRequest(td, req.Links()))
5658
}
5759
res = append(res, req)
5860
return res

0 commit comments

Comments
 (0)