-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathAutoDisposeSingleThreadScheduler.java
More file actions
140 lines (121 loc) · 4.9 KB
/
AutoDisposeSingleThreadScheduler.java
File metadata and controls
140 lines (121 loc) · 4.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
* Copyright 2022 Jim Carroll
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dempsy.util.executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* This is a scheduler that will schedule a task to run in the future (or now) and when that task completes it shuts itself down. It can be thought of almost as
* a daemon task scheduler.
*/
public final class AutoDisposeSingleThreadScheduler {
/**
* The daemon status of the scheduled thread is defaulted to this value.
*/
public final static boolean DEFAULT_DAEMON_STATUS = false;
private final String baseThreadName;
private long pendingCalls = 0L;
private static final AtomicLong sequence = new AtomicLong(0);
private ScheduledExecutorService scheduler = null;
private final boolean daemon;
public AutoDisposeSingleThreadScheduler(final String baseThreadName) {
this(baseThreadName, DEFAULT_DAEMON_STATUS);
}
public AutoDisposeSingleThreadScheduler(final String baseThreadName, final boolean daemon) {
this.baseThreadName = baseThreadName;
this.daemon = daemon;
}
/**
* This object is returned from {@link AutoDisposeSingleThreadScheduler#schedule(Runnable, long, TimeUnit)} and can be used to cancel the task - best
* effort.
*/
public class Cancelable implements Runnable {
private boolean cancelled = false;
private final ScheduledFuture<?> future;
private final Runnable proxied;
// called only with a lock on the outer instance.
private Cancelable(final Runnable runnable, final long timeout, final TimeUnit units) {
this.proxied = runnable;
this.future = getScheduledExecutor().schedule(this, timeout, units);
pendingCalls++;
}
/**
* Attempt to cancel the Runnable that was submitted to {@link AutoDisposeSingleThreadScheduler#schedule(Runnable, long, TimeUnit)}
*/
public void cancel() {
synchronized(AutoDisposeSingleThreadScheduler.this) {
future.cancel(false);
cancelled = true;
decrement();
}
}
@Override
public void run() {
// running the proxied can resubmit the task ... so we dispose afterward
synchronized(AutoDisposeSingleThreadScheduler.this) {
if(cancelled)
return;
}
try {
proxied.run();
} finally {
synchronized(AutoDisposeSingleThreadScheduler.this) {
if(!cancelled) // if it was cancelled then it was already decremented
decrement();
}
}
}
/**
* Is the Runnable that was submitted to {@link AutoDisposeSingleThreadScheduler#schedule(Runnable, long, TimeUnit)} comleted?
*/
public boolean isDone() {
return future.isDone();
}
private void decrement() {
synchronized(AutoDisposeSingleThreadScheduler.this) {
pendingCalls--;
if(pendingCalls <= 0)
disposeOfScheduler();
}
}
}
/**
* Schedule the given Runnable to run at the given time period from now. The scheduler resources will be cleaned up once the task runs.
*/
public synchronized Cancelable schedule(final Runnable runnable, final long timeout, final TimeUnit units) {
return new Cancelable(runnable, timeout, units);
}
private synchronized final ScheduledExecutorService getScheduledExecutor() {
if(scheduler == null) {
if(baseThreadName != null || daemon) {
scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
final Thread thread = new Thread(r, baseThreadName + "-" + sequence.getAndIncrement());
thread.setDaemon(daemon);
return thread;
});
} else
scheduler = Executors.newSingleThreadScheduledExecutor();
}
return scheduler;
}
private synchronized final void disposeOfScheduler() {
if(scheduler != null)
scheduler.shutdown();
scheduler = null;
}
}