Skip to content

Commit 3114583

Browse files
authored
DD-2237 Made the PollingTaskExecutor more generic. Now, a task can be represe… (#33)
Changes to support DD-2237. Introduced a TaskScheduler interface that lets clients configure how tasks picked up by PollingTaskExecutor and created by TaskFactory are then scheduled (e.g., immediately executed or submitted to an executorservice). Made sure the nextInput is called in its own db-transaction, so that a status update to the underlying record is not lost if the task execution fails.
1 parent 3979755 commit 3114583

8 files changed

Lines changed: 120 additions & 26 deletions

File tree

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
<relativePath />
2626
</parent>
2727
<artifactId>dans-java-utils</artifactId>
28-
<version>2.11.2-SNAPSHOT</version>
28+
<version>3.0.0-SNAPSHOT</version>
2929
<name>DANS Java Utility Classes</name>
3030
<inceptionYear>2021</inceptionYear>
3131
<scm>
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (C) 2021 DANS - Data Archiving and Networked Services (info@dans.knaw.nl)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package nl.knaw.dans.lib.util.pollingtaskexec;
17+
18+
import lombok.RequiredArgsConstructor;
19+
20+
import java.util.concurrent.ExecutorService;
21+
22+
/**
23+
* A task scheduler implementation that schedules tasks for execution using an {@link ExecutorService}. This class allows for tasks to be executed asynchronously using the provided executor service.
24+
*
25+
*/
26+
@RequiredArgsConstructor
27+
public class ExecutorServiceTaskScheduler implements TaskScheduler {
28+
private final ExecutorService executorService;
29+
30+
@Override
31+
public void schedule(Runnable task) {
32+
executorService.execute(task);
33+
}
34+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright (C) 2021 DANS - Data Archiving and Networked Services (info@dans.knaw.nl)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package nl.knaw.dans.lib.util.pollingtaskexec;
17+
18+
public class ImmediateTaskScheduler implements TaskScheduler {
19+
@Override
20+
public void schedule(Runnable task) {
21+
task.run();
22+
}
23+
}

src/main/java/nl/knaw/dans/lib/util/pollingtaskexec/PollingTaskExecutor.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,21 @@
3636
@RequiredArgsConstructor
3737
public class PollingTaskExecutor<R> implements Managed {
3838
private final String name;
39-
private final ScheduledExecutorService scheduler;
39+
private final ScheduledExecutorService scheduledExecutorService;
4040
private final Duration pollingInterval;
4141
private final TaskSource<R> taskSource;
4242
private final TaskFactory<R> taskFactory;
43+
private final TaskScheduler taskScheduler;
4344

4445
private ScheduledFuture<?> future;
4546

47+
public PollingTaskExecutor(String name, ScheduledExecutorService scheduledExecutorService, Duration pollingInterval, TaskSource<R> taskSource, TaskFactory<R> taskFactory) {
48+
this(name, scheduledExecutorService, pollingInterval, taskSource, taskFactory, new ImmediateTaskScheduler());
49+
}
50+
4651
/**
4752
* Copy constructor. The source executor must not be running. The purpose of this constructor is only to be able to wrap a PollingTaskExecutor in a UnitOfWorkAwareProxy. In general, no copies
48-
* should be created of a PollingTaskExecutor, and in particular should the schedular not be shared among PollingTaskExecutors.
53+
* should be created of a PollingTaskExecutor, and in particular should the scheduler not be shared among PollingTaskExecutors.
4954
*
5055
* @param other the source executor
5156
*/
@@ -55,16 +60,17 @@ public PollingTaskExecutor(PollingTaskExecutor<R> other) {
5560
throw new IllegalArgumentException("Cannot copy a running executor");
5661
}
5762
this.name = other.name;
58-
this.scheduler = other.scheduler;
63+
this.scheduledExecutorService = other.scheduledExecutorService;
5964
this.pollingInterval = other.pollingInterval;
6065
this.taskSource = other.taskSource;
6166
this.taskFactory = other.taskFactory;
67+
this.taskScheduler = other.taskScheduler;
6268
}
6369

6470
@Override
6571
public void start() {
6672
long delayMs = Math.max(1L, pollingInterval.toMillis());
67-
future = scheduler.scheduleWithFixedDelay(this::tick, 0, delayMs, TimeUnit.MILLISECONDS);
73+
future = scheduledExecutorService.scheduleWithFixedDelay(this::tick, 0, delayMs, TimeUnit.MILLISECONDS);
6874
log.info("{} started; polling every {}", name, pollingInterval);
6975
}
7076

@@ -73,24 +79,28 @@ public void stop() {
7379
if (future != null) {
7480
future.cancel(false);
7581
}
76-
scheduler.shutdown();
82+
scheduledExecutorService.shutdown();
7783
log.info("{} stopped", name);
7884
}
7985

80-
@UnitOfWork
8186
public void tick() {
8287
try {
83-
Optional<R> next = taskSource.nextTask();
84-
if (next.isEmpty()) {
88+
Optional<R> input = getNextInput();
89+
if (input.isEmpty()) {
8590
return;
8691
}
87-
R record = next.get();
88-
log.debug("{}: found next task record: {}", name, record);
89-
Runnable task = taskFactory.create(record);
90-
task.run();
92+
log.debug("{}: found next task input: {}", name, input.get());
93+
Runnable task = taskFactory.create(input.get());
94+
taskScheduler.schedule(task);
9195
}
9296
catch (Exception e) {
9397
log.error("{}: error while polling or running task", name, e);
9498
}
9599
}
100+
101+
// Must be protected for UnitOfWork to function.
102+
@UnitOfWork
103+
protected Optional<R> getNextInput() {
104+
return taskSource.nextInput();
105+
}
96106
}

src/main/java/nl/knaw/dans/lib/util/pollingtaskexec/TaskFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package nl.knaw.dans.lib.util.pollingtaskexec;
1717

1818
/**
19-
* Factory interface for creating {@link Runnable} tasks from a given record.
19+
* Factory interface for creating {@link Runnable} tasks from a record.
2020
* This interface allows the decoupling of task creation logic from task execution logic.
2121
*
2222
* @param <R> the type of the record used to create a {@link Runnable} task
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (C) 2021 DANS - Data Archiving and Networked Services (info@dans.knaw.nl)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package nl.knaw.dans.lib.util.pollingtaskexec;
17+
18+
public interface TaskScheduler {
19+
20+
/**
21+
* Schedules the task for execution.
22+
*
23+
* @param task the task to schedule
24+
*/
25+
void schedule(Runnable task);
26+
}

src/main/java/nl/knaw/dans/lib/util/pollingtaskexec/TaskSource.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
import java.util.Optional;
1919

2020
/**
21-
* Represents a source of tasks from which tasks can be fetched one at a time. Implementations of this interface are responsible for providing the logic to retrieve the next available task or
22-
* returning an empty result if no tasks are available.
21+
* Represents a source of tasks from which tasks can be fetched. Implementations of this interface are responsible for providing the logic to retrieve the next available input that represents one
22+
* task to be executed, returning an empty Optional if no tasks are available. When invoked by {@code PollingTaskExecutor}, this method is run within a {@code @UnitOfWork} so that any updates it
23+
* makes to the database are visible to the resulting tasks.
2324
*
24-
* @param <R> the type of the tasks managed by this source
25+
* @param <R> the type of the input used to create or schedule a task
2526
*/
2627
public interface TaskSource<R> {
27-
Optional<R> nextTask();
28+
Optional<R> nextInput();
2829
}

src/test/java/nl/knaw/dans/lib/util/pollingtaskexec/PollingTaskExecutorTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,34 +82,34 @@ void tick_should_poll_source_create_task_and_run_it() {
8282
String record = "test-record";
8383
Runnable task = mock(Runnable.class);
8484

85-
when(taskSource.nextTask()).thenReturn(Optional.of(record));
85+
when(taskSource.nextInput()).thenReturn(Optional.of(record));
8686
when(taskFactory.create(record)).thenReturn(task);
8787

8888
executor.tick();
8989

90-
verify(taskSource).nextTask();
90+
verify(taskSource).nextInput();
9191
verify(taskFactory).create(record);
9292
verify(task).run();
9393
}
9494

9595
@Test
9696
void tick_should_do_nothing_if_source_is_empty() {
97-
when(taskSource.nextTask()).thenReturn(Optional.empty());
97+
when(taskSource.nextInput()).thenReturn(Optional.empty());
9898

9999
executor.tick();
100100

101-
verify(taskSource).nextTask();
101+
verify(taskSource).nextInput();
102102
verify(taskFactory, never()).create(any());
103103
}
104104

105105
@Test
106106
void tick_should_catch_and_log_exceptions() {
107-
when(taskSource.nextTask()).thenThrow(new RuntimeException("test exception"));
107+
when(taskSource.nextInput()).thenThrow(new RuntimeException("test exception"));
108108

109109
// Should not throw exception
110110
executor.tick();
111111

112-
verify(taskSource).nextTask();
112+
verify(taskSource).nextInput();
113113
}
114114

115115
@Test
@@ -119,12 +119,12 @@ void copy_constructor_should_copy_fields() {
119119
// Verify it works as expected by running a tick on the copy
120120
String record = "test-record";
121121
Runnable task = mock(Runnable.class);
122-
when(taskSource.nextTask()).thenReturn(Optional.of(record));
122+
when(taskSource.nextInput()).thenReturn(Optional.of(record));
123123
when(taskFactory.create(record)).thenReturn(task);
124124

125125
copy.tick();
126126

127-
verify(taskSource).nextTask();
127+
verify(taskSource).nextInput();
128128
verify(taskFactory).create(record);
129129
verify(task).run();
130130
}

0 commit comments

Comments
 (0)