Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ target/
*.zip
*.tar
*.tar.gz
.flattened-pom.xml

# eclipse ignore
.settings/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ public class DefaultFuture extends CompletableFuture<Object> {

private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();

private static final Map<Long, Timeout> PENDING_TASKS = new ConcurrentHashMap<>();
// private static final Map<Long, Timeout> PENDING_TASKS = new ConcurrentHashMap<>();

public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
new NamedThreadFactory("dubbo-future-timeout", true),
30,
TimeUnit.MILLISECONDS);

// invoke id.
private final long id;
private final Long id;
private final Channel channel;
private final Request request;
private final int timeout;
Expand Down Expand Up @@ -133,11 +133,11 @@ public static void received(Channel channel, Response response) {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
Timeout t = PENDING_TASKS.remove(future.getId());
if (t != null) {
// decrease Time
t.cancel();
}
// Timeout t = PENDING_TASKS.remove(future.getId());
// if (t != null) {
// // decrease Time
// t.cancel();
// }
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
Expand Down Expand Up @@ -207,10 +207,11 @@ private void doSent() {
* check time out of the future
*/
private static void timeoutCheck(DefaultFuture future) {
TimeoutCheckTask task = new TimeoutCheckTask(future);
Timeout t = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
PENDING_TASKS.put(future.getId(), t);
TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
// PENDING_TASKS.put(future.getId(), t);
}

private String getTimeoutMessage(boolean scan) {
long nowTimestamp = System.currentTimeMillis();
return (sent > 0 ? "Waiting server-side response timeout" : "Sending request timeout in client-side")
Expand All @@ -226,18 +227,19 @@ private String getTimeoutMessage(boolean scan) {

private static class TimeoutCheckTask implements TimerTask {

private DefaultFuture future;
private final Long requestID;

TimeoutCheckTask(DefaultFuture future) {
this.future = future;
TimeoutCheckTask(Long requestID) {
this.requestID = requestID;
}

@Override
public void run(Timeout timeout) {
// remove from pending task
PENDING_TASKS.remove(future.getId());
// PENDING_TASKS.remove(future.getId());

if (future.isDone()) {
DefaultFuture future = FUTURES.remove(requestID);
if (future == null || future.isDone()) {
return;
}
// create exception response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.dubbo.rpc.RpcInvocation;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* This class will work as a wrapper wrapping outside of each protocol invoker.
Expand All @@ -51,7 +52,7 @@ public Result invoke(Invocation invocation) throws RpcException {

try {
if (InvokeMode.SYNC == ((RpcInvocation)invocation).getInvokeMode()) {
asyncResult.get();
asyncResult.get(invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", 1000), TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
Expand Down