Skip to content

Commit e92e4fa

Browse files
Merge branch '8.0.x' into 8.1.x by rayokota
2 parents 10e5d31 + 59d2bb0 commit e92e4fa

File tree

2 files changed

+50
-19
lines changed

2 files changed

+50
-19
lines changed

client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -544,12 +544,16 @@ private RegisterSchemaResponse registerWithResponse(
544544
? registerAndGetId(subject, schema, version, id, normalize, propagateSchemaTags)
545545
: registerAndGetId(subject, schema, normalize, propagateSchemaTags);
546546
schemaResponseMap.put(schema, retrievedResponse);
547-
String context = toQualifiedContext(subject);
548-
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
549-
context, () -> CacheBuilder.newBuilder()
550-
.maximumSize(cacheCapacity)
551-
.build());
552-
idSchemaMap.put(retrievedResponse.getId(), schema);
547+
if (retrievedResponse.getSchema() != null) {
548+
String context = toQualifiedContext(subject);
549+
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
550+
context, () -> CacheBuilder.newBuilder()
551+
.maximumSize(cacheCapacity)
552+
.build());
553+
ParsedSchema retrievedSchema =
554+
parseSchemaOrElseThrow(new Schema(null, retrievedResponse));
555+
idSchemaMap.put(retrievedResponse.getId(), retrievedSchema);
556+
}
553557
return retrievedResponse;
554558
}
555559
} catch (ExecutionException e) {
@@ -827,14 +831,20 @@ public int getId(String subject, ParsedSchema schema, boolean normalize)
827831
return cachedId;
828832
}
829833

830-
final int retrievedId = getIdFromRegistry(subject, schema, normalize);
834+
final RegisterSchemaResponse retrievedResponse =
835+
getIdWithResponseFromRegistry(subject, schema, normalize, false);
836+
final int retrievedId = retrievedResponse.getId();
831837
schemaIdMap.put(schema, retrievedId);
832-
String context = toQualifiedContext(subject);
833-
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
834-
context, () -> CacheBuilder.newBuilder()
835-
.maximumSize(cacheCapacity)
836-
.build());
837-
idSchemaMap.put(retrievedId, schema);
838+
if (retrievedResponse.getSchema() != null) {
839+
String context = toQualifiedContext(subject);
840+
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
841+
context, () -> CacheBuilder.newBuilder()
842+
.maximumSize(cacheCapacity)
843+
.build());
844+
ParsedSchema retrievedSchema =
845+
parseSchemaOrElseThrow(new Schema(null, retrievedResponse));
846+
idSchemaMap.put(retrievedId, retrievedSchema);
847+
}
838848
return retrievedId;
839849
}
840850
} catch (ExecutionException e) {
@@ -909,12 +919,16 @@ public RegisterSchemaResponse getIdWithResponse(
909919
final RegisterSchemaResponse retrievedResponse =
910920
getIdWithResponseFromRegistry(subject, schema, normalize, false);
911921
schemaResponseMap.put(schema, retrievedResponse);
912-
String context = toQualifiedContext(subject);
913-
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
914-
context, () -> CacheBuilder.newBuilder()
915-
.maximumSize(cacheCapacity)
916-
.build());
917-
idSchemaMap.put(retrievedResponse.getId(), schema);
922+
if (retrievedResponse.getSchema() != null) {
923+
String context = toQualifiedContext(subject);
924+
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
925+
context, () -> CacheBuilder.newBuilder()
926+
.maximumSize(cacheCapacity)
927+
.build());
928+
ParsedSchema retrievedSchema =
929+
parseSchemaOrElseThrow(new Schema(null, retrievedResponse));
930+
idSchemaMap.put(retrievedResponse.getId(), retrievedSchema);
931+
}
918932
return retrievedResponse;
919933
}
920934
} catch (ExecutionException e) {

client/src/test/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClientTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class CachedSchemaRegistryClientTest {
6969
private static final int CACHE_CAPACITY = 5;
7070
private static final String SCHEMA_STR_0 = avroSchemaString(0);
7171
private static final AvroSchema AVRO_SCHEMA_0 = avroSchema(0);
72+
private static final AvroSchema AVRO_SCHEMA_0_WITH_METADATA =
73+
AVRO_SCHEMA_0.copy(new Metadata(null, ImmutableMap.of("confluent:version", "1"), null), null);
7274
private static final AvroSchema SCHEMA_WITH_DECIMAL = new AvroSchema(
7375
"{\n"
7476
+ " \"type\": \"record\",\n"
@@ -258,6 +260,21 @@ public void testRegisterFollowedByLookupWillSkipCache() throws Exception {
258260
verify(restService);
259261
}
260262

263+
@Test
264+
public void testRegisterFollowedByIdLookupWillReturnRegisteredSchema() throws Exception {
265+
expect(restService.registerSchema(anyObject(RegisterSchemaRequest.class),
266+
eq(SUBJECT_0), anyBoolean()))
267+
.andReturn(new RegisterSchemaResponse(new Schema(SUBJECT_0, 1, ID_25, AVRO_SCHEMA_0_WITH_METADATA)))
268+
.once();
269+
270+
replay(restService);
271+
272+
assertEquals(ID_25, client.register(SUBJECT_0, AVRO_SCHEMA_0, VERSION_1, ID_25));
273+
assertEquals(AVRO_SCHEMA_0_WITH_METADATA, client.getSchemaById(ID_25));
274+
275+
verify(restService);
276+
}
277+
261278
@Test
262279
public void testRegisterOverCapacity() throws Exception {
263280
expect(restService.registerSchema(anyObject(RegisterSchemaRequest.class),

0 commit comments

Comments
 (0)