Skip to content

Commit 076db15

Browse files
committed
doesnt pass persistentqueue test yet
1 parent f967587 commit 076db15

4 files changed

Lines changed: 99 additions & 116 deletions

File tree

exporter/exporterhelper/traces.go

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"go.opentelemetry.io/collector/consumer/consumererror"
1818
"go.opentelemetry.io/collector/exporter"
1919
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
20-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
2120
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2221
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
2322
"go.opentelemetry.io/collector/pdata/ptrace"
@@ -48,24 +47,24 @@ func NewTracesQueueBatchSettings() QueueBatchSettings {
4847
}
4948

5049
type tracesRequest struct {
51-
td ptrace.Traces
52-
links []trace.Link
53-
cachedSize int
50+
td ptrace.Traces
51+
spancontexts []trace.SpanContext
52+
cachedSize int
5453
}
5554

56-
func newTracesRequest(td ptrace.Traces, links []trace.Link) Request {
55+
func newTracesRequest(td ptrace.Traces, spancontexts []trace.SpanContext) Request {
5756
return &tracesRequest{
58-
td: td,
59-
links: links,
60-
cachedSize: -1,
57+
td: td,
58+
spancontexts: spancontexts,
59+
cachedSize: -1,
6160
}
6261
}
6362

6463
type tracesEncoding struct{}
6564

6665
type tracesWithSpanContexts struct {
6766
Traces []byte `json:"traces"`
68-
SpanContext []trace.SpanContext `json:"span_context"`
67+
SpanContext []trace.SpanContext `json:"span_contexts"`
6968
}
7069

7170
// Helper for JSON unmarshaling of SpanContextConfig
@@ -112,26 +111,26 @@ func unmarshalSpanContextConfig(data []byte) (trace.SpanContextConfig, error) {
112111
}
113112

114113
func (tracesEncoding) Unmarshal(bytes []byte) (Request, error) {
115-
var twl struct {
116-
Traces []byte `json:"traces"`
117-
SpanContext []json.RawMessage `json:"span_context"`
114+
var tracesWithSpanContextsRaw struct {
115+
Traces []byte `json:"traces"`
116+
SpanContextsRaw []json.RawMessage `json:"span_contexts"`
118117
}
119-
if err := json.Unmarshal(bytes, &twl); err != nil {
118+
if err := json.Unmarshal(bytes, &tracesWithSpanContextsRaw); err != nil {
120119
return nil, err
121120
}
122-
traces, err := tracesUnmarshaler.UnmarshalTraces(twl.Traces)
121+
traces, err := tracesUnmarshaler.UnmarshalTraces(tracesWithSpanContextsRaw.Traces)
123122
if err != nil {
124123
return nil, err
125124
}
126-
links := make([]trace.Link, len(twl.SpanContext))
127-
for i, raw := range twl.SpanContext {
125+
spancontexts := make([]trace.SpanContext, len(tracesWithSpanContextsRaw.SpanContextsRaw))
126+
for i, raw := range tracesWithSpanContextsRaw.SpanContextsRaw {
128127
cfg, err := unmarshalSpanContextConfig(raw)
129128
if err != nil {
130129
return nil, err
131130
}
132-
links[i] = trace.Link{SpanContext: trace.NewSpanContext(cfg)}
131+
spancontexts[i] = trace.NewSpanContext(cfg)
133132
}
134-
return newTracesRequest(traces, links), nil
133+
return newTracesRequest(traces, spancontexts), nil
135134
}
136135

137136
func (tracesEncoding) Marshal(req Request) ([]byte, error) {
@@ -140,21 +139,17 @@ func (tracesEncoding) Marshal(req Request) ([]byte, error) {
140139
if err != nil {
141140
return nil, err
142141
}
143-
spanContexts := make([]trace.SpanContext, len(tr.links))
144-
for i, l := range tr.links {
145-
spanContexts[i] = l.SpanContext
146-
}
147142
twl := tracesWithSpanContexts{
148143
Traces: tracesBytes,
149-
SpanContext: spanContexts,
144+
SpanContext: tr.spancontexts,
150145
}
151146
return json.Marshal(twl)
152147
}
153148

154149
func (req *tracesRequest) OnError(err error) Request {
155150
var traceError consumererror.Traces
156151
if errors.As(err, &traceError) {
157-
return newTracesRequest(traceError.Data(), req.links)
152+
return newTracesRequest(traceError.Data(), req.spancontexts)
158153
}
159154
return req
160155
}
@@ -163,9 +158,9 @@ func (req *tracesRequest) ItemsCount() int {
163158
return req.td.SpanCount()
164159
}
165160

166-
// Links returns the trace links associated with this request.
167-
func (req *tracesRequest) Links() []trace.Link {
168-
return req.links
161+
// SpanContexts returns the SpanContexts associated with this request.
162+
func (req *tracesRequest) SpanContexts() []trace.SpanContext {
163+
return req.spancontexts
169164
}
170165

171166
func (req *tracesRequest) size(sizer sizer.TracesSizer) int {
@@ -212,8 +207,11 @@ func requestConsumeFromTraces(pusher consumer.ConsumeTracesFunc) RequestConsumeF
212207
// requestFromTraces returns a RequestConverterFunc that converts ptrace.Traces into a Request.
213208
func requestFromTraces() RequestConverterFunc[ptrace.Traces] {
214209
return func(ctx context.Context, traces ptrace.Traces) (Request, error) {
215-
links := queuebatch.LinksFromContext(ctx)
216-
return newTracesRequest(traces, links), nil
210+
spancontext := trace.SpanContextFromContext(ctx)
211+
// if !spancontext.IsValid() {
212+
// spancontext = trace.
213+
// }
214+
return newTracesRequest(traces, []trace.SpanContext{spancontext}), nil
217215
}
218216
}
219217

exporter/exporterhelper/traces_batch.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,16 @@ func (req *tracesRequest) mergeTo(dst *tracesRequest, sz sizer.TracesSizer) {
4545
req.setCachedSize(0)
4646
}
4747
req.td.ResourceSpans().MoveAndAppendTo(dst.td.ResourceSpans())
48-
// Merge the links from both requests
49-
dst.links = append(dst.links, req.links...)
48+
// Merge the spancontexts from both requests
49+
dst.spancontexts = append(dst.spancontexts, req.spancontexts...)
5050
}
5151

5252
func (req *tracesRequest) split(maxSize int, sz sizer.TracesSizer) []Request {
5353
var res []Request
5454
for req.size(sz) > maxSize {
5555
td, rmSize := extractTraces(req.td, maxSize, sz)
5656
req.setCachedSize(req.size(sz) - rmSize)
57-
res = append(res, newTracesRequest(td, req.Links()))
57+
res = append(res, newTracesRequest(td, req.SpanContexts()))
5858
}
5959
res = append(res, req)
6060
return res

exporter/exporterhelper/traces_batch_test.go

Lines changed: 43 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99

1010
"github.com/stretchr/testify/assert"
1111
"github.com/stretchr/testify/require"
12-
"go.opentelemetry.io/otel/attribute"
1312
"go.opentelemetry.io/otel/trace"
1413

1514
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
@@ -338,28 +337,18 @@ func BenchmarkSplittingBasedOnItemCountHugeTraces(b *testing.B) {
338337
}
339338
}
340339

341-
func TestMergeSplitTracesWithLinks(t *testing.T) {
342-
// Create test links
343-
link1 := trace.Link{
344-
SpanContext: trace.NewSpanContext(trace.SpanContextConfig{
345-
TraceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
346-
SpanID: [8]byte{1, 2, 3, 4, 5, 6, 7, 8},
347-
TraceFlags: trace.FlagsSampled,
348-
}),
349-
Attributes: []attribute.KeyValue{
350-
attribute.String("key1", "value1"),
351-
},
352-
}
353-
link2 := trace.Link{
354-
SpanContext: trace.NewSpanContext(trace.SpanContextConfig{
355-
TraceID: [16]byte{16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1},
356-
SpanID: [8]byte{8, 7, 6, 5, 4, 3, 2, 1},
357-
TraceFlags: trace.FlagsSampled,
358-
}),
359-
Attributes: []attribute.KeyValue{
360-
attribute.String("key2", "value2"),
361-
},
362-
}
340+
func TestMergeSplitTracesWithSpanContexts(t *testing.T) {
341+
// Create test spancontexts
342+
sc1 := trace.NewSpanContext(trace.SpanContextConfig{
343+
TraceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
344+
SpanID: [8]byte{1, 2, 3, 4, 5, 6, 7, 8},
345+
TraceFlags: trace.FlagsSampled,
346+
})
347+
sc2 := trace.NewSpanContext(trace.SpanContextConfig{
348+
TraceID: [16]byte{16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1},
349+
SpanID: [8]byte{8, 7, 6, 5, 4, 3, 2, 1},
350+
TraceFlags: trace.FlagsSampled,
351+
})
363352

364353
tests := []struct {
365354
name string
@@ -370,38 +359,38 @@ func TestMergeSplitTracesWithLinks(t *testing.T) {
370359
expected []Request
371360
}{
372361
{
373-
name: "merge_with_links",
362+
name: "merge_with_span_contexts",
374363
szt: RequestSizerTypeItems,
375364
maxSize: 10,
376-
tr1: newTracesRequest(testdata.GenerateTraces(2), []trace.Link{link1}),
377-
tr2: newTracesRequest(testdata.GenerateTraces(3), []trace.Link{link2}),
365+
tr1: newTracesRequest(testdata.GenerateTraces(2), []trace.SpanContext{sc1}),
366+
tr2: newTracesRequest(testdata.GenerateTraces(3), []trace.SpanContext{sc2}),
378367
expected: []Request{newTracesRequest(func() ptrace.Traces {
379368
td := testdata.GenerateTraces(2)
380369
testdata.GenerateTraces(3).ResourceSpans().MoveAndAppendTo(td.ResourceSpans())
381370
return td
382-
}(), []trace.Link{link1, link2})},
371+
}(), []trace.SpanContext{sc1, sc2})},
383372
},
384373
{
385-
name: "split_with_links",
374+
name: "split_with_span_contexts",
386375
szt: RequestSizerTypeItems,
387376
maxSize: 2,
388-
tr1: newTracesRequest(testdata.GenerateTraces(5), []trace.Link{link1, link2}),
377+
tr1: newTracesRequest(testdata.GenerateTraces(5), []trace.SpanContext{sc1, sc2}),
389378
tr2: nil,
390379
expected: []Request{
391-
newTracesRequest(testdata.GenerateTraces(2), []trace.Link{link1, link2}),
392-
newTracesRequest(testdata.GenerateTraces(2), []trace.Link{link1, link2}),
393-
newTracesRequest(testdata.GenerateTraces(1), []trace.Link{link1, link2}),
380+
newTracesRequest(testdata.GenerateTraces(2), []trace.SpanContext{sc1, sc2}),
381+
newTracesRequest(testdata.GenerateTraces(2), []trace.SpanContext{sc1, sc2}),
382+
newTracesRequest(testdata.GenerateTraces(1), []trace.SpanContext{sc1, sc2}),
394383
},
395384
},
396385
{
397-
name: "merge_and_split_with_links",
386+
name: "merge_and_split_with_span_contexts",
398387
szt: RequestSizerTypeItems,
399388
maxSize: 3,
400-
tr1: newTracesRequest(testdata.GenerateTraces(2), []trace.Link{link1}),
401-
tr2: newTracesRequest(testdata.GenerateTraces(4), []trace.Link{link2}),
389+
tr1: newTracesRequest(testdata.GenerateTraces(2), []trace.SpanContext{sc1}),
390+
tr2: newTracesRequest(testdata.GenerateTraces(4), []trace.SpanContext{sc2}),
402391
expected: []Request{
403-
newTracesRequest(testdata.GenerateTraces(3), []trace.Link{link1, link2}),
404-
newTracesRequest(testdata.GenerateTraces(3), []trace.Link{link1, link2}),
392+
newTracesRequest(testdata.GenerateTraces(3), []trace.SpanContext{sc1, sc2}),
393+
newTracesRequest(testdata.GenerateTraces(3), []trace.SpanContext{sc1, sc2}),
405394
},
406395
},
407396
}
@@ -414,27 +403,25 @@ func TestMergeSplitTracesWithLinks(t *testing.T) {
414403
for i := range res {
415404
tracesReq := res[i].(*tracesRequest)
416405
expectedReq := tt.expected[i].(*tracesRequest)
417-
assert.Len(t, tracesReq.links, len(expectedReq.links))
418-
assert.Equal(t, expectedReq.links, tracesReq.links)
406+
assert.Len(t, tracesReq.spancontexts, len(expectedReq.spancontexts))
407+
assert.Equal(t, expectedReq.spancontexts, tracesReq.spancontexts)
419408
}
420409
})
421410
}
422411
}
423412

424-
func TestTracesRequestLinksMarshaling(t *testing.T) {
413+
func TestTracesRequestSpanContextsMarshaling(t *testing.T) {
425414
traceState, err := trace.TraceState{}.Insert("key1", "value1")
426415
require.NoError(t, err)
427416

428-
link := trace.Link{
429-
SpanContext: trace.NewSpanContext(trace.SpanContextConfig{
430-
TraceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
431-
SpanID: [8]byte{1, 2, 3, 4, 5, 6, 7, 8},
432-
TraceFlags: trace.FlagsSampled,
433-
TraceState: traceState,
434-
}),
435-
}
417+
sc := trace.NewSpanContext(trace.SpanContextConfig{
418+
TraceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
419+
SpanID: [8]byte{1, 2, 3, 4, 5, 6, 7, 8},
420+
TraceFlags: trace.FlagsSampled,
421+
TraceState: traceState,
422+
})
436423

437-
originalReq := newTracesRequest(testdata.GenerateTraces(2), []trace.Link{link})
424+
originalReq := newTracesRequest(testdata.GenerateTraces(2), []trace.SpanContext{sc})
438425

439426
// Marshal the request
440427
encoding := tracesEncoding{}
@@ -447,14 +434,14 @@ func TestTracesRequestLinksMarshaling(t *testing.T) {
447434

448435
// Verify the unmarshaled request
449436
tracesReq := newReq.(*tracesRequest)
450-
require.Len(t, tracesReq.links, 1)
437+
require.Len(t, tracesReq.spancontexts, 1)
451438

452-
// Verify link properties
453-
unmarshaledLink := tracesReq.links[0]
454-
assert.Equal(t, link.SpanContext.TraceID(), unmarshaledLink.SpanContext.TraceID())
455-
assert.Equal(t, link.SpanContext.SpanID(), unmarshaledLink.SpanContext.SpanID())
456-
assert.Equal(t, link.SpanContext.TraceFlags(), unmarshaledLink.SpanContext.TraceFlags())
457-
assert.Equal(t, link.SpanContext.TraceState(), unmarshaledLink.SpanContext.TraceState())
439+
// Verify spancontext properties
440+
unmarshaledSpanContext := tracesReq.spancontexts[0]
441+
assert.Equal(t, sc.TraceID(), unmarshaledSpanContext.TraceID())
442+
assert.Equal(t, sc.SpanID(), unmarshaledSpanContext.SpanID())
443+
assert.Equal(t, sc.TraceFlags(), unmarshaledSpanContext.TraceFlags())
444+
assert.Equal(t, sc.TraceState(), unmarshaledSpanContext.TraceState())
458445

459446
// Verify trace data
460447
assert.Equal(t, originalReq.(*tracesRequest).td, tracesReq.td)

0 commit comments

Comments
 (0)