Skip to content

Commit 12be5ea

Browse files
Merge pull request #4002 from confluentinc/pr_merge_from_8_0_x_to_8_1_x
Merge Conflict Resolution (from 8.0.x to 8.1.x)
2 parents 25cf009 + 290c8fa commit 12be5ea

File tree

6 files changed

+580
-23
lines changed

6 files changed

+580
-23
lines changed

client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,9 +1117,23 @@ public Mode deleteSubjectMode(String subject)
11171117

11181118
public Mode deleteSubjectMode(Map<String, String> requestProperties, String subject)
11191119
throws IOException, RestClientException {
1120+
return deleteSubjectMode(requestProperties, subject, false);
1121+
}
1122+
1123+
public Mode deleteSubjectMode(Map<String, String> requestProperties, String subject,
1124+
boolean recursive)
1125+
throws IOException, RestClientException {
1126+
UriBuilder builder = subject != null
1127+
? UriBuilder.fromPath("/mode/{subject}")
1128+
: UriBuilder.fromPath("/mode");
1129+
1130+
if (recursive) {
1131+
builder.queryParam("recursive", "true");
1132+
}
1133+
11201134
String path = subject != null
1121-
? UriBuilder.fromPath("/mode/{subject}").build(subject).toString()
1122-
: "/mode";
1135+
? builder.build(subject).toString()
1136+
: builder.build().toString();
11231137

11241138
Mode response = httpRequest(path, "DELETE", null, requestProperties,
11251139
DELETE_SUBJECT_MODE_RESPONSE_TYPE);

client/src/main/java/io/confluent/kafka/schemaregistry/utils/QualifiedSubject.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,19 @@ public static String normalizeContext(String context) {
361361
return DEFAULT_CONTEXT.equals(context) ? "" : CONTEXT_DELIMITER + context + CONTEXT_DELIMITER;
362362
}
363363

364+
/**
365+
* Checks if the given qualified subject represents a context (i.e., has no subject part).
366+
* A context has the format ":.context:" with an empty subject part, or null for default context.
367+
*
368+
* @param tenant the tenant
369+
* @param qualifiedSubject the subject with a tenant prefix
370+
* @return true if the qualified subject is a context, false otherwise
371+
*/
372+
public static boolean isContext(String tenant, String qualifiedSubject) {
373+
QualifiedSubject qs = QualifiedSubject.create(tenant, qualifiedSubject);
374+
return qs == null || qs.getSubject().isEmpty();
375+
}
376+
364377
/**
365378
* Checks if the given qualified subject is the default context for the given tenant.
366379
* The default context is the context with name "." and an empty subject.
@@ -473,4 +486,3 @@ public int compareTo(QualifiedSubject that) {
473486
}
474487
}
475488
}
476-

core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/ModeResource.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,14 @@ public Mode getTopLevelMode(
256256
@DELETE
257257
@DocumentedName("deleteGlobalMode")
258258
@Operation(summary = "Delete global mode",
259-
description = "Deletes the global mode and reverts to the default mode.",
259+
description = "Deletes the global mode and reverts to the default mode. "
260+
+ "If recursive is true, also deletes mode for all subjects in the default context.",
260261
responses = {
261262
@ApiResponse(responseCode = "200",
262-
description = "Operation succeeded. Returns old mode.",
263+
description = "Operation succeeded. Returns old mode."
264+
+ "If recursive is enabled and mode is not found, returns 200 "
265+
+ "and returns null mode as it would delete mode for all subjects "
266+
+ "under it",
263267
content = @Content(schema = @Schema(implementation = Mode.class))),
264268
@ApiResponse(responseCode = "422",
265269
description = "Unprocessable Entity. "
@@ -273,19 +277,26 @@ public Mode getTopLevelMode(
273277
@PerformanceMetric("mode.delete-global")
274278
public void deleteGlobalMode(
275279
final @Suspended AsyncResponse asyncResponse,
276-
@Context HttpHeaders headers) {
277-
log.info("Deleting global mode");
278-
deleteSubjectMode(asyncResponse, headers, null);
280+
@Context HttpHeaders headers,
281+
@Parameter(description =
282+
"Whether to recursively delete mode for all subjects in the default context")
283+
@QueryParam("recursive") boolean recursive) {
284+
log.info("Deleting global mode, recursive={}", recursive);
285+
deleteSubjectMode(asyncResponse, headers, null, recursive);
279286
}
280287

281288
@DELETE
282289
@Path("/{subject}")
283290
@DocumentedName("deleteSubjectMode")
284291
@Operation(summary = "Delete subject mode",
285292
description = "Deletes the specified subject-level mode and reverts to "
286-
+ "the global default.",
293+
+ "the global default. "
294+
+ "If the subject is a context and recursive is true, also deletes mode "
295+
+ "for all subjects under that context.",
287296
responses = {
288-
@ApiResponse(responseCode = "200", description = "Operation succeeded. Returns old mode.",
297+
@ApiResponse(responseCode = "200", description = "Operation succeeded. Returns old mode."
298+
+ "If recursive is enabled and mode is not found, returns 200 and returns null"
299+
+ "mode as it would delete mode for all subjects under it",
289300
content = @Content(schema = @Schema(implementation = Mode.class))),
290301
@ApiResponse(responseCode = "404",
291302
description = "Not Found. Error code 40401 indicates subject not found.",
@@ -300,8 +311,12 @@ public void deleteSubjectMode(
300311
final @Suspended AsyncResponse asyncResponse,
301312
@Context HttpHeaders headers,
302313
@Parameter(description = "Name of the subject", required = true)
303-
@PathParam("subject") String subject) {
304-
log.info("Deleting mode for subject {}", subject);
314+
@PathParam("subject") String subject,
315+
@Parameter(description =
316+
"Whether to recursively delete mode for all subjects under the context, "
317+
+ "if the subject is a context")
318+
@QueryParam("recursive") boolean recursive) {
319+
log.info("Deleting mode for subject {}, recursive={}", subject, recursive);
305320

306321
if (QualifiedSubject.isDefaultContext(schemaRegistry.tenant(), subject)) {
307322
subject = null;
@@ -313,14 +328,19 @@ public void deleteSubjectMode(
313328
Mode deleteModeResponse;
314329
try {
315330
deletedMode = schemaRegistry.getMode(subject);
316-
if (deletedMode == null) {
331+
if (deletedMode == null && !recursive) {
332+
// If recursive is not enabled and mode is not found, return 404
333+
// If recursive is enabled and mode is not found, continue to delete mode for all
334+
// subjects under it
317335
throw Errors.subjectNotFoundException(subject);
318336
}
319337

320338
Map<String, String> headerProperties = requestHeaderBuilder.buildRequestHeaders(
321339
headers, schemaRegistry.config().whitelistHeaders());
322-
schemaRegistry.deleteSubjectModeOrForward(subject, headerProperties);
323-
deleteModeResponse = new Mode(deletedMode.name());
340+
341+
// Delete mode for the context/subject itself (and recursively if requested)
342+
schemaRegistry.deleteSubjectModeOrForward(subject, recursive, headerProperties);
343+
deleteModeResponse = new Mode(deletedMode != null ? deletedMode.name() : null);
324344
} catch (OperationNotPermittedException e) {
325345
throw Errors.operationNotPermittedException(e.getMessage());
326346
} catch (SchemaRegistryStoreException e) {

core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package io.confluent.kafka.schemaregistry.storage;
1717

18+
import static io.confluent.kafka.schemaregistry.utils.QualifiedSubject.CONTEXT_DELIMITER;
19+
import static io.confluent.kafka.schemaregistry.utils.QualifiedSubject.CONTEXT_PREFIX;
1820
import static io.confluent.kafka.schemaregistry.utils.QualifiedSubject.DEFAULT_CONTEXT;
1921

2022
import com.google.common.annotations.VisibleForTesting;
@@ -64,6 +66,7 @@
6466
import java.util.ArrayList;
6567
import java.util.Collections;
6668
import java.util.Iterator;
69+
import java.util.LinkedHashSet;
6770
import java.util.List;
6871
import java.util.Locale;
6972
import java.util.Map;
@@ -994,13 +997,15 @@ private void forwardSetModeRequestToLeader(
994997

995998
private void forwardDeleteSubjectModeRequestToLeader(
996999
String subject,
1000+
boolean recursive,
9971001
Map<String, String> headerProperties)
9981002
throws SchemaRegistryRequestForwardingException {
9991003
UrlList baseUrl = leaderRestService.getBaseUrls();
10001004

1001-
log.debug("Forwarding delete subject mode request {} to {}", subject, baseUrl);
1005+
log.debug("Forwarding delete subject mode request {} to {} with recursive={}",
1006+
subject, baseUrl, recursive);
10021007
try {
1003-
leaderRestService.deleteSubjectMode(headerProperties, subject);
1008+
leaderRestService.deleteSubjectMode(headerProperties, subject, recursive);
10041009
} catch (IOException e) {
10051010
throw new SchemaRegistryRequestForwardingException(
10061011
String.format(
@@ -1248,31 +1253,49 @@ public void setModeOrForward(String subject, ModeUpdateRequest mode, boolean for
12481253

12491254
public void deleteSubjectMode(String subject)
12501255
throws SchemaRegistryStoreException, OperationNotPermittedException {
1256+
deleteSubjectMode(subject, false);
1257+
}
1258+
1259+
public void deleteSubjectMode(String subject, boolean recursive)
1260+
throws SchemaRegistryStoreException, OperationNotPermittedException {
12511261
if (!allowModeChanges) {
12521262
throw new OperationNotPermittedException("Mode changes are not allowed");
12531263
}
12541264
try {
12551265
kafkaStore.waitUntilKafkaReaderReachesLastOffset(subject, kafkaStoreTimeoutMs);
12561266
deleteMode(subject);
1267+
1268+
// If recursive and subject is a context, delete modes for all subjects under it
1269+
if (recursive && QualifiedSubject.isContext(tenant(), subject)) {
1270+
log.info("Recursively deleting mode for all subjects under context: {}", subject);
1271+
try {
1272+
deleteModesForSubjectsUnderContext(subject);
1273+
} catch (SchemaRegistryException e) {
1274+
throw new SchemaRegistryStoreException(
1275+
"Failed to recursively delete modes for subjects under context", e);
1276+
}
1277+
}
12571278
} catch (StoreException e) {
12581279
throw new SchemaRegistryStoreException("Failed to delete subject config value from store",
12591280
e);
12601281
}
12611282
}
12621283

1263-
public void deleteSubjectModeOrForward(String subject, Map<String, String> headerProperties)
1284+
public void deleteSubjectModeOrForward(String subject, boolean recursive,
1285+
Map<String, String> headerProperties)
12641286
throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException,
12651287
OperationNotPermittedException, UnknownLeaderException {
12661288
kafkaStore.lockFor(subject).lock();
12671289
try {
12681290
if (isLeader()) {
1269-
deleteSubjectMode(subject);
1291+
// Delete the subject/context mode itself (and recursively if requested)
1292+
deleteSubjectMode(subject, recursive);
12701293
} else {
1271-
// forward delete subject config request to the leader
1294+
// forward delete subject mode request to the leader (with recursive flag)
12721295
if (leaderIdentity != null) {
1273-
forwardDeleteSubjectModeRequestToLeader(subject, headerProperties);
1296+
forwardDeleteSubjectModeRequestToLeader(subject, recursive, headerProperties);
12741297
} else {
1275-
throw new UnknownLeaderException("Delete config request failed since leader is "
1298+
throw new UnknownLeaderException("Delete mode request failed since leader is "
12761299
+ "unknown");
12771300
}
12781301
}
@@ -1281,6 +1304,68 @@ public void deleteSubjectModeOrForward(String subject, Map<String, String> heade
12811304
}
12821305
}
12831306

1307+
/**
1308+
* List all subjects that have a mode configured under the given prefix.
1309+
* This is different from listSubjectsWithPrefix which only returns subjects with schemas.
1310+
*
1311+
* @param prefix The subject prefix to match (e.g., ":.context:" for a context)
1312+
* @return Set of subject names that have modes configured under the prefix
1313+
* @throws SchemaRegistryException if there's an error accessing the store
1314+
*/
1315+
private Set<String> listSubjectsWithModePrefix(String prefix)
1316+
throws SchemaRegistryException {
1317+
try {
1318+
ModeKey startKey = new ModeKey(prefix + Character.MIN_VALUE);
1319+
ModeKey endKey = new ModeKey(prefix + Character.MAX_VALUE);
1320+
1321+
Set<String> subjects = new LinkedHashSet<>();
1322+
try (CloseableIterator<SchemaRegistryValue> iterator =
1323+
kafkaStore.getAll(startKey, endKey)) {
1324+
while (iterator.hasNext()) {
1325+
SchemaRegistryValue value = iterator.next();
1326+
if (value instanceof ModeValue) {
1327+
ModeValue modeValue = (ModeValue) value;
1328+
String subject = modeValue.getSubject();
1329+
// Only include subjects that match our prefix
1330+
if (subject != null && subject.startsWith(prefix)) {
1331+
subjects.add(subject);
1332+
}
1333+
}
1334+
}
1335+
}
1336+
return subjects;
1337+
} catch (StoreException e) {
1338+
throw new SchemaRegistryStoreException(
1339+
"Error while retrieving subjects with modes under prefix: " + prefix, e);
1340+
}
1341+
}
1342+
1343+
private void deleteModesForSubjectsUnderContext(String context)
1344+
throws SchemaRegistryException {
1345+
// Context is already normalized and includes the trailing delimiter
1346+
// For default context: context would be like ":tenant:"
1347+
// For named context: context would be like ":.production:"
1348+
String subjectPrefix = context != null ? context :
1349+
QualifiedSubject.normalize(tenant(), CONTEXT_PREFIX + CONTEXT_DELIMITER);
1350+
1351+
// Get all subjects with modes under this context
1352+
// This includes subjects that have modes set but no schemas registered
1353+
Set<String> subjects = listSubjectsWithModePrefix(subjectPrefix);
1354+
1355+
log.info("Found {} subjects with modes under context '{}' for recursive mode deletion with"
1356+
+ " subjectPrefix={}", subjects.size(), context, subjectPrefix);
1357+
1358+
// Delete mode for each subject (locally on leader)
1359+
int successCount = 0;
1360+
for (String subjectName : subjects) {
1361+
log.debug("Deleting mode for subject: {}", subjectName);
1362+
deleteSubjectMode(subjectName);
1363+
successCount++;
1364+
}
1365+
1366+
log.info("Recursive mode deletion completed successfully for {} subjects", successCount);
1367+
}
1368+
12841369
@Override
12851370
public KafkaStore<SchemaRegistryKey, SchemaRegistryValue> getKafkaStore() {
12861371
return this.kafkaStore;

core/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,9 @@ Set<String> subjects(String subject, boolean lookupDeletedSubjects) throws
263263
void deleteSubjectMode(String subject) throws SchemaRegistryStoreException,
264264
OperationNotPermittedException;
265265

266+
void deleteSubjectMode(String subject, boolean recursive) throws SchemaRegistryStoreException,
267+
OperationNotPermittedException;
268+
266269
RuleSetHandler getRuleSetHandler();
267270

268271
void setRuleSetHandler(RuleSetHandler ruleSetHandler);
@@ -297,7 +300,8 @@ default String getKafkaClusterId() {
297300
return null;
298301
}
299302

300-
default void deleteSubjectModeOrForward(String subject, Map<String, String> headerProperties)
303+
default void deleteSubjectModeOrForward(String subject, boolean recursive,
304+
Map<String, String> headerProperties)
301305
throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException,
302306
OperationNotPermittedException, UnknownLeaderException {}
303307

0 commit comments

Comments
 (0)