@@ -50,8 +50,6 @@ public class DefaultFuture extends CompletableFuture<Object> {
5050
5151 private static final Map <Long , DefaultFuture > FUTURES = new ConcurrentHashMap <>();
5252
53- // private static final Map<Long, Timeout> PENDING_TASKS = new ConcurrentHashMap<>();
54-
5553 public static final Timer TIME_OUT_TIMER = new HashedWheelTimer (
5654 new NamedThreadFactory ("dubbo-future-timeout" , true ),
5755 30 ,
@@ -64,6 +62,7 @@ public class DefaultFuture extends CompletableFuture<Object> {
6462 private final int timeout ;
6563 private final long start = System .currentTimeMillis ();
6664 private volatile long sent ;
65+ private Timeout timeoutCheckTask ;
6766
6867 private ExecutorService executor ;
6968
@@ -85,6 +84,14 @@ private DefaultFuture(Channel channel, Request request, int timeout) {
8584 CHANNELS .put (id , channel );
8685 }
8786
87+ /**
88+ * check time out of the future
89+ */
90+ private static void timeoutCheck (DefaultFuture future ) {
91+ TimeoutCheckTask task = new TimeoutCheckTask (future .getId ());
92+ future .timeoutCheckTask = TIME_OUT_TIMER .newTimeout (task , future .getTimeout (), TimeUnit .MILLISECONDS );
93+ }
94+
8895 /**
8996 * init a DefaultFuture
9097 * 1.init a DefaultFuture
@@ -142,15 +149,19 @@ public static void closeChannel(Channel channel) {
142149 }
143150
144151 public static void received (Channel channel , Response response ) {
152+ received (channel , response , false );
153+ }
154+
155+ public static void received (Channel channel , Response response , boolean timeout ) {
145156 try {
146157 DefaultFuture future = FUTURES .remove (response .getId ());
147158 if (future != null ) {
159+ Timeout t = future .timeoutCheckTask ;
160+ if (!timeout ) {
161+ // decrease Time
162+ t .cancel ();
163+ }
148164 future .doReceived (response );
149- // Timeout t = PENDING_TASKS.remove(future.getId());
150- // if (t != null) {
151- // // decrease Time
152- // t.cancel();
153- // }
154165 } else {
155166 logger .warn ("The timeout response finally returned at "
156167 + (new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss.SSS" ).format (new Date ()))
@@ -178,6 +189,7 @@ public void cancel() {
178189 this .cancel (true );
179190 }
180191
192+
181193 private void doReceived (Response res ) {
182194 if (res == null ) {
183195 throw new IllegalStateException ("response cannot be null" );
@@ -224,15 +236,6 @@ private void doSent() {
224236 sent = System .currentTimeMillis ();
225237 }
226238
227- /**
228- * check time out of the future
229- */
230- private static void timeoutCheck (DefaultFuture future ) {
231- TimeoutCheckTask task = new TimeoutCheckTask (future .getId ());
232- TIME_OUT_TIMER .newTimeout (task , future .getTimeout (), TimeUnit .MILLISECONDS );
233- // PENDING_TASKS.put(future.getId(), t);
234- }
235-
236239 private String getTimeoutMessage (boolean scan ) {
237240 long nowTimestamp = System .currentTimeMillis ();
238241 return (sent > 0 ? "Waiting server-side response timeout" : "Sending request timeout in client-side" )
@@ -256,10 +259,7 @@ private static class TimeoutCheckTask implements TimerTask {
256259
257260 @ Override
258261 public void run (Timeout timeout ) {
259- // remove from pending task
260- // PENDING_TASKS.remove(future.getId());
261-
262- DefaultFuture future = FUTURES .remove (requestID );
262+ DefaultFuture future = DefaultFuture .getFuture (requestID );
263263 if (future == null || future .isDone ()) {
264264 return ;
265265 }
@@ -271,7 +271,7 @@ public void run(Timeout timeout) {
271271 timeoutResponse .setStatus (future .isSent () ? Response .SERVER_TIMEOUT : Response .CLIENT_TIMEOUT );
272272 timeoutResponse .setErrorMessage (future .getTimeoutMessage (true ));
273273 // handle response.
274- DefaultFuture .received (future .getChannel (), timeoutResponse );
274+ DefaultFuture .received (future .getChannel (), timeoutResponse , true );
275275 });
276276 }
277277 }
0 commit comments