Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class DefaultFuture implements ResponseFuture {

private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();

private static final Map<Long, Timeout> PENDING_TASKS = new ConcurrentHashMap<>();

public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
new NamedThreadFactory("dubbo-future-timeout", true),
30,
Expand Down Expand Up @@ -86,7 +88,8 @@ private DefaultFuture(Channel channel, Request request, int timeout) {
*/
private static void timeoutCheck(DefaultFuture future) {
TimeoutCheckTask task = new TimeoutCheckTask(future);
TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
Timeout t = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
PENDING_TASKS.put(future.getId(), t);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid this PENDING_TASKS instance? I think DefaultFuture itself hosts the status of its timeouttask or TIME_OUT_TIMER hosts the status of all timeout tasks can be better.

I need to merge this PR first to do some pressure test, please help to improve if possible.

}

/**
Expand Down Expand Up @@ -149,6 +152,11 @@ public static void received(Channel channel, Response response) {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
Timeout t = PENDING_TASKS.remove(future.getId());
if (t != null) {
// decrease Time
t.cancel();
}
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
Expand Down Expand Up @@ -238,7 +246,10 @@ private static class TimeoutCheckTask implements TimerTask {

@Override
public void run(Timeout timeout) {
if (future == null || future.isDone()) {
// remove from pending task
PENDING_TASKS.remove(future.getId());

if (future.isDone()) {
return;
}
// create exception response.
Expand All @@ -248,13 +259,11 @@ public void run(Timeout timeout) {
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse);

}
}

private void invokeCallback(ResponseCallback c) {
ResponseCallback callbackCopy = c;
if (callbackCopy == null) {
if (c == null) {
throw new NullPointerException("callback cannot be null.");
}
Response res = response;
Expand All @@ -264,21 +273,21 @@ private void invokeCallback(ResponseCallback c) {

if (res.getStatus() == Response.OK) {
try {
callbackCopy.done(res.getResult());
c.done(res.getResult());
} catch (Exception e) {
logger.error("callback invoke error .result:" + res.getResult() + ",url:" + channel.getUrl(), e);
}
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
try {
TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
callbackCopy.caught(te);
c.caught(te);
} catch (Exception e) {
logger.error("callback invoke error ,url:" + channel.getUrl(), e);
}
} else {
try {
RuntimeException re = new RuntimeException(res.getErrorMessage());
callbackCopy.caught(re);
c.caught(re);
} catch (Exception e) {
logger.error("callback invoke error ,url:" + channel.getUrl(), e);
}
Expand Down