Skip to content

Commit bf090bf

Browse files
committed
[chore] Extract queue logic into a separate internal package
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
1 parent d800ad3 commit bf090bf

17 files changed

Lines changed: 91 additions & 51 deletions

exporter/exporterhelper/internal/queuebatch/batcher.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@ import (
77
"context"
88

99
"go.opentelemetry.io/collector/component"
10+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/queue"
1011
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1112
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1213
)
1314

1415
// Batcher is in charge of reading items from the queue and send them out asynchronously.
1516
type Batcher[T any] interface {
1617
component.Component
17-
Consume(context.Context, T, Done)
18+
Consume(context.Context, T, queue.Done)
1819
}
1920

2021
type batcherSettings[T any] struct {

exporter/exporterhelper/internal/queuebatch/disabled_batcher.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88

99
"go.opentelemetry.io/collector/component"
10+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/queue"
1011
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1112
)
1213

@@ -18,7 +19,7 @@ type disabledBatcher[T any] struct {
1819
consumeFunc sender.SendFunc[T]
1920
}
2021

21-
func (db *disabledBatcher[T]) Consume(ctx context.Context, req T, done Done) {
22+
func (db *disabledBatcher[T]) Consume(ctx context.Context, req T, done queue.Done) {
2223
done.OnDone(db.consumeFunc(ctx, req))
2324
}
2425

exporter/exporterhelper/internal/queuebatch/disabled_batcher_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/stretchr/testify/require"
1414

1515
"go.opentelemetry.io/collector/component/componenttest"
16+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/queue"
1617
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
1819
)
@@ -36,12 +37,12 @@ func TestDisabledBatcher(t *testing.T) {
3637
sink := requesttest.NewSink()
3738
ba := newDisabledBatcher(sink.Export)
3839

39-
mq := newMemoryQueue[request.Request](memoryQueueSettings[request.Request]{
40-
sizer: request.RequestsSizer[request.Request]{},
41-
capacity: 1000,
42-
blockOnOverflow: true,
43-
})
44-
q := newAsyncQueue(mq, tt.maxWorkers, ba.Consume)
40+
q := queue.NewQueue[request.Request](queue.Settings[request.Request]{
41+
Sizer: request.RequestsSizer[request.Request]{},
42+
Capacity: 1000,
43+
BlockOnOverflow: true,
44+
NumConsumers: tt.maxWorkers,
45+
}, ba.Consume)
4546

4647
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
4748
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))

exporter/exporterhelper/internal/queuebatch/async_queue.go renamed to exporter/exporterhelper/internal/queuebatch/internal/queue/async_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
4+
package queue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/queue"
55

66
import (
77
"context"

exporter/exporterhelper/internal/queuebatch/async_queue_test.go renamed to exporter/exporterhelper/internal/queuebatch/internal/queue/async_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package queuebatch
4+
package queue
55

66
import (
77
"context"

exporter/exporterhelper/internal/queuebatch/cond.go renamed to exporter/exporterhelper/internal/queuebatch/internal/queue/cond.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
4+
package queue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/queue"
55

66
import (
77
"context"

exporter/exporterhelper/internal/queuebatch/memory_queue.go renamed to exporter/exporterhelper/internal/queuebatch/internal/queue/memory_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
4+
package queue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/queue"
55

66
import (
77
"context"

exporter/exporterhelper/internal/queuebatch/memory_queue_test.go renamed to exporter/exporterhelper/internal/queuebatch/internal/queue/memory_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package queuebatch
4+
package queue
55

66
import (
77
"context"

exporter/exporterhelper/internal/queuebatch/persistent_queue.go renamed to exporter/exporterhelper/internal/queuebatch/internal/queue/persistent_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
4+
package queue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/queue"
55

66
import (
77
"context"

exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go renamed to exporter/exporterhelper/internal/queuebatch/internal/queue/persistent_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package queuebatch
4+
package queue
55

66
import (
77
"context"

0 commit comments

Comments
 (0)