Skip to content

Commit 003e400

Browse files
chickenljJeff-Lv
authored andcommitted
Async optimization (#3738)
* Result implement CF * Result implement CF * Result implement CF * Add AsyncRpcResult * Fix bugs and refactor Filter * Try to add onSend onError for Filter * invoke different filter method according to result status. * make generic work with async call, including add $invokeAsync * refactor legacy Filter implementation to work with onResponse. * demo changes * Fixes #3620, provider attachment lose on consumer side, fix this by reverting RpcContext copy * AsyncRpcResult should always holds an Invocation instance * refactor filter signature * reimplement embedded Filters * use ProviderModel modification in 3.x * Fix address notification processing workflow after merging 3.x branch * Fix UT * Fix UT * Unit test of JValidator; Clean code of JValidator (#3723) * Fixes #3625 (#3730) use constant to replace magic number * Fix conflict when merging master and 3.x * Fix conflict when merging master and 3.x * Result interface itself has Future status. * Fix DefaultFuture UT * Wrap all protocol Invoker with AsyncToSyncInvoker & Fix UT * Add license * fix UT * Fix ut in MonitorFilterTest * avoid duplicate async to sync wrapper * return async result in CacheFilter. * fix UT in CacheFilterTest * Add generic condition check to GenericFilter callback. * Fix UT * Get generic from RpcContext if the value in Invocation is empty. * Fix RSocketProtocol to meet AbstractProtocol adjustment * rename RpcResult to AppResponse to help avoid confusion with AsyncRpcResult. * RSocket module switch to AsyncRpcResult
1 parent 91554bc commit 003e400

File tree

141 files changed

+1794
-1582
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

141 files changed

+1794
-1582
lines changed

dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvoker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
import org.apache.dubbo.common.timer.Timer;
2525
import org.apache.dubbo.common.timer.TimerTask;
2626
import org.apache.dubbo.common.utils.NamedThreadFactory;
27+
import org.apache.dubbo.rpc.AsyncRpcResult;
2728
import org.apache.dubbo.rpc.Invocation;
2829
import org.apache.dubbo.rpc.Invoker;
2930
import org.apache.dubbo.rpc.Result;
3031
import org.apache.dubbo.rpc.RpcException;
31-
import org.apache.dubbo.rpc.RpcResult;
3232
import org.apache.dubbo.rpc.cluster.Directory;
3333
import org.apache.dubbo.rpc.cluster.LoadBalance;
3434

@@ -99,7 +99,7 @@ protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, Load
9999
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
100100
+ e.getMessage() + ", ", e);
101101
addFailed(loadbalance, invocation, invokers, invoker);
102-
return new RpcResult(); // ignore
102+
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
103103
}
104104
}
105105

dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/FailsafeClusterInvoker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
import org.apache.dubbo.common.logger.Logger;
2020
import org.apache.dubbo.common.logger.LoggerFactory;
21+
import org.apache.dubbo.rpc.AsyncRpcResult;
2122
import org.apache.dubbo.rpc.Invocation;
2223
import org.apache.dubbo.rpc.Invoker;
2324
import org.apache.dubbo.rpc.Result;
2425
import org.apache.dubbo.rpc.RpcException;
25-
import org.apache.dubbo.rpc.RpcResult;
2626
import org.apache.dubbo.rpc.cluster.Directory;
2727
import org.apache.dubbo.rpc.cluster.LoadBalance;
2828

@@ -50,7 +50,7 @@ public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBal
5050
return invoker.invoke(invocation);
5151
} catch (Throwable e) {
5252
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
53-
return new RpcResult(); // ignore
53+
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
5454
}
5555
}
5656
}

dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import java.util.concurrent.atomic.AtomicInteger;
3737

3838
/**
39+
* NOTICE! This implementation does not work well with async call.
40+
*
3941
* Invoke a specific number of invokers concurrently, usually used for demanding real-time operations, but need to waste more service resources.
4042
*
4143
* <a href="http://en.wikipedia.org/wiki/Fork_(topology)">Fork</a>
@@ -66,7 +68,6 @@ public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, L
6668
} else {
6769
selected = new ArrayList<>();
6870
for (int i = 0; i < forks; i++) {
69-
// TODO. Add some comment here, refer chinese version for more details.
7071
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
7172
if (!selected.contains(invoker)) {
7273
//Avoid add the same invoker several times.

dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
import org.apache.dubbo.common.logger.LoggerFactory;
2424
import org.apache.dubbo.common.utils.ConfigUtils;
2525
import org.apache.dubbo.common.utils.NamedThreadFactory;
26+
import org.apache.dubbo.rpc.AsyncRpcResult;
2627
import org.apache.dubbo.rpc.Invocation;
2728
import org.apache.dubbo.rpc.Invoker;
2829
import org.apache.dubbo.rpc.Result;
2930
import org.apache.dubbo.rpc.RpcException;
3031
import org.apache.dubbo.rpc.RpcInvocation;
31-
import org.apache.dubbo.rpc.RpcResult;
3232
import org.apache.dubbo.rpc.cluster.Directory;
3333
import org.apache.dubbo.rpc.cluster.LoadBalance;
3434
import org.apache.dubbo.rpc.cluster.Merger;
@@ -41,12 +41,13 @@
4141
import java.util.HashMap;
4242
import java.util.List;
4343
import java.util.Map;
44-
import java.util.concurrent.Callable;
4544
import java.util.concurrent.ExecutorService;
4645
import java.util.concurrent.Executors;
47-
import java.util.concurrent.Future;
48-
import java.util.concurrent.TimeUnit;
4946

47+
/**
48+
* NOTICE! Does not work with async call.
49+
* @param <T>
50+
*/
5051
@SuppressWarnings("unchecked")
5152
public class MergeableClusterInvoker<T> extends AbstractClusterInvoker<T> {
5253

@@ -86,26 +87,19 @@ protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, Load
8687
returnType = null;
8788
}
8889

89-
Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
90+
Map<String, Result> results = new HashMap<>();
9091
for (final Invoker<T> invoker : invokers) {
91-
Future<Result> future = executor.submit(new Callable<Result>() {
92-
@Override
93-
public Result call() throws Exception {
94-
return invoker.invoke(new RpcInvocation(invocation, invoker));
95-
}
96-
});
97-
results.put(invoker.getUrl().getServiceKey(), future);
92+
results.put(invoker.getUrl().getServiceKey(), invoker.invoke(new RpcInvocation(invocation, invoker)));
9893
}
9994

10095
Object result = null;
10196

10297
List<Result> resultList = new ArrayList<Result>(results.size());
10398

104-
int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
105-
for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
106-
Future<Result> future = entry.getValue();
99+
for (Map.Entry<String, Result> entry : results.entrySet()) {
100+
Result asyncResult = entry.getValue();
107101
try {
108-
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
102+
Result r = asyncResult.get();
109103
if (r.hasException()) {
110104
log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +
111105
" failed: " + r.getException().getMessage(),
@@ -119,13 +113,13 @@ public Result call() throws Exception {
119113
}
120114

121115
if (resultList.isEmpty()) {
122-
return new RpcResult((Object) null);
116+
return AsyncRpcResult.newDefaultAsyncResult(invocation);
123117
} else if (resultList.size() == 1) {
124118
return resultList.iterator().next();
125119
}
126120

127121
if (returnType == void.class) {
128-
return new RpcResult((Object) null);
122+
return AsyncRpcResult.newDefaultAsyncResult(invocation);
129123
}
130124

131125
if (merger.startsWith(".")) {
@@ -173,7 +167,7 @@ public Result call() throws Exception {
173167
throw new RpcException("There is no merger to merge result.");
174168
}
175169
}
176-
return new RpcResult(result);
170+
return AsyncRpcResult.newDefaultAsyncResult(result, invocation);
177171
}
178172

179173

dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import org.apache.dubbo.common.logger.LoggerFactory;
2323
import org.apache.dubbo.common.utils.CollectionUtils;
2424
import org.apache.dubbo.common.utils.StringUtils;
25+
import org.apache.dubbo.rpc.AsyncRpcResult;
2526
import org.apache.dubbo.rpc.Invocation;
2627
import org.apache.dubbo.rpc.Invoker;
2728
import org.apache.dubbo.rpc.Result;
2829
import org.apache.dubbo.rpc.RpcException;
2930
import org.apache.dubbo.rpc.RpcInvocation;
30-
import org.apache.dubbo.rpc.RpcResult;
3131
import org.apache.dubbo.rpc.cluster.Directory;
3232
import org.apache.dubbo.rpc.support.MockInvoker;
3333

@@ -113,7 +113,7 @@ private Result doMockInvoke(Invocation invocation, RpcException e) {
113113
result = minvoker.invoke(invocation);
114114
} catch (RpcException me) {
115115
if (me.isBiz()) {
116-
result = new RpcResult(me.getCause());
116+
result = AsyncRpcResult.newDefaultAsyncResult(me.getCause(), invocation);
117117
} else {
118118
throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
119119
}

dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/StickyTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import org.apache.dubbo.common.Constants;
2121
import org.apache.dubbo.common.URL;
2222
import org.apache.dubbo.common.extension.ExtensionLoader;
23+
import org.apache.dubbo.rpc.AppResponse;
2324
import org.apache.dubbo.rpc.Invocation;
2425
import org.apache.dubbo.rpc.Invoker;
2526
import org.apache.dubbo.rpc.Result;
2627
import org.apache.dubbo.rpc.RpcException;
2728
import org.apache.dubbo.rpc.RpcInvocation;
28-
import org.apache.dubbo.rpc.RpcResult;
2929
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
3030

3131
import org.junit.jupiter.api.Assertions;
@@ -48,7 +48,7 @@ public class StickyTest {
4848
private Invoker<StickyTest> invoker2 = mock(Invoker.class);
4949
private RpcInvocation invocation;
5050
private Directory<StickyTest> dic;
51-
private Result result = new RpcResult();
51+
private Result result = new AppResponse();
5252
private StickyClusterInvoker<StickyTest> clusterinvoker = null;
5353
private URL url = URL.valueOf("test://test:11/test?"
5454
+ "&loadbalance=roundrobin"

dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/MockDirInvocation.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@ public Map<String, String> getAttachments() {
5151
return attachments;
5252
}
5353

54+
@Override
55+
public void setAttachment(String key, String value) {
56+
57+
}
58+
59+
@Override
60+
public void setAttachmentIfAbsent(String key, String value) {
61+
62+
}
63+
5464
public Invoker<?> getInvoker() {
5565
return null;
5666
}

dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/file/FileRouterEngineTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919
import org.apache.dubbo.common.Constants;
2020
import org.apache.dubbo.common.URL;
2121
import org.apache.dubbo.common.extension.ExtensionLoader;
22+
import org.apache.dubbo.rpc.AppResponse;
2223
import org.apache.dubbo.rpc.Invocation;
2324
import org.apache.dubbo.rpc.Invoker;
2425
import org.apache.dubbo.rpc.Result;
2526
import org.apache.dubbo.rpc.RpcException;
2627
import org.apache.dubbo.rpc.RpcInvocation;
27-
import org.apache.dubbo.rpc.RpcResult;
2828
import org.apache.dubbo.rpc.cluster.Directory;
2929
import org.apache.dubbo.rpc.cluster.LoadBalance;
3030
import org.apache.dubbo.rpc.cluster.RouterFactory;
@@ -52,7 +52,7 @@ public class FileRouterEngineTest {
5252
Invoker<FileRouterEngineTest> invoker2 = mock(Invoker.class);
5353
Invocation invocation;
5454
StaticDirectory<FileRouterEngineTest> dic;
55-
Result result = new RpcResult();
55+
Result result = new AppResponse();
5656
private RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
5757

5858
@BeforeAll

dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailSafeClusterInvokerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
import org.apache.dubbo.common.URL;
2020
import org.apache.dubbo.common.utils.LogUtil;
21+
import org.apache.dubbo.rpc.AppResponse;
2122
import org.apache.dubbo.rpc.Invoker;
2223
import org.apache.dubbo.rpc.Result;
2324
import org.apache.dubbo.rpc.RpcContext;
2425
import org.apache.dubbo.rpc.RpcInvocation;
25-
import org.apache.dubbo.rpc.RpcResult;
2626
import org.apache.dubbo.rpc.cluster.Directory;
2727
import org.apache.dubbo.rpc.cluster.filter.DemoService;
2828

@@ -48,7 +48,7 @@ public class FailSafeClusterInvokerTest {
4848
Invoker<DemoService> invoker = mock(Invoker.class);
4949
RpcInvocation invocation = new RpcInvocation();
5050
Directory<DemoService> dic;
51-
Result result = new RpcResult();
51+
Result result = new AppResponse();
5252

5353
/**
5454
* @throws java.lang.Exception

dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailbackClusterInvokerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import org.apache.dubbo.common.URL;
2121
import org.apache.dubbo.common.utils.DubboAppender;
2222
import org.apache.dubbo.common.utils.LogUtil;
23+
import org.apache.dubbo.rpc.AppResponse;
2324
import org.apache.dubbo.rpc.Invoker;
2425
import org.apache.dubbo.rpc.Result;
2526
import org.apache.dubbo.rpc.RpcContext;
2627
import org.apache.dubbo.rpc.RpcInvocation;
27-
import org.apache.dubbo.rpc.RpcResult;
2828
import org.apache.dubbo.rpc.cluster.Directory;
2929

3030
import org.apache.log4j.Level;
@@ -59,7 +59,7 @@ public class FailbackClusterInvokerTest {
5959
Invoker<FailbackClusterInvokerTest> invoker = mock(Invoker.class);
6060
RpcInvocation invocation = new RpcInvocation();
6161
Directory<FailbackClusterInvokerTest> dic;
62-
Result result = new RpcResult();
62+
Result result = new AppResponse();
6363

6464
/**
6565
* @throws java.lang.Exception

0 commit comments

Comments
 (0)