-
Notifications
You must be signed in to change notification settings - Fork 26.5k
merge async changes in 3.x to 2.7 #3997
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
1f52668
596b5d5
8143d4e
64081c6
670690e
0d0c991
db962a1
5971410
d9676b8
5c90c8f
76862a6
93688fd
82ffe21
2e9e367
e02d099
de320ba
7381887
09f2047
3194ef7
2634859
2869f6d
5738a75
b57d818
32a3d07
2b8ce01
56953eb
00aa992
3615281
fc797e2
fa1c373
b155d93
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,12 +22,12 @@ | |
| import org.apache.dubbo.common.logger.LoggerFactory; | ||
| import org.apache.dubbo.common.utils.ConfigUtils; | ||
| import org.apache.dubbo.common.utils.NamedThreadFactory; | ||
| import org.apache.dubbo.rpc.AsyncRpcResult; | ||
| import org.apache.dubbo.rpc.Invocation; | ||
| import org.apache.dubbo.rpc.Invoker; | ||
| import org.apache.dubbo.rpc.Result; | ||
| import org.apache.dubbo.rpc.RpcException; | ||
| import org.apache.dubbo.rpc.RpcInvocation; | ||
| import org.apache.dubbo.rpc.RpcResult; | ||
| import org.apache.dubbo.rpc.cluster.Directory; | ||
| import org.apache.dubbo.rpc.cluster.LoadBalance; | ||
| import org.apache.dubbo.rpc.cluster.Merger; | ||
|
|
@@ -40,17 +40,16 @@ | |
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.Callable; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; | ||
| import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; | ||
| import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY; | ||
| import static org.apache.dubbo.rpc.Constants.MERGER_KEY; | ||
|
|
||
| /** | ||
| * NOTICE! Does not work with async call. | ||
| * @param <T> | ||
| */ | ||
| @SuppressWarnings("unchecked") | ||
| public class MergeableClusterInvoker<T> extends AbstractClusterInvoker<T> { | ||
|
|
||
|
|
@@ -90,26 +89,19 @@ protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, Load | |
| returnType = null; | ||
| } | ||
|
|
||
| Map<String, Future<Result>> results = new HashMap<String, Future<Result>>(); | ||
| Map<String, Result> results = new HashMap<>(); | ||
| for (final Invoker<T> invoker : invokers) { | ||
| Future<Result> future = executor.submit(new Callable<Result>() { | ||
| @Override | ||
| public Result call() throws Exception { | ||
| return invoker.invoke(new RpcInvocation(invocation, invoker)); | ||
| } | ||
| }); | ||
| results.put(invoker.getUrl().getServiceKey(), future); | ||
| results.put(invoker.getUrl().getServiceKey(), invoker.invoke(new RpcInvocation(invocation, invoker))); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this way, it is synchronized?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because this class is not for async?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个是不是只是放入invoker,到后面的时候,通过get来等待?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, it's synchronized, which is not the status we expect. Now, I will make it async before making the call.. |
||
| } | ||
|
|
||
| Object result = null; | ||
|
|
||
| List<Result> resultList = new ArrayList<Result>(results.size()); | ||
|
|
||
| int timeout = getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, DEFAULT_TIMEOUT); | ||
| for (Map.Entry<String, Future<Result>> entry : results.entrySet()) { | ||
| Future<Result> future = entry.getValue(); | ||
| for (Map.Entry<String, Result> entry : results.entrySet()) { | ||
| Result asyncResult = entry.getValue(); | ||
| try { | ||
| Result r = future.get(timeout, TimeUnit.MILLISECONDS); | ||
| Result r = asyncResult.get(); | ||
|
chickenlj marked this conversation as resolved.
|
||
| if (r.hasException()) { | ||
| log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + | ||
| " failed: " + r.getException().getMessage(), | ||
|
|
@@ -123,13 +115,13 @@ public Result call() throws Exception { | |
| } | ||
|
|
||
| if (resultList.isEmpty()) { | ||
| return new RpcResult((Object) null); | ||
| return AsyncRpcResult.newDefaultAsyncResult(invocation); | ||
| } else if (resultList.size() == 1) { | ||
| return resultList.iterator().next(); | ||
| } | ||
|
|
||
| if (returnType == void.class) { | ||
| return new RpcResult((Object) null); | ||
| return AsyncRpcResult.newDefaultAsyncResult(invocation); | ||
| } | ||
|
|
||
| if (merger.startsWith(".")) { | ||
|
|
@@ -177,7 +169,7 @@ public Result call() throws Exception { | |
| throw new RpcException("There is no merger to merge result."); | ||
| } | ||
| } | ||
| return new RpcResult(result); | ||
| return AsyncRpcResult.newDefaultAsyncResult(result, invocation); | ||
| } | ||
|
|
||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.