Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
Expand Down Expand Up @@ -90,6 +90,8 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify

private volatile URL overrideDirectoryUrl; // Initialization at construction time, assertion not null, and always assign non null value

private volatile URL registeredConsumerUrl;

/**
* override rules
* Priority: override>-D>consumer>provider
Expand Down Expand Up @@ -158,6 +160,15 @@ public void destroy() {
if (isDestroyed()) {
return;
}

// unregister.
try {
if (getRegisteredConsumerUrl() != null && registry != null && registry.isAvailable()) {
registry.unregister(getRegisteredConsumerUrl());
}
} catch (Throwable t) {
logger.warn("unexpected error when unregister service " + serviceKey + "from registry" + registry.getUrl(), t);
}
// unsubscribe.
try {
if (getConsumerUrl() != null && registry != null && registry.isAvailable()) {
Expand Down Expand Up @@ -565,6 +576,14 @@ public URL getUrl() {
return this.overrideDirectoryUrl;
}

public URL getRegisteredConsumerUrl() {
return registeredConsumerUrl;
}

public void setRegisteredConsumerUrl(URL registeredConsumerUrl) {
this.registeredConsumerUrl = registeredConsumerUrl;
}

@Override
public boolean isAvailable() {
if (isDestroyed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
registry.register(getRegisteredConsumerUrl(subscribeUrl, url));
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
Expand All @@ -385,7 +386,7 @@ private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type
return invoker;
}

private URL getRegisteredConsumerUrl(final URL consumerUrl, URL registryUrl) {
public URL getRegisteredConsumerUrl(final URL consumerUrl, URL registryUrl) {
if (!registryUrl.getParameter(SIMPLE_CONSUMER_CONFIG_KEY, false)) {
return consumerUrl.addParameters(CATEGORY_KEY, CONSUMERS_CATEGORY,
CHECK_KEY, String.valueOf(false));
Expand Down Expand Up @@ -598,6 +599,7 @@ protected void notifyOverrides() {
overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary());
}
}

/**
* exporter proxy, establish the corresponding relationship between the returned exporter and the exporter
* exported by the protocol, and can modify the relationship at the time of override.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ private synchronized void initConnectStatusCheckCommand() {
@Override
public void run() {
try {
if (cancelFutureIfOffline()) return;

if (!isConnected()) {
connect();
} else {
Expand All @@ -173,7 +175,29 @@ public void run() {
}
}
}

private boolean cancelFutureIfOffline() {
/**
* If the provider service is detected offline,
* the client should not attempt to connect again.
*
* issue: https://github.com/apache/incubator-dubbo/issues/3158
*/
if(isClosed()) {
ScheduledFuture<?> future = reconnectExecutorFuture;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you can use the ExecutorUtil#cancelSurtureFuture

if(future != null && !future.isCancelled()){
/**
* Client has been destroyed and
* scheduled task should be cancelled.
*/
future.cancel(true);
}
return true;
}
return false;
}
};

reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
}
}
Expand Down Expand Up @@ -345,14 +369,14 @@ public void reconnect() throws RemotingException {
@Override
public void close() {
try {
if (executor != null) {
ExecutorUtil.shutdownNow(executor, 100);
}
super.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
super.close();
if (executor != null) {
ExecutorUtil.shutdownNow(executor, 100);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
Expand Down