Skip to content

Commit e7376c1

Browse files
authored
DGS-23542 DGS-23555 Ensure guid is preserved when formatting schema responses (#4163)
1 parent ad2f6c8 commit e7376c1

File tree

10 files changed

+129
-43
lines changed

10 files changed

+129
-43
lines changed

avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ protected byte[] serializeImpl(
137137
io.confluent.kafka.schemaregistry.client.rest.entities.Schema schemaEntity =
138138
new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(
139139
subject, null, useSchemaId, schema);
140-
schemaId = new SchemaId(AvroSchema.TYPE, useSchemaId, schemaEntity.getGuid());
140+
// omit the GUID when useSchemaId is set
141+
schemaId = new SchemaId(AvroSchema.TYPE, useSchemaId, (String) null);
141142
} else if (metadata != null) {
142143
restClientErrorMsg = "Error retrieving latest with metadata '" + metadata + "'";
143144
ExtendedSchema extendedSchema = getLatestWithMetadata(subject);

avro-serializer/src/test/java/io/confluent/kafka/serializers/KafkaAvroConfigDeserializerTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.avro.generic.GenericData;
3535
import org.apache.avro.generic.GenericRecord;
3636
import org.apache.avro.generic.IndexedRecord;
37+
import org.apache.kafka.common.errors.SerializationException;
3738
import org.apache.kafka.common.header.internals.RecordHeaders;
3839
import org.junit.Test;
3940

@@ -96,7 +97,7 @@ private IndexedRecord createAnnotatedUserRecord() {
9697
return io.confluent.kafka.example.annotated.User.newBuilder().setName("testUser").build();
9798
}
9899

99-
@Test
100+
@Test(expected = SerializationException.class)
100101
public void testKafkaAvroSerializerWithPreRegisteredUseSchemaId()
101102
throws IOException, RestClientException {
102103
Map configs = ImmutableMap.of(
@@ -115,7 +116,5 @@ public void testKafkaAvroSerializerWithPreRegisteredUseSchemaId()
115116
IndexedRecord annotatedUserRecord = createAnnotatedUserRecord();
116117
RecordHeaders headers = new RecordHeaders();
117118
byte[] bytes = avroSerializer.serialize(topic, headers, annotatedUserRecord);
118-
assertEquals(avroRecord, avroDeserializer.deserialize(topic, headers, bytes));
119-
assertEquals(avroRecord, avroDecoder.fromBytes(headers, bytes));
120119
}
121120
}

client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java

Lines changed: 58 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,13 @@ public class CachedSchemaRegistryClient implements SchemaRegistryClient {
7777

7878
private final RestService restService;
7979
private final int cacheCapacity;
80-
private final Cache<String, Cache<ParsedSchema, RegisterSchemaResponse>> schemaToResponseCache;
81-
private final Cache<String, Cache<ParsedSchema, Integer>> schemaToIdCache;
80+
private final Cache<String, Cache<SchemaAndNormalize, RegisterSchemaResponse>>
81+
schemaToResponseCache;
82+
private final Cache<String, Cache<SchemaAndNormalize, Integer>> schemaToIdCache;
8283
private final Cache<String, Cache<Integer, ParsedSchema>> idToSchemaCache;
8384
private final Cache<String, ParsedSchema> guidToSchemaCache;
84-
private final Cache<String, Cache<ParsedSchema, String>> schemaToGuidCache;
85-
private final Cache<String, Cache<ParsedSchema, Integer>> schemaToVersionCache;
85+
private final Cache<String, Cache<SchemaAndNormalize, String>> schemaToGuidCache;
86+
private final Cache<String, Cache<SchemaAndNormalize, Integer>> schemaToVersionCache;
8687
private final Cache<String, Cache<Integer, Schema>> versionToSchemaCache;
8788
private final Cache<String, SchemaMetadata> latestVersionCache;
8889
private final Cache<SubjectAndMetadata, SchemaMetadata> latestWithMetadataCache;
@@ -524,26 +525,27 @@ private RegisterSchemaResponse registerWithResponse(
524525
boolean normalize, boolean propagateSchemaTags)
525526
throws IOException, RestClientException {
526527
try {
527-
final Cache<ParsedSchema, RegisterSchemaResponse> schemaResponseMap =
528+
final Cache<SchemaAndNormalize, RegisterSchemaResponse> schemaResponseMap =
528529
schemaToResponseCache.get(subject, () -> CacheBuilder.newBuilder()
529530
.maximumSize(cacheCapacity)
530531
.build());
531532

532-
RegisterSchemaResponse cachedResponse = schemaResponseMap.getIfPresent(schema);
533+
SchemaAndNormalize cacheKey = new SchemaAndNormalize(schema, normalize);
534+
RegisterSchemaResponse cachedResponse = schemaResponseMap.getIfPresent(cacheKey);
533535
if (cachedResponse != null && (id < 0 || id == cachedResponse.getId())) {
534536
return cachedResponse;
535537
}
536538

537539
synchronized (this) {
538-
cachedResponse = schemaResponseMap.getIfPresent(schema);
540+
cachedResponse = schemaResponseMap.getIfPresent(cacheKey);
539541
if (cachedResponse != null && (id < 0 || id == cachedResponse.getId())) {
540542
return cachedResponse;
541543
}
542544

543545
final RegisterSchemaResponse retrievedResponse = id >= 0
544546
? registerAndGetId(subject, schema, version, id, normalize, propagateSchemaTags)
545547
: registerAndGetId(subject, schema, normalize, propagateSchemaTags);
546-
schemaResponseMap.put(schema, retrievedResponse);
548+
schemaResponseMap.put(cacheKey, retrievedResponse);
547549
if (retrievedResponse.getSchema() != null) {
548550
String context = toQualifiedContext(subject);
549551
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
@@ -767,24 +769,25 @@ public int getVersion(String subject, ParsedSchema schema)
767769
public int getVersion(String subject, ParsedSchema schema, boolean normalize)
768770
throws IOException, RestClientException {
769771
try {
770-
final Cache<ParsedSchema, Integer> schemaVersionMap = schemaToVersionCache.get(
772+
final Cache<SchemaAndNormalize, Integer> schemaVersionMap = schemaToVersionCache.get(
771773
subject, () -> CacheBuilder.newBuilder()
772774
.maximumSize(cacheCapacity)
773775
.build());
774776

775-
Integer cachedVersion = schemaVersionMap.getIfPresent(schema);
777+
SchemaAndNormalize cacheKey = new SchemaAndNormalize(schema, normalize);
778+
Integer cachedVersion = schemaVersionMap.getIfPresent(cacheKey);
776779
if (cachedVersion != null) {
777780
return cachedVersion;
778781
}
779782

780783
synchronized (this) {
781-
cachedVersion = schemaVersionMap.getIfPresent(schema);
784+
cachedVersion = schemaVersionMap.getIfPresent(cacheKey);
782785
if (cachedVersion != null) {
783786
return cachedVersion;
784787
}
785788

786789
final int retrievedVersion = getVersionFromRegistry(subject, schema, normalize);
787-
schemaVersionMap.put(schema, retrievedVersion);
790+
schemaVersionMap.put(cacheKey, retrievedVersion);
788791
return retrievedVersion;
789792
}
790793
} catch (ExecutionException e) {
@@ -815,26 +818,27 @@ public int getId(String subject, ParsedSchema schema)
815818
public int getId(String subject, ParsedSchema schema, boolean normalize)
816819
throws IOException, RestClientException {
817820
try {
818-
final Cache<ParsedSchema, Integer> schemaIdMap = schemaToIdCache.get(
821+
final Cache<SchemaAndNormalize, Integer> schemaIdMap = schemaToIdCache.get(
819822
subject, () -> CacheBuilder.newBuilder()
820823
.maximumSize(cacheCapacity)
821824
.build());
822825

823-
Integer cachedId = schemaIdMap.getIfPresent(schema);
826+
SchemaAndNormalize cacheKey = new SchemaAndNormalize(schema, normalize);
827+
Integer cachedId = schemaIdMap.getIfPresent(cacheKey);
824828
if (cachedId != null) {
825829
return cachedId;
826830
}
827831

828832
synchronized (this) {
829-
cachedId = schemaIdMap.getIfPresent(schema);
833+
cachedId = schemaIdMap.getIfPresent(cacheKey);
830834
if (cachedId != null) {
831835
return cachedId;
832836
}
833837

834838
final RegisterSchemaResponse retrievedResponse =
835839
getIdWithResponseFromRegistry(subject, schema, normalize, false);
836840
final int retrievedId = retrievedResponse.getId();
837-
schemaIdMap.put(schema, retrievedId);
841+
schemaIdMap.put(cacheKey, retrievedId);
838842
if (retrievedResponse.getSchema() != null) {
839843
String context = toQualifiedContext(subject);
840844
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
@@ -861,24 +865,25 @@ public String getGuid(
861865
String subject, ParsedSchema schema, boolean normalize)
862866
throws IOException, RestClientException {
863867
try {
864-
final Cache<ParsedSchema, String> guidMap = schemaToGuidCache.get(
868+
final Cache<SchemaAndNormalize, String> guidMap = schemaToGuidCache.get(
865869
subject, () -> CacheBuilder.newBuilder()
866870
.maximumSize(cacheCapacity)
867871
.build());
868872

869-
String cachedGuid = guidMap.getIfPresent(schema);
873+
SchemaAndNormalize cacheKey = new SchemaAndNormalize(schema, normalize);
874+
String cachedGuid = guidMap.getIfPresent(cacheKey);
870875
if (cachedGuid != null) {
871876
return cachedGuid;
872877
}
873878

874879
synchronized (this) {
875-
cachedGuid = guidMap.getIfPresent(schema);
880+
cachedGuid = guidMap.getIfPresent(cacheKey);
876881
if (cachedGuid != null) {
877882
return cachedGuid;
878883
}
879884

880885
final String retrievedGuid = getGuidFromRegistry(subject, schema, normalize);
881-
guidMap.put(schema, retrievedGuid);
886+
guidMap.put(cacheKey, retrievedGuid);
882887
guidToSchemaCache.put(retrievedGuid, schema);
883888
return retrievedGuid;
884889
}
@@ -892,12 +897,13 @@ public RegisterSchemaResponse getIdWithResponse(
892897
String subject, ParsedSchema schema, boolean normalize)
893898
throws IOException, RestClientException {
894899
try {
895-
final Cache<ParsedSchema, RegisterSchemaResponse> schemaResponseMap =
900+
final Cache<SchemaAndNormalize, RegisterSchemaResponse> schemaResponseMap =
896901
schemaToResponseCache.get(subject, () -> CacheBuilder.newBuilder()
897902
.maximumSize(cacheCapacity)
898903
.build());
899904

900-
RegisterSchemaResponse cachedResponse = schemaResponseMap.getIfPresent(schema);
905+
SchemaAndNormalize cacheKey = new SchemaAndNormalize(schema, normalize);
906+
RegisterSchemaResponse cachedResponse = schemaResponseMap.getIfPresent(cacheKey);
901907
if (cachedResponse != null) {
902908
// Allow the schema to be looked up again if version is not valid
903909
// This is for backward compatibility with versions before CP 8.0
@@ -907,7 +913,7 @@ public RegisterSchemaResponse getIdWithResponse(
907913
}
908914

909915
synchronized (this) {
910-
cachedResponse = schemaResponseMap.getIfPresent(schema);
916+
cachedResponse = schemaResponseMap.getIfPresent(cacheKey);
911917
if (cachedResponse != null) {
912918
// Allow the schema to be looked up again if version is not valid
913919
// This is for backward compatibility with versions before CP 8.0
@@ -918,7 +924,7 @@ public RegisterSchemaResponse getIdWithResponse(
918924

919925
final RegisterSchemaResponse retrievedResponse =
920926
getIdWithResponseFromRegistry(subject, schema, normalize, false);
921-
schemaResponseMap.put(schema, retrievedResponse);
927+
schemaResponseMap.put(cacheKey, retrievedResponse);
922928
if (retrievedResponse.getSchema() != null) {
923929
String context = toQualifiedContext(subject);
924930
final Cache<Integer, ParsedSchema> idSchemaMap = idToSchemaCache.get(
@@ -972,7 +978,7 @@ public synchronized Integer deleteSchemaVersion(
972978
String version,
973979
boolean isPermanent)
974980
throws IOException, RestClientException {
975-
Cache<ParsedSchema, Integer> versionCache = schemaToVersionCache.getIfPresent(subject);
981+
Cache<SchemaAndNormalize, Integer> versionCache = schemaToVersionCache.getIfPresent(subject);
976982
if (versionCache != null) {
977983
versionCache.asMap().values().remove(Integer.valueOf(version));
978984
}
@@ -1205,6 +1211,33 @@ private static String toQualifiedContext(String subject) {
12051211
return qualifiedSubject != null ? qualifiedSubject.toQualifiedContext() : NO_SUBJECT;
12061212
}
12071213

1214+
static class SchemaAndNormalize {
1215+
private final ParsedSchema schema;
1216+
private final boolean normalize;
1217+
1218+
SchemaAndNormalize(ParsedSchema schema, boolean normalize) {
1219+
this.schema = schema;
1220+
this.normalize = normalize;
1221+
}
1222+
1223+
@Override
1224+
public boolean equals(Object o) {
1225+
if (this == o) {
1226+
return true;
1227+
}
1228+
if (o == null || getClass() != o.getClass()) {
1229+
return false;
1230+
}
1231+
SchemaAndNormalize that = (SchemaAndNormalize) o;
1232+
return normalize == that.normalize && schema.equals(that.schema);
1233+
}
1234+
1235+
@Override
1236+
public int hashCode() {
1237+
return Objects.hash(schema, normalize);
1238+
}
1239+
}
1240+
12081241
static class SubjectAndSchema {
12091242
private final String subject;
12101243
private final ParsedSchema schema;

client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/Schema.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.security.MessageDigest;
3232
import java.security.NoSuchAlgorithmException;
3333
import java.util.Collections;
34+
import java.util.Comparator;
3435
import java.util.List;
3536
import java.util.Objects;
3637
import java.util.UUID;
@@ -65,7 +66,7 @@ public class Schema implements Comparable<Schema> {
6566
private String subject;
6667
private Integer version;
6768
private Integer id;
68-
private String guid;
69+
private volatile String guid;
6970
private String schemaType;
7071
private List<SchemaReference> references;
7172
private Metadata metadata;
@@ -304,7 +305,7 @@ public void setId(Integer id) {
304305

305306
@JsonProperty("guid")
306307
public String getGuid() {
307-
if (guid == null) {
308+
if (guid == null && schema != null) {
308309
try {
309310
MessageDigest md = MessageDigest.getInstance("MD5");
310311
updateHash(md);
@@ -321,6 +322,11 @@ public String getGuid() {
321322
return guid;
322323
}
323324

325+
@JsonProperty("guid")
326+
public void setGuid(String guid) {
327+
this.guid = guid;
328+
}
329+
324330
@io.swagger.v3.oas.annotations.media.Schema(description = TYPE_DESC, example = TYPE_EXAMPLE)
325331
@JsonProperty("schemaType")
326332
public String getSchemaType() {
@@ -341,6 +347,7 @@ public List<SchemaReference> getReferences() {
341347
@JsonProperty("references")
342348
public void setReferences(List<SchemaReference> references) {
343349
this.references = references;
350+
this.guid = null;
344351
}
345352

346353
@io.swagger.v3.oas.annotations.media.Schema(description = METADATA_DESC)
@@ -352,6 +359,7 @@ public Metadata getMetadata() {
352359
@JsonProperty("metadata")
353360
public void setMetadata(Metadata metadata) {
354361
this.metadata = metadata;
362+
this.guid = null;
355363
}
356364

357365
@io.swagger.v3.oas.annotations.media.Schema(description = RULESET_DESC)
@@ -363,6 +371,7 @@ public RuleSet getRuleSet() {
363371
@JsonProperty("ruleSet")
364372
public void setRuleSet(RuleSet ruleSet) {
365373
this.ruleSet = ruleSet;
374+
this.guid = null;
366375
}
367376

368377
@io.swagger.v3.oas.annotations.media.Schema(description = SCHEMA_DESC, example = SCHEMA_EXAMPLE)
@@ -374,6 +383,7 @@ public String getSchema() {
374383
@JsonProperty("schema")
375384
public void setSchema(String schema) {
376385
this.schema = schema;
386+
this.guid = null;
377387
}
378388

379389
@io.swagger.v3.oas.annotations.media.Schema(description = SCHEMA_TAGS_DESC)
@@ -407,11 +417,6 @@ public void setDeleted(Boolean deleted) {
407417
this.deleted = deleted;
408418
}
409419

410-
@JsonProperty("guid")
411-
public void setGuid(String guid) {
412-
this.guid = guid;
413-
}
414-
415420
@Override
416421
public boolean equals(Object o) {
417422
if (this == o) {
@@ -454,12 +459,14 @@ public String toString() {
454459

455460
@Override
456461
public int compareTo(Schema that) {
457-
int result = this.subject.compareTo(that.subject);
462+
int result = Objects.compare(this.subject, that.subject,
463+
Comparator.nullsFirst(String::compareTo));
458464
if (result != 0) {
459465
return result;
460466
}
461-
result = this.version - that.version;
462-
return result;
467+
return Integer.compare(
468+
this.version != null ? this.version : 0,
469+
that.version != null ? that.version : 0);
463470
}
464471

465472
public void updateHash(MessageDigest md) {

core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,9 @@ public Schema getSchemaByVersion(
176176
}
177177
if (format != null && !format.trim().isEmpty()) {
178178
ParsedSchema parsedSchema = schemaRegistry.parseSchema(schema, false, false);
179+
String originalGuid = schema.getGuid();
179180
schema.setSchema(parsedSchema.formattedString(format));
181+
schema.setGuid(originalGuid);
180182
}
181183
QualifiedSubject qs = QualifiedSubject.create(schemaRegistry.tenant(), schema.getSubject());
182184
boolean isQualifiedSubject = qs != null && !DEFAULT_CONTEXT.equals(qs.getContext());
@@ -484,7 +486,9 @@ public void register(
484486
schemaRegistry.registerOrForward(subjectName, request, normalize, headerProperties);
485487
if (result.getSchema() != null && format != null && !format.trim().isEmpty()) {
486488
ParsedSchema parsedSchema = schemaRegistry.parseSchema(result, false, false);
489+
String originalGuid = result.getGuid();
487490
result.setSchema(parsedSchema.formattedString(format));
491+
result.setGuid(originalGuid);
488492
}
489493
registerSchemaResponse = new RegisterSchemaResponse(result);
490494
} catch (IdDoesNotMatchException e) {

core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectsResource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,9 @@ public void lookUpSchemaUnderSubject(
144144
}
145145
if (format != null && !format.trim().isEmpty()) {
146146
ParsedSchema parsedSchema = schemaRegistry.parseSchema(matchingSchema, false, false);
147+
String originalGuid = matchingSchema.getGuid();
147148
matchingSchema.setSchema(parsedSchema.formattedString(format));
149+
matchingSchema.setGuid(originalGuid);
148150
}
149151
} catch (InvalidSchemaException e) {
150152
throw Errors.invalidSchemaException(e);
@@ -203,7 +205,9 @@ public void getLatestWithMetadata(
203205
}
204206
if (format != null && !format.trim().isEmpty()) {
205207
ParsedSchema parsedSchema = schemaRegistry.parseSchema(matchingSchema, false, false);
208+
String originalGuid = matchingSchema.getGuid();
206209
matchingSchema.setSchema(parsedSchema.formattedString(format));
210+
matchingSchema.setGuid(originalGuid);
207211
}
208212
} catch (InvalidSchemaException e) {
209213
throw Errors.invalidSchemaException(e);

0 commit comments

Comments
 (0)