Skip to content

Commit e9b8681

Browse files
jerrick-zhuchickenlj
authored andcommitted
Merge pull request apache#1827, support generic invoke and attachment for http/hessian protocol.
fixes apache#1768, apache#19
1 parent 907143a commit e9b8681

File tree

24 files changed

+763
-21
lines changed

24 files changed

+763
-21
lines changed

dubbo-config/dubbo-config-api/src/test/java/com/alibaba/dubbo/config/mock/MockProxyFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ public <T> T getProxy(Invoker<T> invoker) throws RpcException {
2727
return null;
2828
}
2929

30+
@Override
31+
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
32+
return null;
33+
}
34+
3035
@Override
3136
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
3237
return null;

dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/ProxyFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,15 @@ public interface ProxyFactory {
3636
@Adaptive({Constants.PROXY_KEY})
3737
<T> T getProxy(Invoker<T> invoker) throws RpcException;
3838

39+
/**
40+
* create proxy.
41+
*
42+
* @param invoker
43+
* @return proxy
44+
*/
45+
@Adaptive({Constants.PROXY_KEY})
46+
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
47+
3948
/**
4049
* create invoker.
4150
*

dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/filter/GenericFilter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@
3232
import com.alibaba.dubbo.rpc.Invocation;
3333
import com.alibaba.dubbo.rpc.Invoker;
3434
import com.alibaba.dubbo.rpc.Result;
35+
import com.alibaba.dubbo.rpc.RpcContext;
3536
import com.alibaba.dubbo.rpc.RpcException;
3637
import com.alibaba.dubbo.rpc.RpcInvocation;
3738
import com.alibaba.dubbo.rpc.RpcResult;
3839
import com.alibaba.dubbo.rpc.service.GenericException;
40+
import com.alibaba.dubbo.rpc.service.GenericService;
3941
import com.alibaba.dubbo.rpc.support.ProtocolUtils;
4042

4143
import java.io.IOException;
@@ -52,7 +54,7 @@ public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
5254
if (inv.getMethodName().equals(Constants.$INVOKE)
5355
&& inv.getArguments() != null
5456
&& inv.getArguments().length == 3
55-
&& !ProtocolUtils.isGeneric(invoker.getUrl().getParameter(Constants.GENERIC_KEY))) {
57+
&& !invoker.getInterface().equals(GenericService.class)) {
5658
String name = ((String) inv.getArguments()[0]).trim();
5759
String[] types = (String[]) inv.getArguments()[1];
5860
Object[] args = (Object[]) inv.getArguments()[2];
@@ -63,6 +65,11 @@ public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
6365
args = new Object[params.length];
6466
}
6567
String generic = inv.getAttachment(Constants.GENERIC_KEY);
68+
69+
if (StringUtils.isBlank(generic)) {
70+
generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
71+
}
72+
6673
if (StringUtils.isEmpty(generic)
6774
|| ProtocolUtils.isDefaultGenericSerialization(generic)) {
6875
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());

dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/filter/GenericImplFilter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,13 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
155155

156156
for (Object arg : args) {
157157
if (!(byte[].class == arg.getClass())) {
158-
error(byte[].class.getName(), arg.getClass().getName());
158+
error(generic, byte[].class.getName(), arg.getClass().getName());
159159
}
160160
}
161161
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
162162
for (Object arg : args) {
163163
if (!(arg instanceof JavaBeanDescriptor)) {
164-
error(JavaBeanDescriptor.class.getName(), arg.getClass().getName());
164+
error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
165165
}
166166
}
167167
}
@@ -172,10 +172,10 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
172172
return invoker.invoke(invocation);
173173
}
174174

175-
private void error(String expected, String actual) throws RpcException {
175+
private void error(String generic, String expected, String actual) throws RpcException {
176176
throw new RpcException(
177177
"Generic serialization [" +
178-
Constants.GENERIC_SERIALIZATION_NATIVE_JAVA +
178+
generic +
179179
"] only support message type " +
180180
expected +
181181
" and your message type is " +

dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractProxyProtocol.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
6868
if (exporter != null) {
6969
return exporter;
7070
}
71-
final Runnable runnable = doExport(proxyFactory.getProxy(invoker), invoker.getInterface(), invoker.getUrl());
71+
final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
7272
exporter = new AbstractExporter<T>(invoker) {
7373
@Override
7474
public void unexport() {
@@ -89,12 +89,12 @@ public void unexport() {
8989

9090
@Override
9191
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
92-
final Invoker<T> tagert = proxyFactory.getInvoker(doRefer(type, url), type, url);
92+
final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
9393
Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
9494
@Override
9595
protected Result doInvoke(Invocation invocation) throws Throwable {
9696
try {
97-
Result result = tagert.invoke(invocation);
97+
Result result = target.invoke(invocation);
9898
Throwable e = result.getException();
9999
if (e != null) {
100100
for (Class<?> rpcException : rpcExceptions) {

dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/proxy/AbstractProxyFactory.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.alibaba.dubbo.rpc.ProxyFactory;
2323
import com.alibaba.dubbo.rpc.RpcException;
2424
import com.alibaba.dubbo.rpc.service.EchoService;
25+
import com.alibaba.dubbo.rpc.service.GenericService;
2526

2627
/**
2728
* AbstractProxyFactory
@@ -30,6 +31,11 @@ public abstract class AbstractProxyFactory implements ProxyFactory {
3031

3132
@Override
3233
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
34+
return getProxy(invoker, false);
35+
}
36+
37+
@Override
38+
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
3339
Class<?>[] interfaces = null;
3440
String config = invoker.getUrl().getParameter("interfaces");
3541
if (config != null && config.length() > 0) {
@@ -46,6 +52,15 @@ public <T> T getProxy(Invoker<T> invoker) throws RpcException {
4652
if (interfaces == null) {
4753
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
4854
}
55+
56+
if (!invoker.getInterface().equals(GenericService.class) && generic) {
57+
int len = interfaces.length;
58+
Class<?>[] temp = interfaces;
59+
interfaces = new Class<?>[len + 1];
60+
System.arraycopy(temp, 0, interfaces, 0, len);
61+
interfaces[len] = GenericService.class;
62+
}
63+
4964
return getProxy(invoker, interfaces);
5065
}
5166

dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/proxy/wrapper/StubProxyFactoryWrapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ public void setProtocol(Protocol protocol) {
5454
this.protocol = protocol;
5555
}
5656

57+
@Override
58+
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
59+
return proxyFactory.getProxy(invoker, generic);
60+
}
61+
5762
@Override
5863
@SuppressWarnings({"unchecked", "rawtypes"})
5964
public <T> T getProxy(Invoker<T> invoker) throws RpcException {

dubbo-rpc/dubbo-rpc-hessian/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,11 @@
4848
<groupId>org.apache.httpcomponents</groupId>
4949
<artifactId>httpclient</artifactId>
5050
</dependency>
51+
<dependency>
52+
<groupId>com.alibaba</groupId>
53+
<artifactId>dubbo-serialization-jdk</artifactId>
54+
<version>${project.parent.version}</version>
55+
<scope>test</scope>
56+
</dependency>
5157
</dependencies>
5258
</project>
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.dubbo.rpc.protocol.hessian;
19+
20+
import com.alibaba.dubbo.common.Constants;
21+
import com.alibaba.dubbo.rpc.RpcContext;
22+
import com.caucho.hessian.client.HessianConnection;
23+
import com.caucho.hessian.client.HessianURLConnectionFactory;
24+
25+
import java.io.IOException;
26+
import java.net.URL;
27+
28+
public class DubboHessianURLConnectionFactory extends HessianURLConnectionFactory {
29+
30+
@Override
31+
public HessianConnection open(URL url) throws IOException {
32+
HessianConnection connection = super.open(url);
33+
RpcContext context = RpcContext.getContext();
34+
for (String key : context.getAttachments().keySet()) {
35+
connection.addHeader(Constants.DEFAULT_EXCHANGER + key, context.getAttachment(key));
36+
}
37+
38+
return connection;
39+
}
40+
}

dubbo-rpc/dubbo-rpc-hessian/src/main/java/com/alibaba/dubbo/rpc/protocol/hessian/HessianProtocol.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@
2525
import com.alibaba.dubbo.rpc.RpcException;
2626
import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;
2727

28+
import com.alibaba.dubbo.rpc.service.GenericService;
29+
import com.alibaba.dubbo.rpc.support.ProtocolUtils;
2830
import com.caucho.hessian.HessianException;
2931
import com.caucho.hessian.client.HessianConnectionException;
32+
import com.caucho.hessian.client.HessianConnectionFactory;
3033
import com.caucho.hessian.client.HessianProxyFactory;
3134
import com.caucho.hessian.io.HessianMethodSerializationException;
3235
import com.caucho.hessian.server.HessianSkeleton;
@@ -37,6 +40,7 @@
3740
import java.io.IOException;
3841
import java.net.SocketTimeoutException;
3942
import java.util.ArrayList;
43+
import java.util.Enumeration;
4044
import java.util.Map;
4145
import java.util.concurrent.ConcurrentHashMap;
4246

@@ -73,19 +77,31 @@ protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcExcept
7377
serverMap.put(addr, server);
7478
}
7579
final String path = url.getAbsolutePath();
76-
HessianSkeleton skeleton = new HessianSkeleton(impl, type);
80+
final HessianSkeleton skeleton = new HessianSkeleton(impl, type);
7781
skeletonMap.put(path, skeleton);
82+
83+
final String genericPath = path + "/" + Constants.GENERIC_KEY;
84+
skeletonMap.put(genericPath, new HessianSkeleton(impl, GenericService.class));
85+
7886
return new Runnable() {
7987
@Override
8088
public void run() {
8189
skeletonMap.remove(path);
90+
skeletonMap.remove(genericPath);
8291
}
8392
};
8493
}
8594

8695
@Override
8796
@SuppressWarnings("unchecked")
8897
protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
98+
String generic = url.getParameter(Constants.GENERIC_KEY);
99+
boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);
100+
if (isGeneric) {
101+
RpcContext.getContext().setAttachment(Constants.GENERIC_KEY, generic);
102+
url = url.setPath(url.getPath() + "/" + Constants.GENERIC_KEY);
103+
}
104+
89105
HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
90106
boolean isHessian2Request = url.getParameter(Constants.HESSIAN2_REQUEST_KEY, Constants.DEFAULT_HESSIAN2_REQUEST);
91107
hessianProxyFactory.setHessian2Request(isHessian2Request);
@@ -96,6 +112,10 @@ protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
96112
hessianProxyFactory.setConnectionFactory(new HttpClientConnectionFactory());
97113
} else if (client != null && client.length() > 0 && !Constants.DEFAULT_HTTP_CLIENT.equals(client)) {
98114
throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");
115+
} else {
116+
HessianConnectionFactory factory = new DubboHessianURLConnectionFactory();
117+
factory.setHessianProxyFactory(hessianProxyFactory);
118+
hessianProxyFactory.setConnectionFactory(factory);
99119
}
100120
int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
101121
hessianProxyFactory.setConnectTimeout(timeout);
@@ -148,6 +168,16 @@ public void handle(HttpServletRequest request, HttpServletResponse response)
148168
response.setStatus(500);
149169
} else {
150170
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
171+
172+
Enumeration<String> enumeration = request.getHeaderNames();
173+
while (enumeration.hasMoreElements()) {
174+
String key = enumeration.nextElement();
175+
if (key.startsWith(Constants.DEFAULT_EXCHANGER)) {
176+
RpcContext.getContext().setAttachment(key.substring(Constants.DEFAULT_EXCHANGER.length()),
177+
request.getHeader(key));
178+
}
179+
}
180+
151181
try {
152182
skeleton.invoke(request.getInputStream(), response.getOutputStream());
153183
} catch (Throwable e) {

0 commit comments

Comments
 (0)