|
| 1 | +// Copyright The OpenTelemetry Authors |
| 2 | +// SPDX-License-Identifier: Apache-2.0 |
| 3 | + |
| 4 | +package queuebatch |
| 5 | + |
| 6 | +import ( |
| 7 | + "context" |
| 8 | + "encoding/json" |
| 9 | + "testing" |
| 10 | + |
| 11 | + "go.opentelemetry.io/otel/trace" |
| 12 | + |
| 13 | + "go.opentelemetry.io/collector/component" |
| 14 | + "go.opentelemetry.io/collector/component/componenttest" |
| 15 | + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" |
| 16 | + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/storagetest" |
| 17 | + "go.opentelemetry.io/collector/exporter/exportertest" |
| 18 | + "go.opentelemetry.io/collector/pipeline" |
| 19 | +) |
| 20 | + |
| 21 | +type requestTypeKeyType struct{} |
| 22 | + |
| 23 | +var requestTypeKey = requestTypeKeyType{} |
| 24 | + |
| 25 | +const ( |
| 26 | + originalRequestValue = "original_request" |
| 27 | + contextRequestValue = "context_request" |
| 28 | + contextAndKeyValueRequestValue = "context_and_key_value_request" |
| 29 | +) |
| 30 | + |
| 31 | +type largeRequest struct { |
| 32 | + ID int |
| 33 | + Meta MetaInfo |
| 34 | + Body BodyLevel1 |
| 35 | +} |
| 36 | + |
| 37 | +type MetaInfo struct { |
| 38 | + Source string |
| 39 | + Tags []string |
| 40 | +} |
| 41 | + |
| 42 | +type BodyLevel1 struct { |
| 43 | + Level2 BodyLevel2 |
| 44 | + Extra string |
| 45 | +} |
| 46 | + |
| 47 | +type BodyLevel2 struct { |
| 48 | + Level3 BodyLevel3 |
| 49 | + Values []int |
| 50 | +} |
| 51 | + |
| 52 | +type BodyLevel3 struct { |
| 53 | + Payload []byte |
| 54 | + Note string |
| 55 | +} |
| 56 | + |
| 57 | +type largeRequestEncoding struct{} |
| 58 | + |
| 59 | +func (largeRequestEncoding) Marshal(val largeRequest) ([]byte, error) { |
| 60 | + return json.Marshal(val) |
| 61 | +} |
| 62 | + |
| 63 | +func (largeRequestEncoding) Unmarshal(buf []byte) (largeRequest, error) { |
| 64 | + var req largeRequest |
| 65 | + if err := json.Unmarshal(buf, &req); err != nil { |
| 66 | + return largeRequest{}, err |
| 67 | + } |
| 68 | + return req, nil |
| 69 | +} |
| 70 | + |
| 71 | +func BenchmarkPersistentQueue_LargeRequests(b *testing.B) { |
| 72 | + const ( |
| 73 | + numRequests = 3000 |
| 74 | + dataSize = 20 * 1024 // 20 KB |
| 75 | + ) |
| 76 | + // Prepare large requests |
| 77 | + requests := make([]largeRequest, numRequests) |
| 78 | + for i := range requests { |
| 79 | + requests[i] = largeRequest{ |
| 80 | + ID: i, |
| 81 | + Meta: MetaInfo{ |
| 82 | + Source: "benchmark", |
| 83 | + Tags: []string{"tag1", "tag2", "tag3"}, |
| 84 | + }, |
| 85 | + Body: BodyLevel1{ |
| 86 | + Level2: BodyLevel2{ |
| 87 | + Level3: BodyLevel3{ |
| 88 | + Payload: make([]byte, dataSize), |
| 89 | + Note: "deep payload", |
| 90 | + }, |
| 91 | + Values: []int{i, i + 1, i + 2}, |
| 92 | + }, |
| 93 | + Extra: "extra info", |
| 94 | + }, |
| 95 | + } |
| 96 | + for j := range requests[i].Body.Level2.Level3.Payload { |
| 97 | + requests[i].Body.Level2.Level3.Payload[j] = byte((i + j) % 256) |
| 98 | + } |
| 99 | + } |
| 100 | + |
| 101 | + // Use a persistent queue with large capacity |
| 102 | + pq := newPersistentQueue[largeRequest](persistentQueueSettings[largeRequest]{ |
| 103 | + sizer: request.RequestsSizer[largeRequest]{}, |
| 104 | + capacity: numRequests, |
| 105 | + signal: pipeline.SignalTraces, |
| 106 | + storageID: component.ID{}, |
| 107 | + encoding: largeRequestEncoding{}, |
| 108 | + id: component.NewID(exportertest.NopType), |
| 109 | + telemetry: componenttest.NewNopTelemetrySettings(), |
| 110 | + }).(*persistentQueue[largeRequest]) |
| 111 | + |
| 112 | + ext := storagetest.NewMockStorageExtension(nil) |
| 113 | + client, err := ext.GetClient(context.Background(), component.KindExporter, pq.set.id, pq.set.signal.String()) |
| 114 | + if err != nil { |
| 115 | + b.Fatalf("failed to get storage client: %v", err) |
| 116 | + } |
| 117 | + |
| 118 | + contextValues := []string{originalRequestValue, contextRequestValue} // , contextAndKeyValueRequestValue} |
| 119 | + for _, value := range contextValues { |
| 120 | + contextType := context.WithValue(context.Background(), requestTypeKey, value) |
| 121 | + sc := trace.NewSpanContext(trace.SpanContextConfig{ |
| 122 | + TraceID: trace.TraceID{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10}, |
| 123 | + SpanID: trace.SpanID{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}, |
| 124 | + TraceFlags: trace.TraceFlags(0x1), |
| 125 | + TraceState: trace.TraceState{}, |
| 126 | + Remote: false, |
| 127 | + }) |
| 128 | + sharedContext := trace.ContextWithSpanContext(contextType, sc) |
| 129 | + pq.initClient(contextType, client) |
| 130 | + |
| 131 | + b.ResetTimer() |
| 132 | + b.ReportAllocs() |
| 133 | + |
| 134 | + // Offer all requests |
| 135 | + for i := 0; i < numRequests; i++ { |
| 136 | + err := pq.Offer(sharedContext, requests[i]) |
| 137 | + if err != nil { |
| 138 | + b.Fatalf("Offer failed at %d: %v", i, err) |
| 139 | + } |
| 140 | + } |
| 141 | + |
| 142 | + // Read and OnDone all requests |
| 143 | + for i := 0; i < numRequests; i++ { |
| 144 | + _, req, done, ok := pq.Read(sharedContext) |
| 145 | + if !ok { |
| 146 | + b.Fatalf("Read failed at %d", i) |
| 147 | + } |
| 148 | + if req.ID != i { |
| 149 | + b.Fatalf("Request ID mismatch at %d: got %d", i, req.ID) |
| 150 | + } |
| 151 | + done.OnDone(nil) |
| 152 | + } |
| 153 | + |
| 154 | + if pq.Size() != 0 { |
| 155 | + b.Fatalf("Queue not empty after all operations: size=%d", pq.Size()) |
| 156 | + } |
| 157 | + } |
| 158 | +} |
0 commit comments