Skip to content

Commit 127dae3

Browse files
committed
RPC: Add action field to GetObject for error recovery
Add a nullable `action` field to GetObject requests. When null, the request is a normal data transfer. When set to "revert", the handler restores both remoteObjects and localObjects to the pre-transfer state, and the requester reverts its own remoteObjects tracking to match. This fixes state desynchronization after failed deserialization: the handler stores the pre-transfer baseline in an actionBaseline map at transfer start, optimistically updates remoteObjects when streaming completes, and rolls both maps back on revert. The action field is extensible for future corrective actions (clear, reset, abort) without protocol changes.
1 parent 886de26 commit 127dae3

7 files changed

Lines changed: 308 additions & 42 deletions

File tree

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# 9. GetObject action field for reliable state synchronization
2+
3+
Date: 2026-02-26
4+
5+
## Status
6+
7+
Accepted (supersedes ADR 8)
8+
9+
## Context
10+
11+
The diff-based GetObject protocol tracks state on both sides: the sender maintains a `remoteObjects` map recording what the receiver last successfully received, and uses this as the baseline for computing diffs on subsequent transfers. This tracking is updated optimistically — the sender assumes the receiver consumed the data successfully once streaming completes.
12+
13+
When the receiver fails mid-deserialization (e.g., `ClassCastException` from an invalid AST node), the two sides go out of sync: the sender thinks the receiver has version N, but the receiver discarded it.
14+
15+
### The Print problem
16+
17+
This manifests concretely with `Print`. After a composite recipe runs, Java computes diffs by printing both the `before` and `after` trees. For RPC-based languages (Python, JavaScript), printing works as follows:
18+
19+
1. Java sends a Print RPC to the remote (Python)
20+
2. Python's `handle_print` calls `get_object_from_java(tree_id)`, sending GetObject back to Java
21+
3. Java's `GetObject.Handler` computes a diff against `remoteObjects[id]` (its belief of what Python has) and streams the result
22+
23+
If a prior Visit failed in the *reverse* direction (Java requesting a modified tree from Python), the cleanup at `RewriteRpc.getObject()` only removes Java's **requester-side** `remoteObjects` entry. Java's **handler-side** `remoteObjects` (used by `GetObject.Handler` when Python requests from Java) may still reflect a state that Python no longer has. The subsequent Print-triggered GetObject computes a diff against the wrong baseline, producing corrupt data or errors.
24+
25+
### Fundamental issue: unilateral state updates
26+
27+
The root cause is that `remoteObjects` is updated unilaterally by the sender without confirmation from the receiver. If the receiver fails to deserialize, the sender has no way to learn this — the stale state persists and affects all subsequent operations in either direction.
28+
29+
## Decision
30+
31+
Add an `action` field to the GetObject request. This nullable string field allows the receiver to send corrective actions back to the handler. When null, the request is a normal data-transfer request.
32+
33+
### The `revert` action
34+
35+
When the receiver fails to deserialize a transferred object, it sends a GetObject request with `action: "revert"`. The handler:
36+
37+
1. Restores `remoteObjects[id]` to the pre-transfer value (stored in an `actionBaseline` map at transfer start)
38+
2. Restores `localObjects[id]` to the same pre-transfer value — this ensures the failed modification is discarded rather than retried with the same broken diff
39+
3. Cancels any in-progress batch send for that ID
40+
41+
This reverts both sides to a consistent, known-good state. The receiver also clears its own `remoteObjects[id]` tracking, so the next transfer starts fresh.
42+
43+
### Optimistic updates with rollback
44+
45+
Unlike a deferred-commit (ACK/NACK) approach, `remoteObjects` is updated optimistically when streaming completes — no extra round-trip is needed on the success path. The `actionBaseline` map stores the pre-transfer value so that `revert` can roll it back on the failure path.
46+
47+
### Extensibility
48+
49+
The `action` field is designed to support future corrective actions beyond `revert`:
50+
51+
- `"clear"` / `"remove"` — tell the handler to drop all tracking for this ID (e.g., when the caller knows the object is no longer needed)
52+
- `"abort"` — cancel an in-progress batched transfer mid-stream
53+
- `"reset"` — force a full re-serialization
54+
55+
### Protocol flow
56+
57+
**Success path** (no extra round-trip):
58+
1. Handler streams batches, optimistically updates `remoteObjects[id] = after`
59+
2. Receiver processes batches, updates its own `remoteObjects` and `localObjects`
60+
3. Done — no confirmation needed
61+
62+
**Failure path** (one extra round-trip):
63+
1. Handler streams batches, optimistically updates `remoteObjects[id] = after`
64+
2. Receiver fails to deserialize
65+
3. Receiver sends `GetObject(id, sourceFileType, action="revert")`
66+
4. Handler restores `remoteObjects[id]` and `localObjects[id]` from `actionBaseline`
67+
5. Handler returns empty list
68+
69+
### Relationship to ADR 8 (`reset` flag)
70+
71+
The `reset` flag from ADR 8 is removed. The `revert` action makes it unnecessary — instead of the receiver hinting "I lost sync" on its *next* request, it explicitly tells the handler to roll back immediately after failure.
72+
73+
## Consequences
74+
75+
**Positive:**
76+
- No extra round-trip on the success path (unlike an ACK-based approach)
77+
- On failure, reverts both `remoteObjects` and `localObjects` to a consistent pre-transfer state, preventing cascading errors
78+
- Fixes the Print problem: the handler's `remoteObjects` is rolled back before any Print-triggered GetObject can observe stale state
79+
- Extensible: the `action` field can carry future corrective actions without protocol changes
80+
- Works for all GetObject consumers (Visit, Print, Generate) in both directions
81+
82+
**Negative:**
83+
- Handler must store pre-transfer baselines (`actionBaseline` map) for potential rollback — one extra object reference per active transfer
84+
- Reverting `localObjects` means the handler discards its local modification on failure, which is a deliberate policy choice: if the receiver can't deserialize it, retrying would just fail again
85+
86+
**Trade-offs:**
87+
- The `actionBaseline` entries persist until overwritten by the next transfer for the same ID, rather than being cleaned up immediately on success. The memory cost is bounded by the number of active object IDs and is comparable to `remoteObjects` itself
88+
- The inline-Visit optimization (bundling tree data with Visit request/response to eliminate GetObject round-trips) remains a complementary performance improvement that could be pursued independently

doc/adr/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,7 @@
44
* [2. Naming recipes](0002-recipe-naming.md)
55
* [3. OSS contributor's guidelines](0003-oss-contributors.md)
66
* [4. Library migration recipe conventions](0004-library-migration-conventions.md)
7+
* [5. Parser and LST conventions](0005-parser-lst-conventions.md)
8+
* [6. Recipe marketplace CSV format](0006-recipe-marketplace-csv-format.md)
9+
* [7. JavaScript templating engine enhancements](0007-javascript-templating-enhancements.md)
10+
* [9. GetObject action field for error recovery](0009-getobject-action-field.md)

rewrite-core/src/main/java/org/openrewrite/rpc/RewriteRpc.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -466,13 +466,37 @@ public <T> T getObject(String id, @Nullable String sourceFileType) {
466466

467467
RpcReceiveQueue q = new RpcReceiveQueue(
468468
remoteRefs,
469-
() -> send("GetObject", new GetObject(id, sourceFileType), GetObjectResponse.class),
469+
() -> send("GetObject", new GetObject(id, sourceFileType, null), GetObjectResponse.class),
470470
sourceFileType,
471471
log.get()
472472
);
473-
Object remoteObject = q.receive(localObject, null);
474-
if (q.take().getState() != END_OF_OBJECT) {
475-
throw new IllegalStateException("Expected END_OF_OBJECT");
473+
Object before = remoteObjects.get(id);
474+
Object remoteObject;
475+
try {
476+
remoteObject = q.receive(localObject, null);
477+
if (q.take().getState() != END_OF_OBJECT) {
478+
throw new IllegalStateException("Expected END_OF_OBJECT");
479+
}
480+
} catch (Exception e) {
481+
// Tell the handler to revert both remoteObjects and localObjects
482+
// to the pre-transfer state
483+
try {
484+
send("GetObject", new GetObject(id, sourceFileType, "revert"), GetObjectResponse.class);
485+
} catch (Exception revertError) {
486+
PrintStream logFile = log.get();
487+
if (logFile != null) {
488+
revertError.printStackTrace(logFile);
489+
}
490+
}
491+
// Revert our tracking to match the handler's reverted state.
492+
// The handler restored remoteObjects[id] to the pre-transfer
493+
// value, so the requester must do the same to stay in sync.
494+
if (before != null) {
495+
remoteObjects.put(id, before);
496+
} else {
497+
remoteObjects.remove(id);
498+
}
499+
throw e;
476500
}
477501

478502
//noinspection ConstantValue

rewrite-core/src/main/java/org/openrewrite/rpc/request/GetObject.java

Lines changed: 77 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,14 @@
2323
import org.openrewrite.rpc.RpcSendQueue;
2424

2525
import java.io.PrintStream;
26-
import java.util.ArrayList;
27-
import java.util.IdentityHashMap;
28-
import java.util.List;
29-
import java.util.Map;
26+
import java.util.*;
3027
import java.util.concurrent.*;
28+
import java.util.concurrent.atomic.AtomicBoolean;
3129
import java.util.concurrent.atomic.AtomicInteger;
3230
import java.util.concurrent.atomic.AtomicReference;
3331
import java.util.function.Supplier;
3432

33+
import static java.util.Collections.emptyList;
3534
import static org.openrewrite.rpc.RpcObjectData.State.DELETE;
3635
import static org.openrewrite.rpc.RpcObjectData.State.END_OF_OBJECT;
3736

@@ -42,6 +41,20 @@ public class GetObject implements RpcRequest {
4241
@Nullable
4342
String sourceFileType;
4443

44+
/**
45+
* An action for the handler to perform instead of a normal data transfer.
46+
* When null, this is a normal data-transfer request.
47+
* <p>
48+
* Supported actions:
49+
* <ul>
50+
* <li>"revert" — sent by the receiver after a deserialization failure.
51+
* The handler reverts both {@code remoteObjects} and {@code localObjects}
52+
* for this ID to the pre-transfer state.</li>
53+
* </ul>
54+
*/
55+
@Nullable
56+
String action;
57+
4558
@RequiredArgsConstructor
4659
public static class Handler extends JsonRpcMethod<GetObject> {
4760
private static final ExecutorService forkJoin = ForkJoinPool.commonPool();
@@ -59,32 +72,82 @@ public static class Handler extends JsonRpcMethod<GetObject> {
5972
private final AtomicReference<PrintStream> log;
6073
private final Supplier<Boolean> traceGetObject;
6174

62-
private final Map<String, BlockingQueue<List<RpcObjectData>>> inProgressGetRpcObjects = new ConcurrentHashMap<>();
75+
@RequiredArgsConstructor
76+
private static class InProgressSend {
77+
final BlockingQueue<List<RpcObjectData>> queue;
78+
final @Nullable Object before;
79+
final AtomicBoolean cancelled;
80+
}
81+
82+
private final Map<String, InProgressSend> inProgressGetRpcObjects = new ConcurrentHashMap<>();
83+
84+
/**
85+
* Stores the pre-transfer {@code remoteObjects} value for each in-flight
86+
* or recently completed transfer. Used by the "revert" action to restore
87+
* both {@code remoteObjects} and {@code localObjects} to the state before
88+
* the transfer started.
89+
*/
90+
private final Map<String, @Nullable Object> actionBaseline = new HashMap<>();
6391

6492
@Override
6593
protected List<RpcObjectData> handle(GetObject request) throws Exception {
94+
String action = request.getAction();
95+
if (action != null) {
96+
if ("revert".equals(action)) {
97+
String id = request.getId();
98+
InProgressSend stale = inProgressGetRpcObjects.remove(id);
99+
if (stale != null) {
100+
stale.cancelled.set(true);
101+
}
102+
if (actionBaseline.containsKey(id)) {
103+
Object before = actionBaseline.remove(id);
104+
if (before != null) {
105+
remoteObjects.put(id, before);
106+
localObjects.put(id, before);
107+
} else {
108+
remoteObjects.remove(id);
109+
localObjects.remove(id);
110+
}
111+
}
112+
}
113+
return emptyList();
114+
}
115+
66116
Object after = localObjects.get(request.getId());
67117

68118
if (after == null) {
119+
// Clean up any stale in-progress send for this ID
120+
InProgressSend stale = inProgressGetRpcObjects.remove(request.getId());
121+
if (stale != null) {
122+
stale.cancelled.set(true);
123+
}
124+
69125
List<RpcObjectData> deleted = new ArrayList<>(2);
70126
deleted.add(new RpcObjectData(DELETE, null, null, null, traceGetObject.get()));
71127
deleted.add(new RpcObjectData(END_OF_OBJECT, null, null, null, traceGetObject.get()));
72128
return deleted;
73129
}
74130

75-
BlockingQueue<List<RpcObjectData>> q = inProgressGetRpcObjects.computeIfAbsent(request.getId(), id -> {
131+
Object currentBefore = remoteObjects.get(request.getId());
132+
133+
InProgressSend inProgress = inProgressGetRpcObjects.computeIfAbsent(request.getId(), id -> {
134+
// Save the pre-transfer baseline for potential revert
135+
actionBaseline.put(id, currentBefore);
136+
76137
BlockingQueue<List<RpcObjectData>> batch = new ArrayBlockingQueue<>(1);
77-
Object before = remoteObjects.get(id);
138+
AtomicBoolean cancelled = new AtomicBoolean(false);
78139

79140
RpcSendQueue sendQueue = new RpcSendQueue(batchSize.get(), batch::put, localRefs, request.getSourceFileType(), traceGetObject.get());
80141
forkJoin.submit(() -> {
81142
try {
82-
sendQueue.send(after, before, null);
143+
sendQueue.send(after, currentBefore, null);
83144

84-
// All the data has been sent, and the remote should have received
85-
// the full tree, so update our understanding of the remote state
86-
// of this tree.
87-
remoteObjects.put(id, after);
145+
// Optimistically update remoteObjects — the receiver is
146+
// expected to send action="revert" if deserialization fails,
147+
// which will roll this back.
148+
if (!cancelled.get()) {
149+
remoteObjects.put(id, after);
150+
}
88151
} catch (Throwable t) {
89152
PrintStream logFile = log.get();
90153
//noinspection ConstantValue
@@ -97,10 +160,10 @@ protected List<RpcObjectData> handle(GetObject request) throws Exception {
97160
}
98161
return 0;
99162
});
100-
return batch;
163+
return new InProgressSend(batch, currentBefore, cancelled);
101164
});
102165

103-
List<RpcObjectData> batch = q.take();
166+
List<RpcObjectData> batch = inProgress.queue.take();
104167
if (batch.get(batch.size() - 1).getState() == END_OF_OBJECT) {
105168
inProgressGetRpcObjects.remove(request.getId());
106169
}

rewrite-javascript/rewrite/src/rpc/request/get-object.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ import {extractSourcePath, withMetrics} from "./metrics";
2020

2121
export class GetObject {
2222
constructor(private readonly id: string,
23-
private readonly sourceFileType?: string) {
23+
private readonly sourceFileType?: string,
24+
private readonly action?: string) {
2425
}
2526

2627
static handle(
@@ -33,6 +34,7 @@ export class GetObject {
3334
metricsCsv?: string,
3435
): void {
3536
const pendingData = new Map<string, RpcObjectData[]>();
37+
const actionBaseline = new Map<string, any>();
3638

3739
connection.onRequest(
3840
new rpc.RequestType<GetObject, any, Error>("GetObject"),
@@ -41,6 +43,25 @@ export class GetObject {
4143
metricsCsv,
4244
(context) => async request => {
4345
const objId = request.id;
46+
47+
// Handle actions from the receiver
48+
if (request.action) {
49+
if (request.action === 'revert') {
50+
const before = actionBaseline.get(objId);
51+
actionBaseline.delete(objId);
52+
pendingData.delete(objId);
53+
if (before !== undefined) {
54+
remoteObjects.set(objId, before);
55+
localObjects.set(objId, before);
56+
} else {
57+
remoteObjects.delete(objId);
58+
localObjects.delete(objId);
59+
}
60+
}
61+
context.target = '';
62+
return [];
63+
}
64+
4465
if (!localObjects.has(objId)) {
4566
context.target = '';
4667
return [
@@ -63,10 +84,14 @@ export class GetObject {
6384
const after = obj;
6485
const before = remoteObjects.get(objId);
6586

87+
// Save baseline for potential revert
88+
actionBaseline.set(objId, before);
89+
6690
allData = await new RpcSendQueue(localRefs, request.sourceFileType, trace())
6791
.generate(after, before);
6892
pendingData.set(objId, allData);
6993

94+
// Optimistic update — receiver sends action="revert" on failure
7095
remoteObjects.set(objId, after);
7196
}
7297

rewrite-javascript/rewrite/src/rpc/rewrite-rpc.ts

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,36 @@ export class RewriteRpc {
133133
);
134134
}, this.logger, this.traceGetObject.receive);
135135

136-
const remoteObject = await q.receive<P>(localObject);
137-
138-
const eof = (await q.take());
139-
if (eof.state !== RpcObjectState.END_OF_OBJECT) {
140-
RpcObjectData.logTrace(eof, this.traceGetObject.receive, this.logger);
141-
throw new Error(`Expected END_OF_OBJECT but got: ${eof.state}`);
136+
const before = this.remoteObjects.get(id);
137+
let remoteObject: P;
138+
try {
139+
remoteObject = await q.receive<P>(localObject);
140+
141+
const eof = (await q.take());
142+
if (eof.state !== RpcObjectState.END_OF_OBJECT) {
143+
RpcObjectData.logTrace(eof, this.traceGetObject.receive, this.logger);
144+
throw new Error(`Expected END_OF_OBJECT but got: ${eof.state}`);
145+
}
146+
} catch (e) {
147+
// Tell the handler to revert both remoteObjects and localObjects
148+
// to the pre-transfer state
149+
try {
150+
await this.connection.sendRequest(
151+
new rpc.RequestType<GetObject, RpcObjectData[], Error>("GetObject"),
152+
new GetObject(id, sourceFileType, 'revert'),
153+
);
154+
} catch {
155+
// Best-effort revert
156+
}
157+
// Revert our tracking to match the handler's reverted state.
158+
// The handler restored remoteObjects[id] to the pre-transfer
159+
// value, so the requester must do the same to stay in sync.
160+
if (before !== undefined) {
161+
this.remoteObjects.set(id, before);
162+
} else {
163+
this.remoteObjects.delete(id);
164+
}
165+
throw e;
142166
}
143167

144168
this.remoteObjects.set(id, remoteObject);

0 commit comments

Comments
 (0)