Skip to content

Commit 4755217

Browse files
authored
Actually flush in ExecutorServiceSpanProcessor (#3806)
* Actually flush in ExecutorServiceSpanProcessor * Fix mock reset * Remove drift comment
1 parent 8f081ba commit 4755217

2 files changed

Lines changed: 14 additions & 6 deletions

File tree

sdk-extensions/tracing-incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/trace/ExecutorServiceSpanProcessor.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ List<SpanData> getBatch() {
145145

146146
private static class Worker implements Runnable {
147147

148+
private final AtomicBoolean running = new AtomicBoolean();
148149
private final AtomicLong nextExportTime = new AtomicLong();
149150
private final ArrayBlockingQueue<SpanData> batch;
150151
private final AtomicBoolean isShutdown;
@@ -211,7 +212,9 @@ private void updateNextExportTime() {
211212

212213
@Override
213214
public void run() {
214-
// nextExportTime is set for the first time in the constructor
215+
if (!running.compareAndSet(false, true)) {
216+
return;
217+
}
215218

216219
boolean continueWork = true;
217220
while (continueWork && !isShutdown.get()) {
@@ -238,6 +241,7 @@ public void run() {
238241
if (flushRequested.get() != null) {
239242
workerExporter.flush(batch, queue);
240243
}
244+
running.set(false);
241245
}
242246

243247
private void scheduleNextRun() {
@@ -275,7 +279,10 @@ public void addSpan(ReadableSpan span) {
275279
public CompletableResultCode forceFlush() {
276280
CompletableResultCode flushResult = new CompletableResultCode();
277281
// we set the atomic here to trigger the worker loop to do a flush on its next iteration.
278-
flushRequested.compareAndSet(null, flushResult);
282+
if (flushRequested.compareAndSet(null, flushResult)) {
283+
executorService.execute(this);
284+
return flushResult;
285+
}
279286
CompletableResultCode possibleResult = flushRequested.get();
280287
// there's a race here where the flush happening in the worker loop could complete before we
281288
// get what's in the atomic. In that case, just return success, since we know it succeeded in

sdk-extensions/tracing-incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/trace/ExecutorServiceSpanProcessorTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ public void continuesIfExporterTimesOut() throws InterruptedException {
396396
// Still processing new spans.
397397
CountDownLatch exportedAgain = new CountDownLatch(1);
398398
reset(mockSpanExporter);
399+
when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
399400
when(mockSpanExporter.export(
400401
argThat(
401402
spans -> {
@@ -472,27 +473,27 @@ void exportNotSampledSpans_recordOnly() {
472473
}
473474

474475
@Test
475-
@Timeout(10)
476476
void shutdownFlushes() {
477477
WaitingSpanExporter waitingSpanExporter =
478478
new WaitingSpanExporter(1, CompletableResultCode.ofSuccess());
479479
// Set the export delay to large value, in order to confirm the #flush() below works
480480

481-
sdkTracerProvider =
481+
SdkTracerProvider sdkTracerProvider =
482482
SdkTracerProvider.builder()
483483
.addSpanProcessor(
484484
ExecutorServiceSpanProcessor.builder(waitingSpanExporter, executor, false)
485485
.setScheduleDelay(10, TimeUnit.SECONDS)
486486
.build())
487487
.build();
488488

489-
ReadableSpan span2 = createEndedSpan(SPAN_NAME_2);
489+
Span span2 = sdkTracerProvider.get("test").spanBuilder(SPAN_NAME_2).startSpan();
490+
span2.end();
490491

491492
// Force a shutdown, which forces processing of all remaining spans.
492493
sdkTracerProvider.shutdown().join(10, TimeUnit.SECONDS);
493494

494495
List<SpanData> exported = waitingSpanExporter.getExported();
495-
assertThat(exported).containsExactly(span2.toSpanData());
496+
assertThat(exported).containsExactly(((ReadableSpan) span2).toSpanData());
496497
assertThat(waitingSpanExporter.shutDownCalled.get()).isTrue();
497498
}
498499

0 commit comments

Comments
 (0)