Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,19 @@ public static boolean isProvider(URL url) {
PROVIDERS_CATEGORY.equals(url.getParameter(CATEGORY_KEY, PROVIDERS_CATEGORY));
}

public static int getHeartbeat(URL url) {
return url.getParameter(Constants.HEARTBEAT_KEY, Constants.DEFAULT_HEARTBEAT);
}

public static int getIdleTimeout(URL url) {
int heartBeat = getHeartbeat(url);
int idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3);
if (idleTimeout < heartBeat * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}
return idleTimeout;
}

/**
* Check if the given value matches the given pattern. The pattern supports wildcard "*".
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*
* @see org.apache.dubbo.remoting.Transporter#connect(org.apache.dubbo.common.URL, ChannelHandler)
*/
public interface Client extends Endpoint, Channel, Resetable {
public interface Client extends Endpoint, Channel, Resetable, IdleSensible {

/**
* reconnect.
Expand All @@ -35,4 +35,4 @@ public interface Client extends Endpoint, Channel, Resetable {
@Deprecated
void reset(org.apache.dubbo.common.Parameters parameters);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.dubbo.remoting;

/**
* Indicate whether the implementation (for both server and client) has the ability to sense and handle idle connection.
* If the server has the ability to handle idle connection, it should close the connection when it happens, and if
* the client has the ability to handle idle connection, it should send the heartbeat to the server.
*/
public interface IdleSensible {
/**
* Whether the implementation can sense and handle the idle connection. By default it's false, the implementation
* relies on dedicated timer to take care of idle connection.
*
* @return whether has the ability to handle idle connection
*/
default boolean canHandleIdle() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*
* @see org.apache.dubbo.remoting.Transporter#bind(org.apache.dubbo.common.URL, ChannelHandler)
*/
public interface Server extends Endpoint, Resetable {
public interface Server extends Endpoint, Resetable, IdleSensible {

/**
* is bound.
Expand All @@ -55,4 +55,4 @@ public interface Server extends Endpoint, Resetable {
@Deprecated
void reset(org.apache.dubbo.common.Parameters parameters);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,38 +33,31 @@
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.apache.dubbo.common.utils.UrlUtils.getHeartbeat;
import static org.apache.dubbo.common.utils.UrlUtils.getIdleTimeout;

/**
* DefaultMessageClient
*/
public class HeaderExchangeClient implements ExchangeClient {

private final Client client;
private final ExchangeChannel channel;
private int heartbeat;
private int idleTimeout;

private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1,
TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL);

private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(
new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL);
private HeartbeatTimerTask heartBeatTimerTask;

private ReconnectTimerTask reconnectTimerTask;

public HeaderExchangeClient(Client client, boolean needHeartbeat) {
public HeaderExchangeClient(Client client, boolean startTimer) {
Assert.notNull(client, "Client can't be null");
this.client = client;
this.channel = new HeaderExchangeChannel(client);
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);

this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null &&
dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
this.idleTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (idleTimeout < heartbeat * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}

if (needHeartbeat) {
startIdleCheckTask();
if (startTimer) {
URL url = client.getUrl();
startReconnectTask(url);
startHeartBeatTask(url);
}
}

Expand Down Expand Up @@ -145,6 +138,7 @@ public void startClose() {
@Override
public void reset(URL url) {
client.reset(url);
// FIXME, should cancel and restart timer tasks if parameters in the new URL are different?
}

@Override
Expand Down Expand Up @@ -178,25 +172,34 @@ public boolean hasAttribute(String key) {
return channel.hasAttribute(key);
}

private void startIdleCheckTask() {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);

long heartbeatTick = calculateLeastDuration(heartbeat);
long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout);

this.heartBeatTimerTask = heartBeatTimerTask;
this.reconnectTimerTask = reconnectTimerTask;
private void startHeartBeatTask(URL url) {
if (!client.canHandleIdle()) {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
int heartbeat = getHeartbeat(url);
long heartbeatTick = calculateLeastDuration(heartbeat);
this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
}
}

// init task and start timer.
IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
private void startReconnectTask(URL url) {
if (shouldReconnect(url)) {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
int idleTimeout = getIdleTimeout(url);
long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
this.reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout);
IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}
}

private void doClose() {
heartBeatTimerTask.cancel();
reconnectTimerTask.cancel();
if (heartBeatTimerTask != null) {
heartBeatTimerTask.cancel();
}

if (reconnectTimerTask != null) {
reconnectTimerTask.cancel();
}
}

/**
Expand All @@ -210,6 +213,10 @@ private long calculateLeastDuration(int time) {
}
}

private boolean shouldReconnect(URL url) {
return url.getParameter(Constants.RECONNECT_KEY, true);
}

@Override
public String toString() {
return "HeaderExchangeClient [channel=" + channel + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
Expand All @@ -49,8 +50,6 @@ public class HeaderExchangeServer implements ExchangeServer {
protected final Logger logger = LoggerFactory.getLogger(getClass());

private final Server server;
private int heartbeat;
private int idleTimeout;
private AtomicBoolean closed = new AtomicBoolean(false);

private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), 1,
Expand All @@ -61,13 +60,7 @@ public class HeaderExchangeServer implements ExchangeServer {
public HeaderExchangeServer(Server server) {
Assert.notNull(server, "server == null");
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
this.idleTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (idleTimeout < heartbeat * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}

startIdleCheckTask();
startIdleCheckTask(getUrl());
}

public Server getServer() {
Expand Down Expand Up @@ -154,7 +147,9 @@ private void doClose() {
}

private void cancelCloseTask() {
closeTimerTask.cancel();
if (closeTimerTask != null) {
closeTimerTask.cancel();
}
}

@Override
Expand Down Expand Up @@ -210,21 +205,13 @@ public ChannelHandler getChannelHandler() {
public void reset(URL url) {
server.reset(url);
try {
if (url.hasParameter(Constants.HEARTBEAT_KEY)
|| url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) {
int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
if (t < h * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}
if (h != heartbeat || t != idleTimeout) {
heartbeat = h;
idleTimeout = t;

// we need cancel the exist closeTimeout first.
cancelCloseTask();
startIdleCheckTask();
}
int currHeartbeat = UrlUtils.getHeartbeat(getUrl());
int currIdleTimeout = UrlUtils.getIdleTimeout(getUrl());
int heartbeat = UrlUtils.getHeartbeat(url);
int idleTimeout = UrlUtils.getIdleTimeout(url);
if (currHeartbeat != heartbeat || currIdleTimeout != idleTimeout) {
cancelCloseTask();
startIdleCheckTask(url);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
Expand Down Expand Up @@ -266,15 +253,16 @@ private long calculateLeastDuration(int time) {
}
}

private void startIdleCheckTask() {
AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());

long idleTimeoutTick = calculateLeastDuration(idleTimeout);
CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);
this.closeTimerTask = closeTimerTask;
private void startIdleCheckTask(URL url) {
if (!server.canHandleIdle()) {
AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());
int idleTimeout = UrlUtils.getIdleTimeout(url);
long idleTimeoutTick = calculateLeastDuration(idleTimeout);
CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);
this.closeTimerTask = closeTimerTask;

// init task and start timer.
IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
// init task and start timer.
IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,18 @@ protected void doTask(Channel channel) {
try {
Long lastRead = lastRead(channel);
Long now = now();

// Rely on reconnect timer to reconnect when AbstractClient.doConnect fails to init the connection
if (!channel.isConnected()) {
Comment thread
beiwei30 marked this conversation as resolved.
try {
logger.info("Initial connection to " + channel);
((Client) channel).reconnect();
} catch (Exception e) {
logger.error("Fail to connect to " + channel, e);
}
// check pong at client
if (lastRead != null && now - lastRead > idleTimeout) {
logger.warn("Close channel " + channel + ", because heartbeat read idle time out: "
} else if (lastRead != null && now - lastRead > idleTimeout) {
logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: "
+ idleTimeout + "ms");
try {
((Client) channel).reconnect();
Expand Down
Loading