diff --git a/.gitignore b/.gitignore index a98a8a694570..b4467e7777eb 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ target/ *.zip *.tar *.tar.gz +.flattened-pom.xml # eclipse ignore .settings/ diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java index e13f65112b06..89e11718ddf5 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java @@ -48,7 +48,7 @@ public class DefaultFuture extends CompletableFuture { private static final Map FUTURES = new ConcurrentHashMap<>(); - private static final Map PENDING_TASKS = new ConcurrentHashMap<>(); +// private static final Map PENDING_TASKS = new ConcurrentHashMap<>(); public static final Timer TIME_OUT_TIMER = new HashedWheelTimer( new NamedThreadFactory("dubbo-future-timeout", true), @@ -56,7 +56,7 @@ public class DefaultFuture extends CompletableFuture { TimeUnit.MILLISECONDS); // invoke id. - private final long id; + private final Long id; private final Channel channel; private final Request request; private final int timeout; @@ -133,11 +133,11 @@ public static void received(Channel channel, Response response) { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); - Timeout t = PENDING_TASKS.remove(future.getId()); - if (t != null) { - // decrease Time - t.cancel(); - } +// Timeout t = PENDING_TASKS.remove(future.getId()); +// if (t != null) { +// // decrease Time +// t.cancel(); +// } } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) @@ -207,10 +207,11 @@ private void doSent() { * check time out of the future */ private static void timeoutCheck(DefaultFuture future) { - TimeoutCheckTask task = new TimeoutCheckTask(future); - Timeout t = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS); - PENDING_TASKS.put(future.getId(), t); + TimeoutCheckTask task = new TimeoutCheckTask(future.getId()); + TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS); +// PENDING_TASKS.put(future.getId(), t); } + private String getTimeoutMessage(boolean scan) { long nowTimestamp = System.currentTimeMillis(); return (sent > 0 ? "Waiting server-side response timeout" : "Sending request timeout in client-side") @@ -226,18 +227,19 @@ private String getTimeoutMessage(boolean scan) { private static class TimeoutCheckTask implements TimerTask { - private DefaultFuture future; + private final Long requestID; - TimeoutCheckTask(DefaultFuture future) { - this.future = future; + TimeoutCheckTask(Long requestID) { + this.requestID = requestID; } @Override public void run(Timeout timeout) { // remove from pending task - PENDING_TASKS.remove(future.getId()); +// PENDING_TASKS.remove(future.getId()); - if (future.isDone()) { + DefaultFuture future = FUTURES.remove(requestID); + if (future == null || future.isDone()) { return; } // create exception response. diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java index 5c06b4e4d1d5..9f1725d42a51 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java @@ -27,6 +27,7 @@ import org.apache.dubbo.rpc.RpcInvocation; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; /** * This class will work as a wrapper wrapping outside of each protocol invoker. @@ -51,7 +52,7 @@ public Result invoke(Invocation invocation) throws RpcException { try { if (InvokeMode.SYNC == ((RpcInvocation)invocation).getInvokeMode()) { - asyncResult.get(); + asyncResult.get(invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", 1000), TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);