Skip to content

Commit 7c767f8

Browse files
authored
DGS-22814 Allow function to be passed during Avro deserialization (#4004)
1 parent bee7372 commit 7c767f8

File tree

3 files changed

+26
-3
lines changed

3 files changed

+26
-3
lines changed

avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.List;
3131
import java.util.Properties;
3232
import java.util.concurrent.ExecutionException;
33+
import java.util.function.Function;
3334
import org.apache.avro.Schema;
3435
import org.apache.avro.Schema.Type;
3536
import org.apache.avro.generic.GenericContainer;
@@ -253,12 +254,21 @@ protected GenericContainerWithVersion deserializeWithSchemaAndVersion(
253254
protected GenericContainerWithVersion deserializeWithSchemaAndVersion(
254255
String topic, boolean isKey, Headers headers, byte[] payload)
255256
throws SerializationException, InvalidConfigurationException {
256-
return deserializeWithSchemaAndVersion(topic, isKey, headers, payload, null);
257+
return deserializeWithSchemaAndVersion(topic, isKey, headers, payload, (Schema) null);
257258
}
258259

259260
protected GenericContainerWithVersion deserializeWithSchemaAndVersion(
260261
String topic, boolean isKey, Headers headers, byte[] payload, Schema readerSchema)
261262
throws SerializationException, InvalidConfigurationException {
263+
return deserializeWithSchemaAndVersion(
264+
topic, isKey, headers, payload,
265+
writerAvroSchema -> readerSchema != null ? new AvroSchema(readerSchema) : null);
266+
}
267+
268+
protected GenericContainerWithVersion deserializeWithSchemaAndVersion(
269+
String topic, boolean isKey, Headers headers, byte[] payload,
270+
Function<AvroSchema, AvroSchema> writerToReaderSchemaFunc)
271+
throws SerializationException, InvalidConfigurationException {
262272
// Even if the caller requests schema & version, if the payload is null we cannot include it.
263273
// The caller must handle this case.
264274
if (payload == null) {
@@ -276,8 +286,8 @@ protected GenericContainerWithVersion deserializeWithSchemaAndVersion(
276286
// explicit from the Connector).
277287
DeserializationContext context = new DeserializationContext(topic, isKey, headers, payload);
278288
AvroSchema schema = context.schemaForDeserialize();
279-
AvroSchema readerAvroSchema = readerSchema != null
280-
? new AvroSchema(readerSchema)
289+
AvroSchema readerAvroSchema = writerToReaderSchemaFunc != null
290+
? writerToReaderSchemaFunc.apply(schema)
281291
: specificAvroReaderSchema != null
282292
? new AvroSchema(specificAvroReaderSchema)
283293
: null;

avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.confluent.kafka.serializers;
1818

19+
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
20+
import java.util.function.Function;
1921
import org.apache.avro.Schema;
2022
import org.apache.kafka.common.header.Headers;
2123

@@ -128,6 +130,12 @@ public GenericContainerWithVersion deserializeWithSchema(
128130
return deserializeWithSchemaAndVersion(topic, isKey, headers, bytes, readerSchema);
129131
}
130132

133+
public GenericContainerWithVersion deserializeWithSchema(
134+
String topic, Headers headers, byte[] bytes,
135+
Function<AvroSchema, AvroSchema> writerToReaderSchemaFunc) {
136+
return deserializeWithSchemaAndVersion(topic, isKey, headers, bytes, writerToReaderSchemaFunc);
137+
}
138+
131139
@Override
132140
public void close() {
133141
try {

avro-serializer/src/test/java/io/confluent/kafka/serializers/KafkaAvroSerializerTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1090,6 +1090,11 @@ public void testKafkaAvroSerializerWithProjection() {
10901090
topic, headers, bytes, User.getClassSchema());
10911091
assertEquals(new AvroSchema(ExtendedUser.SCHEMA$), schemaAndValue.getSchema());
10921092
assertEquals(obj, schemaAndValue.getValue());
1093+
1094+
schemaAndValue = avroDeserializer.deserializeWithSchema(
1095+
topic, headers, bytes, x -> new AvroSchema(User.getClassSchema()));
1096+
assertEquals(new AvroSchema(ExtendedUser.SCHEMA$), schemaAndValue.getSchema());
1097+
assertEquals(obj, schemaAndValue.getValue());
10931098
}
10941099

10951100
@Test

0 commit comments

Comments
 (0)