Skip to content

Commit bbdbc31

Browse files
Fix RPC concurrency issues due to shared visitors (#6152)
* Fix RPC concurrency issues due to shared visitors When running multiple RPC operations in parallel, we mustn't use the same visitor instances concurrently, as they aren't stateless. The `cursor` instance field modifications can lead to `IllegalStateException`s getting thrown from `Cursor#getParentOrThrow()`. * Add test case
1 parent 721616d commit bbdbc31

12 files changed

Lines changed: 190 additions & 38 deletions

File tree

rewrite-java/src/main/java/org/openrewrite/java/internal/rpc/JavaReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,7 @@ public <T> JRightPadded<T> visitRightPadded(JRightPadded<T> right, RpcReceiveQue
662662
.withMarkers(q.receive(right.getMarkers()));
663663
}
664664

665-
private static final JavaTypeReceiver javaTypeReceiver = new JavaTypeReceiver();
665+
private final JavaTypeReceiver javaTypeReceiver = new JavaTypeReceiver();
666666

667667
@Override
668668
public @Nullable JavaType visitType(@Nullable JavaType javaType, RpcReceiveQueue q) {

rewrite-java/src/main/java/org/openrewrite/java/internal/rpc/JavaSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,7 @@ public void visitSpace(Space space, RpcSendQueue q) {
640640
q.getAndSend(space, Space::getWhitespace);
641641
}
642642

643-
private static final JavaTypeSender javaTypeSender = new JavaTypeSender();
643+
private final JavaTypeSender javaTypeSender = new JavaTypeSender();
644644

645645
@Override
646646
public @Nullable JavaType visitType(@Nullable JavaType javaType, RpcSendQueue q) {

rewrite-java/src/main/java/org/openrewrite/java/tree/JContainer.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import lombok.RequiredArgsConstructor;
2020
import org.jspecify.annotations.Nullable;
2121
import org.openrewrite.internal.ListUtils;
22-
import org.openrewrite.java.internal.rpc.JavaReceiver;
23-
import org.openrewrite.java.internal.rpc.JavaSender;
2422
import org.openrewrite.marker.Markers;
2523

2624
import java.util.List;
@@ -41,8 +39,6 @@
4139
* @param <T> The type of the inner list of elements.
4240
*/
4341
public class JContainer<T> {
44-
private static final JavaSender RPC_SENDER = new JavaSender();
45-
private static final JavaReceiver RPC_RECEIVER = new JavaReceiver();
4642

4743
private transient @Nullable Padding<T> padding;
4844

rewrite-java/src/main/java/org/openrewrite/java/tree/JLeftPadded.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
import lombok.Value;
3535
import lombok.With;
3636
import org.jspecify.annotations.Nullable;
37-
import org.openrewrite.java.internal.rpc.JavaReceiver;
38-
import org.openrewrite.java.internal.rpc.JavaSender;
3937
import org.openrewrite.marker.Markers;
4038

4139
import java.util.function.UnaryOperator;
@@ -52,8 +50,6 @@
5250
@EqualsAndHashCode(callSuper = false, onlyExplicitlyIncluded = true)
5351
@With
5452
public class JLeftPadded<T> {
55-
private static final JavaSender RPC_SENDER = new JavaSender();
56-
private static final JavaReceiver RPC_RECEIVER = new JavaReceiver();
5753

5854
Space before;
5955
T element;

rewrite-java/src/main/java/org/openrewrite/java/tree/JRightPadded.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import lombok.Value;
2020
import lombok.With;
2121
import org.jspecify.annotations.Nullable;
22-
import org.openrewrite.java.internal.rpc.JavaReceiver;
23-
import org.openrewrite.java.internal.rpc.JavaSender;
2422
import org.openrewrite.marker.Markers;
2523

2624
import java.util.*;
@@ -37,8 +35,6 @@
3735
@EqualsAndHashCode(callSuper = false, onlyExplicitlyIncluded = true)
3836
@With
3937
public class JRightPadded<T> {
40-
private static JavaSender RPC_SENDER = new JavaSender();
41-
private static JavaReceiver RPC_RECEIVER = new JavaReceiver();
4238

4339
T element;
4440
Space after;

rewrite-javascript/rewrite/src/java/rpc.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -812,14 +812,14 @@ export class JavaSender extends JavaVisitor<RpcSendQueue> {
812812
return container;
813813
}
814814

815-
private static typeVisitor = new TypeSender();
815+
private typeVisitor = new TypeSender();
816816

817817
public override async visitType(javaType: Type | undefined, q: RpcSendQueue): Promise<Type | undefined> {
818818
if (!javaType) {
819819
return undefined;
820820
}
821821

822-
return await JavaSender.typeVisitor.visit(javaType, q);
822+
return await this.typeVisitor.visit(javaType, q);
823823
}
824824
}
825825

@@ -1561,15 +1561,15 @@ export class JavaReceiver extends JavaVisitor<RpcReceiveQueue> {
15611561
return updateIfChanged(container, updates) as J.Container<T>;
15621562
}
15631563

1564-
private static typeVisitor = new TypeReceiver();
1564+
private typeVisitor = new TypeReceiver();
15651565

15661566
public override async visitType(javaType: Type | undefined, q: RpcReceiveQueue): Promise<Type | undefined> {
15671567
if (!javaType) {
15681568
return undefined;
15691569
} else if (javaType.kind === Type.Kind.Unknown) {
15701570
return Type.unknownType;
15711571
}
1572-
return await JavaReceiver.typeVisitor.visit(javaType, q);
1572+
return await this.typeVisitor.visit(javaType, q);
15731573
}
15741574
}
15751575

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
* <p>
4+
* Licensed under the Moderne Source Available License (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p>
8+
* https://docs.moderne.io/licensing/moderne-source-available-license
9+
* <p>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.openrewrite.javascript.rpc;
17+
18+
import org.intellij.lang.annotations.Language;
19+
import org.junit.jupiter.api.Test;
20+
import org.junit.jupiter.api.io.TempDir;
21+
import org.openrewrite.ExecutionContext;
22+
import org.openrewrite.InMemoryExecutionContext;
23+
import org.openrewrite.Parser;
24+
import org.openrewrite.SourceFile;
25+
import org.openrewrite.javascript.JavaScriptParser;
26+
27+
import java.nio.file.Path;
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
import java.util.concurrent.*;
31+
import java.util.concurrent.atomic.AtomicInteger;
32+
33+
import static org.assertj.core.api.Assertions.assertThat;
34+
35+
/**
36+
* Test to reproduce thread-safety issues when multiple threads each have their own
37+
* RewriteRpc instance and call print() in parallel.
38+
*/
39+
class ParallelPrintTest {
40+
@TempDir
41+
Path tempDir;
42+
43+
@Test
44+
void parallelPrintWithSeparateRpcInstances() throws Exception {
45+
// JavaScript code to parse
46+
@Language("js")
47+
String jsCode = """
48+
class Calculator {
49+
constructor(name) {
50+
this.name = name;
51+
this.history = [];
52+
}
53+
54+
add(a, b) {
55+
const result = a + b;
56+
this.history.push({ op: 'add', a, b, result });
57+
return result;
58+
}
59+
60+
subtract(a, b) {
61+
const result = a - b;
62+
this.history.push({ op: 'subtract', a, b, result });
63+
return result;
64+
}
65+
66+
multiply(a, b) {
67+
const result = a * b;
68+
this.history.push({ op: 'multiply', a, b, result });
69+
return result;
70+
}
71+
}
72+
73+
const calc = new Calculator("MyCalc");
74+
console.log(calc.add(5, 3));
75+
""";
76+
77+
int numThreads = 6; // Increased from 4 to stress test more
78+
int printsPerThread = 20; // Increased from 20 to make concurrency issues more likely
79+
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
80+
CountDownLatch startLatch = new CountDownLatch(1);
81+
AtomicInteger successCount = new AtomicInteger(0);
82+
AtomicInteger errorCount = new AtomicInteger(0);
83+
List<Future<String>> futures = new ArrayList<>();
84+
85+
try {
86+
// Create threads that will each get their own RPC instance
87+
for (int i = 0; i < numThreads; i++) {
88+
final int threadNum = i;
89+
futures.add(executor.submit(() -> {
90+
try {
91+
// Wait for all threads to be ready
92+
startLatch.await();
93+
94+
// Each thread sets up its own RPC instance via ThreadLocal
95+
JavaScriptRewriteRpc.setFactory(JavaScriptRewriteRpc.builder()
96+
.recipeInstallDir(tempDir.resolve("thread-" + threadNum))
97+
.log(tempDir.resolve("thread-" + threadNum + ".log"))
98+
);
99+
100+
JavaScriptRewriteRpc rpc;
101+
102+
// Parse the code
103+
JavaScriptParser parser = JavaScriptParser.builder().build();
104+
ExecutionContext ctx = new InMemoryExecutionContext();
105+
106+
Parser.Input input = Parser.Input.fromString(
107+
Path.of("calculator-" + threadNum + ".js"),
108+
jsCode
109+
);
110+
List<SourceFile> sourceFiles = parser.parseInputs(
111+
List.of(input),
112+
null,
113+
ctx
114+
).toList();
115+
116+
assertThat(sourceFiles).hasSize(1);
117+
SourceFile sourceFile = sourceFiles.getFirst();
118+
119+
// Shutdown RPC after parsing to clear remote cache
120+
// This forces GetObject callbacks during print()
121+
JavaScriptRewriteRpc.shutdownCurrent();
122+
123+
// Restart RPC for print operations
124+
rpc = JavaScriptRewriteRpc.getOrStart();
125+
126+
// Call print() multiple times in rapid succession
127+
for (int j = 0; j < printsPerThread; j++) {
128+
String printed = rpc.print(sourceFile);
129+
130+
assertThat(printed).isNotEmpty();
131+
assertThat(printed).contains("Calculator");
132+
successCount.incrementAndGet();
133+
}
134+
135+
// Cleanup
136+
JavaScriptRewriteRpc.shutdownCurrent();
137+
138+
return "";
139+
} catch (Exception e) {
140+
errorCount.incrementAndGet();
141+
throw new RuntimeException("Thread " + threadNum + " failed", e);
142+
}
143+
}));
144+
}
145+
146+
// Start all threads simultaneously
147+
startLatch.countDown();
148+
149+
// Wait for all threads to complete and collect results
150+
for (Future<String> future : futures) {
151+
try {
152+
future.get(30, TimeUnit.SECONDS);
153+
} catch (Exception e) {
154+
errorCount.incrementAndGet();
155+
System.err.println("Error getting future result: " + e.getMessage());
156+
throw new RuntimeException(e);
157+
}
158+
}
159+
160+
// Verify results
161+
assertThat(errorCount.get())
162+
.withFailMessage("Expected no errors but got %d errors. Check output above for details.", errorCount.get())
163+
.isEqualTo(0);
164+
assertThat(successCount.get()).isEqualTo(numThreads * printsPerThread);
165+
166+
} finally {
167+
executor.shutdown();
168+
boolean terminated = executor.awaitTermination(5, TimeUnit.SECONDS);
169+
if (!terminated) {
170+
System.err.println("Warning: Executor did not terminate in time");
171+
}
172+
}
173+
}
174+
}

rewrite-javascript/src/main/java/org/openrewrite/javascript/internal/rpc/JavaScriptContainerRpcCodec.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
@Getter
2626
@SuppressWarnings("rawtypes")
2727
public class JavaScriptContainerRpcCodec extends DynamicDispatchRpcCodec<JContainer> {
28-
private final JavaScriptSender sender = new JavaScriptSender();
29-
private final JavaScriptReceiver receiver = new JavaScriptReceiver();
3028

3129
@Override
3230
public String getSourceFileType() {
@@ -41,12 +39,12 @@ public Class<? extends JContainer> getType() {
4139
@Override
4240
public void rpcSend(JContainer after, RpcSendQueue q) {
4341
//noinspection unchecked
44-
sender.visitContainer(after, q);
42+
new JavaScriptSender().visitContainer(after, q);
4543
}
4644

4745
@Override
4846
public JContainer rpcReceive(JContainer before, RpcReceiveQueue q) {
4947
//noinspection unchecked
50-
return receiver.visitContainer(before, q);
48+
return new JavaScriptReceiver().visitContainer(before, q);
5149
}
5250
}

rewrite-javascript/src/main/java/org/openrewrite/javascript/internal/rpc/JavaScriptLeftPaddedRpcCodec.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
@Getter
2626
@SuppressWarnings("rawtypes")
2727
public class JavaScriptLeftPaddedRpcCodec extends DynamicDispatchRpcCodec<JLeftPadded> {
28-
private final JavaScriptSender sender = new JavaScriptSender();
29-
private final JavaScriptReceiver receiver = new JavaScriptReceiver();
3028

3129
@Override
3230
public String getSourceFileType() {
@@ -41,12 +39,12 @@ public Class<? extends JLeftPadded> getType() {
4139
@Override
4240
public void rpcSend(JLeftPadded after, RpcSendQueue q) {
4341
//noinspection unchecked
44-
sender.visitLeftPadded(after, q);
42+
new JavaScriptSender().visitLeftPadded(after, q);
4543
}
4644

4745
@Override
4846
public JLeftPadded rpcReceive(JLeftPadded before, RpcReceiveQueue q) {
4947
//noinspection unchecked
50-
return receiver.visitLeftPadded(before, q);
48+
return new JavaScriptReceiver().visitLeftPadded(before, q);
5149
}
5250
}

rewrite-javascript/src/main/java/org/openrewrite/javascript/internal/rpc/JavaScriptRightPaddedRpcCodec.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
@Getter
2626
@SuppressWarnings("rawtypes")
2727
public class JavaScriptRightPaddedRpcCodec extends DynamicDispatchRpcCodec<JRightPadded> {
28-
private final JavaScriptSender sender = new JavaScriptSender();
29-
private final JavaScriptReceiver receiver = new JavaScriptReceiver();
3028

3129
@Override
3230
public String getSourceFileType() {
@@ -41,12 +39,12 @@ public Class<? extends JRightPadded> getType() {
4139
@Override
4240
public void rpcSend(JRightPadded after, RpcSendQueue q) {
4341
//noinspection unchecked
44-
sender.visitRightPadded(after, q);
42+
new JavaScriptSender().visitRightPadded(after, q);
4543
}
4644

4745
@Override
4846
public JRightPadded rpcReceive(JRightPadded before, RpcReceiveQueue q) {
4947
//noinspection unchecked
50-
return receiver.visitRightPadded(before, q);
48+
return new JavaScriptReceiver().visitRightPadded(before, q);
5149
}
5250
}

0 commit comments

Comments
 (0)