Skip to content

Commit c7a92ab

Browse files
committed
OTEL-2540 add marshalRequestWithSpanContext and unmarshalRequestWithSpanContext
1 parent a260a05 commit c7a92ab

2 files changed

Lines changed: 53 additions & 35 deletions

File tree

exporter/exporterhelper/internal/queuebatch/persistent_queue.go

Lines changed: 52 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
245245

246246
type marshaledRequestWithSpanContext struct {
247247
RequestBytes []byte `json:"request"`
248-
SpanContextJSON json.RawMessage `json:"span_context"`
248+
SpanContextJSON json.RawMessage `json:"span_context,omitempty"`
249249
}
250250

251251
type spanContextConfigWrapper struct {
@@ -293,6 +293,53 @@ func SpanContextFromWrapper(wrapper spanContextConfigWrapper) (*trace.SpanContex
293293
return &sc, nil
294294
}
295295

296+
// unmarshalRequestWithSpanContext unmarshals a marshaledRequestWithSpanContext from bytes, returning the request
297+
// and a context with the restored SpanContext (if present).
298+
func unmarshalRequestWithSpanContext[T any](encoding Encoding[T], value []byte) (T, context.Context, error) {
299+
var req T
300+
restoredContext := context.Background()
301+
var envelope marshaledRequestWithSpanContext
302+
if err := json.Unmarshal(value, &envelope); err != nil {
303+
return req, restoredContext, err
304+
}
305+
request, err := encoding.Unmarshal(envelope.RequestBytes)
306+
if err != nil {
307+
return req, restoredContext, err
308+
}
309+
if len(envelope.SpanContextJSON) > 0 {
310+
var wrapper spanContextConfigWrapper
311+
if err := json.Unmarshal(envelope.SpanContextJSON, &wrapper); err == nil {
312+
if sc, err := SpanContextFromWrapper(wrapper); err == nil && sc != nil {
313+
restoredContext = trace.ContextWithSpanContext(restoredContext, *sc)
314+
}
315+
}
316+
}
317+
return request, restoredContext, nil
318+
}
319+
320+
// marshalRequestWithSpanContext marshals the request and the SpanContext from ctx into a marshaledRequestWithSpanContext envelope as bytes.
321+
func marshalRequestWithSpanContext[T any](ctx context.Context, encoding Encoding[T], req T) ([]byte, error) {
322+
reqBuf, err := encoding.Marshal(req)
323+
if err != nil {
324+
return nil, err
325+
}
326+
sc := trace.SpanContextFromContext(ctx)
327+
var scJSON []byte
328+
if sc.IsValid() {
329+
scJSON, err = json.Marshal(sc)
330+
if err != nil {
331+
return nil, err
332+
}
333+
} else {
334+
scJSON = nil // Will be omitted due to omitempty
335+
}
336+
envelope := marshaledRequestWithSpanContext{
337+
RequestBytes: reqBuf,
338+
SpanContextJSON: scJSON,
339+
}
340+
return json.Marshal(envelope)
341+
}
342+
296343
// putInternal is the internal version that requires caller to hold the mutex lock.
297344
func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
298345
reqSize := pq.set.sizer.Sizeof(req)
@@ -305,28 +352,14 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
305352
}
306353
}
307354

308-
reqBuf, err := pq.set.encoding.Marshal(req)
309-
if err != nil {
310-
return err
311-
}
312-
// Retrieve SpanContext object from provided context, and store alongside the request
313-
sc := trace.SpanContextFromContext(ctx)
314-
scJSON, err := json.Marshal(sc)
315-
if err != nil {
316-
return err
317-
}
318-
envelope := marshaledRequestWithSpanContext{
319-
RequestBytes: reqBuf,
320-
SpanContextJSON: scJSON,
321-
}
322-
envelopeBytes, err := json.Marshal(envelope)
355+
reqBuf, err := marshalRequestWithSpanContext(ctx, pq.set.encoding, req)
323356
if err != nil {
324357
return err
325358
}
326359
// Carry out a transaction where we both add the item and update the write index
327360
ops := []*storage.Operation{
328361
storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.writeIndex+1)),
329-
storage.SetOperation(getItemKey(pq.writeIndex), envelopeBytes),
362+
storage.SetOperation(getItemKey(pq.writeIndex), reqBuf),
330363
}
331364
if err = pq.client.Batch(ctx, ops...); err != nil {
332365
return err
@@ -395,22 +428,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool,
395428
var request T
396429
restoredContext := context.Background()
397430
if err == nil {
398-
var envelope marshaledRequestWithSpanContext
399-
if err = json.Unmarshal(getOp.Value, &envelope); err == nil {
400-
// Unmarshal the request using the specified encoding
401-
if request, err = pq.set.encoding.Unmarshal(envelope.RequestBytes); err == nil {
402-
// Unmarshal the SpanContext from JSON
403-
var wrapper spanContextConfigWrapper
404-
if len(envelope.SpanContextJSON) > 0 {
405-
if err = json.Unmarshal(envelope.SpanContextJSON, &wrapper); err == nil {
406-
var sc *trace.SpanContext
407-
if sc, err = SpanContextFromWrapper(wrapper); err == nil && sc != nil {
408-
restoredContext = trace.ContextWithSpanContext(restoredContext, *sc)
409-
}
410-
}
411-
}
412-
}
413-
}
431+
request, restoredContext, err = unmarshalRequestWithSpanContext(pq.set.encoding, getOp.Value)
414432
}
415433

416434
if err != nil {
@@ -519,7 +537,7 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
519537
pq.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet))
520538
continue
521539
}
522-
req, err := pq.set.encoding.Unmarshal(op.Value)
540+
req, _, err := unmarshalRequestWithSpanContext(pq.set.encoding, op.Value)
523541
// If error happened or item is nil, it will be efficiently ignored
524542
if err != nil {
525543
pq.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err))

exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -914,7 +914,7 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) {
914914
}
915915

916916
func TestPersistentQueue_StorageFull(t *testing.T) {
917-
marshaled, err := uint64Encoding{}.Marshal(uint64(50))
917+
marshaled, err := marshalRequestWithSpanContext(context.Background(), uint64Encoding{}, uint64(50))
918918
require.NoError(t, err)
919919
maxSizeInBytes := len(marshaled) * 5 // arbitrary small number
920920

0 commit comments

Comments
 (0)