3030import java .util .List ;
3131import java .util .Properties ;
3232import java .util .concurrent .ExecutionException ;
33+ import java .util .function .Function ;
3334import org .apache .avro .Schema ;
3435import org .apache .avro .Schema .Type ;
3536import org .apache .avro .generic .GenericContainer ;
@@ -253,12 +254,21 @@ protected GenericContainerWithVersion deserializeWithSchemaAndVersion(
253254 protected GenericContainerWithVersion deserializeWithSchemaAndVersion (
254255 String topic , boolean isKey , Headers headers , byte [] payload )
255256 throws SerializationException , InvalidConfigurationException {
256- return deserializeWithSchemaAndVersion (topic , isKey , headers , payload , null );
257+ return deserializeWithSchemaAndVersion (topic , isKey , headers , payload , ( Schema ) null );
257258 }
258259
259260 protected GenericContainerWithVersion deserializeWithSchemaAndVersion (
260261 String topic , boolean isKey , Headers headers , byte [] payload , Schema readerSchema )
261262 throws SerializationException , InvalidConfigurationException {
263+ return deserializeWithSchemaAndVersion (
264+ topic , isKey , headers , payload ,
265+ writerAvroSchema -> readerSchema != null ? new AvroSchema (readerSchema ) : null );
266+ }
267+
268+ protected GenericContainerWithVersion deserializeWithSchemaAndVersion (
269+ String topic , boolean isKey , Headers headers , byte [] payload ,
270+ Function <AvroSchema , AvroSchema > writerToReaderSchemaFunc )
271+ throws SerializationException , InvalidConfigurationException {
262272 // Even if the caller requests schema & version, if the payload is null we cannot include it.
263273 // The caller must handle this case.
264274 if (payload == null ) {
@@ -276,8 +286,8 @@ protected GenericContainerWithVersion deserializeWithSchemaAndVersion(
276286 // explicit from the Connector).
277287 DeserializationContext context = new DeserializationContext (topic , isKey , headers , payload );
278288 AvroSchema schema = context .schemaForDeserialize ();
279- AvroSchema readerAvroSchema = readerSchema != null
280- ? new AvroSchema ( readerSchema )
289+ AvroSchema readerAvroSchema = writerToReaderSchemaFunc != null
290+ ? writerToReaderSchemaFunc . apply ( schema )
281291 : specificAvroReaderSchema != null
282292 ? new AvroSchema (specificAvroReaderSchema )
283293 : null ;
0 commit comments