Skip to content

Commit d8dfde6

Browse files
RPC: explicitly fail if RPC receives a message it has no codec for (#7342)
* Defensive mechanism for missing codecs * Defensive mechanism for missing codecs
1 parent 21451ad commit d8dfde6

6 files changed

Lines changed: 51 additions & 4 deletions

File tree

rewrite-core/src/main/java/org/openrewrite/rpc/RpcReceiveQueue.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ public <T> T receive(@Nullable T before, @Nullable UnaryOperator<T> onChange) {
138138
after = codec.rpcReceive(before, this);
139139
} else if (message.getValueType() == null) {
140140
after = message.getValue();
141+
} else if (message.getState() == RpcObjectData.State.ADD && message.getValue() == null) {
142+
throw new IllegalStateException(
143+
"No RPC codec registered on the Java side for '" + message.getValueType() + "'. " +
144+
"The remote side has a codec and sent property messages that will not be consumed, " +
145+
"causing RPC queue desynchronization.");
141146
} else {
142147
after = before;
143148
}

rewrite-core/src/test/java/org/openrewrite/rpc/RpcReceiveQueueTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.*;
2727

2828
import static org.assertj.core.api.Assertions.assertThat;
29+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2930

3031
public class RpcReceiveQueueTest {
3132

@@ -106,6 +107,19 @@ void changePropertyType() {
106107
assertThat(((Checksum) received).getAlgorithm()).isEqualTo("SHA-256");
107108
}
108109

110+
@Test
111+
void detectsMissingCodecOnReceiverSide() {
112+
// given
113+
batches.addLast(List.of(
114+
new RpcObjectData(RpcObjectData.State.ADD, "java.lang.StringBuilder", null, null, false)
115+
));
116+
117+
// when / then
118+
assertThatThrownBy(() -> rq.receive(null))
119+
.isInstanceOf(IllegalStateException.class)
120+
.hasMessageContaining("No RPC codec registered on the Java side for 'java.lang.StringBuilder'");
121+
}
122+
109123
private List<RpcObjectData> encode(List<RpcObjectData> batch) {
110124
List<RpcObjectData> encoded = new ArrayList<>();
111125
for (RpcObjectData data : batch) {

rewrite-csharp/csharp/OpenRewrite/Core/Rpc/RpcReceiveQueue.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,12 @@ public RpcObjectData Take()
151151
// Simple value types (enums, primitives) sent with both valueType and value
152152
after = ExtractValue<T>(message.Value);
153153
}
154-
else if (message.ValueType != null)
154+
else if (message.State == ADD && message.ValueType != null)
155155
{
156-
// ValueType set but no value and no codec - keep before
157-
after = before;
156+
throw new InvalidOperationException(
157+
$"No RPC codec registered on the C# side for '{message.ValueType}'. " +
158+
"The remote side has a codec and sent property messages that will not be consumed, " +
159+
"causing RPC queue desynchronization.");
158160
}
159161
else
160162
{

rewrite-javascript/rewrite/src/rpc/queue.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,12 @@ export class RpcReceiveQueue {
348348
after = await codec.rpcReceive(before, this);
349349
} else if (message.value !== undefined) {
350350
after = message.valueType ? {kind: message.valueType, ...message.value} : message.value;
351+
} else if (message.state === RpcObjectState.ADD && message.valueType) {
352+
throw new Error(
353+
`No RPC codec registered on the TypeScript side for '${message.valueType}'. ` +
354+
`The Java side has a codec and sent property messages that will not be consumed, ` +
355+
`causing RPC queue desynchronization.`
356+
);
351357
} else {
352358
after = before;
353359
}

rewrite-javascript/rewrite/test/rpc/queue.test.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import {Json} from "../../src/json";
2-
import {asRef, ReferenceMap, RpcReceiveQueue, RpcSendQueue} from "../../src/rpc";
2+
import {asRef, ReferenceMap, RpcReceiveQueue, RpcSendQueue, RpcObjectState} from "../../src/rpc";
3+
import type {RpcObjectData} from "../../src/rpc";
34

45
describe("RPC queues", () => {
56

@@ -50,4 +51,17 @@ describe("RPC queues", () => {
5051
expect(received.value.kind).toBe(Json.Kind.Identifier);
5152
expect(received.value.kind).not.toBe(beforeWrapper.value.kind);
5253
});
54+
55+
test("detects missing codec on receiver side", async () => {
56+
// given
57+
const batch: RpcObjectData[] = [
58+
{state: RpcObjectState.ADD, valueType: "com.example.UnknownMarker", value: undefined},
59+
];
60+
const rq = new RpcReceiveQueue(new Map(), undefined, async () => batch, undefined, false);
61+
62+
// when / then
63+
await expect(rq.receive(undefined)).rejects.toThrow(
64+
"No RPC codec registered on the TypeScript side for 'com.example.UnknownMarker'"
65+
);
66+
});
5367
});

rewrite-python/rewrite/src/rewrite/rpc/receive_queue.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,12 @@ def _do_change(
192192
after = {'kind': message.value_type, **message.value} if isinstance(message.value, dict) else message.value
193193
else:
194194
after = message.value
195+
elif message.state == RpcObjectState.ADD and message.value_type:
196+
raise RuntimeError(
197+
f"No RPC codec registered on the Python side for '{message.value_type}'. "
198+
"The remote side has a codec and sent property messages that will not be consumed, "
199+
"causing RPC queue desynchronization."
200+
)
195201
else:
196202
after = before
197203

0 commit comments

Comments
 (0)