@@ -50,20 +50,19 @@ public class DefaultFuture extends CompletableFuture<Object> {
5050
5151 private static final Map <Long , DefaultFuture > FUTURES = new ConcurrentHashMap <>();
5252
53- private static final Map <Long , Timeout > PENDING_TASKS = new ConcurrentHashMap <>();
54-
5553 public static final Timer TIME_OUT_TIMER = new HashedWheelTimer (
5654 new NamedThreadFactory ("dubbo-future-timeout" , true ),
5755 30 ,
5856 TimeUnit .MILLISECONDS );
5957
6058 // invoke id.
61- private final long id ;
59+ private final Long id ;
6260 private final Channel channel ;
6361 private final Request request ;
6462 private final int timeout ;
6563 private final long start = System .currentTimeMillis ();
6664 private volatile long sent ;
65+ private Timeout timeoutCheckTask ;
6766
6867 private DefaultFuture (Channel channel , Request request , int timeout ) {
6968 this .channel = channel ;
@@ -79,9 +78,8 @@ private DefaultFuture(Channel channel, Request request, int timeout) {
7978 * check time out of the future
8079 */
8180 private static void timeoutCheck (DefaultFuture future ) {
82- TimeoutCheckTask task = new TimeoutCheckTask (future );
83- Timeout t = TIME_OUT_TIMER .newTimeout (task , future .getTimeout (), TimeUnit .MILLISECONDS );
84- PENDING_TASKS .put (future .getId (), t );
81+ TimeoutCheckTask task = new TimeoutCheckTask (future .getId ());
82+ future .timeoutCheckTask = TIME_OUT_TIMER .newTimeout (task , future .getTimeout (), TimeUnit .MILLISECONDS );
8583 }
8684
8785 /**
@@ -140,15 +138,19 @@ public static void closeChannel(Channel channel) {
140138 }
141139
142140 public static void received (Channel channel , Response response ) {
141+ received (channel , response , false );
142+ }
143+
144+ public static void received (Channel channel , Response response , boolean timeout ) {
143145 try {
144146 DefaultFuture future = FUTURES .remove (response .getId ());
145147 if (future != null ) {
146- future .doReceived (response );
147- Timeout t = PENDING_TASKS .remove (future .getId ());
148- if (t != null ) {
148+ Timeout t = future .timeoutCheckTask ;
149+ if (!timeout ) {
149150 // decrease Time
150151 t .cancel ();
151152 }
153+ future .doReceived (response );
152154 } else {
153155 logger .warn ("The timeout response finally returned at "
154156 + (new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss.SSS" ).format (new Date ()))
@@ -229,14 +231,15 @@ private String getTimeoutMessage(boolean scan) {
229231
230232 private static class TimeoutCheckTask implements TimerTask {
231233
232- private DefaultFuture future ;
234+ private final Long requestID ;
233235
234- TimeoutCheckTask (DefaultFuture future ) {
235- this .future = future ;
236+ TimeoutCheckTask (Long requestID ) {
237+ this .requestID = requestID ;
236238 }
237239
238240 @ Override
239241 public void run (Timeout timeout ) {
242+ DefaultFuture future = DefaultFuture .getFuture (requestID );
240243 if (future == null || future .isDone ()) {
241244 return ;
242245 }
@@ -246,7 +249,7 @@ public void run(Timeout timeout) {
246249 timeoutResponse .setStatus (future .isSent () ? Response .SERVER_TIMEOUT : Response .CLIENT_TIMEOUT );
247250 timeoutResponse .setErrorMessage (future .getTimeoutMessage (true ));
248251 // handle response.
249- DefaultFuture .received (future .getChannel (), timeoutResponse );
252+ DefaultFuture .received (future .getChannel (), timeoutResponse , true );
250253
251254 }
252255 }
0 commit comments