diff --git a/dubbo-all/pom.xml b/dubbo-all/pom.xml index e0e6e11ccc9d..2a68c62c982c 100644 --- a/dubbo-all/pom.xml +++ b/dubbo-all/pom.xml @@ -332,6 +332,13 @@ compile true + + org.apache.dubbo + dubbo-serialization-avro + ${project.version} + compile + true + org.apache.dubbo dubbo-serialization-protostuff @@ -523,6 +530,7 @@ org.apache.dubbo:dubbo-serialization-hessian2 org.apache.dubbo:dubbo-serialization-fst org.apache.dubbo:dubbo-serialization-kryo + org.apache.dubbo:dubbo-serialization-avro org.apache.dubbo:dubbo-serialization-jdk org.apache.dubbo:dubbo-serialization-protostuff org.apache.dubbo:dubbo-configcenter-api diff --git a/dubbo-bom/pom.xml b/dubbo-bom/pom.xml index 3d28b4304652..6381732a9b59 100644 --- a/dubbo-bom/pom.xml +++ b/dubbo-bom/pom.xml @@ -314,6 +314,11 @@ dubbo-serialization-protostuff ${project.version} + + org.apache.dubbo + dubbo-serialization-avro + ${project.version} + org.apache.dubbo dubbo-compatible diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml index f8880494b24e..22b43ae296b3 100644 --- a/dubbo-dependencies-bom/pom.xml +++ b/dubbo-dependencies-bom/pom.xml @@ -1,19 +1,18 @@ - @@ -117,6 +116,7 @@ 4.0.1 0.42 2.48-jdk-6 + 1.8.2 1.1.1 1.20 3.8.1 @@ -307,6 +307,11 @@ fst ${fst_version} + + org.apache.avro + avro + ${avro_version} + io.protostuff protostuff-core diff --git a/dubbo-dependencies/pom.xml b/dubbo-dependencies/pom.xml index a1a4f73d69cf..188d1e7b8966 100644 --- a/dubbo-dependencies/pom.xml +++ b/dubbo-dependencies/pom.xml @@ -1,19 +1,18 @@ - diff --git a/dubbo-distribution/pom.xml b/dubbo-distribution/pom.xml index d60548d3b179..b818291bc21f 100644 --- a/dubbo-distribution/pom.xml +++ b/dubbo-distribution/pom.xml @@ -240,6 +240,11 @@ dubbo-serialization-protostuff ${project.version} + + org.apache.dubbo + dubbo-serialization-avro + ${project.version} + org.apache.dubbo dubbo diff --git a/dubbo-serialization/dubbo-serialization-avro/pom.xml b/dubbo-serialization/dubbo-serialization-avro/pom.xml new file mode 100644 index 000000000000..27fd5378c7ac --- /dev/null +++ b/dubbo-serialization/dubbo-serialization-avro/pom.xml @@ -0,0 +1,44 @@ + + + 4.0.0 + + org.apache.dubbo + dubbo-serialization + 2.7.2-SNAPSHOT + + dubbo-serialization-avro + jar + ${project.artifactId} + The avro serialization module of dubbo project + + false + + + + + org.apache.dubbo + dubbo-serialization-api + ${project.parent.version} + + + org.apache.avro + avro + + + \ No newline at end of file diff --git a/dubbo-serialization/dubbo-serialization-avro/src/main/java/org/apache/dubbo/common/serialize/avro/AvroObjectInput.java b/dubbo-serialization/dubbo-serialization-avro/src/main/java/org/apache/dubbo/common/serialize/avro/AvroObjectInput.java new file mode 100644 index 000000000000..083d2140a451 --- /dev/null +++ b/dubbo-serialization/dubbo-serialization-avro/src/main/java/org/apache/dubbo/common/serialize/avro/AvroObjectInput.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.serialize.avro; + +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.util.Utf8; +import org.apache.dubbo.common.serialize.ObjectInput; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.Map; + +public class AvroObjectInput implements ObjectInput { + private static DecoderFactory decoderFactory = DecoderFactory.get(); + private BinaryDecoder decoder; + + public AvroObjectInput(InputStream in) { + decoder = decoderFactory.binaryDecoder(in, null); + } + + @Override + public boolean readBool() throws IOException { + return decoder.readBoolean(); + } + + @Override + public byte readByte() throws IOException { + byte[] bytes = new byte[1]; + decoder.readFixed(bytes); + return bytes[0]; + } + + @Override + public short readShort() throws IOException { + return (short) decoder.readInt(); + } + + @Override + public int readInt() throws IOException { + return decoder.readInt(); + } + + @Override + public long readLong() throws IOException { + return decoder.readLong(); + } + + @Override + public float readFloat() throws IOException { + return decoder.readFloat(); + } + + @Override + public double readDouble() throws IOException { + return decoder.readDouble(); + } + + @Override + public String readUTF() throws IOException { + Utf8 result = new Utf8(); + result = decoder.readString(result); + return result.toString(); + } + + @Override + public byte[] readBytes() throws IOException { + String resultStr = decoder.readString(); + return resultStr.getBytes("utf8"); + } + + /** + * will lost all attribute + */ + @Override + public Object readObject() throws IOException, ClassNotFoundException { + ReflectDatumReader reader = new ReflectDatumReader<>(Object.class); + return reader.read(null, decoder); + } + + @Override + @SuppressWarnings(value = {"unchecked"}) + public T readObject(Class cls) throws IOException, ClassNotFoundException { + //Map interface class change to HashMap implement + if (cls == Map.class) { + cls = (Class) HashMap.class; + } + + ReflectDatumReader reader = new ReflectDatumReader<>(cls); + return reader.read(null, decoder); + } + + @Override + public T readObject(Class cls, Type type) throws IOException, ClassNotFoundException { + ReflectDatumReader reader = new ReflectDatumReader<>(cls); + return reader.read(null, decoder); + } + +} diff --git a/dubbo-serialization/dubbo-serialization-avro/src/main/java/org/apache/dubbo/common/serialize/avro/AvroObjectOutput.java b/dubbo-serialization/dubbo-serialization-avro/src/main/java/org/apache/dubbo/common/serialize/avro/AvroObjectOutput.java new file mode 100644 index 000000000000..d043dd068436 --- /dev/null +++ b/dubbo-serialization/dubbo-serialization-avro/src/main/java/org/apache/dubbo/common/serialize/avro/AvroObjectOutput.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.serialize.avro; + +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.avro.util.Utf8; +import org.apache.dubbo.common.serialize.ObjectOutput; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; + +public class AvroObjectOutput implements ObjectOutput { + private static EncoderFactory encoderFactory = EncoderFactory.get(); + private BinaryEncoder encoder; + + public AvroObjectOutput(OutputStream out) { + encoder = encoderFactory.binaryEncoder(out, null); + } + + @Override + public void writeBool(boolean v) throws IOException { + encoder.writeBoolean(v); + } + + @Override + public void writeByte(byte v) throws IOException { + encoder.writeFixed(new byte[]{v}); + } + + @Override + public void writeShort(short v) throws IOException { + encoder.writeInt(v); + } + + @Override + public void writeInt(int v) throws IOException { + encoder.writeInt(v); + } + + @Override + public void writeLong(long v) throws IOException { + encoder.writeLong(v); + } + + @Override + public void writeFloat(float v) throws IOException { + encoder.writeFloat(v); + } + + @Override + public void writeDouble(double v) throws IOException { + encoder.writeDouble(v); + } + + @Override + public void writeUTF(String v) throws IOException { + encoder.writeString(new Utf8(v)); + } + + @Override + public void writeBytes(byte[] v) throws IOException { + encoder.writeString(new String(v, "utf8")); + } + + @Override + public void writeBytes(byte[] v, int off, int len) throws IOException { + byte[] v2 = Arrays.copyOfRange(v, off, off + len); + encoder.writeString(new String(v2, "utf8")); + } + + @Override + public void flushBuffer() throws IOException { + encoder.flush(); + } + + @Override + @SuppressWarnings(value = {"rawtypes", "unchecked"}) + public void writeObject(Object obj) throws IOException { + if (obj == null) { + encoder.writeNull(); + return; + } + ReflectDatumWriter dd = new ReflectDatumWriter<>(obj.getClass()); + dd.write(obj, encoder); + } + +} diff --git a/dubbo-serialization/dubbo-serialization-avro/src/main/java/org/apache/dubbo/common/serialize/avro/AvroSerialization.java b/dubbo-serialization/dubbo-serialization-avro/src/main/java/org/apache/dubbo/common/serialize/avro/AvroSerialization.java new file mode 100644 index 000000000000..8c5466235513 --- /dev/null +++ b/dubbo-serialization/dubbo-serialization-avro/src/main/java/org/apache/dubbo/common/serialize/avro/AvroSerialization.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.serialize.avro; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.serialize.ObjectInput; +import org.apache.dubbo.common.serialize.ObjectOutput; +import org.apache.dubbo.common.serialize.Serialization; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class AvroSerialization implements Serialization { + + @Override + public byte getContentTypeId() { + return 10; + } + + @Override + public String getContentType() { + return "avro/binary"; + } + + @Override + public ObjectOutput serialize(URL url, OutputStream output) throws IOException { + return new AvroObjectOutput(output); + } + + @Override + public ObjectInput deserialize(URL url, InputStream input) throws IOException { + return new AvroObjectInput(input); + } + +} diff --git a/dubbo-serialization/dubbo-serialization-avro/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.serialize.Serialization b/dubbo-serialization/dubbo-serialization-avro/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.serialize.Serialization new file mode 100644 index 000000000000..d8cba3e1ba91 --- /dev/null +++ b/dubbo-serialization/dubbo-serialization-avro/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.serialize.Serialization @@ -0,0 +1 @@ +avro=org.apache.dubbo.common.serialize.avro.AvroSerialization \ No newline at end of file diff --git a/dubbo-serialization/dubbo-serialization-test/pom.xml b/dubbo-serialization/dubbo-serialization-test/pom.xml index 32e98196aa66..ee6e353d3dec 100644 --- a/dubbo-serialization/dubbo-serialization-test/pom.xml +++ b/dubbo-serialization/dubbo-serialization-test/pom.xml @@ -67,5 +67,10 @@ dubbo-serialization-api ${project.parent.version} + + org.apache.dubbo + dubbo-serialization-avro + ${project.parent.version} + diff --git a/dubbo-serialization/dubbo-serialization-test/src/test/java/org/apache/dubbo/common/serialize/avro/AvroObjectInputOutputTest.java b/dubbo-serialization/dubbo-serialization-test/src/test/java/org/apache/dubbo/common/serialize/avro/AvroObjectInputOutputTest.java new file mode 100644 index 000000000000..431b29f08302 --- /dev/null +++ b/dubbo-serialization/dubbo-serialization-test/src/test/java/org/apache/dubbo/common/serialize/avro/AvroObjectInputOutputTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.serialize.avro; + + +import org.apache.dubbo.common.serialize.model.Person; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNot.not; +import static org.hamcrest.core.IsNull.nullValue; + + +public class AvroObjectInputOutputTest { + private AvroObjectInput avroObjectInput; + private AvroObjectOutput avroObjectOutput; + + private PipedOutputStream pos; + private PipedInputStream pis; + + @BeforeEach + public void setup() throws IOException { + pis = new PipedInputStream(); + pos = new PipedOutputStream(); + pis.connect(pos); + + avroObjectOutput = new AvroObjectOutput(pos); + avroObjectInput = new AvroObjectInput(pis); + } + + @AfterEach + public void clean() throws IOException { + if (pos != null) { + pos.close(); + pos = null; + } + if (pis != null) { + pis.close(); + pis = null; + } + } + + @Test + public void testWriteReadBool() throws IOException, InterruptedException { + avroObjectOutput.writeBool(true); + avroObjectOutput.flushBuffer(); + pos.close(); + + boolean result = avroObjectInput.readBool(); + assertThat(result, is(true)); + } + + @Test + public void testWriteReadByte() throws IOException { + avroObjectOutput.writeByte((byte) 'a'); + avroObjectOutput.flushBuffer(); + pos.close(); + + Byte result = avroObjectInput.readByte(); + + assertThat(result, is((byte) 'a')); + } + + @Test + public void testWriteReadBytes() throws IOException { + avroObjectOutput.writeBytes("123456".getBytes()); + avroObjectOutput.flushBuffer(); + pos.close(); + + byte[] result = avroObjectInput.readBytes(); + + assertThat(result, is("123456".getBytes())); + } + + @Test + public void testWriteReadShort() throws IOException { + avroObjectOutput.writeShort((short) 1); + avroObjectOutput.flushBuffer(); + pos.close(); + + short result = avroObjectInput.readShort(); + + assertThat(result, is((short) 1)); + } + + @Test + public void testWriteReadInt() throws IOException { + avroObjectOutput.writeInt(1); + avroObjectOutput.flushBuffer(); + pos.close(); + + Integer result = avroObjectInput.readInt(); + + assertThat(result, is(1)); + } + + @Test + public void testReadDouble() throws IOException { + avroObjectOutput.writeDouble(3.14d); + avroObjectOutput.flushBuffer(); + pos.close(); + + Double result = avroObjectInput.readDouble(); + + assertThat(result, is(3.14d)); + } + + @Test + public void testReadLong() throws IOException { + avroObjectOutput.writeLong(10L); + avroObjectOutput.flushBuffer(); + pos.close(); + + Long result = avroObjectInput.readLong(); + + assertThat(result, is(10L)); + } + + @Test + public void testWriteReadFloat() throws IOException { + avroObjectOutput.writeFloat(1.66f); + avroObjectOutput.flushBuffer(); + pos.close(); + + Float result = avroObjectInput.readFloat(); + + assertThat(result, is(1.66F)); + } + + @Test + public void testWriteReadUTF() throws IOException { + avroObjectOutput.writeUTF("wording"); + avroObjectOutput.flushBuffer(); + pos.close(); + + String result = avroObjectInput.readUTF(); + + assertThat(result, is("wording")); + } + + @Test + public void testWriteReadObject() throws IOException, ClassNotFoundException { + Person p = new Person(); + p.setAge(30); + p.setName("abc"); + + avroObjectOutput.writeObject(p); + avroObjectOutput.flushBuffer(); + pos.close(); + + Person result = avroObjectInput.readObject(Person.class); + + assertThat(result, not(nullValue())); + assertThat(result.getName(), is("abc")); + assertThat(result.getAge(), is(30)); + } + + @Test + public void testWriteReadObjectWithoutClass() throws IOException, ClassNotFoundException { + Person p = new Person(); + p.setAge(30); + p.setName("abc"); + + avroObjectOutput.writeObject(p); + avroObjectOutput.flushBuffer(); + pos.close(); + + //这里会丢失所有信息 + Object result = avroObjectInput.readObject(); + + assertThat(result, not(nullValue())); +// assertThat(result.getName(), is("abc")); +// assertThat(result.getAge(), is(30)); + } +} diff --git a/dubbo-serialization/dubbo-serialization-test/src/test/java/org/apache/dubbo/common/serialize/avro/AvroSerializationTest.java b/dubbo-serialization/dubbo-serialization-test/src/test/java/org/apache/dubbo/common/serialize/avro/AvroSerializationTest.java new file mode 100644 index 000000000000..79e42bdeeb6f --- /dev/null +++ b/dubbo-serialization/dubbo-serialization-test/src/test/java/org/apache/dubbo/common/serialize/avro/AvroSerializationTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.serialize.avro; + +import org.apache.dubbo.common.serialize.ObjectInput; +import org.apache.dubbo.common.serialize.ObjectOutput; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; + +public class AvroSerializationTest { + private AvroSerialization avroSerialization; + + @BeforeEach + public void setUp() { + this.avroSerialization = new AvroSerialization(); + } + + @Test + public void testContentType() { + assertThat(avroSerialization.getContentType(), is("avro/binary")); + } + + @Test + public void testContentTypeId() { + assertThat(avroSerialization.getContentTypeId(), is((byte) 10)); + } + + @Test + public void testObjectOutput() throws IOException { + ObjectOutput objectOutput = avroSerialization.serialize(null, mock(OutputStream.class)); + assertThat(objectOutput, Matchers.instanceOf(AvroObjectOutput.class)); + } + + @Test + public void testObjectInput() throws IOException { + ObjectInput objectInput = avroSerialization.deserialize(null, mock(InputStream.class)); + assertThat(objectInput, Matchers.instanceOf(AvroObjectInput.class)); + } +} diff --git a/dubbo-serialization/pom.xml b/dubbo-serialization/pom.xml index e6fcd52a610a..3fd40ea6ef92 100644 --- a/dubbo-serialization/pom.xml +++ b/dubbo-serialization/pom.xml @@ -36,6 +36,7 @@ dubbo-serialization-fst dubbo-serialization-jdk dubbo-serialization-protostuff + dubbo-serialization-avro dubbo-serialization-test diff --git a/dubbo-test/pom.xml b/dubbo-test/pom.xml index db5bc5c20f45..2d92d237b33e 100644 --- a/dubbo-test/pom.xml +++ b/dubbo-test/pom.xml @@ -198,6 +198,10 @@ org.apache.dubbo dubbo-serialization-kryo + + org.apache.dubbo + dubbo-serialization-avro + org.apache.dubbo dubbo-serialization-jdk