Skip to content

Commit 6cfd64d

Browse files
VWang1111claude
andcommitted
feat: opt out of ReconnectingWebSocketListener wrapper, auto-disable for custom transports
Adds `ClientOptions.Builder.reconnect(boolean)` (default `true`). When `false`, the per-resource WebSocket clients (`listen/v1`, `listen/v2`, `agent/v1`, `speak/v1`) connect once via a plain `okhttp3.WebSocketListener` and propagate failures directly, bypassing `ReconnectingWebSocketListener` and its hardcoded `connectionFuture.get(4000, MILLISECONDS)` ceiling. Auto-disable for custom transports: when `DeepgramClientBuilder.transportFactory(...)` (or its async counterpart) is set, `setAdditional()` now also calls `builder.reconnect(false)`. Custom transports like `SageMakerTransportFactory` already manage their own connection lifecycle and retry semantics; wrapping them in another retry layer creates double-retry storms under burst load. The four per-resource WebSocket client files are added to the "temporarily frozen" section of `.fernignore` so the patch survives the next Fern regen. Unfreeze once the change has been pushed upstream into the Fern generator template. Backwards compatibility: - Existing callers that do NOT use `transportFactory(...)` see zero behavior change. `reconnect()` defaults to `true`, the `ReconnectingWebSocketListener` wrapper is unchanged for the OkHttp WebSocket path (Deepgram cloud, self-hosted Deepgram via raw WS), and the lower-level `ClientOptions.Builder.webSocketFactory(...)` seam is also unaffected. - Existing SageMaker callers (the only known users of `transportFactory(...)`) get the auto-disable. Empirically this eliminates the retry-storm pattern observed at 400 concurrent streams on a 10x ml.g6.2xlarge endpoint: ThrottlingException line-count 68,909 -> 240 (-99.7%) ModelStreamError line-count 30,442 -> 1,128 (-96.3%) Streams hitting Attempt Count: 4 29,030 -> 96 (-99.7%) 4-second `connection timeout` 13 -> 0 Total error log lines 1,322,666 -> 33,972 (-97.4%) Same wall time (313 vs 312 s), same transcript count, dramatically cleaner error log. Edge case worth calling out: - A SageMaker caller who has been tuning retry behavior via `wsClient.reconnectOptions(customOpts)` will find those options silently ignored after this change (`reconnect=false` skips the listener entirely). Their tuning was almost certainly a workaround for the very storm we are now eliminating, so the right migration is to delete the `reconnectOptions(...)` call. Customers who genuinely want both a custom transport and SDK-side reconnect can re-enable explicitly with `.reconnect(true)` after `.transportFactory(...)`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c790dbd commit 6cfd64d

8 files changed

Lines changed: 364 additions & 24 deletions

File tree

.fernignore

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,17 @@ src/main/java/com/deepgram/core/ClientOptions.java
1919
# Transport abstraction (pluggable transport for SageMaker, etc.)
2020
src/main/java/com/deepgram/core/transport/
2121

22+
# Temporarily frozen — Layer 4 patch: ClientOptions.reconnect(false) opt-out path
23+
# in the per-resource WebSocket clients. When `clientOptions.reconnect()` is false
24+
# (auto-set by `DeepgramClientBuilder.transportFactory(...)`), connect() bypasses
25+
# `ReconnectingWebSocketListener` and uses a plain okhttp3.WebSocketListener.
26+
# Pull these out of the freeze once the change has been pushed upstream into the
27+
# Fern generator template.
28+
src/main/java/com/deepgram/resources/listen/v1/websocket/V1WebSocketClient.java
29+
src/main/java/com/deepgram/resources/listen/v2/websocket/V2WebSocketClient.java
30+
src/main/java/com/deepgram/resources/agent/v1/websocket/V1WebSocketClient.java
31+
src/main/java/com/deepgram/resources/speak/v1/websocket/V1WebSocketClient.java
32+
2233
# Build and project configuration
2334
build.gradle
2435
settings.gradle

src/main/java/com/deepgram/AsyncDeepgramClientBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ protected void setAdditional(ClientOptions.Builder builder) {
118118
builder.addHeader("x-deepgram-session-id", sid);
119119
if (transportFactory != null) {
120120
builder.webSocketFactory(new TransportWebSocketFactory(transportFactory));
121+
// Custom transports manage their own connection lifecycle and retry semantics.
122+
// Wrapping them in ReconnectingWebSocketListener creates double-retry layering
123+
// that compounds failures under burst load. Auto-disable here; users can
124+
// re-enable explicitly via ClientOptions if they have a reason to.
125+
builder.reconnect(false);
121126
}
122127
}
123128

src/main/java/com/deepgram/DeepgramClientBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ protected void setAdditional(ClientOptions.Builder builder) {
117117
builder.addHeader("x-deepgram-session-id", sid);
118118
if (transportFactory != null) {
119119
builder.webSocketFactory(new TransportWebSocketFactory(transportFactory));
120+
// Custom transports manage their own connection lifecycle and retry semantics.
121+
// Wrapping them in ReconnectingWebSocketListener creates double-retry layering
122+
// that compounds failures under burst load. Auto-disable here; users can
123+
// re-enable explicitly via ClientOptions if they have a reason to.
124+
builder.reconnect(false);
120125
}
121126
}
122127

src/main/java/com/deepgram/core/ClientOptions.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public final class ClientOptions {
2727

2828
private final Optional<LogConfig> logging;
2929

30+
private final boolean reconnect;
31+
3032
private ClientOptions(
3133
Environment environment,
3234
Map<String, String> headers,
@@ -35,7 +37,8 @@ private ClientOptions(
3537
int timeout,
3638
int maxRetries,
3739
Optional<WebSocketFactory> webSocketFactory,
38-
Optional<LogConfig> logging) {
40+
Optional<LogConfig> logging,
41+
boolean reconnect) {
3942
this.environment = environment;
4043
this.headers = new HashMap<>();
4144
this.headers.putAll(headers);
@@ -53,6 +56,7 @@ private ClientOptions(
5356
this.maxRetries = maxRetries;
5457
this.webSocketFactory = webSocketFactory;
5558
this.logging = logging;
59+
this.reconnect = reconnect;
5660
}
5761

5862
public Environment environment() {
@@ -106,6 +110,18 @@ public Optional<LogConfig> logging() {
106110
return this.logging;
107111
}
108112

113+
/**
114+
* Whether the per-resource WebSocket clients should wrap the underlying WebSocket in a
115+
* {@link ReconnectingWebSocketListener} that retries on failure. Defaults to {@code true}.
116+
* When {@code false}, the WebSocket clients connect once and propagate failures directly
117+
* without retry. Use {@code false} when the underlying transport (e.g. SageMaker bidi
118+
* streaming) already manages its own connection lifecycle and retry semantics, in which
119+
* case wrapping in another retry layer causes double-retry storms under burst load.
120+
*/
121+
public boolean reconnect() {
122+
return this.reconnect;
123+
}
124+
109125
public static Builder builder() {
110126
return new Builder();
111127
}
@@ -127,6 +143,8 @@ public static class Builder {
127143

128144
private Optional<WebSocketFactory> webSocketFactory = Optional.empty();
129145

146+
private boolean reconnect = true;
147+
130148
public Builder environment(Environment environment) {
131149
this.environment = environment;
132150
return this;
@@ -187,6 +205,17 @@ public Builder logging(LogConfig logging) {
187205
return this;
188206
}
189207

208+
/**
209+
* Enable or disable the auto-reconnecting WebSocket wrapper. Defaults to {@code true}.
210+
* Set to {@code false} when using a custom transport (e.g. SageMaker) that already
211+
* manages its own connection lifecycle — wrapping such transports in the reconnect
212+
* listener causes double-retry storms under burst load.
213+
*/
214+
public Builder reconnect(boolean reconnect) {
215+
this.reconnect = reconnect;
216+
return this;
217+
}
218+
190219
public ClientOptions build() {
191220
OkHttpClient.Builder httpClientBuilder =
192221
this.httpClient != null ? this.httpClient.newBuilder() : new OkHttpClient.Builder();
@@ -220,7 +249,8 @@ public ClientOptions build() {
220249
this.timeout.get(),
221250
this.maxRetries,
222251
this.webSocketFactory,
223-
this.logging);
252+
this.logging,
253+
this.reconnect);
224254
}
225255

226256
/**
@@ -235,6 +265,7 @@ public static Builder from(ClientOptions clientOptions) {
235265
builder.headerSuppliers.putAll(clientOptions.headerSuppliers);
236266
builder.maxRetries = clientOptions.maxRetries();
237267
builder.logging = clientOptions.logging();
268+
builder.reconnect = clientOptions.reconnect();
238269
return builder;
239270
}
240271
}

src/main/java/com/deepgram/resources/agent/v1/websocket/V1WebSocketClient.java

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import okhttp3.Request;
4343
import okhttp3.Response;
4444
import okhttp3.WebSocket;
45+
import okhttp3.WebSocketListener;
4546
import okio.ByteString;
4647

4748
/**
@@ -73,6 +74,12 @@ public class V1WebSocketClient implements AutoCloseable {
7374

7475
private ReconnectingWebSocketListener reconnectingListener;
7576

77+
// Direct (non-reconnecting) WebSocket — used when clientOptions.reconnect() is false
78+
// (e.g. when a custom transportFactory is configured). In that mode, the underlying
79+
// transport owns its own connection lifecycle, so we connect once and propagate
80+
// failures directly without wrapping in ReconnectingWebSocketListener.
81+
private volatile WebSocket directWebSocket;
82+
7683
private volatile Consumer<AgentV1ReceiveFunctionCallResponse> onFunctionCallResponseHandler;
7784

7885
private volatile Consumer<AgentV1PromptUpdated> promptUpdatedHandler;
@@ -144,6 +151,58 @@ public CompletableFuture<Void> connect() {
144151
clientOptions.headers(null).forEach(requestBuilder::addHeader);
145152
final Request request = requestBuilder.build();
146153
this.readyState = WebSocketReadyState.CONNECTING;
154+
155+
if (!clientOptions.reconnect()) {
156+
// Direct mode — no ReconnectingWebSocketListener wrapper. Used when a custom
157+
// transportFactory manages its own retry/lifecycle (e.g. SageMaker bidi streaming).
158+
WebSocketListener directListener = new WebSocketListener() {
159+
@Override
160+
public void onOpen(WebSocket webSocket, Response response) {
161+
directWebSocket = webSocket;
162+
readyState = WebSocketReadyState.OPEN;
163+
if (onConnectedHandler != null) {
164+
onConnectedHandler.run();
165+
}
166+
connectionFuture.complete(null);
167+
}
168+
169+
@Override
170+
public void onMessage(WebSocket webSocket, String text) {
171+
handleIncomingMessage(text);
172+
}
173+
174+
@Override
175+
public void onMessage(WebSocket webSocket, ByteString bytes) {
176+
if (agentV1AudioHandler != null) {
177+
agentV1AudioHandler.accept(bytes);
178+
}
179+
}
180+
181+
@Override
182+
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
183+
readyState = WebSocketReadyState.CLOSED;
184+
if (onErrorHandler != null) {
185+
onErrorHandler.accept(new RuntimeException(t));
186+
}
187+
connectionFuture.completeExceptionally(t);
188+
}
189+
190+
@Override
191+
public void onClosed(WebSocket webSocket, int code, String reason) {
192+
readyState = WebSocketReadyState.CLOSED;
193+
if (onDisconnectedHandler != null) {
194+
onDisconnectedHandler.accept(new DisconnectReason(code, reason));
195+
}
196+
}
197+
};
198+
if (clientOptions.webSocketFactory().isPresent()) {
199+
this.directWebSocket = clientOptions.webSocketFactory().get().create(request, directListener);
200+
} else {
201+
this.directWebSocket = okHttpClient.newWebSocket(request, directListener);
202+
}
203+
return connectionFuture;
204+
}
205+
147206
ReconnectingWebSocketListener.ReconnectOptions reconnectOpts = this.reconnectOptions != null
148207
? this.reconnectOptions
149208
: ReconnectingWebSocketListener.ReconnectOptions.builder().build();
@@ -201,7 +260,12 @@ protected void onWebSocketClosed(WebSocket webSocket, int code, String reason) {
201260
* Disconnects the WebSocket connection and releases resources.
202261
*/
203262
public void disconnect() {
204-
reconnectingListener.disconnect();
263+
if (reconnectingListener != null) {
264+
reconnectingListener.disconnect();
265+
} else if (directWebSocket != null) {
266+
directWebSocket.close(1000, "Client disconnecting");
267+
directWebSocket = null;
268+
}
205269
if (timeoutExecutor != null) {
206270
timeoutExecutor.shutdownNow();
207271
timeoutExecutor = null;
@@ -300,8 +364,12 @@ public CompletableFuture<Void> sendMedia(ByteString message) {
300364
CompletableFuture<Void> future = new CompletableFuture<>();
301365
try {
302366
assertSocketIsOpen();
303-
// Use reconnecting listener's sendBinary method which handles queuing
304-
reconnectingListener.sendBinary(message);
367+
if (reconnectingListener != null) {
368+
// Use reconnecting listener's sendBinary method which handles queuing
369+
reconnectingListener.sendBinary(message);
370+
} else {
371+
directWebSocket.send(message);
372+
}
305373
future.complete(null);
306374
} catch (Exception e) {
307375
future.completeExceptionally(new RuntimeException("Failed to send binary data", e));
@@ -501,7 +569,10 @@ public void close() {
501569
* @throws IllegalStateException if the socket is not connected or not open
502570
*/
503571
private void assertSocketIsOpen() {
504-
if (reconnectingListener.getWebSocket() == null) {
572+
WebSocket activeSocket = (reconnectingListener != null)
573+
? reconnectingListener.getWebSocket()
574+
: directWebSocket;
575+
if (activeSocket == null) {
505576
throw new IllegalStateException("WebSocket is not connected. Call connect() first.");
506577
}
507578
if (readyState != WebSocketReadyState.OPEN) {
@@ -514,8 +585,12 @@ private CompletableFuture<Void> sendMessage(Object body) {
514585
try {
515586
assertSocketIsOpen();
516587
String json = objectMapper.writeValueAsString(body);
517-
// Use reconnecting listener's send method which handles queuing
518-
reconnectingListener.send(json);
588+
if (reconnectingListener != null) {
589+
// Use reconnecting listener's send method which handles queuing
590+
reconnectingListener.send(json);
591+
} else {
592+
directWebSocket.send(json);
593+
}
519594
future.complete(null);
520595
} catch (IllegalStateException e) {
521596
future.completeExceptionally(e);

0 commit comments

Comments
 (0)