Skip to content

Commit bb65368

Browse files
committed
DGS-23340 Ensure output schema is cached if it is returned
1 parent a5b71b7 commit bb65368

File tree

2 files changed

+50
-19
lines changed

2 files changed

+50
-19
lines changed

client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -543,12 +543,16 @@ private RegisterSchemaResponse registerWithResponse(
543543
? registerAndGetId(subject, schema, version, id, normalize, propagateSchemaTags)
544544
: registerAndGetId(subject, schema, normalize, propagateSchemaTags);
545545
schemaResponseMap.put(schema, retrievedResponse);
546-
String context = toQualifiedContext(subject);
547-
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
548-
context, () -> CacheBuilder.newBuilder()
549-
.maximumSize(cacheCapacity)
550-
.build());
551-
idSchemaMap.put(retrievedResponse.getId(), schema);
546+
if (retrievedResponse.getSchema() != null) {
547+
String context = toQualifiedContext(subject);
548+
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
549+
context, () -> CacheBuilder.newBuilder()
550+
.maximumSize(cacheCapacity)
551+
.build());
552+
ParsedSchema retrievedSchema =
553+
parseSchemaOrElseThrow(new Schema(null, retrievedResponse));
554+
idSchemaMap.put(retrievedResponse.getId(), retrievedSchema);
555+
}
552556
return retrievedResponse;
553557
}
554558
} catch (ExecutionException e) {
@@ -826,14 +830,20 @@ public int getId(String subject, ParsedSchema schema, boolean normalize)
826830
return cachedId;
827831
}
828832

829-
final int retrievedId = getIdFromRegistry(subject, schema, normalize);
833+
final RegisterSchemaResponse retrievedResponse =
834+
getIdWithResponseFromRegistry(subject, schema, normalize, false);
835+
final int retrievedId = retrievedResponse.getId();
830836
schemaIdMap.put(schema, retrievedId);
831-
String context = toQualifiedContext(subject);
832-
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
833-
context, () -> CacheBuilder.newBuilder()
834-
.maximumSize(cacheCapacity)
835-
.build());
836-
idSchemaMap.put(retrievedId, schema);
837+
if (retrievedResponse.getSchema() != null) {
838+
String context = toQualifiedContext(subject);
839+
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
840+
context, () -> CacheBuilder.newBuilder()
841+
.maximumSize(cacheCapacity)
842+
.build());
843+
ParsedSchema retrievedSchema =
844+
parseSchemaOrElseThrow(new Schema(null, retrievedResponse));
845+
idSchemaMap.put(retrievedId, retrievedSchema);
846+
}
837847
return retrievedId;
838848
}
839849
} catch (ExecutionException e) {
@@ -908,12 +918,16 @@ public RegisterSchemaResponse getIdWithResponse(
908918
final RegisterSchemaResponse retrievedResponse =
909919
getIdWithResponseFromRegistry(subject, schema, normalize, false);
910920
schemaResponseMap.put(schema, retrievedResponse);
911-
String context = toQualifiedContext(subject);
912-
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
913-
context, () -> CacheBuilder.newBuilder()
914-
.maximumSize(cacheCapacity)
915-
.build());
916-
idSchemaMap.put(retrievedResponse.getId(), schema);
921+
if (retrievedResponse.getSchema() != null) {
922+
String context = toQualifiedContext(subject);
923+
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
924+
context, () -> CacheBuilder.newBuilder()
925+
.maximumSize(cacheCapacity)
926+
.build());
927+
ParsedSchema retrievedSchema =
928+
parseSchemaOrElseThrow(new Schema(null, retrievedResponse));
929+
idSchemaMap.put(retrievedResponse.getId(), retrievedSchema);
930+
}
917931
return retrievedResponse;
918932
}
919933
} catch (ExecutionException e) {

client/src/test/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClientTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class CachedSchemaRegistryClientTest {
6969
private static final int CACHE_CAPACITY = 5;
7070
private static final String SCHEMA_STR_0 = avroSchemaString(0);
7171
private static final AvroSchema AVRO_SCHEMA_0 = avroSchema(0);
72+
private static final AvroSchema AVRO_SCHEMA_0_WITH_METADATA =
73+
AVRO_SCHEMA_0.copy(new Metadata(null, ImmutableMap.of("confluent:version", "1"), null), null);
7274
private static final AvroSchema SCHEMA_WITH_DECIMAL = new AvroSchema(
7375
"{\n"
7476
+ " \"type\": \"record\",\n"
@@ -258,6 +260,21 @@ public void testRegisterFollowedByLookupWillSkipCache() throws Exception {
258260
verify(restService);
259261
}
260262

263+
@Test
264+
public void testRegisterFollowedByIdLookupWillReturnRegisteredSchema() throws Exception {
265+
expect(restService.registerSchema(anyObject(RegisterSchemaRequest.class),
266+
eq(SUBJECT_0), anyBoolean()))
267+
.andReturn(new RegisterSchemaResponse(new Schema(SUBJECT_0, 1, ID_25, AVRO_SCHEMA_0_WITH_METADATA)))
268+
.once();
269+
270+
replay(restService);
271+
272+
assertEquals(ID_25, client.register(SUBJECT_0, AVRO_SCHEMA_0, VERSION_1, ID_25));
273+
assertEquals(AVRO_SCHEMA_0_WITH_METADATA, client.getSchemaById(ID_25));
274+
275+
verify(restService);
276+
}
277+
261278
@Test
262279
public void testRegisterOverCapacity() throws Exception {
263280
expect(restService.registerSchema(anyObject(RegisterSchemaRequest.class),

0 commit comments

Comments
 (0)