diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java index 6a5544698457..7a860a396fd5 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java @@ -54,6 +54,8 @@ public class DefaultFuture implements ResponseFuture { private static final Map FUTURES = new ConcurrentHashMap<>(); + private static final Map PENDING_TASKS = new ConcurrentHashMap<>(); + public static final Timer TIME_OUT_TIMER = new HashedWheelTimer( new NamedThreadFactory("dubbo-future-timeout", true), 30, @@ -86,7 +88,8 @@ private DefaultFuture(Channel channel, Request request, int timeout) { */ private static void timeoutCheck(DefaultFuture future) { TimeoutCheckTask task = new TimeoutCheckTask(future); - TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS); + Timeout t = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS); + PENDING_TASKS.put(future.getId(), t); } /** @@ -149,6 +152,11 @@ public static void received(Channel channel, Response response) { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); + Timeout t = PENDING_TASKS.remove(future.getId()); + if (t != null) { + // decrease Time + t.cancel(); + } } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) @@ -238,7 +246,10 @@ private static class TimeoutCheckTask implements TimerTask { @Override public void run(Timeout timeout) { - if (future == null || future.isDone()) { + // remove from pending task + PENDING_TASKS.remove(future.getId()); + + if (future.isDone()) { return; } // create exception response. @@ -248,13 +259,11 @@ public void run(Timeout timeout) { timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // handle response. DefaultFuture.received(future.getChannel(), timeoutResponse); - } } private void invokeCallback(ResponseCallback c) { - ResponseCallback callbackCopy = c; - if (callbackCopy == null) { + if (c == null) { throw new NullPointerException("callback cannot be null."); } Response res = response; @@ -264,21 +273,21 @@ private void invokeCallback(ResponseCallback c) { if (res.getStatus() == Response.OK) { try { - callbackCopy.done(res.getResult()); + c.done(res.getResult()); } catch (Exception e) { logger.error("callback invoke error .result:" + res.getResult() + ",url:" + channel.getUrl(), e); } } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { try { TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); - callbackCopy.caught(te); + c.caught(te); } catch (Exception e) { logger.error("callback invoke error ,url:" + channel.getUrl(), e); } } else { try { RuntimeException re = new RuntimeException(res.getErrorMessage()); - callbackCopy.caught(re); + c.caught(re); } catch (Exception e) { logger.error("callback invoke error ,url:" + channel.getUrl(), e); }