Skip to content

Commit 0f86000

Browse files
lexburnerbeiwei30
authored andcommitted
Improve/heartbeat (#3276)
* add the notice of code style * modify the pic * del teh faq.md, move to dubbo admin * improve:remove the heartbeat on server side * improve:change the scope of timer to static
1 parent 0a2b9bf commit 0f86000

File tree

4 files changed

+100
-79
lines changed

4 files changed

+100
-79
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dubbo.remoting.exchange.support.header;
19+
20+
import org.apache.dubbo.common.logger.Logger;
21+
import org.apache.dubbo.common.logger.LoggerFactory;
22+
import org.apache.dubbo.remoting.Channel;
23+
24+
/**
25+
* CloseTimerTask
26+
*/
27+
public class CloseTimerTask extends AbstractTimerTask {
28+
29+
private static final Logger logger = LoggerFactory.getLogger(CloseTimerTask.class);
30+
31+
private final int idleTimeout;
32+
33+
public CloseTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int idleTimeout) {
34+
super(channelProvider, heartbeatTimeoutTick);
35+
this.idleTimeout = idleTimeout;
36+
}
37+
38+
@Override
39+
protected void doTask(Channel channel) {
40+
try {
41+
Long lastRead = lastRead(channel);
42+
Long lastWrite = lastWrite(channel);
43+
Long now = now();
44+
// check ping & pong at server
45+
if ((lastRead != null && now - lastRead > idleTimeout)
46+
|| (lastWrite != null && now - lastWrite > idleTimeout)) {
47+
logger.warn("Close channel " + channel + ", because idleCheck timeout: "
48+
+ idleTimeout + "ms");
49+
channel.close();
50+
}
51+
} catch (Throwable t) {
52+
logger.warn("Exception when close remote channel " + channel.getRemoteAddress(), t);
53+
}
54+
}
55+
}

dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.dubbo.common.Constants;
2020
import org.apache.dubbo.common.URL;
2121
import org.apache.dubbo.common.timer.HashedWheelTimer;
22+
import org.apache.dubbo.common.utils.Assert;
2223
import org.apache.dubbo.common.utils.NamedThreadFactory;
2324
import org.apache.dubbo.remoting.ChannelHandler;
2425
import org.apache.dubbo.remoting.Client;
@@ -39,32 +40,27 @@ public class HeaderExchangeClient implements ExchangeClient {
3940

4041
private final Client client;
4142
private final ExchangeChannel channel;
42-
// heartbeat(ms), default value is 0 , won't execute a heartbeat.
4343
private int heartbeat;
44-
private int heartbeatTimeout;
44+
private int idleTimeout;
4545

46-
private HashedWheelTimer heartbeatTimer;
46+
private static HashedWheelTimer idleCheckTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1,
47+
TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL);
4748

4849
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
49-
if (client == null) {
50-
throw new IllegalArgumentException("client == null");
51-
}
50+
Assert.notNull(client, "Client can't be null");
5251
this.client = client;
5352
this.channel = new HeaderExchangeChannel(client);
5453
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
5554

5655
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null &&
5756
dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
58-
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
59-
if (heartbeatTimeout < heartbeat * 2) {
60-
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
57+
this.idleTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
58+
if (idleTimeout < heartbeat * 2) {
59+
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
6160
}
6261

6362
if (needHeartbeat) {
64-
long tickDuration = calculateLeastDuration(heartbeat);
65-
heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true), tickDuration,
66-
TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
67-
startHeartbeatTimer();
63+
startIdleCheckTask();
6864
}
6965
}
7066

@@ -178,28 +174,20 @@ public boolean hasAttribute(String key) {
178174
return channel.hasAttribute(key);
179175
}
180176

181-
private void startHeartbeatTimer() {
177+
private void startIdleCheckTask() {
182178
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
183179

184180
long heartbeatTick = calculateLeastDuration(heartbeat);
185-
long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
181+
long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
186182
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
187-
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);
183+
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout);
188184

189185
// init task and start timer.
190-
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
191-
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
192-
}
193-
194-
private void stopHeartbeatTimer() {
195-
if (heartbeatTimer != null) {
196-
heartbeatTimer.stop();
197-
heartbeatTimer = null;
198-
}
186+
idleCheckTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
187+
idleCheckTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
199188
}
200189

201190
private void doClose() {
202-
stopHeartbeatTimer();
203191
}
204192

205193
/**

dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java

Lines changed: 17 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.dubbo.common.logger.Logger;
2323
import org.apache.dubbo.common.logger.LoggerFactory;
2424
import org.apache.dubbo.common.timer.HashedWheelTimer;
25+
import org.apache.dubbo.common.utils.Assert;
2526
import org.apache.dubbo.common.utils.CollectionUtils;
2627
import org.apache.dubbo.common.utils.NamedThreadFactory;
2728
import org.apache.dubbo.remoting.Channel;
@@ -48,25 +49,23 @@ public class HeaderExchangeServer implements ExchangeServer {
4849
protected final Logger logger = LoggerFactory.getLogger(getClass());
4950

5051
private final Server server;
51-
// heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
5252
private int heartbeat;
53-
private int heartbeatTimeout;
53+
private int idleTimeout;
5454
private AtomicBoolean closed = new AtomicBoolean(false);
5555

56-
private HashedWheelTimer heartbeatTimer;
56+
private static HashedWheelTimer idleCheckTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), 1,
57+
TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL);
5758

5859
public HeaderExchangeServer(Server server) {
59-
if (server == null) {
60-
throw new IllegalArgumentException("server == null");
61-
}
60+
Assert.notNull(server, "server == null");
6261
this.server = server;
6362
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
64-
this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
65-
if (heartbeatTimeout < heartbeat * 2) {
66-
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
63+
this.idleTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
64+
if (idleTimeout < heartbeat * 2) {
65+
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
6766
}
6867

69-
startHeartbeatTimer();
68+
startIdleCheckTask();
7069
}
7170

7271
public Server getServer() {
@@ -149,7 +148,6 @@ private void doClose() {
149148
if (!closed.compareAndSet(false, true)) {
150149
return;
151150
}
152-
stopHeartbeatTimer();
153151
}
154152

155153
@Override
@@ -210,14 +208,13 @@ public void reset(URL url) {
210208
int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
211209
int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
212210
if (t < h * 2) {
213-
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
211+
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
214212
}
215-
if (h != heartbeat || t != heartbeatTimeout) {
213+
if (h != heartbeat || t != idleTimeout) {
216214
heartbeat = h;
217-
heartbeatTimeout = t;
215+
idleTimeout = t;
218216

219-
stopHeartbeatTimer();
220-
startHeartbeatTimer();
217+
startIdleCheckTask();
221218
}
222219
}
223220
} catch (Throwable t) {
@@ -260,28 +257,14 @@ private long calculateLeastDuration(int time) {
260257
}
261258
}
262259

263-
private void startHeartbeatTimer() {
264-
long tickDuration = calculateLeastDuration(heartbeat);
265-
heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-heartbeat", true), tickDuration,
266-
TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
267-
260+
private void startIdleCheckTask() {
268261
AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());
269262

270-
long heartbeatTick = calculateLeastDuration(heartbeat);
271-
long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
272-
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
273-
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);
263+
long idleTimeoutTick = calculateLeastDuration(idleTimeout);
264+
CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);
274265

275266
// init task and start timer.
276-
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
277-
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
278-
}
279-
280-
private void stopHeartbeatTimer() {
281-
if (heartbeatTimer != null) {
282-
heartbeatTimer.stop();
283-
heartbeatTimer = null;
284-
}
267+
idleCheckTimer.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
285268
}
286269

287270
}

dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,35 +29,30 @@ public class ReconnectTimerTask extends AbstractTimerTask {
2929

3030
private static final Logger logger = LoggerFactory.getLogger(ReconnectTimerTask.class);
3131

32-
private final int heartbeatTimeout;
32+
private final int idleTimeout;
3333

34-
ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int heartbeatTimeout1) {
34+
public ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int idleTimeout) {
3535
super(channelProvider, heartbeatTimeoutTick);
36-
this.heartbeatTimeout = heartbeatTimeout1;
36+
this.idleTimeout = idleTimeout;
3737
}
3838

3939
@Override
4040
protected void doTask(Channel channel) {
41-
Long lastRead = lastRead(channel);
42-
Long now = now();
43-
if (lastRead != null && now - lastRead > heartbeatTimeout) {
44-
if (channel instanceof Client) {
41+
try {
42+
Long lastRead = lastRead(channel);
43+
Long now = now();
44+
// check pong at client
45+
if (lastRead != null && now - lastRead > idleTimeout) {
46+
logger.warn("Close channel " + channel + ", because heartbeat read idle time out: "
47+
+ idleTimeout + "ms");
4548
try {
46-
logger.warn("Reconnect to remote channel " + channel.getRemoteAddress() + ", because heartbeat read idle time out: "
47-
+ heartbeatTimeout + "ms");
4849
((Client) channel).reconnect();
49-
} catch (Throwable t) {
50-
// do nothing
51-
}
52-
} else {
53-
try {
54-
logger.warn("Close channel " + channel + ", because heartbeat read idle time out: "
55-
+ heartbeatTimeout + "ms");
56-
channel.close();
57-
} catch (Throwable t) {
58-
logger.warn("Exception when close channel " + channel, t);
50+
} catch (Exception e) {
51+
logger.error(channel + "reconnect failed during idle time.", e);
5952
}
6053
}
54+
} catch (Throwable t) {
55+
logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t);
6156
}
6257
}
6358
}

0 commit comments

Comments
 (0)