Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,31 @@ public static boolean isProvider(URL url) {
PROVIDERS_CATEGORY.equals(url.getParameter(CATEGORY_KEY, PROVIDERS_CATEGORY));
}

public static int getHeartbeat(URL url) {
String dubbo = url.getParameter(Constants.DUBBO_VERSION_KEY);
return url.getParameter(Constants.HEARTBEAT_KEY, dubbo != null &&
dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
Comment thread
chickenlj marked this conversation as resolved.
Outdated
}

public static int getIdleTimeout(URL url) {
int heartBeat = getHeartbeat(url);
int idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3);
if (idleTimeout < heartBeat * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}
if (idleTimeout == 0) {
String reconnect = url.getParameter(Constants.RECONNECT_KEY);
if (StringUtils.isNotEmpty(reconnect)) {
try {
idleTimeout = Integer.parseInt(reconnect);
} catch (NumberFormatException e) {
// ignore
}
}
}
return idleTimeout;
}

/**
* Check if the given value matches the given pattern. The pattern supports wildcard "*".
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,38 +33,33 @@
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.apache.dubbo.common.Constants.CLIENT_KEY;
import static org.apache.dubbo.common.Constants.TRANSPORTER_KEY;
import static org.apache.dubbo.common.utils.UrlUtils.getHeartbeat;
import static org.apache.dubbo.common.utils.UrlUtils.getIdleTimeout;

/**
* DefaultMessageClient
*/
public class HeaderExchangeClient implements ExchangeClient {

private final Client client;
private final ExchangeChannel channel;
private int heartbeat;
private int idleTimeout;

private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1,
TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL);

private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(
new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL);
private HeartbeatTimerTask heartBeatTimerTask;

private ReconnectTimerTask reconnectTimerTask;

public HeaderExchangeClient(Client client, boolean needHeartbeat) {
public HeaderExchangeClient(Client client, boolean startTimer) {
Assert.notNull(client, "Client can't be null");
this.client = client;
this.channel = new HeaderExchangeChannel(client);
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);

this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null &&
dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
this.idleTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (idleTimeout < heartbeat * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}

if (needHeartbeat) {
startIdleCheckTask();
if (startTimer) {
URL url = client.getUrl();
startReconnectTask(url);
startHeartBeatTask(url);
}
}

Expand Down Expand Up @@ -145,6 +140,7 @@ public void startClose() {
@Override
public void reset(URL url) {
client.reset(url);
// FIXME, should cancel and restart timer tasks if parameters in the new URL are different?
}

@Override
Expand Down Expand Up @@ -178,25 +174,34 @@ public boolean hasAttribute(String key) {
return channel.hasAttribute(key);
}

private void startIdleCheckTask() {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);

long heartbeatTick = calculateLeastDuration(heartbeat);
long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout);

this.heartBeatTimerTask = heartBeatTimerTask;
this.reconnectTimerTask = reconnectTimerTask;
private void startHeartBeatTask(URL url) {
if (shouldHeartbeat(url)) {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
int heartbeat = getHeartbeat(url);
long heartbeatTick = calculateLeastDuration(heartbeat);
this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
}
}

// init task and start timer.
IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
private void startReconnectTask(URL url) {
if (shouldReconnect(url)) {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
int idleTimeout = getIdleTimeout(url);
long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
this.reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout);
IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}
}

private void doClose() {
heartBeatTimerTask.cancel();
reconnectTimerTask.cancel();
if (heartBeatTimerTask != null) {
heartBeatTimerTask.cancel();
}

if (reconnectTimerTask != null) {
reconnectTimerTask.cancel();
}
}

/**
Expand All @@ -210,6 +215,38 @@ private long calculateLeastDuration(int time) {
}
}

private boolean shouldHeartbeat(URL url) {
Comment thread
beiwei30 marked this conversation as resolved.
Outdated
String transporter = url.getParameter(CLIENT_KEY, url.getParameter(TRANSPORTER_KEY, "netty"));
return !transporter.equalsIgnoreCase("netty") && !transporter.equalsIgnoreCase("netty4");
}

private boolean shouldReconnect(URL url) {
boolean reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
if (!reconnect) {
String param = url.getParameter(Constants.RECONNECT_KEY, "true");
if ("false".equalsIgnoreCase(param)) {
Comment thread
chickenlj marked this conversation as resolved.
Outdated
reconnect = false;
} else if ("true".equalsIgnoreCase(param)) {
reconnect = true;
} else {
int value;
try {
value = Integer.parseInt(param);
} catch (Exception e) {
throw new IllegalArgumentException("reconnect param must be non-negative integer or false/true, " +
"input is:" + param);
}

if (value < 0) {
throw new IllegalArgumentException("reconnect param must be non-negative integer or false/true, " +
"input is:" + param);
}
reconnect = true;
}
}
return reconnect;
}

@Override
public String toString() {
return "HeaderExchangeClient [channel=" + channel + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
Expand All @@ -40,6 +41,8 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.unmodifiableCollection;
import static org.apache.dubbo.common.Constants.SERVER_KEY;
import static org.apache.dubbo.common.Constants.TRANSPORTER_KEY;

/**
* ExchangeServerImpl
Expand All @@ -49,8 +52,6 @@ public class HeaderExchangeServer implements ExchangeServer {
protected final Logger logger = LoggerFactory.getLogger(getClass());

private final Server server;
private int heartbeat;
private int idleTimeout;
private AtomicBoolean closed = new AtomicBoolean(false);

private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), 1,
Expand All @@ -61,13 +62,7 @@ public class HeaderExchangeServer implements ExchangeServer {
public HeaderExchangeServer(Server server) {
Assert.notNull(server, "server == null");
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
this.idleTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (idleTimeout < heartbeat * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}

startIdleCheckTask();
startIdleCheckTask(getUrl());
}

public Server getServer() {
Expand Down Expand Up @@ -154,7 +149,9 @@ private void doClose() {
}

private void cancelCloseTask() {
closeTimerTask.cancel();
if (closeTimerTask != null) {
closeTimerTask.cancel();
}
}

@Override
Expand Down Expand Up @@ -210,21 +207,13 @@ public ChannelHandler getChannelHandler() {
public void reset(URL url) {
server.reset(url);
try {
if (url.hasParameter(Constants.HEARTBEAT_KEY)
|| url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) {
int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
if (t < h * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}
if (h != heartbeat || t != idleTimeout) {
heartbeat = h;
idleTimeout = t;

// we need cancel the exist closeTimeout first.
cancelCloseTask();
startIdleCheckTask();
}
int currHeartbeat = UrlUtils.getHeartbeat(getUrl());
int currIdleTimeout = UrlUtils.getIdleTimeout(getUrl());
int heartbeat = UrlUtils.getHeartbeat(url);
int idleTimeout = UrlUtils.getIdleTimeout(url);
if (currHeartbeat != heartbeat || currIdleTimeout != idleTimeout) {
cancelCloseTask();
startIdleCheckTask(url);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
Expand Down Expand Up @@ -266,15 +255,21 @@ private long calculateLeastDuration(int time) {
}
}

private void startIdleCheckTask() {
AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());

long idleTimeoutTick = calculateLeastDuration(idleTimeout);
CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);
this.closeTimerTask = closeTimerTask;
private void startIdleCheckTask(URL url) {
if (shouldStartCloseTimer(url)) {
AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());
int idleTimeout = UrlUtils.getIdleTimeout(url);
long idleTimeoutTick = calculateLeastDuration(idleTimeout);
CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);
this.closeTimerTask = closeTimerTask;

// init task and start timer.
IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
// init task and start timer.
IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
}
}

private boolean shouldStartCloseTimer(URL url) {
Comment thread
beiwei30 marked this conversation as resolved.
Outdated
String transporter = url.getParameter(SERVER_KEY, url.getParameter(TRANSPORTER_KEY, "netty"));
return !transporter.equalsIgnoreCase("netty") && !transporter.equalsIgnoreCase("netty4");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,18 @@ protected void doTask(Channel channel) {
try {
Long lastRead = lastRead(channel);
Long now = now();

// Rely on reconnect timer to reconnect when AbstractClient.doConnect fails to init the connection
if (!channel.isConnected()) {
Comment thread
beiwei30 marked this conversation as resolved.
try {
logger.info("Initial connection to " + channel);
((Client) channel).reconnect();
} catch (Exception e) {
logger.error("Fail to connect to " + channel, e);
}
// check pong at client
if (lastRead != null && now - lastRead > idleTimeout) {
logger.warn("Close channel " + channel + ", because heartbeat read idle time out: "
} else if (lastRead != null && now - lastRead > idleTimeout) {
logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: "
+ idleTimeout + "ms");
try {
((Client) channel).reconnect();
Expand Down
Loading