Skip to content

Commit c84511c

Browse files
beiwei30chickenlj
authored andcommitted
Merge pull request #3044, code review around RouterChain.
1 parent 77891b4 commit c84511c

File tree

10 files changed

+80
-149
lines changed

10 files changed

+80
-149
lines changed

dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.dubbo.rpc.RpcException;
2323

2424
import java.util.List;
25-
import java.util.Map;
2625

2726
/**
2827
* Router. (SPI, Prototype, ThreadSafe)
@@ -43,24 +42,25 @@ public interface Router extends Comparable<Router> {
4342
/**
4443
* Filter invokers with current routing rule and only return the invokers that comply with the rule.
4544
*
46-
* @param invokers
45+
* @param invokers invoker list
4746
* @param url refer url
48-
* @param invocation
47+
* @param invocation invocation
4948
* @return routed invokers
5049
* @throws RpcException
5150
*/
5251
<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
5352

54-
default <T> Map<String, List<Invoker<T>>> preRoute(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
55-
return null;
56-
}
5753

5854
/**
59-
* Each router has a reference of the router chain.
55+
* Notify the router the invoker list. Invoker list may change from time to time. This method gives the router a
56+
* chance to prepare before {@link Router#route(List, URL, Invocation)} gets called.
6057
*
61-
* @param routerChain
58+
* @param invokers invoker list
59+
* @param <T> invoker's type
6260
*/
63-
void addRouterChain(RouterChain routerChain);
61+
default <T> void notify(List<Invoker<T>> invokers) {
62+
63+
}
6464

6565
/**
6666
* To decide whether this router need to execute every time an RPC comes or should only execute when addresses or rule change.
@@ -83,4 +83,4 @@ default <T> Map<String, List<Invoker<T>>> preRoute(List<Invoker<T>> invokers, UR
8383
* @return
8484
*/
8585
int getPriority();
86-
}
86+
}

dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java

Lines changed: 40 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import org.apache.dubbo.common.URL;
2020
import org.apache.dubbo.common.extension.ExtensionLoader;
21-
import org.apache.dubbo.common.utils.CollectionUtils;
2221
import org.apache.dubbo.rpc.Invocation;
2322
import org.apache.dubbo.rpc.Invoker;
2423

@@ -33,92 +32,78 @@
3332
public class RouterChain<T> {
3433

3534
// full list of addresses from registry, classified by method name.
36-
private List<Invoker<T>> fullInvokers;
35+
private List<Invoker<T>> invokers = Collections.emptyList();
3736
private URL url;
3837

3938
// containing all routers, reconstruct every time 'route://' urls change.
40-
private List<Router> routers = new CopyOnWriteArrayList<>();
41-
// Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the rule for each instance may change but the instance will never delete or recreate.
42-
private List<Router> residentRouters;
39+
private volatile List<Router> routers = Collections.emptyList();
4340

44-
public static <T> RouterChain<T> buildChain(URL url) {
45-
RouterChain<T> routerChain = new RouterChain<>(url);
46-
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class).getActivateExtension(url, (String[]) null);
47-
List<Router> routers = extensionFactories.stream()
48-
.map(factory -> {
49-
Router router = factory.getRouter(url);
50-
router.addRouterChain(routerChain);
51-
return router;
52-
}).collect(Collectors.toList());
53-
routerChain.setResidentRouters(routers);
54-
return routerChain;
55-
}
41+
// Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the rule for each instance may change but the
42+
// instance will never delete or recreate.
43+
private List<Router> builtinRouters = Collections.emptyList();
5644

57-
protected RouterChain(List<Router> routers) {
58-
this.routers.addAll(routers);
45+
public static <T> RouterChain<T> buildChain(URL url) {
46+
return new RouterChain<>(url);
5947
}
6048

61-
protected RouterChain(URL url) {
49+
private RouterChain(URL url) {
6250
this.url = url;
51+
52+
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
53+
.getActivateExtension(url, (String[]) null);
54+
55+
List<Router> routers = extensionFactories.stream()
56+
.map(factory -> factory.getRouter(url))
57+
.collect(Collectors.toList());
58+
59+
initWithRouters(routers);
6360
}
6461

6562
/**
6663
* the resident routers must being initialized before address notification.
67-
*
68-
* @param residentRouters
64+
* FIXME: this method should not be public
6965
*/
70-
public void setResidentRouters(List<Router> residentRouters) {
71-
this.residentRouters = residentRouters;
72-
this.routers.addAll(residentRouters);
66+
public void initWithRouters(List<Router> builtinRouters) {
67+
this.builtinRouters = builtinRouters;
68+
this.routers = new CopyOnWriteArrayList<>(builtinRouters);
7369
this.sort();
7470
}
7571

72+
public void addRouter(Router router) {
73+
this.routers.add(router);
74+
}
75+
7676
/**
7777
* If we use route:// protocol in version before 2.7.0, each URL will generate a Router instance,
7878
* so we should keep the routers up to date, that is, each time router URLs changes, we should update the routers list,
79-
* only keep the residentRouters which are available all the time and the latest notified routers which are generated from URLs.
79+
* only keep the builtinRouters which are available all the time and the latest notified routers which are generated from URLs.
8080
*
81-
* @param generatedRouters routers from 'router://' rules in 2.6.x or before.
81+
* @param routers routers from 'router://' rules in 2.6.x or before.
8282
*/
83-
public void setGeneratedRouters(List<Router> generatedRouters) {
83+
public void addRouters(List<Router> routers) {
84+
// FIXME will sort cause concurrent problem? since it's kind of a write operation.
8485
List<Router> newRouters = new CopyOnWriteArrayList<>();
85-
newRouters.addAll(residentRouters);
86-
newRouters.addAll(generatedRouters);
86+
newRouters.addAll(builtinRouters);
87+
newRouters.addAll(routers);
8788
this.routers = newRouters;
88-
// FIXME will sort cause concurrent problem? since it's kind of a write operation.
8989
this.sort();
90-
/* if (fullInvokers != null) {
91-
this.preRoute(fullInvokers, url, null);
90+
/* if (invokers != null) {
91+
this.rebuild(invokers, url, null);
9292
}*/
9393
}
9494

95-
public void sort() {
95+
private void sort() {
9696
Collections.sort(routers);
9797
}
9898

99-
/**
100-
* TODO
101-
*
102-
* Building of router cache can be triggered from within different threads, for example, registry notification and governance notification.
103-
* So this operation should be synchronized.
104-
* @param invokers
105-
* @param url
106-
* @param invocation
107-
*/
108-
public void preRoute(List<Invoker<T>> invokers, URL url, Invocation invocation) {
109-
for (Router router : routers) {
110-
router.preRoute(invokers, url, invocation);
111-
}
112-
}
113-
11499
/**
115100
*
116101
* @param url
117102
* @param invocation
118103
* @return
119104
*/
120105
public List<Invoker<T>> route(URL url, Invocation invocation) {
121-
List<Invoker<T>> finalInvokers = fullInvokers;
106+
List<Invoker<T>> finalInvokers = invokers;
122107
for (Router router : routers) {
123108
// if (router.isRuntime()) {
124109
// finalInvokers = router.route(finalInvokers, url, invocation);
@@ -128,29 +113,14 @@ public List<Invoker<T>> route(URL url, Invocation invocation) {
128113
return finalInvokers;
129114
}
130115

131-
/**
132-
* When any of the router's rule changed, notify the router chain to rebuild cache from scratch.
133-
*/
134-
public void notifyRuleChanged() {
135-
if (CollectionUtils.isEmpty(this.fullInvokers)) {
136-
return;
137-
}
138-
preRoute(this.fullInvokers, url, null);
139-
}
140-
141116
/**
142117
* Notify router chain of the initial addresses from registry at the first time.
143118
* Notify whenever addresses in registry change.
144-
*
145-
* @param invokers
146-
* @param url
147119
*/
148-
public void notifyFullInvokers(List<Invoker<T>> invokers, URL url) {
149-
setFullMethodInvokers(invokers);
150-
preRoute(invokers, url, null);
151-
}
152-
153-
public void setFullMethodInvokers(List<Invoker<T>> fullInvokers) {
154-
this.fullInvokers = (fullInvokers == null ? Collections.emptyList() : fullInvokers);
120+
public void setInvokers(List<Invoker<T>> invokers) {
121+
if (invokers != null) {
122+
this.invokers = invokers;
123+
routers.forEach(router -> router.notify(invokers));
124+
}
155125
}
156126
}

dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void setRouterChain(RouterChain<T> routerChain) {
9898
protected void addRouters(List<Router> routers) {
9999
// copy list
100100
routers = routers == null ? new ArrayList<>() : new ArrayList<>(routers);
101-
routerChain.setGeneratedRouters(routers);
101+
routerChain.addRouters(routers);
102102
}
103103

104104
public URL getConsumerUrl() {

dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void destroy() {
8787

8888
public void buildRouterChain() {
8989
RouterChain<T> routerChain = RouterChain.buildChain(getUrl());
90-
routerChain.notifyFullInvokers(invokers, getUrl());
90+
routerChain.setInvokers(invokers);
9191
this.setRouterChain(routerChain);
9292
}
9393

dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@
1919
import org.apache.dubbo.common.URL;
2020
import org.apache.dubbo.configcenter.DynamicConfiguration;
2121
import org.apache.dubbo.rpc.cluster.Router;
22-
import org.apache.dubbo.rpc.cluster.RouterChain;
23-
24-
import java.util.List;
25-
import java.util.concurrent.CopyOnWriteArrayList;
2622

2723
/**
2824
* TODO Extract more code to here if necessary
@@ -31,7 +27,6 @@ public abstract class AbstractRouter implements Router {
3127
protected int priority;
3228
protected boolean force = false;
3329
protected boolean enabled = true;
34-
protected List<RouterChain> routerChains = new CopyOnWriteArrayList<>();
3530
protected URL url;
3631

3732
protected DynamicConfiguration configuration;
@@ -53,12 +48,6 @@ public void setUrl(URL url) {
5348
this.url = url;
5449
}
5550

56-
@Override
57-
public void addRouterChain(RouterChain routerChain) {
58-
this.routerChains.add(routerChain);
59-
}
60-
61-
6251
public void setConfiguration(DynamicConfiguration configuration) {
6352
this.configuration = configuration;
6453
}

dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/AbstractConfigConditionRouter.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.dubbo.rpc.Invoker;
2929
import org.apache.dubbo.rpc.RpcException;
3030
import org.apache.dubbo.rpc.cluster.Router;
31-
import org.apache.dubbo.rpc.cluster.RouterChain;
3231
import org.apache.dubbo.rpc.cluster.router.AbstractRouter;
3332
import org.apache.dubbo.rpc.cluster.router.condition.ConditionRouter;
3433
import org.apache.dubbo.rpc.cluster.router.condition.config.model.BlackWhiteListRule;
@@ -73,8 +72,6 @@ public synchronized void process(ConfigChangeEvent event) {
7372
.getValue(), e);
7473
}
7574
}
76-
77-
routerChains.forEach(RouterChain::notifyRuleChanged);
7875
}
7976

8077
@Override

0 commit comments

Comments
 (0)