Skip to content

Commit 35f1914

Browse files
beiwei30chickenlj
authored andcommitted
Merge pull request #3341, start to use IdleStateHandler in Netty4.
1 parent 663738f commit 35f1914

File tree

15 files changed

+209
-241
lines changed

15 files changed

+209
-241
lines changed

dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,19 @@ public static boolean isProvider(URL url) {
467467
PROVIDERS_CATEGORY.equals(url.getParameter(CATEGORY_KEY, PROVIDERS_CATEGORY));
468468
}
469469

470+
public static int getHeartbeat(URL url) {
471+
return url.getParameter(Constants.HEARTBEAT_KEY, Constants.DEFAULT_HEARTBEAT);
472+
}
473+
474+
public static int getIdleTimeout(URL url) {
475+
int heartBeat = getHeartbeat(url);
476+
int idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3);
477+
if (idleTimeout < heartBeat * 2) {
478+
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
479+
}
480+
return idleTimeout;
481+
}
482+
470483
/**
471484
* Check if the given value matches the given pattern. The pattern supports wildcard "*".
472485
*

dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Client.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
*
2626
* @see org.apache.dubbo.remoting.Transporter#connect(org.apache.dubbo.common.URL, ChannelHandler)
2727
*/
28-
public interface Client extends Endpoint, Channel, Resetable {
28+
public interface Client extends Endpoint, Channel, Resetable, IdleSensible {
2929

3030
/**
3131
* reconnect.
@@ -35,4 +35,4 @@ public interface Client extends Endpoint, Channel, Resetable {
3535
@Deprecated
3636
void reset(org.apache.dubbo.common.Parameters parameters);
3737

38-
}
38+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.dubbo.remoting;
21+
22+
/**
23+
* Indicate whether the implementation (for both server and client) has the ability to sense and handle idle connection.
24+
* If the server has the ability to handle idle connection, it should close the connection when it happens, and if
25+
* the client has the ability to handle idle connection, it should send the heartbeat to the server.
26+
*/
27+
public interface IdleSensible {
28+
/**
29+
* Whether the implementation can sense and handle the idle connection. By default it's false, the implementation
30+
* relies on dedicated timer to take care of idle connection.
31+
*
32+
* @return whether has the ability to handle idle connection
33+
*/
34+
default boolean canHandleIdle() {
35+
return false;
36+
}
37+
}

dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Server.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
*
2929
* @see org.apache.dubbo.remoting.Transporter#bind(org.apache.dubbo.common.URL, ChannelHandler)
3030
*/
31-
public interface Server extends Endpoint, Resetable {
31+
public interface Server extends Endpoint, Resetable, IdleSensible {
3232

3333
/**
3434
* is bound.
@@ -55,4 +55,4 @@ public interface Server extends Endpoint, Resetable {
5555
@Deprecated
5656
void reset(org.apache.dubbo.common.Parameters parameters);
5757

58-
}
58+
}

dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -33,38 +33,31 @@
3333
import java.util.Collections;
3434
import java.util.concurrent.TimeUnit;
3535

36+
import static org.apache.dubbo.common.utils.UrlUtils.getHeartbeat;
37+
import static org.apache.dubbo.common.utils.UrlUtils.getIdleTimeout;
38+
3639
/**
3740
* DefaultMessageClient
3841
*/
3942
public class HeaderExchangeClient implements ExchangeClient {
4043

4144
private final Client client;
4245
private final ExchangeChannel channel;
43-
private int heartbeat;
44-
private int idleTimeout;
45-
46-
private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1,
47-
TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL);
4846

47+
private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(
48+
new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL);
4949
private HeartbeatTimerTask heartBeatTimerTask;
50-
5150
private ReconnectTimerTask reconnectTimerTask;
5251

53-
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
52+
public HeaderExchangeClient(Client client, boolean startTimer) {
5453
Assert.notNull(client, "Client can't be null");
5554
this.client = client;
5655
this.channel = new HeaderExchangeChannel(client);
57-
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
58-
59-
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null &&
60-
dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
61-
this.idleTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
62-
if (idleTimeout < heartbeat * 2) {
63-
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
64-
}
6556

66-
if (needHeartbeat) {
67-
startIdleCheckTask();
57+
if (startTimer) {
58+
URL url = client.getUrl();
59+
startReconnectTask(url);
60+
startHeartBeatTask(url);
6861
}
6962
}
7063

@@ -145,6 +138,7 @@ public void startClose() {
145138
@Override
146139
public void reset(URL url) {
147140
client.reset(url);
141+
// FIXME, should cancel and restart timer tasks if parameters in the new URL are different?
148142
}
149143

150144
@Override
@@ -178,25 +172,34 @@ public boolean hasAttribute(String key) {
178172
return channel.hasAttribute(key);
179173
}
180174

181-
private void startIdleCheckTask() {
182-
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
183-
184-
long heartbeatTick = calculateLeastDuration(heartbeat);
185-
long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
186-
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
187-
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout);
188-
189-
this.heartBeatTimerTask = heartBeatTimerTask;
190-
this.reconnectTimerTask = reconnectTimerTask;
175+
private void startHeartBeatTask(URL url) {
176+
if (!client.canHandleIdle()) {
177+
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
178+
int heartbeat = getHeartbeat(url);
179+
long heartbeatTick = calculateLeastDuration(heartbeat);
180+
this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
181+
IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
182+
}
183+
}
191184

192-
// init task and start timer.
193-
IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
194-
IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
185+
private void startReconnectTask(URL url) {
186+
if (shouldReconnect(url)) {
187+
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
188+
int idleTimeout = getIdleTimeout(url);
189+
long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
190+
this.reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout);
191+
IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
192+
}
195193
}
196194

197195
private void doClose() {
198-
heartBeatTimerTask.cancel();
199-
reconnectTimerTask.cancel();
196+
if (heartBeatTimerTask != null) {
197+
heartBeatTimerTask.cancel();
198+
}
199+
200+
if (reconnectTimerTask != null) {
201+
reconnectTimerTask.cancel();
202+
}
200203
}
201204

202205
/**
@@ -210,6 +213,10 @@ private long calculateLeastDuration(int time) {
210213
}
211214
}
212215

216+
private boolean shouldReconnect(URL url) {
217+
return url.getParameter(Constants.RECONNECT_KEY, true);
218+
}
219+
213220
@Override
214221
public String toString() {
215222
return "HeaderExchangeClient [channel=" + channel + "]";

dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.dubbo.common.utils.Assert;
2626
import org.apache.dubbo.common.utils.CollectionUtils;
2727
import org.apache.dubbo.common.utils.NamedThreadFactory;
28+
import org.apache.dubbo.common.utils.UrlUtils;
2829
import org.apache.dubbo.remoting.Channel;
2930
import org.apache.dubbo.remoting.ChannelHandler;
3031
import org.apache.dubbo.remoting.RemotingException;
@@ -49,8 +50,6 @@ public class HeaderExchangeServer implements ExchangeServer {
4950
protected final Logger logger = LoggerFactory.getLogger(getClass());
5051

5152
private final Server server;
52-
private int heartbeat;
53-
private int idleTimeout;
5453
private AtomicBoolean closed = new AtomicBoolean(false);
5554

5655
private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), 1,
@@ -61,13 +60,7 @@ public class HeaderExchangeServer implements ExchangeServer {
6160
public HeaderExchangeServer(Server server) {
6261
Assert.notNull(server, "server == null");
6362
this.server = server;
64-
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
65-
this.idleTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
66-
if (idleTimeout < heartbeat * 2) {
67-
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
68-
}
69-
70-
startIdleCheckTask();
63+
startIdleCheckTask(getUrl());
7164
}
7265

7366
public Server getServer() {
@@ -154,7 +147,9 @@ private void doClose() {
154147
}
155148

156149
private void cancelCloseTask() {
157-
closeTimerTask.cancel();
150+
if (closeTimerTask != null) {
151+
closeTimerTask.cancel();
152+
}
158153
}
159154

160155
@Override
@@ -210,21 +205,13 @@ public ChannelHandler getChannelHandler() {
210205
public void reset(URL url) {
211206
server.reset(url);
212207
try {
213-
if (url.hasParameter(Constants.HEARTBEAT_KEY)
214-
|| url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) {
215-
int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
216-
int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
217-
if (t < h * 2) {
218-
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
219-
}
220-
if (h != heartbeat || t != idleTimeout) {
221-
heartbeat = h;
222-
idleTimeout = t;
223-
224-
// we need cancel the exist closeTimeout first.
225-
cancelCloseTask();
226-
startIdleCheckTask();
227-
}
208+
int currHeartbeat = UrlUtils.getHeartbeat(getUrl());
209+
int currIdleTimeout = UrlUtils.getIdleTimeout(getUrl());
210+
int heartbeat = UrlUtils.getHeartbeat(url);
211+
int idleTimeout = UrlUtils.getIdleTimeout(url);
212+
if (currHeartbeat != heartbeat || currIdleTimeout != idleTimeout) {
213+
cancelCloseTask();
214+
startIdleCheckTask(url);
228215
}
229216
} catch (Throwable t) {
230217
logger.error(t.getMessage(), t);
@@ -266,15 +253,16 @@ private long calculateLeastDuration(int time) {
266253
}
267254
}
268255

269-
private void startIdleCheckTask() {
270-
AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());
271-
272-
long idleTimeoutTick = calculateLeastDuration(idleTimeout);
273-
CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);
274-
this.closeTimerTask = closeTimerTask;
256+
private void startIdleCheckTask(URL url) {
257+
if (!server.canHandleIdle()) {
258+
AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());
259+
int idleTimeout = UrlUtils.getIdleTimeout(url);
260+
long idleTimeoutTick = calculateLeastDuration(idleTimeout);
261+
CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);
262+
this.closeTimerTask = closeTimerTask;
275263

276-
// init task and start timer.
277-
IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
264+
// init task and start timer.
265+
IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
266+
}
278267
}
279-
280268
}

dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,18 @@ protected void doTask(Channel channel) {
4141
try {
4242
Long lastRead = lastRead(channel);
4343
Long now = now();
44+
45+
// Rely on reconnect timer to reconnect when AbstractClient.doConnect fails to init the connection
46+
if (!channel.isConnected()) {
47+
try {
48+
logger.info("Initial connection to " + channel);
49+
((Client) channel).reconnect();
50+
} catch (Exception e) {
51+
logger.error("Fail to connect to " + channel, e);
52+
}
4453
// check pong at client
45-
if (lastRead != null && now - lastRead > idleTimeout) {
46-
logger.warn("Close channel " + channel + ", because heartbeat read idle time out: "
54+
} else if (lastRead != null && now - lastRead > idleTimeout) {
55+
logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: "
4756
+ idleTimeout + "ms");
4857
try {
4958
((Client) channel).reconnect();

0 commit comments

Comments
 (0)