Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
Expand All @@ -39,7 +38,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -92,8 +90,7 @@ public class NacosRegistry extends FailbackRegistry {

/**
* The separator for service name
*
* @revert change a constant to be configurable, it's designed for Windows file name that is compatible with old
* Change a constant to be configurable, it's designed for Windows file name that is compatible with old
* Nacos binary release(< 0.6.1)
*/
private static final String SERVICE_NAME_SEPARATOR = System.getProperty("nacos.service.name.separator", ":");
Expand Down Expand Up @@ -122,7 +119,7 @@ public class NacosRegistry extends FailbackRegistry {
public NacosRegistry(URL url, NamingService namingService) {
super(url);
this.namingService = namingService;
this.nacosListeners = new ConcurrentHashMap<String, EventListener>();
this.nacosListeners = new ConcurrentHashMap<>();
}

@Override
Expand All @@ -132,15 +129,12 @@ public boolean isAvailable() {

@Override
public List<URL> lookup(final URL url) {
final List<URL> urls = new LinkedList<URL>();
execute(new NamingServiceCallback() {
@Override
public void callback(NamingService namingService) throws NacosException {
List<String> serviceNames = getServiceNames(url, null);
for (String serviceName : serviceNames) {
List<Instance> instances = namingService.getAllInstances(serviceName);
urls.addAll(buildURLs(url, instances));
}
final List<URL> urls = new LinkedList<>();
execute(namingService -> {
List<String> serviceNames = getServiceNames(url, null);
for (String serviceName : serviceNames) {
List<Instance> instances = namingService.getAllInstances(serviceName);
urls.addAll(buildURLs(url, instances));
}
});
return urls;
Expand All @@ -150,21 +144,15 @@ public void callback(NamingService namingService) throws NacosException {
public void doRegister(URL url) {
final String serviceName = getServiceName(url);
final Instance instance = createInstance(url);
execute(new NamingServiceCallback() {
public void callback(NamingService namingService) throws NacosException {
namingService.registerInstance(serviceName, instance);
}
});
execute(namingService -> namingService.registerInstance(serviceName, instance));
}

@Override
public void doUnregister(final URL url) {
execute(new NamingServiceCallback() {
public void callback(NamingService namingService) throws NacosException {
String serviceName = getServiceName(url);
Instance instance = createInstance(url);
namingService.deregisterInstance(serviceName, instance.getIp(), instance.getPort());
}
execute(namingService -> {
String serviceName = getServiceName(url);
Instance instance = createInstance(url);
namingService.deregisterInstance(serviceName, instance.getIp(), instance.getPort());
});
}

Expand All @@ -175,14 +163,11 @@ public void doSubscribe(final URL url, final NotifyListener listener) {
}

private void doSubscribe(final URL url, final NotifyListener listener, final List<String> serviceNames) {
execute(new NamingServiceCallback() {
@Override
public void callback(NamingService namingService) throws NacosException {
for (String serviceName : serviceNames) {
List<Instance> instances = namingService.getAllInstances(serviceName);
notifySubscriber(url, listener, instances);
subscribeEventListener(serviceName, url, listener);
}
execute(namingService -> {
for (String serviceName : serviceNames) {
List<Instance> instances = namingService.getAllInstances(serviceName);
notifySubscriber(url, listener, instances);
subscribeEventListener(serviceName, url, listener);
}
});
}
Expand Down Expand Up @@ -223,26 +208,20 @@ private boolean isAdminProtocol(URL url) {
private void scheduleServiceNamesLookup(final URL url, final NotifyListener listener) {
if (scheduledExecutorService == null) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
List<String> serviceNames = getAllServiceNames();
filterData(serviceNames, new NacosDataFilter<String>() {
@Override
public boolean accept(String serviceName) {
boolean accepted = false;
for (String category : ALL_SUPPORTED_CATEGORIES) {
String prefix = category + SERVICE_NAME_SEPARATOR;
if (StringUtils.startsWith(serviceName, prefix)) {
accepted = true;
break;
}
}
return accepted;
scheduledExecutorService.scheduleAtFixedRate(() -> {
List<String> serviceNames = getAllServiceNames();
filterData(serviceNames, serviceName -> {
boolean accepted = false;
for (String category : ALL_SUPPORTED_CATEGORIES) {
String prefix = category + SERVICE_NAME_SEPARATOR;
if (StringUtils.startsWith(serviceName, prefix)) {
accepted = true;
break;
}
});
doSubscribe(url, listener, serviceNames);
}
}
return accepted;
});
doSubscribe(url, listener, serviceNames);
}, LOOKUP_INTERVAL, LOOKUP_INTERVAL, TimeUnit.SECONDS);
}
}
Expand All @@ -261,34 +240,31 @@ private List<String> getServiceNamesForOps(URL url) {

private List<String> getAllServiceNames() {

final List<String> serviceNames = new LinkedList<String>();

execute(new NamingServiceCallback() {
@Override
public void callback(NamingService namingService) throws NacosException {

int pageIndex = 1;
ListView<String> listView = namingService.getServicesOfServer(pageIndex, PAGINATION_SIZE);
// First page data
List<String> firstPageData = listView.getData();
// Append first page into list
serviceNames.addAll(firstPageData);
// the total count
int count = listView.getCount();
// the number of pages
int pageNumbers = count / PAGINATION_SIZE;
int remainder = count % PAGINATION_SIZE;
// remain
if (remainder > 0) {
pageNumbers += 1;
}
// If more than 1 page
while (pageIndex < pageNumbers) {
listView = namingService.getServicesOfServer(++pageIndex, PAGINATION_SIZE);
serviceNames.addAll(listView.getData());
}

final List<String> serviceNames = new LinkedList<>();

execute(namingService -> {

int pageIndex = 1;
ListView<String> listView = namingService.getServicesOfServer(pageIndex, PAGINATION_SIZE);
// First page data
List<String> firstPageData = listView.getData();
// Append first page into list
serviceNames.addAll(firstPageData);
// the total count
int count = listView.getCount();
// the number of pages
int pageNumbers = count / PAGINATION_SIZE;
int remainder = count % PAGINATION_SIZE;
// remain
if (remainder > 0) {
pageNumbers += 1;
}
// If more than 1 page
while (pageIndex < pageNumbers) {
listView = namingService.getServicesOfServer(++pageIndex, PAGINATION_SIZE);
serviceNames.addAll(listView.getData());
}

});

return serviceNames;
Expand All @@ -304,61 +280,50 @@ private void filterServiceNames(List<String> serviceNames, URL url) {

final String targetGroup = url.getParameter(GROUP_KEY);

filterData(serviceNames, new NacosDataFilter<String>() {
@Override
public boolean accept(String serviceName) {
// split service name to segments
// (required) segments[0] = category
// (required) segments[1] = serviceInterface
// (required) segments[2] = version
// (optional) segments[3] = group
String[] segments = StringUtils.split(serviceName, SERVICE_NAME_SEPARATOR);
int length = segments.length;
if (length < 3) { // must present 3 segments or more
return false;
}

String category = segments[CATEGORY_INDEX];
if (!ArrayUtils.contains(categories, category)) { // no match category
return false;
}

String serviceInterface = segments[SERVICE_INTERFACE_INDEX];
if (!WILDCARD.equals(targetServiceInterface) &&
!StringUtils.equals(targetServiceInterface, serviceInterface)) { // no match service interface
return false;
}
filterData(serviceNames, serviceName -> {
// split service name to segments
// (required) segments[0] = category
// (required) segments[1] = serviceInterface
// (required) segments[2] = version
// (optional) segments[3] = group
String[] segments = StringUtils.split(serviceName, SERVICE_NAME_SEPARATOR);
int length = segments.length;
if (length < 3) { // must present 3 segments or more
return false;
}

String version = segments[SERVICE_VERSION_INDEX];
if (!WILDCARD.equals(targetVersion) &&
!StringUtils.equals(targetVersion, version)) { // no match service version
return false;
}
String category = segments[CATEGORY_INDEX];
if (!ArrayUtils.contains(categories, category)) { // no match category
return false;
}

String group = length > 3 ? segments[SERVICE_GROUP_INDEX] : null;
if (group != null && !WILDCARD.equals(targetGroup)
&& !StringUtils.equals(targetGroup, group)) { // no match service group
return false;
}
String serviceInterface = segments[SERVICE_INTERFACE_INDEX];
if (!WILDCARD.equals(targetServiceInterface) &&
!StringUtils.equals(targetServiceInterface, serviceInterface)) { // no match service interface
return false;
}

return true;
String version = segments[SERVICE_VERSION_INDEX];
if (!WILDCARD.equals(targetVersion) &&
!StringUtils.equals(targetVersion, version)) { // no match service version
return false;
}

String group = length > 3 ? segments[SERVICE_GROUP_INDEX] : null;
// no match service group
return group == null || WILDCARD.equals(targetGroup)
|| StringUtils.equals(targetGroup, group);
});
}

private <T> void filterData(Collection<T> collection, NacosDataFilter<T> filter) {
Iterator<T> iterator = collection.iterator();
while (iterator.hasNext()) {
T data = iterator.next();
if (!filter.accept(data)) { // remove if not accept
iterator.remove();
}
}
// remove if not accept
collection.removeIf(data -> !filter.accept(data));
}

private List<String> doGetServiceNames(URL url) {
String[] categories = getCategories(url);
List<String> serviceNames = new ArrayList<String>(categories.length);
List<String> serviceNames = new ArrayList<>(categories.length);
for (String category : categories) {
final String serviceName = getServiceName(url, category);
serviceNames.add(serviceName);
Expand All @@ -370,7 +335,7 @@ private List<URL> buildURLs(URL consumerURL, Collection<Instance> instances) {
if (instances.isEmpty()) {
return Collections.emptyList();
}
List<URL> urls = new LinkedList<URL>();
List<URL> urls = new LinkedList<>();
for (Instance instance : instances) {
URL url = buildURL(instance);
if (UrlUtils.isMatch(consumerURL, url)) {
Expand All @@ -383,12 +348,10 @@ private List<URL> buildURLs(URL consumerURL, Collection<Instance> instances) {
private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
throws NacosException {
if (!nacosListeners.containsKey(serviceName)) {
EventListener eventListener = new EventListener() {
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
NamingEvent e = (NamingEvent) event;
notifySubscriber(url, listener, e.getInstances());
}
EventListener eventListener = event -> {
if (event instanceof NamingEvent) {
NamingEvent e = (NamingEvent) event;
notifySubscriber(url, listener, e.getInstances());
}
};
namingService.subscribe(serviceName, eventListener);
Expand All @@ -404,7 +367,7 @@ public void onEvent(Event event) {
* @param instances all {@link Instance instances}
*/
private void notifySubscriber(URL url, NotifyListener listener, Collection<Instance> instances) {
List<Instance> healthyInstances = new LinkedList<Instance>(instances);
List<Instance> healthyInstances = new LinkedList<>(instances);
// Healthy Instances
filterHealthyInstances(healthyInstances);
List<URL> urls = buildURLs(url, healthyInstances);
Expand All @@ -426,12 +389,11 @@ private URL buildURL(Instance instance) {
Map<String, String> metadata = instance.getMetadata();
String protocol = metadata.get(Constants.PROTOCOL_KEY);
String path = metadata.get(Constants.PATH_KEY);
URL url = new URL(protocol,
return new URL(protocol,
instance.getIp(),
instance.getPort(),
path,
instance.getMetadata());
return url;
}

private Instance createInstance(URL url) {
Expand All @@ -445,7 +407,7 @@ private Instance createInstance(URL url) {
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setMetadata(new HashMap<String, String>(newURL.getParameters()));
instance.setMetadata(new HashMap<>(newURL.getParameters()));
return instance;
}

Expand Down Expand Up @@ -480,14 +442,10 @@ private void execute(NamingServiceCallback callback) {
}

private void filterHealthyInstances(Collection<Instance> instances) {
filterData(instances, new NacosDataFilter<Instance>() {
@Override
public boolean accept(Instance data) {
return data.isEnabled();
}
});
filterData(instances, Instance::isEnabled);
}

@SafeVarargs
private static <T> T[] of(T... values) {
return values;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected Registry createRegistry(URL url) {

private NamingService buildNamingService(URL url) {
Properties nacosProperties = buildNacosProperties(url);
NamingService namingService = null;
NamingService namingService;
try {
namingService = NacosFactory.createNamingService(nacosProperties);
} catch (NacosException e) {
Expand Down