Skip to content

Commit 140c5b9

Browse files
committed
RPC: Fix Python get_object_from_java missing END_OF_OBJECT on batch boundary
When serialized data items are an exact multiple of the handler's batchSize, END_OF_OBJECT ends up alone in a separate batch that receiver.receive() never pulls. Drain the pending batch after receive() completes — analogous to Java/JS's explicit q.take(). Add integration test with small batchSize to exercise the fix.
1 parent dc3188d commit 140c5b9

2 files changed

Lines changed: 153 additions & 5 deletions

File tree

rewrite-python/rewrite/src/rewrite/rpc/server.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -196,17 +196,21 @@ def pull_batch() -> List[Dict[str, Any]]:
196196
# Receive and deserialize the object (applies diffs to before state)
197197
try:
198198
obj = receiver.receive(before, q)
199+
200+
# After receive() completes, END_OF_OBJECT may still be pending in a
201+
# separate batch (happens when data items are an exact multiple of the
202+
# handler's batchSize). Drain it — analogous to Java's explicit
203+
# q.take() after receive().
204+
if not received_end:
205+
pull_batch()
206+
if not received_end:
207+
raise RuntimeError(f"Did not receive END_OF_OBJECT marker for object {obj_id}")
199208
except Exception:
200209
# Reset our tracking of the remote state so the next interaction
201210
# forces a full object sync (ADD) instead of a delta (CHANGE).
202211
remote_objects.pop(obj_id, None)
203212
raise
204213

205-
# Verify we received the complete object (END_OF_OBJECT was in the final batch)
206-
# This matches Java's RewriteRpc.java line 474-475 which explicitly checks for END_OF_OBJECT
207-
if not received_end:
208-
raise RuntimeError(f"Did not receive END_OF_OBJECT marker for object {obj_id}")
209-
210214
if obj is not None:
211215
# Update our understanding of what Java has
212216
remote_objects[obj_id] = obj
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright 2026 the original author or authors.
3+
* <p>
4+
* Licensed under the Moderne Source Available License (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p>
8+
* https://docs.moderne.io/licensing/moderne-source-available-license
9+
* <p>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.openrewrite.python;
17+
18+
import org.junit.jupiter.api.AfterEach;
19+
import org.junit.jupiter.api.BeforeEach;
20+
import org.junit.jupiter.api.Test;
21+
import org.junit.jupiter.api.Timeout;
22+
import org.junit.jupiter.api.io.TempDir;
23+
import org.openrewrite.ExecutionContext;
24+
import org.openrewrite.InMemoryExecutionContext;
25+
import org.openrewrite.Recipe;
26+
import org.openrewrite.RecipeRun;
27+
import org.openrewrite.Result;
28+
import org.openrewrite.SourceFile;
29+
import org.openrewrite.internal.InMemoryLargeSourceSet;
30+
import org.openrewrite.python.rpc.PythonRewriteRpc;
31+
32+
import java.io.IOException;
33+
import java.nio.file.Files;
34+
import java.nio.file.Path;
35+
import java.util.List;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.stream.Collectors;
38+
39+
import static org.assertj.core.api.Assertions.assertThat;
40+
41+
/**
42+
* Integration test that installs recipes from rewrite-migrate-python and
43+
* runs PythonBestPractices against real projects.
44+
* <p>
45+
* This exercises the full RPC lifecycle including recipe installation,
46+
* project parsing, recipe execution (Visit + GetObject + Print), and
47+
* verifies that the diff-based RPC protocol stays in sync across
48+
* multiple recipe steps.
49+
*/
50+
class PythonBestPracticesIntegTest {
51+
52+
static final Path RECIPES_PATH = Path.of("/Users/knut/git/moderneinc/rewrite-migrate-python/.worktrees/nimble-hawk/recipes-python");
53+
54+
@TempDir
55+
Path tempDir;
56+
57+
@BeforeEach
58+
void before() {
59+
PythonRewriteRpc.setFactory(PythonRewriteRpc.builder()
60+
.log(tempDir.resolve("python-rpc.log"))
61+
.traceRpcMessages()
62+
);
63+
}
64+
65+
@AfterEach
66+
void after() throws IOException {
67+
PythonRewriteRpc.shutdownCurrent();
68+
PythonRewriteRpc.setFactory(PythonRewriteRpc.builder());
69+
Path logFile = tempDir.resolve("python-rpc.log");
70+
if (Files.exists(logFile)) {
71+
String log = Files.readString(logFile);
72+
if (log.contains("ERROR")) {
73+
System.out.println("=== Python RPC Log (errors found) ===");
74+
System.out.println(log);
75+
}
76+
}
77+
}
78+
79+
@Test
80+
@Timeout(value = 300, unit = TimeUnit.SECONDS)
81+
void moreItertools() {
82+
runPythonBestPractices(Path.of("/Users/knut/moderne/working-set-python-static-analysis-2/more-itertools/more-itertools"));
83+
}
84+
85+
@Test
86+
@Timeout(value = 300, unit = TimeUnit.SECONDS)
87+
void marshmallow() {
88+
runPythonBestPractices(Path.of("/Users/knut/moderne/working-set-python-static-analysis-2/marshmallow-code/marshmallow"));
89+
}
90+
91+
/**
92+
* Runs with a small batch size to force END_OF_OBJECT into a separate batch.
93+
* This exercises the fix in Python's get_object_from_java that drains
94+
* the END_OF_OBJECT batch after receive() completes.
95+
*/
96+
@Test
97+
@Timeout(value = 600, unit = TimeUnit.SECONDS)
98+
void marshmallowSmallBatch() {
99+
PythonRewriteRpc rpc = PythonRewriteRpc.getOrStart();
100+
rpc.batchSize(7);
101+
runPythonBestPractices(rpc, Path.of("/Users/knut/moderne/working-set-python-static-analysis-2/marshmallow-code/marshmallow"));
102+
}
103+
104+
private void runPythonBestPractices(Path projectPath) {
105+
runPythonBestPractices(PythonRewriteRpc.getOrStart(), projectPath);
106+
}
107+
108+
private void runPythonBestPractices(PythonRewriteRpc rpc, Path projectPath) {
109+
110+
// Install recipes from local path
111+
rpc.installRecipes(RECIPES_PATH.toFile());
112+
113+
// Prepare the composite recipe
114+
Recipe recipe = rpc.prepareRecipe("org.openrewrite.python.cleanup.PythonBestPractices");
115+
116+
// Parse the project
117+
ExecutionContext ctx = new InMemoryExecutionContext(Throwable::printStackTrace);
118+
List<SourceFile> sources = rpc.parseProject(projectPath, ctx)
119+
.collect(Collectors.toList());
120+
assertThat(sources).isNotEmpty();
121+
System.out.println("Parsed " + sources.size() + " source files");
122+
123+
// Run the recipe against each source file individually
124+
int changed = 0;
125+
for (SourceFile source : sources) {
126+
System.out.println("Processing: " + source.getSourcePath());
127+
System.out.flush();
128+
129+
RecipeRun run = recipe.run(
130+
new InMemoryLargeSourceSet(List.of(source)), ctx);
131+
132+
for (Result result : run.getChangeset().getAllResults()) {
133+
String diff = result.diff(null);
134+
if (!diff.isEmpty()) {
135+
changed++;
136+
System.out.println(" Changed: " + result.getAfter().getSourcePath());
137+
}
138+
}
139+
}
140+
141+
System.out.println("Total sources: " + sources.size());
142+
System.out.println("Total changed: " + changed);
143+
}
144+
}

0 commit comments

Comments
 (0)