-
Notifications
You must be signed in to change notification settings - Fork 26.5k
Support multiple shared links #2457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
d7092c8
dc602c8
444d70d
99c6e63
32197c5
5ecdbe7
71c1ac2
1467d21
52a3ac7
1cb48fc
d6ddcd5
f376c1e
06e29bc
63962a1
52d69f3
b3fdee6
e75855c
268f8c4
39a433c
560dfb9
cf47e78
13fbc63
f511240
7b907c9
a9e7d87
768b7b5
593f4a3
c75808c
d739ded
9dc2f65
52efa43
33039fc
4a5943e
5aea875
9c19fd4
4cb08bc
8f4efc0
42fd2c7
de4c9be
209f966
4d00308
30557d3
9dc5c1b
dc2bad3
53aa630
5042de1
6b68053
35b9e31
51ba855
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,10 +21,7 @@ | |
| import org.apache.dubbo.common.extension.ExtensionLoader; | ||
| import org.apache.dubbo.common.serialize.support.SerializableClassRegistry; | ||
| import org.apache.dubbo.common.serialize.support.SerializationOptimizer; | ||
| import org.apache.dubbo.common.utils.ConcurrentHashSet; | ||
| import org.apache.dubbo.common.utils.ConfigUtils; | ||
| import org.apache.dubbo.common.utils.NetUtils; | ||
| import org.apache.dubbo.common.utils.StringUtils; | ||
| import org.apache.dubbo.common.utils.*; | ||
|
beiwei30 marked this conversation as resolved.
Outdated
|
||
| import org.apache.dubbo.remoting.Channel; | ||
| import org.apache.dubbo.remoting.RemotingException; | ||
| import org.apache.dubbo.remoting.Transporter; | ||
|
|
@@ -47,11 +44,7 @@ | |
| import org.apache.dubbo.rpc.protocol.AbstractProtocol; | ||
|
|
||
| import java.net.InetSocketAddress; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.*; | ||
|
beiwei30 marked this conversation as resolved.
Outdated
|
||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ConcurrentMap; | ||
|
|
@@ -67,9 +60,8 @@ public class DubboProtocol extends AbstractProtocol { | |
| private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke"; | ||
| private static DubboProtocol INSTANCE; | ||
| private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // <host:port,Exchanger> | ||
| private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger> | ||
| private final Map<String, List<ReferenceCountExchangeClient>> referenceClientMap = new ConcurrentHashMap<>(); // <host:port,Exchanger> | ||
| private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap = new ConcurrentHashMap<String, LazyConnectExchangeClient>(); | ||
| private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>(); | ||
| private final Set<String> optimizers = new ConcurrentHashSet<String>(); | ||
| //consumer side export a stub service for dispatching event | ||
| //servicekey-stubmethods | ||
|
|
@@ -364,10 +356,13 @@ private ExchangeClient[] getClients(URL url) { | |
| // whether to share connection | ||
| boolean service_share_connect = false; | ||
| int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); | ||
| List<ReferenceCountExchangeClient> shareClients = null; | ||
| // if not configured, connection is shared, otherwise, one connection for one service | ||
| if (connections == 0) { | ||
| service_share_connect = true; | ||
| connections = 1; | ||
| connections = Integer.parseInt(ConfigUtils.getProperty(Constants.DEFAULT_CONNECTIONS_KEY, | ||
| Constants.DEFAULT_CONNECTIONS)); | ||
| shareClients = getSharedClient(url, connections); | ||
| } | ||
|
|
||
| ExchangeClient[] clients = new ExchangeClient[connections]; | ||
|
|
@@ -384,31 +379,37 @@ private ExchangeClient[] getClients(URL url) { | |
| /** | ||
| * Get shared connection | ||
| */ | ||
| private ExchangeClient getSharedClient(URL url) { | ||
| private List<ReferenceCountExchangeClient> getSharedClient(URL url) { | ||
| String key = url.getAddress(); | ||
| ReferenceCountExchangeClient client = referenceClientMap.get(key); | ||
| if (client != null) { | ||
| if (!client.isClosed()) { | ||
| client.incrementAndGetCount(); | ||
| return client; | ||
| } else { | ||
| referenceClientMap.remove(key); | ||
| } | ||
| List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key); | ||
| if(clients == null) { | ||
| List<ReferenceCountExchangeClient> referenceCountExchangeClients = buildReferenceCountExchangeClientList(url, key, connectNum); | ||
|
beiwei30 marked this conversation as resolved.
Outdated
|
||
| referenceClientMap.put(key, referenceCountExchangeClients); | ||
|
|
||
| clients = referenceCountExchangeClients; | ||
| } | ||
|
|
||
| locks.putIfAbsent(key, new Object()); | ||
| synchronized (locks.get(key)) { | ||
| if (referenceClientMap.containsKey(key)) { | ||
| return referenceClientMap.get(key); | ||
| for (int i = 0; i < clients.size(); i++) { | ||
| ReferenceCountExchangeClient client = clients.get(i); | ||
| if (client.isClosed()){ | ||
| client = buildReferenceCountExchangeClient(url, key); | ||
| clients.set(i, client); | ||
| } | ||
|
|
||
| ExchangeClient exchangeClient = initClient(url); | ||
| client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); | ||
| referenceClientMap.put(key, client); | ||
| ghostClientMap.remove(key); | ||
| locks.remove(key); | ||
| return client; | ||
| client.incrementAndGetCount(); | ||
| } | ||
|
|
||
| return clients; | ||
| } | ||
|
|
||
| private ReferenceCountExchangeClient buildReferenceCountExchangeClient (URL url, String key) { | ||
|
|
||
| ExchangeClient exchangeClient = initClient(url); | ||
|
|
||
| ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Delayed connections are used to reduce the number of long connections. Create a long connection when a call is initiated. <dubbo:protocol name="dubbo" lazy="true" />Note: This configuration only works for dubbo protocols that use long connections.
So, no matter how many ExchangeClients a URL corresponds to, it only needs a ghost connection in the ghostClientMap, which is a LazyConnectExchangeClient. Also, only the shared ExchangeClient will use the ReferenceCountExchangeClient, so the default client used to share a TCP connection is the ReferenceCountExchangeClient, and the separate TCP connection set by connections does not need or use the ReferenceCountExchangeClient. 延迟连接用于减少长连接数。当有调用发起时,再创建长连接。<dubbo:protocol name="dubbo" lazy="true" />注意:该配置只对使用长连接的 dubbo 协议生效。
所以,不管一个URL对应多少个ExchangeClient,它在ghostClientMap中最多只需要一个幽灵连接,即一个LazyConnectExchangeClient。 并且,只有共享的ExchangeClient才会用到ReferenceCountExchangeClient,所以默认的共享1条TCP连接所使用的client就是ReferenceCountExchangeClient,而通过connections设置的单独TCP连接不需要也没有使用ReferenceCountExchangeClient。
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand the mistake, please correct me, thank you |
||
| ghostClientMap.remove(key); | ||
|
|
||
| return client; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -460,15 +461,20 @@ public void destroy() { | |
| } | ||
|
|
||
| for (String key : new ArrayList<String>(referenceClientMap.keySet())) { | ||
| ExchangeClient client = referenceClientMap.remove(key); | ||
| if (client != null) { | ||
| try { | ||
| if (logger.isInfoEnabled()) { | ||
| logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); | ||
| List<ReferenceCountExchangeClient> clients = referenceClientMap.remove(key); | ||
|
|
||
| if(CollectionUtils.isNotEmpty(clients)) { | ||
| for (ReferenceCountExchangeClient client : clients) { | ||
| if (client != null) { | ||
| try { | ||
| if (logger.isInfoEnabled()) { | ||
| logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); | ||
| } | ||
| client.close(); | ||
| } catch (Throwable t) { | ||
| logger.warn(t.getMessage(), t); | ||
| } | ||
| } | ||
| client.close(ConfigUtils.getServerShutdownTimeout()); | ||
| } catch (Throwable t) { | ||
| logger.warn(t.getMessage(), t); | ||
| } | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.