Skip to content

Commit d27fb1f

Browse files
zonghaishangchickenlj
authored andcommitted
Merge pull request #3295, unregister consumer url when client destroyed (referenceconfig#destroy).
* fix client reconnect offline provider. * refactor cancel future. * fix client reconnect offline provider. * refactor cancel future. * fix client reconnect offline provider. * refactor cancel future. * fix unregister when client destroyed
1 parent 0642b3e commit d27fb1f

File tree

3 files changed

+51
-7
lines changed

3 files changed

+51
-7
lines changed

dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
import org.apache.dubbo.common.logger.Logger;
2424
import org.apache.dubbo.common.logger.LoggerFactory;
2525
import org.apache.dubbo.common.utils.Assert;
26+
import org.apache.dubbo.common.utils.CollectionUtils;
2627
import org.apache.dubbo.common.utils.NetUtils;
2728
import org.apache.dubbo.common.utils.StringUtils;
2829
import org.apache.dubbo.common.utils.UrlUtils;
29-
import org.apache.dubbo.common.utils.CollectionUtils;
3030
import org.apache.dubbo.configcenter.DynamicConfiguration;
3131
import org.apache.dubbo.registry.NotifyListener;
3232
import org.apache.dubbo.registry.Registry;
@@ -90,6 +90,8 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
9090

9191
private volatile URL overrideDirectoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
9292

93+
private volatile URL registeredConsumerUrl;
94+
9395
/**
9496
* override rules
9597
* Priority: override>-D>consumer>provider
@@ -158,6 +160,15 @@ public void destroy() {
158160
if (isDestroyed()) {
159161
return;
160162
}
163+
164+
// unregister.
165+
try {
166+
if (getRegisteredConsumerUrl() != null && registry != null && registry.isAvailable()) {
167+
registry.unregister(getRegisteredConsumerUrl());
168+
}
169+
} catch (Throwable t) {
170+
logger.warn("unexpected error when unregister service " + serviceKey + "from registry" + registry.getUrl(), t);
171+
}
161172
// unsubscribe.
162173
try {
163174
if (getConsumerUrl() != null && registry != null && registry.isAvailable()) {
@@ -565,6 +576,14 @@ public URL getUrl() {
565576
return this.overrideDirectoryUrl;
566577
}
567578

579+
public URL getRegisteredConsumerUrl() {
580+
return registeredConsumerUrl;
581+
}
582+
583+
public void setRegisteredConsumerUrl(URL registeredConsumerUrl) {
584+
this.registeredConsumerUrl = registeredConsumerUrl;
585+
}
586+
568587
@Override
569588
public boolean isAvailable() {
570589
if (isDestroyed()) {

dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,8 @@ private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type
372372
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
373373
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
374374
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
375-
registry.register(getRegisteredConsumerUrl(subscribeUrl, url));
375+
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
376+
registry.register(directory.getRegisteredConsumerUrl());
376377
}
377378
directory.buildRouterChain(subscribeUrl);
378379
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
@@ -383,7 +384,7 @@ private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type
383384
return invoker;
384385
}
385386

386-
private URL getRegisteredConsumerUrl(final URL consumerUrl, URL registryUrl) {
387+
public URL getRegisteredConsumerUrl(final URL consumerUrl, URL registryUrl) {
387388
if (!registryUrl.getParameter(SIMPLIFIED_KEY, false)) {
388389
return consumerUrl.addParameters(CATEGORY_KEY, CONSUMERS_CATEGORY,
389390
CHECK_KEY, String.valueOf(false));

dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ private synchronized void initConnectStatusCheckCommand() {
153153
@Override
154154
public void run() {
155155
try {
156+
if (cancelFutureIfOffline()) return;
157+
156158
if (!isConnected()) {
157159
connect();
158160
} else {
@@ -173,7 +175,29 @@ public void run() {
173175
}
174176
}
175177
}
178+
179+
private boolean cancelFutureIfOffline() {
180+
/**
181+
* If the provider service is detected offline,
182+
* the client should not attempt to connect again.
183+
*
184+
* issue: https://github.com/apache/incubator-dubbo/issues/3158
185+
*/
186+
if(isClosed()) {
187+
ScheduledFuture<?> future = reconnectExecutorFuture;
188+
if(future != null && !future.isCancelled()){
189+
/**
190+
* Client has been destroyed and
191+
* scheduled task should be cancelled.
192+
*/
193+
future.cancel(true);
194+
}
195+
return true;
196+
}
197+
return false;
198+
}
176199
};
200+
177201
reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
178202
}
179203
}
@@ -345,14 +369,14 @@ public void reconnect() throws RemotingException {
345369
@Override
346370
public void close() {
347371
try {
348-
if (executor != null) {
349-
ExecutorUtil.shutdownNow(executor, 100);
350-
}
372+
super.close();
351373
} catch (Throwable e) {
352374
logger.warn(e.getMessage(), e);
353375
}
354376
try {
355-
super.close();
377+
if (executor != null) {
378+
ExecutorUtil.shutdownNow(executor, 100);
379+
}
356380
} catch (Throwable e) {
357381
logger.warn(e.getMessage(), e);
358382
}

0 commit comments

Comments
 (0)