Skip to content

Commit 4308f23

Browse files
committed
More robust inbox
1 parent dc79633 commit 4308f23

1 file changed

Lines changed: 54 additions & 1 deletion

File tree

  • src/main/java/nl/knaw/dans/lib/util/inbox

src/main/java/nl/knaw/dans/lib/util/inbox/Inbox.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import java.util.concurrent.CountDownLatch;
3939
import java.util.concurrent.ExecutorService;
4040
import java.util.concurrent.Executors;
41+
import java.util.concurrent.ThreadFactory;
42+
import java.util.concurrent.atomic.AtomicInteger;
4143

4244
/**
4345
* <p>
@@ -204,6 +206,57 @@ private void startFileAlterationMonitor() throws Exception {
204206

205207
observer.addListener(this);
206208
monitor.addObserver(observer);
209+
210+
// Set a ThreadFactory that auto-restarts on RuntimeException (but not on Error)
211+
monitor.setThreadFactory(new AutoRestartingThreadFactory("InboxMonitor"));
207212
monitor.start();
208213
}
209-
}
214+
215+
/**
216+
* ThreadFactory that creates threads which restart the delegate Runnable when it crashes with a RuntimeException.
217+
* Errors (like OutOfMemoryError) are not caught to avoid masking fatal problems.
218+
*/
219+
static final class AutoRestartingThreadFactory implements ThreadFactory {
220+
private final String namePrefix;
221+
private final AtomicInteger threadCount = new AtomicInteger(1);
222+
private final long resetBackoffAfterMillis = 3_600_000; // 1 hour
223+
224+
AutoRestartingThreadFactory(final String namePrefix) {
225+
this.namePrefix = namePrefix;
226+
}
227+
228+
@Override
229+
public Thread newThread(final Runnable r) {
230+
var t = new Thread(() -> {
231+
long backoffMillis = 500;
232+
final long maxBackoffMillis = 30_000;
233+
while (true) {
234+
var runStart = System.currentTimeMillis();
235+
try {
236+
r.run();
237+
// If run() returned normally, exit.
238+
return;
239+
} catch (RuntimeException ex) {
240+
var runDuration = System.currentTimeMillis() - runStart;
241+
// If runnable ran stably for at least resetBackoffAfterMillis, reset backoff
242+
if (runDuration >= resetBackoffAfterMillis) {
243+
backoffMillis = 500;
244+
}
245+
log.error("Thread '{}' crashed with RuntimeException after {} ms: {}. Restarting thread.", Thread.currentThread().getName(), runDuration, ex.getMessage());
246+
log.debug("Stack trace for RuntimeException that caused thread '{}' to crash", Thread.currentThread().getName(), ex);
247+
try {
248+
Thread.sleep(backoffMillis);
249+
} catch (InterruptedException ie) {
250+
Thread.currentThread().interrupt();
251+
return;
252+
}
253+
backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis);
254+
// Loop to restart.
255+
}
256+
}
257+
}, namePrefix + "-" + threadCount.getAndIncrement());
258+
t.setDaemon(true);
259+
return t;
260+
}
261+
}
262+
}

0 commit comments

Comments
 (0)