Skip to content

Commit 020697b

Browse files
uglycowchickenlj
authored andcommitted
Merge pull request #3609, introduce rx support.
1 parent bef8f6d commit 020697b

File tree

20 files changed

+1854
-0
lines changed

20 files changed

+1854
-0
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one or more
3+
contributor license agreements. See the NOTICE file distributed with
4+
this work for additional information regarding copyright ownership.
5+
The ASF licenses this file to You under the Apache License, Version 2.0
6+
(the "License"); you may not use this file except in compliance with
7+
the License. You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
-->
17+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
18+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
19+
<modelVersion>4.0.0</modelVersion>
20+
<parent>
21+
<groupId>org.apache.dubbo</groupId>
22+
<artifactId>dubbo-rpc</artifactId>
23+
<version>2.7.0-SNAPSHOT</version>
24+
</parent>
25+
<artifactId>dubbo-rpc-rsocket</artifactId>
26+
<packaging>jar</packaging>
27+
<name>${project.artifactId}</name>
28+
<description>The default rpc module of dubbo project</description>
29+
<properties>
30+
<skip_maven_deploy>false</skip_maven_deploy>
31+
</properties>
32+
<dependencies>
33+
<dependency>
34+
<groupId>org.springframework</groupId>
35+
<artifactId>spring-context</artifactId>
36+
<version>4.3.16.RELEASE</version>
37+
</dependency>
38+
<dependency>
39+
<groupId>org.apache.dubbo</groupId>
40+
<artifactId>dubbo-registry-multicast</artifactId>
41+
<version>${project.version}</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>io.rsocket</groupId>
45+
<artifactId>rsocket-core</artifactId>
46+
<version>0.11.14</version>
47+
</dependency>
48+
<dependency>
49+
<groupId>io.rsocket</groupId>
50+
<artifactId>rsocket-transport-netty</artifactId>
51+
<version>0.11.14</version>
52+
</dependency>
53+
<dependency>
54+
<groupId>com.alibaba</groupId>
55+
<artifactId>fastjson</artifactId>
56+
<version>1.2.54</version>
57+
</dependency>
58+
<dependency>
59+
<groupId>org.apache.dubbo</groupId>
60+
<artifactId>dubbo-rpc-api</artifactId>
61+
<version>${project.parent.version}</version>
62+
</dependency>
63+
64+
<dependency>
65+
<groupId>org.apache.dubbo</groupId>
66+
<artifactId>dubbo-remoting-api</artifactId>
67+
<version>${project.parent.version}</version>
68+
</dependency>
69+
<dependency>
70+
<groupId>org.apache.dubbo</groupId>
71+
<artifactId>dubbo-config-api</artifactId>
72+
<version>${project.version}</version>
73+
</dependency>
74+
<dependency>
75+
<groupId>org.apache.dubbo</groupId>
76+
<artifactId>dubbo-config-spring</artifactId>
77+
<version>${project.version}</version>
78+
</dependency>
79+
<dependency>
80+
<groupId>org.apache.dubbo</groupId>
81+
<artifactId>dubbo-container-api</artifactId>
82+
<version>${project.parent.version}</version>
83+
<exclusions>
84+
<exclusion>
85+
<groupId>org.eclipse.jetty</groupId>
86+
<artifactId>jetty-server</artifactId>
87+
</exclusion>
88+
<exclusion>
89+
<groupId>org.eclipse.jetty</groupId>
90+
<artifactId>jetty-servlet</artifactId>
91+
</exclusion>
92+
</exclusions>
93+
</dependency>
94+
<dependency>
95+
<groupId>org.apache.dubbo</groupId>
96+
<artifactId>dubbo-serialization-hessian2</artifactId>
97+
<version>${project.parent.version}</version>
98+
<scope>test</scope>
99+
</dependency>
100+
<dependency>
101+
<groupId>org.apache.dubbo</groupId>
102+
<artifactId>dubbo-serialization-jdk</artifactId>
103+
<version>${project.parent.version}</version>
104+
<scope>test</scope>
105+
</dependency>
106+
107+
<!--<dependency>-->
108+
<!--<groupId>javax.validation</groupId>-->
109+
<!--<artifactId>validation-api</artifactId>-->
110+
<!--<scope>test</scope>-->
111+
<!--</dependency>-->
112+
<!--<dependency>-->
113+
<!--<groupId>org.hibernate</groupId>-->
114+
<!--<artifactId>hibernate-validator</artifactId>-->
115+
<!--<scope>test</scope>-->
116+
<!--</dependency>-->
117+
<!--<dependency>-->
118+
<!--<groupId>org.glassfish</groupId>-->
119+
<!--<artifactId>javax.el</artifactId>-->
120+
<!--<scope>test</scope>-->
121+
<!--</dependency>-->
122+
</dependencies>
123+
</project>
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dubbo.rpc.protocol.rsocket;
18+
19+
import io.rsocket.Payload;
20+
import org.apache.dubbo.common.serialize.ObjectInput;
21+
import org.apache.dubbo.common.serialize.Serialization;
22+
import org.apache.dubbo.rpc.RpcResult;
23+
import org.reactivestreams.Subscriber;
24+
import org.reactivestreams.Subscription;
25+
26+
import java.io.ByteArrayInputStream;
27+
import java.io.InputStream;
28+
import java.nio.ByteBuffer;
29+
import java.util.Map;
30+
import java.util.concurrent.CompletableFuture;
31+
32+
public class FutureSubscriber extends CompletableFuture<RpcResult> implements Subscriber<Payload> {
33+
34+
private final Serialization serialization;
35+
36+
private final Class<?> retType;
37+
38+
public FutureSubscriber(Serialization serialization, Class<?> retType) {
39+
this.serialization = serialization;
40+
this.retType = retType;
41+
}
42+
43+
44+
@Override
45+
public void onSubscribe(Subscription subscription) {
46+
subscription.request(1);
47+
}
48+
49+
@Override
50+
public void onNext(Payload payload) {
51+
try {
52+
RpcResult rpcResult = new RpcResult();
53+
ByteBuffer dataBuffer = payload.getData();
54+
byte[] dataBytes = new byte[dataBuffer.remaining()];
55+
dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining());
56+
InputStream dataInputStream = new ByteArrayInputStream(dataBytes);
57+
ObjectInput in = serialization.deserialize(null, dataInputStream);
58+
59+
int flag = in.readByte();
60+
if ((flag & RSocketConstants.FLAG_ERROR) != 0) {
61+
Throwable t = (Throwable) in.readObject();
62+
rpcResult.setException(t);
63+
} else {
64+
Object value = null;
65+
if ((flag & RSocketConstants.FLAG_NULL_VALUE) == 0) {
66+
if (retType == null) {
67+
value = in.readObject();
68+
} else {
69+
value = in.readObject(retType);
70+
}
71+
rpcResult.setValue(value);
72+
}
73+
}
74+
75+
if ((flag & RSocketConstants.FLAG_HAS_ATTACHMENT) != 0) {
76+
Map<String, String> attachment = in.readObject(Map.class);
77+
rpcResult.setAttachments(attachment);
78+
79+
}
80+
81+
this.complete(rpcResult);
82+
83+
84+
} catch (Throwable t) {
85+
this.completeExceptionally(t);
86+
}
87+
}
88+
89+
@Override
90+
public void onError(Throwable throwable) {
91+
this.completeExceptionally(throwable);
92+
}
93+
94+
@Override
95+
public void onComplete() {
96+
}
97+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dubbo.rpc.protocol.rsocket;
18+
19+
import com.alibaba.fastjson.JSON;
20+
21+
import java.io.IOException;
22+
import java.nio.charset.StandardCharsets;
23+
import java.util.Map;
24+
25+
/**
26+
* @author sixie.xyn on 2019/1/3.
27+
*/
28+
public class MetadataCodec {
29+
30+
public static Map<String, Object> decodeMetadata(byte[] bytes) throws IOException {
31+
return JSON.parseObject(new String(bytes, StandardCharsets.UTF_8), Map.class);
32+
}
33+
34+
public static byte[] encodeMetadata(Map<String, Object> metadata) throws IOException {
35+
String jsonStr = JSON.toJSONString(metadata);
36+
return jsonStr.getBytes(StandardCharsets.UTF_8);
37+
}
38+
39+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dubbo.rpc.protocol.rsocket;
18+
19+
/**
20+
* @author sixie.xyn on 2019/1/3.
21+
*/
22+
public class RSocketConstants {
23+
24+
public static final String SERVICE_NAME_KEY = "_service_name";
25+
public static final String SERVICE_VERSION_KEY = "_service_version";
26+
public static final String METHOD_NAME_KEY = "_method_name";
27+
public static final String PARAM_TYPE_KEY = "_param_type";
28+
public static final String SERIALIZE_TYPE_KEY = "_serialize_type";
29+
public static final String TIMEOUT_KEY = "_timeout";
30+
31+
32+
public static final int FLAG_ERROR = 0x01;
33+
public static final int FLAG_NULL_VALUE = 0x02;
34+
public static final int FLAG_HAS_ATTACHMENT = 0x04;
35+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dubbo.rpc.protocol.rsocket;
18+
19+
import org.apache.dubbo.rpc.Exporter;
20+
import org.apache.dubbo.rpc.Invoker;
21+
import org.apache.dubbo.rpc.protocol.AbstractExporter;
22+
23+
import java.util.Map;
24+
25+
/**
26+
* @author sixie.xyn on 2019/1/2.
27+
*/
28+
public class RSocketExporter<T> extends AbstractExporter<T> {
29+
30+
private final String key;
31+
32+
private final Map<String, Exporter<?>> exporterMap;
33+
34+
public RSocketExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
35+
super(invoker);
36+
this.key = key;
37+
this.exporterMap = exporterMap;
38+
}
39+
40+
@Override
41+
public void unexport() {
42+
super.unexport();
43+
exporterMap.remove(key);
44+
}
45+
46+
}

0 commit comments

Comments
 (0)