Skip to content

Commit d42b93d

Browse files
authored
fixes #10079, address notification issue with service discovery multi subscription (#10080)
1 parent c35b2ec commit d42b93d

File tree

3 files changed

+178
-58
lines changed

3 files changed

+178
-58
lines changed

dubbo-common/src/main/java/org/apache/dubbo/common/URL.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,8 +1402,8 @@ public String getProtocolServiceKey() {
14021402
}
14031403
this.protocolServiceKey = getServiceKey();
14041404
/*
1405-
Special treatment if this is a consumer subscription url instance with no protocol specified - starts with 'consumer://'
1406-
If the specific protocol is specified on the consumer side, then this method will return as normal.
1405+
Special treatment for urls begins with 'consumer://', that is, a consumer subscription url instance with no protocol specified.
1406+
If protocol is specified on the consumer side, then this method will return as normal.
14071407
*/
14081408
if (!CONSUMER.equals(getProtocol())) {
14091409
this.protocolServiceKey += (GROUP_CHAR_SEPARATOR + getProtocol());

dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java

Lines changed: 57 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -204,33 +204,30 @@ public synchronized void addListenerAndNotify(String serviceKey, NotifyListener
204204
return;
205205
}
206206

207-
Set<String> protocolServiceKeys = getProtocolServiceKeyList(serviceKey, listener);
208-
for (String protocolServiceKey : protocolServiceKeys) {
209-
// Add to global listeners
210-
if (!this.listeners.containsKey(serviceKey)) {
211-
// synchronized method, no need to use DCL
212-
this.listeners.put(serviceKey, new ConcurrentHashSet<>());
213-
}
214-
Set<NotifyListenerWithKey> notifyListeners = this.listeners.get(serviceKey);
215-
notifyListeners.add(new NotifyListenerWithKey(protocolServiceKey, listener));
216-
}
217-
207+
Set<NotifyListenerWithKey> notifyListeners = this.listeners.computeIfAbsent(serviceKey, _k -> new ConcurrentHashSet<>());
208+
// {@code protocolServiceKeysToConsume} will be specific protocols configured in reference config or default protocols supported by framework.
209+
Set<String> protocolServiceKeysToConsume = getProtocolServiceKeyList(serviceKey, listener);
210+
// Add current listener to serviceKey set, there will have more than one listener when multiple references of one same service is configured.
211+
NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(serviceKey, protocolServiceKeysToConsume, listener);
212+
notifyListeners.add(listenerWithKey);
213+
214+
// Aggregate address and notify on subscription.
218215
List<URL> urls;
219-
if (protocolServiceKeys.size() > 1) {
216+
if (protocolServiceKeysToConsume.size() > 1) {
220217
urls = new ArrayList<>();
221-
for (NotifyListenerWithKey notifyListenerWithKey : this.listeners.get(serviceKey)) {
222-
String protocolKey = notifyListenerWithKey.getProtocolServiceKey();
223-
List<URL> urlsOfProtocol = getAddresses(protocolKey, listener.getConsumerUrl());
218+
for (String protocolServiceKey : protocolServiceKeysToConsume) {
219+
List<URL> urlsOfProtocol = getAddresses(protocolServiceKey, listener.getConsumerUrl());
224220
if (CollectionUtils.isNotEmpty(urlsOfProtocol)) {
221+
logger.info(String.format("Found %s urls of protocol service key %s ", urlsOfProtocol.size(), protocolServiceKey));
225222
urls.addAll(urlsOfProtocol);
226223
}
227224
}
228225
} else {
229-
String protocolKey = this.listeners.get(serviceKey).iterator().next().getProtocolServiceKey();
230-
urls = getAddresses(protocolKey, listener.getConsumerUrl());
226+
urls = getAddresses(protocolServiceKeysToConsume.iterator().next(), listener.getConsumerUrl());
231227
}
232228

233229
if (CollectionUtils.isNotEmpty(urls)) {
230+
logger.info(String.format("Notify serviceKey: %s, listener: %s with %s urls on subscription", serviceKey, listener, urls.size()));
234231
listener.notify(urls);
235232
}
236233
}
@@ -240,18 +237,16 @@ public synchronized void removeListener(String serviceKey, NotifyListener notify
240237
return;
241238
}
242239

243-
for (String protocolServiceKey : getProtocolServiceKeyList(serviceKey, notifyListener)) {
244-
// synchronized method, no need to use DCL
245-
Set<NotifyListenerWithKey> notifyListeners = this.listeners.get(serviceKey);
246-
if (notifyListeners != null) {
247-
NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(protocolServiceKey, notifyListener);
248-
// Remove from global listeners
249-
notifyListeners.remove(listenerWithKey);
240+
// synchronized method, no need to use DCL
241+
Set<NotifyListenerWithKey> notifyListeners = this.listeners.get(serviceKey);
242+
if (notifyListeners != null) {
243+
NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(serviceKey, notifyListener);
244+
// Remove from global listeners
245+
notifyListeners.remove(listenerWithKey);
250246

251-
// ServiceKey has no listener, remove set
252-
if (notifyListeners.size() == 0) {
253-
this.listeners.remove(serviceKey);
254-
}
247+
// ServiceKey has no listener, remove set
248+
if (notifyListeners.size() == 0) {
249+
this.listeners.remove(serviceKey);
255250
}
256251
}
257252
}
@@ -385,32 +380,32 @@ protected List<URL> getAddresses(String serviceProtocolKey, URL consumerURL) {
385380
* race condition is protected by onEvent/doOnEvent
386381
*/
387382
protected void notifyAddressChanged() {
383+
// 1 different services
388384
listeners.forEach((serviceKey, listenerSet) -> {
389-
if (listenerSet != null) {
390-
if (listenerSet.size() == 1) {
391-
NotifyListenerWithKey listenerWithKey = listenerSet.iterator().next();
392-
String protocolServiceKey = listenerWithKey.getProtocolServiceKey();
393-
NotifyListener notifyListener = listenerWithKey.getNotifyListener();
385+
// 2 multiple subscription listener of the same service
386+
for (NotifyListenerWithKey listenerWithKey : listenerSet) {
387+
NotifyListener notifyListener = listenerWithKey.getNotifyListener();
388+
if (listenerWithKey.getProtocolServiceKeys().size() == 1) {// 2.1 if one specific protocol is specified
389+
String protocolServiceKey = listenerWithKey.getProtocolServiceKeys().iterator().next();
394390
//FIXME, group wildcard match
395391
List<URL> urls = toUrlsWithEmpty(getAddresses(protocolServiceKey, notifyListener.getConsumerUrl()));
396-
logger.info("Notify service " + serviceKey + " with urls " + urls.size());
392+
logger.info("Notify service " + protocolServiceKey + " with urls " + urls.size());
397393
notifyListener.notify(urls);
398-
} else {
394+
} else {// 2.2 multiple protocols or no protocol(using default protocols) set
399395
List<URL> urls = new ArrayList<>();
400-
NotifyListener notifyListener = null;
401-
for (NotifyListenerWithKey listenerWithKey : listenerSet) {
402-
String protocolServiceKey = listenerWithKey.getProtocolServiceKey();
403-
notifyListener = listenerWithKey.getNotifyListener();
396+
int effectiveProtocolNum = 0;
397+
for (String protocolServiceKey : listenerWithKey.getProtocolServiceKeys()) {
404398
List<URL> tmpUrls = getAddresses(protocolServiceKey, notifyListener.getConsumerUrl());
405399
if (CollectionUtils.isNotEmpty(tmpUrls)) {
400+
logger.info("Found " + urls.size() + " urls of protocol service key " + protocolServiceKey);
401+
effectiveProtocolNum++;
406402
urls.addAll(tmpUrls);
407403
}
408404
}
409-
if (notifyListener != null) {
410-
logger.info("Notify service " + serviceKey + " with urls " + urls.size());
411-
urls = toUrlsWithEmpty(urls);
412-
notifyListener.notify(urls);
413-
}
405+
406+
logger.info("Notify service " + serviceKey + " with " + urls.size() + " urls from " + effectiveProtocolNum + " different protocols");
407+
urls = toUrlsWithEmpty(urls);
408+
notifyListener.notify(urls);
414409
}
415410
}
416411
});
@@ -477,7 +472,7 @@ public int hashCode() {
477472
* Calculate the protocol list that the consumer cares about.
478473
*
479474
* @param serviceKey possible input serviceKey includes
480-
* 1. {group}/{interface}:{version}:consumer
475+
* 1. {group}/{interface}:{version}, if protocol is not specified
481476
* 2. {group}/{interface}:{version}:{user specified protocols}
482477
* @param listener listener also contains the user specified protocols
483478
* @return protocol list with the format {group}/{interface}:{version}:{protocol}
@@ -530,16 +525,26 @@ public void run() {
530525
}
531526

532527
public static class NotifyListenerWithKey {
533-
private final String protocolServiceKey;
528+
private final String serviceKey;
529+
private final Set<String> protocolServiceKeys;
534530
private final NotifyListener notifyListener;
535531

536-
public NotifyListenerWithKey(String protocolServiceKey, NotifyListener notifyListener) {
537-
this.protocolServiceKey = protocolServiceKey;
532+
public NotifyListenerWithKey(String protocolServiceKey, Set<String> protocolServiceKeys, NotifyListener notifyListener) {
533+
this.serviceKey = protocolServiceKey;
534+
this.protocolServiceKeys = (protocolServiceKeys == null ? new ConcurrentHashSet<>() : protocolServiceKeys);
538535
this.notifyListener = notifyListener;
539536
}
540537

541-
public String getProtocolServiceKey() {
542-
return protocolServiceKey;
538+
public NotifyListenerWithKey(String protocolServiceKey, NotifyListener notifyListener) {
539+
this(protocolServiceKey, null, notifyListener);
540+
}
541+
542+
public String getServiceKey() {
543+
return serviceKey;
544+
}
545+
546+
public Set<String> getProtocolServiceKeys() {
547+
return protocolServiceKeys;
543548
}
544549

545550
public NotifyListener getNotifyListener() {
@@ -555,12 +560,12 @@ public boolean equals(Object o) {
555560
return false;
556561
}
557562
NotifyListenerWithKey that = (NotifyListenerWithKey) o;
558-
return Objects.equals(protocolServiceKey, that.protocolServiceKey) && Objects.equals(notifyListener, that.notifyListener);
563+
return Objects.equals(serviceKey, that.serviceKey) && Objects.equals(notifyListener, that.notifyListener);
559564
}
560565

561566
@Override
562567
public int hashCode() {
563-
return Objects.hash(protocolServiceKey, notifyListener);
568+
return Objects.hash(serviceKey, notifyListener);
564569
}
565570
}
566571
}

0 commit comments

Comments
 (0)