Skip to content

Commit 35cea29

Browse files
authored
GH-5809 Circuit breakers against OOM (#5729)
2 parents 5cddddc + 00e31a0 commit 35cea29

35 files changed

Lines changed: 4228 additions & 53 deletions

File tree

core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/DistinctIteration.java

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
*
99
* SPDX-License-Identifier: BSD-3-Clause
1010
*******************************************************************************/
11-
11+
// Some portions generated by Codex
1212
package org.eclipse.rdf4j.common.iteration;
1313

14+
import java.lang.reflect.InvocationTargetException;
15+
import java.lang.reflect.Method;
1416
import java.util.HashSet;
1517
import java.util.Set;
1618
import java.util.function.Supplier;
@@ -20,6 +22,8 @@
2022
*/
2123
public class DistinctIteration<E> extends FilterIteration<E> {
2224

25+
private static final String OPERATOR_NAME = "DISTINCT";
26+
2327
/*-----------*
2428
* Variables *
2529
*-----------*/
@@ -76,13 +80,30 @@ public DistinctIteration(CloseableIteration<? extends E> iter, Supplier<Set<E>>
7680
*/
7781
@Override
7882
protected boolean accept(E object) {
83+
try {
84+
QueryExecutionContextBridge.throwIfHeavyOperatorExecutionDisabled(OPERATOR_NAME);
85+
QueryExecutionContextBridge.markHeavy(OPERATOR_NAME);
86+
QueryExecutionContextBridge.checkpoint(OPERATOR_NAME);
87+
} catch (Throwable t) {
88+
excludeSet = null;
89+
throw t;
90+
}
7991
return add(object);
8092
}
8193

8294
@Override
8395
protected void handleClose() {
8496
// help GC by removing link to set
8597
excludeSet = null;
98+
excludeSet = null;
99+
}
100+
101+
/**
102+
* @param object
103+
* @return true if the object is in the excludeSet
104+
*/
105+
private boolean inExcludeSet(E object) {
106+
return excludeSet.contains(object);
86107
}
87108

88109
/**
@@ -91,4 +112,86 @@ protected void handleClose() {
91112
protected boolean add(E object) {
92113
return excludeSet.add(object);
93114
}
115+
116+
private static final class QueryExecutionContextBridge {
117+
118+
private static final String QUERY_EXECUTION_CONTEXT_CLASS = "org.eclipse.rdf4j.http.client.QueryExecutionContext";
119+
120+
private static volatile boolean initialized;
121+
private static volatile Method markHeavyMethod;
122+
private static volatile Method checkpointMethod;
123+
private static volatile Method throwIfHeavyOperatorExecutionDisabledMethod;
124+
125+
private QueryExecutionContextBridge() {
126+
}
127+
128+
private static void markHeavy(String operator) {
129+
Method method = getMethod(true);
130+
if (method != null) {
131+
invoke(method, operator);
132+
}
133+
}
134+
135+
private static void checkpoint(String operator) {
136+
Method method = getMethod(false);
137+
if (method != null) {
138+
invoke(method, operator);
139+
}
140+
}
141+
142+
private static void throwIfHeavyOperatorExecutionDisabled(String operator) {
143+
if (!initialized) {
144+
initialize();
145+
}
146+
if (throwIfHeavyOperatorExecutionDisabledMethod != null) {
147+
invoke(throwIfHeavyOperatorExecutionDisabledMethod, operator);
148+
}
149+
}
150+
151+
private static Method getMethod(boolean markHeavy) {
152+
if (!initialized) {
153+
initialize();
154+
}
155+
return markHeavy ? markHeavyMethod : checkpointMethod;
156+
}
157+
158+
private static synchronized void initialize() {
159+
if (initialized) {
160+
return;
161+
}
162+
try {
163+
Class<?> contextType = Class.forName(QUERY_EXECUTION_CONTEXT_CLASS);
164+
markHeavyMethod = contextType.getMethod("markHeavy", String.class);
165+
checkpointMethod = contextType.getMethod("checkpoint", String.class);
166+
throwIfHeavyOperatorExecutionDisabledMethod = contextType.getMethod(
167+
"throwIfHeavyOperatorExecutionDisabled", String.class);
168+
} catch (ClassNotFoundException | NoSuchMethodException e) {
169+
markHeavyMethod = null;
170+
checkpointMethod = null;
171+
throwIfHeavyOperatorExecutionDisabledMethod = null;
172+
}
173+
initialized = true;
174+
}
175+
176+
private static void invoke(Method method, String operator) {
177+
try {
178+
method.invoke(null, operator);
179+
} catch (IllegalAccessException e) {
180+
throw new IllegalStateException("Unable to access query execution context bridge", e);
181+
} catch (InvocationTargetException e) {
182+
throw propagate(e.getCause());
183+
}
184+
}
185+
186+
private static RuntimeException propagate(Throwable throwable) {
187+
if (throwable instanceof RuntimeException) {
188+
throw (RuntimeException) throwable;
189+
}
190+
if (throwable instanceof Error) {
191+
throw (Error) throwable;
192+
}
193+
throw new IllegalStateException("Unexpected checked exception from query execution context bridge",
194+
throwable);
195+
}
196+
}
94197
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2026 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
// Some portions generated by Codex
12+
package org.eclipse.rdf4j.common.iteration;
13+
14+
import static org.junit.jupiter.api.Assertions.assertEquals;
15+
import static org.junit.jupiter.api.Assertions.assertThrows;
16+
import static org.junit.jupiter.api.Assertions.assertTrue;
17+
18+
import java.util.List;
19+
20+
import org.eclipse.rdf4j.http.client.QueryExecutionContext;
21+
import org.junit.jupiter.api.AfterEach;
22+
import org.junit.jupiter.api.Test;
23+
24+
class DistinctIterationTest {
25+
26+
@AfterEach
27+
void tearDown() {
28+
QueryExecutionContext.reset();
29+
}
30+
31+
@Test
32+
void shouldConsultQueryExecutionContextWhenFilteringDistinctValues() {
33+
QueryExecutionContext.failOnCheckpoint(new RuntimeException("breaker checkpoint"));
34+
35+
DistinctIteration<Integer> iteration = new DistinctIteration<>(
36+
new CloseableIteratorIteration<>(List.of(1, 2, 3).iterator()), java.util.HashSet::new);
37+
38+
RuntimeException exception = assertThrows(RuntimeException.class, iteration::hasNext);
39+
assertEquals("breaker checkpoint", exception.getMessage());
40+
assertTrue(QueryExecutionContext.getMarkHeavyCalls() > 0);
41+
assertTrue(QueryExecutionContext.getCheckpointCalls() > 0);
42+
}
43+
44+
@Test
45+
void shouldAbortDistinctIterationWhenHeavyOperatorExecutionIsDisabled() {
46+
QueryExecutionContext.disableHeavyOperatorExecution(new RuntimeException("critical-breaker-stop"));
47+
48+
DistinctIteration<Integer> iteration = new DistinctIteration<>(
49+
new CloseableIteratorIteration<>(List.of(1, 2, 3).iterator()), java.util.HashSet::new);
50+
51+
RuntimeException exception = assertThrows(RuntimeException.class, iteration::hasNext);
52+
assertEquals("critical-breaker-stop", exception.getMessage());
53+
assertEquals(0, QueryExecutionContext.getMarkHeavyCalls());
54+
assertEquals(0, QueryExecutionContext.getCheckpointCalls());
55+
}
56+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2026 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
// Some portions generated by Codex
12+
package org.eclipse.rdf4j.http.client;
13+
14+
import java.util.concurrent.atomic.AtomicInteger;
15+
16+
public final class QueryExecutionContext {
17+
18+
private static final AtomicInteger MARK_HEAVY_CALLS = new AtomicInteger();
19+
private static final AtomicInteger CHECKPOINT_CALLS = new AtomicInteger();
20+
21+
private static volatile boolean heavyOperatorExecutionEnabled = true;
22+
private static volatile RuntimeException checkpointFailure;
23+
private static volatile RuntimeException heavyOperatorExecutionFailure;
24+
25+
private QueryExecutionContext() {
26+
}
27+
28+
public static void markHeavy(String operator) {
29+
MARK_HEAVY_CALLS.incrementAndGet();
30+
}
31+
32+
public static void checkpoint(String operator) {
33+
CHECKPOINT_CALLS.incrementAndGet();
34+
if (checkpointFailure != null) {
35+
throw checkpointFailure;
36+
}
37+
}
38+
39+
public static void throwIfHeavyOperatorExecutionDisabled(String operator) {
40+
if (!heavyOperatorExecutionEnabled) {
41+
throw heavyOperatorExecutionFailure != null ? heavyOperatorExecutionFailure
42+
: new RuntimeException("heavy operator execution disabled: " + operator);
43+
}
44+
}
45+
46+
public static void failOnCheckpoint(RuntimeException runtimeException) {
47+
checkpointFailure = runtimeException;
48+
}
49+
50+
public static void disableHeavyOperatorExecution(RuntimeException runtimeException) {
51+
heavyOperatorExecutionEnabled = false;
52+
heavyOperatorExecutionFailure = runtimeException;
53+
}
54+
55+
public static void enableHeavyOperatorExecution() {
56+
heavyOperatorExecutionEnabled = true;
57+
heavyOperatorExecutionFailure = null;
58+
}
59+
60+
public static int getMarkHeavyCalls() {
61+
return MARK_HEAVY_CALLS.get();
62+
}
63+
64+
public static int getCheckpointCalls() {
65+
return CHECKPOINT_CALLS.get();
66+
}
67+
68+
public static void reset() {
69+
MARK_HEAVY_CALLS.set(0);
70+
CHECKPOINT_CALLS.set(0);
71+
heavyOperatorExecutionEnabled = true;
72+
checkpointFailure = null;
73+
heavyOperatorExecutionFailure = null;
74+
}
75+
}

0 commit comments

Comments
 (0)