Skip to content

Commit 3a35b6d

Browse files
VWang1111claude
andcommitted
fix(reconnect): maxRetries(0) and configurable connectionTimeoutMs; auto-wire transport factory policy
Reworks the Layer-4 patch (#45) per @lukeocodes' review. Replaces the previous ClientOptions.reconnect(boolean) flag and the four duplicated WebSocketClient listeners with the underlying bug fixes in ReconnectingWebSocketListener plus a clean policy-declaration hook on DeepgramTransportFactory. Reverts (no edits to generated WS clients, no .fernignore freeze inflation): - ClientOptions.reconnect(boolean) field/getter/builder method. - if (transportFactory != null) builder.reconnect(false) auto-disable heuristic in both DeepgramClientBuilder and AsyncDeepgramClientBuilder. - The four resources/.../websocket/*WebSocketClient.java listener duplications and their .fernignore freeze entries. Bug fixes in ReconnectingWebSocketListener (now temporarily frozen): - maxRetries(0) used to refuse to connect because the gate compared retryCount.get() >= maxRetries on the initial attempt (retryCount starts at 0). The initial attempt is not a retry; the gate is now > maxRetries so the count caps retries only. maxRetries(0) means "1 attempt, no retries" as the API name implies. - The 4000 ms connectionFuture.get(...) timeout is now configurable via a new connectionTimeoutMs field on ReconnectOptions (default 4000, validated > 0). The hardcoded literal is gone. - Adds applyOptionsOverride(ReconnectOptions) so a transport-factory wrapper can rewire policy on the listener after construction. Used by TransportWebSocketFactory to honour the new factory hook below without requiring edits to the per-resource WebSocketClient classes. Plugin policy declaration: - DeepgramTransportFactory (already permanently frozen) gains a default ReconnectOptions reconnectOptions() method returning null. Plugins managing their own connection lifecycle (e.g. SageMaker) override this to return ReconnectOptions.builder().maxRetries(0).build() so the SDK's wrapper-level reconnect doesn't compound their internal retries into a Throttling-on-Throttling storm under burst load. - TransportWebSocketFactory.create() applies factory.reconnectOptions() to the wrapping ReconnectingWebSocketListener via the new override hook. Auto-wiring with no edits to generated code. Tests: - New ReconnectingWebSocketListenerTest covers the maxRetries(0) regression, connectionTimeoutMs default + customisation + validation, and applyOptionsOverride no-op + retry-gate behaviour. Net diff vs the prior PR head: 4 files changed (+94/−10) instead of 8 files (+364/−24). Freeze list grows by 1 (the listener), shrinks by 4 (the WS clients). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6cfd64d commit 3a35b6d

12 files changed

Lines changed: 307 additions & 373 deletions

File tree

.fernignore

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,10 @@ src/main/java/com/deepgram/core/ClientOptions.java
1919
# Transport abstraction (pluggable transport for SageMaker, etc.)
2020
src/main/java/com/deepgram/core/transport/
2121

22-
# Temporarily frozen — Layer 4 patch: ClientOptions.reconnect(false) opt-out path
23-
# in the per-resource WebSocket clients. When `clientOptions.reconnect()` is false
24-
# (auto-set by `DeepgramClientBuilder.transportFactory(...)`), connect() bypasses
25-
# `ReconnectingWebSocketListener` and uses a plain okhttp3.WebSocketListener.
26-
# Pull these out of the freeze once the change has been pushed upstream into the
27-
# Fern generator template.
28-
src/main/java/com/deepgram/resources/listen/v1/websocket/V1WebSocketClient.java
29-
src/main/java/com/deepgram/resources/listen/v2/websocket/V2WebSocketClient.java
30-
src/main/java/com/deepgram/resources/agent/v1/websocket/V1WebSocketClient.java
31-
src/main/java/com/deepgram/resources/speak/v1/websocket/V1WebSocketClient.java
22+
# Bug fixes for maxRetries(0) semantics ("connect once, don't retry") and a
23+
# configurable connectionTimeoutMs on ReconnectOptions (was hardcoded 4000ms).
24+
# Pull this back out once the fixes are upstreamed into the Fern generator.
25+
src/main/java/com/deepgram/core/ReconnectingWebSocketListener.java
3226

3327
# Build and project configuration
3428
build.gradle

src/main/java/com/deepgram/AsyncDeepgramClientBuilder.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,6 @@ protected void setAdditional(ClientOptions.Builder builder) {
118118
builder.addHeader("x-deepgram-session-id", sid);
119119
if (transportFactory != null) {
120120
builder.webSocketFactory(new TransportWebSocketFactory(transportFactory));
121-
// Custom transports manage their own connection lifecycle and retry semantics.
122-
// Wrapping them in ReconnectingWebSocketListener creates double-retry layering
123-
// that compounds failures under burst load. Auto-disable here; users can
124-
// re-enable explicitly via ClientOptions if they have a reason to.
125-
builder.reconnect(false);
126121
}
127122
}
128123

src/main/java/com/deepgram/DeepgramClientBuilder.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,6 @@ protected void setAdditional(ClientOptions.Builder builder) {
117117
builder.addHeader("x-deepgram-session-id", sid);
118118
if (transportFactory != null) {
119119
builder.webSocketFactory(new TransportWebSocketFactory(transportFactory));
120-
// Custom transports manage their own connection lifecycle and retry semantics.
121-
// Wrapping them in ReconnectingWebSocketListener creates double-retry layering
122-
// that compounds failures under burst load. Auto-disable here; users can
123-
// re-enable explicitly via ClientOptions if they have a reason to.
124-
builder.reconnect(false);
125120
}
126121
}
127122

src/main/java/com/deepgram/core/ClientOptions.java

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ public final class ClientOptions {
2727

2828
private final Optional<LogConfig> logging;
2929

30-
private final boolean reconnect;
31-
3230
private ClientOptions(
3331
Environment environment,
3432
Map<String, String> headers,
@@ -37,8 +35,7 @@ private ClientOptions(
3735
int timeout,
3836
int maxRetries,
3937
Optional<WebSocketFactory> webSocketFactory,
40-
Optional<LogConfig> logging,
41-
boolean reconnect) {
38+
Optional<LogConfig> logging) {
4239
this.environment = environment;
4340
this.headers = new HashMap<>();
4441
this.headers.putAll(headers);
@@ -56,7 +53,6 @@ private ClientOptions(
5653
this.maxRetries = maxRetries;
5754
this.webSocketFactory = webSocketFactory;
5855
this.logging = logging;
59-
this.reconnect = reconnect;
6056
}
6157

6258
public Environment environment() {
@@ -110,18 +106,6 @@ public Optional<LogConfig> logging() {
110106
return this.logging;
111107
}
112108

113-
/**
114-
* Whether the per-resource WebSocket clients should wrap the underlying WebSocket in a
115-
* {@link ReconnectingWebSocketListener} that retries on failure. Defaults to {@code true}.
116-
* When {@code false}, the WebSocket clients connect once and propagate failures directly
117-
* without retry. Use {@code false} when the underlying transport (e.g. SageMaker bidi
118-
* streaming) already manages its own connection lifecycle and retry semantics, in which
119-
* case wrapping in another retry layer causes double-retry storms under burst load.
120-
*/
121-
public boolean reconnect() {
122-
return this.reconnect;
123-
}
124-
125109
public static Builder builder() {
126110
return new Builder();
127111
}
@@ -143,8 +127,6 @@ public static class Builder {
143127

144128
private Optional<WebSocketFactory> webSocketFactory = Optional.empty();
145129

146-
private boolean reconnect = true;
147-
148130
public Builder environment(Environment environment) {
149131
this.environment = environment;
150132
return this;
@@ -205,17 +187,6 @@ public Builder logging(LogConfig logging) {
205187
return this;
206188
}
207189

208-
/**
209-
* Enable or disable the auto-reconnecting WebSocket wrapper. Defaults to {@code true}.
210-
* Set to {@code false} when using a custom transport (e.g. SageMaker) that already
211-
* manages its own connection lifecycle — wrapping such transports in the reconnect
212-
* listener causes double-retry storms under burst load.
213-
*/
214-
public Builder reconnect(boolean reconnect) {
215-
this.reconnect = reconnect;
216-
return this;
217-
}
218-
219190
public ClientOptions build() {
220191
OkHttpClient.Builder httpClientBuilder =
221192
this.httpClient != null ? this.httpClient.newBuilder() : new OkHttpClient.Builder();
@@ -249,8 +220,7 @@ public ClientOptions build() {
249220
this.timeout.get(),
250221
this.maxRetries,
251222
this.webSocketFactory,
252-
this.logging,
253-
this.reconnect);
223+
this.logging);
254224
}
255225

256226
/**
@@ -265,7 +235,6 @@ public static Builder from(ClientOptions clientOptions) {
265235
builder.headerSuppliers.putAll(clientOptions.headerSuppliers);
266236
builder.maxRetries = clientOptions.maxRetries();
267237
builder.logging = clientOptions.logging();
268-
builder.reconnect = clientOptions.reconnect();
269238
return builder;
270239
}
271240
}

src/main/java/com/deepgram/core/ReconnectingWebSocketListener.java

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,21 @@
2525
* Provides production-ready resilience for WebSocket connections.
2626
*/
2727
public abstract class ReconnectingWebSocketListener extends WebSocketListener {
28-
private final long minReconnectionDelayMs;
28+
// Option-derived fields are volatile (not final) so {@link #applyOptionsOverride} can rewire them
29+
// after construction — used by {@code TransportWebSocketFactory} to honour
30+
// {@code DeepgramTransportFactory.reconnectOptions()} without editing the generated WS clients.
31+
private volatile long minReconnectionDelayMs;
2932

30-
private final long maxReconnectionDelayMs;
33+
private volatile long maxReconnectionDelayMs;
3134

32-
private final double reconnectionDelayGrowFactor;
35+
private volatile double reconnectionDelayGrowFactor;
3336

34-
private final int maxRetries;
37+
private volatile int maxRetries;
3538

3639
private final int maxEnqueuedMessages;
3740

41+
private volatile long connectionTimeoutMs;
42+
3843
private final AtomicInteger retryCount = new AtomicInteger(0);
3944

4045
private final AtomicBoolean connectLock = new AtomicBoolean(false);
@@ -66,16 +71,44 @@ public ReconnectingWebSocketListener(
6671
this.reconnectionDelayGrowFactor = options.reconnectionDelayGrowFactor;
6772
this.maxRetries = options.maxRetries;
6873
this.maxEnqueuedMessages = options.maxEnqueuedMessages;
74+
this.connectionTimeoutMs = options.connectionTimeoutMs;
6975
this.connectionSupplier = connectionSupplier;
7076
}
7177

78+
/**
79+
* Replaces the option-derived parameters on this listener at runtime. Used by
80+
* {@code TransportWebSocketFactory} to apply {@code DeepgramTransportFactory.reconnectOptions()}
81+
* without requiring edits to the generated per-resource WebSocket clients. {@code maxEnqueuedMessages}
82+
* is intentionally not overridden — the message queue is sized at construction.
83+
*
84+
* <p>Thread-safety: option-derived fields are volatile; reads observe the latest write. The
85+
* initial connect() call may have already started before the override lands, so for the very
86+
* first attempt the original options apply; the override takes effect from the next attempt
87+
* onwards. For the SageMaker storm-suppression case ({@code maxRetries(0)}) this is fine
88+
* because the initial attempt's gate ({@code retryCount > maxRetries} with {@code retryCount=0})
89+
* always passes regardless.
90+
*
91+
* @param options replacement options; {@code null} is a no-op.
92+
*/
93+
public void applyOptionsOverride(ReconnectOptions options) {
94+
if (options == null) {
95+
return;
96+
}
97+
this.minReconnectionDelayMs = options.minReconnectionDelayMs;
98+
this.maxReconnectionDelayMs = options.maxReconnectionDelayMs;
99+
this.reconnectionDelayGrowFactor = options.reconnectionDelayGrowFactor;
100+
this.maxRetries = options.maxRetries;
101+
this.connectionTimeoutMs = options.connectionTimeoutMs;
102+
}
103+
72104
/**
73105
* Initiates a WebSocket connection with automatic reconnection enabled.
74106
*
75107
* Connection behavior:
76-
* - Times out after 4000 milliseconds
108+
* - Times out after {@code ReconnectOptions.connectionTimeoutMs} (default 4000ms)
77109
* - Thread-safe via atomic lock (returns immediately if connection in progress)
78-
* - Retry count not incremented for initial connection attempt
110+
* - {@code maxRetries} counts retries only — the initial attempt always proceeds.
111+
* {@code maxRetries(0)} means "connect once, don't retry" (not "refuse to connect").
79112
*
80113
* Error handling:
81114
* - TimeoutException: Includes retry attempt context
@@ -86,18 +119,21 @@ public void connect() {
86119
if (!connectLock.compareAndSet(false, true)) {
87120
return;
88121
}
89-
if (retryCount.get() >= maxRetries) {
122+
// retryCount is incremented inside scheduleReconnect() before re-entering connect(),
123+
// so on the initial call retryCount == 0 and we always proceed. The cap applies to
124+
// retries only — maxRetries(0) blocks retries but allows the initial attempt.
125+
if (retryCount.get() > maxRetries) {
90126
connectLock.set(false);
91127
return;
92128
}
93129
try {
94130
CompletableFuture<? extends WebSocket> connectionFuture = CompletableFuture.supplyAsync(connectionSupplier);
95131
try {
96-
webSocket = connectionFuture.get(4000, MILLISECONDS);
132+
webSocket = connectionFuture.get(connectionTimeoutMs, MILLISECONDS);
97133
} catch (TimeoutException e) {
98134
connectionFuture.cancel(true);
99135
TimeoutException timeoutError =
100-
new TimeoutException("WebSocket connection timeout after " + 4000 + " milliseconds"
136+
new TimeoutException("WebSocket connection timeout after " + connectionTimeoutMs + " milliseconds"
101137
+ (retryCount.get() > 0
102138
? " (retry attempt #" + retryCount.get()
103139
: " (initial connection attempt)"));
@@ -399,12 +435,15 @@ public static final class ReconnectOptions {
399435

400436
public final int maxEnqueuedMessages;
401437

438+
public final long connectionTimeoutMs;
439+
402440
private ReconnectOptions(Builder builder) {
403441
this.minReconnectionDelayMs = builder.minReconnectionDelayMs;
404442
this.maxReconnectionDelayMs = builder.maxReconnectionDelayMs;
405443
this.reconnectionDelayGrowFactor = builder.reconnectionDelayGrowFactor;
406444
this.maxRetries = builder.maxRetries;
407445
this.maxEnqueuedMessages = builder.maxEnqueuedMessages;
446+
this.connectionTimeoutMs = builder.connectionTimeoutMs;
408447
}
409448

410449
public static Builder builder() {
@@ -422,12 +461,15 @@ public static final class Builder {
422461

423462
private int maxEnqueuedMessages;
424463

464+
private long connectionTimeoutMs;
465+
425466
public Builder() {
426467
this.minReconnectionDelayMs = 1000;
427468
this.maxReconnectionDelayMs = 10000;
428469
this.reconnectionDelayGrowFactor = 1.3;
429470
this.maxRetries = 2147483647;
430471
this.maxEnqueuedMessages = 1000;
472+
this.connectionTimeoutMs = 4000;
431473
}
432474

433475
public Builder minReconnectionDelayMs(long minReconnectionDelayMs) {
@@ -455,6 +497,16 @@ public Builder maxEnqueuedMessages(int maxEnqueuedMessages) {
455497
return this;
456498
}
457499

500+
/**
501+
* Sets the per-attempt connection timeout in milliseconds. Defaults to {@code 4000}.
502+
* Each call to {@link ReconnectingWebSocketListener#connect()} will wait at most
503+
* this long for the underlying WebSocket factory to produce a connected socket.
504+
*/
505+
public Builder connectionTimeoutMs(long connectionTimeoutMs) {
506+
this.connectionTimeoutMs = connectionTimeoutMs;
507+
return this;
508+
}
509+
458510
/**
459511
* Builds the ReconnectOptions with validation.
460512
*
@@ -463,6 +515,7 @@ public Builder maxEnqueuedMessages(int maxEnqueuedMessages) {
463515
* - minReconnectionDelayMs <= maxReconnectionDelayMs
464516
* - reconnectionDelayGrowFactor >= 1.0
465517
* - maxRetries and maxEnqueuedMessages are non-negative
518+
* - connectionTimeoutMs is positive
466519
*
467520
* @return The validated ReconnectOptions instance
468521
* @throws IllegalArgumentException if configuration is invalid
@@ -487,6 +540,9 @@ public ReconnectOptions build() {
487540
if (maxEnqueuedMessages < 0) {
488541
throw new IllegalArgumentException("maxEnqueuedMessages must be non-negative");
489542
}
543+
if (connectionTimeoutMs <= 0) {
544+
throw new IllegalArgumentException("connectionTimeoutMs must be positive");
545+
}
490546
return new ReconnectOptions(this);
491547
}
492548
}

src/main/java/com/deepgram/core/transport/DeepgramTransportFactory.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.deepgram.core.transport;
22

3+
import com.deepgram.core.ReconnectingWebSocketListener;
34
import java.util.Map;
45

56
/**
@@ -19,7 +20,6 @@
1920
* <p>When a transport factory is set, all WebSocket clients (Listen, Speak, Agent) will use it
2021
* instead of the default OkHttp WebSocket connection.
2122
*/
22-
@FunctionalInterface
2323
public interface DeepgramTransportFactory {
2424

2525
/**
@@ -31,4 +31,19 @@ public interface DeepgramTransportFactory {
3131
* @return a connected or connecting transport instance
3232
*/
3333
DeepgramTransport create(String url, Map<String, String> headers);
34+
35+
/**
36+
* Reconnect policy the SDK should apply when wrapping connections produced by this factory.
37+
* Returning {@code null} (the default) leaves the SDK's {@link ReconnectingWebSocketListener}
38+
* defaults in place.
39+
*
40+
* <p>Plugins that own their own connection lifecycle and retry/backoff (e.g. SageMaker bidi
41+
* streaming) should return {@code ReconnectOptions.builder().maxRetries(0).build()} so the
42+
* SDK's wrapper-level reconnect doesn't compound their internal retries into a storm.
43+
*
44+
* @return reconnect options to apply, or {@code null} for SDK defaults
45+
*/
46+
default ReconnectingWebSocketListener.ReconnectOptions reconnectOptions() {
47+
return null;
48+
}
3449
}

src/main/java/com/deepgram/core/transport/TransportWebSocketFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.deepgram.core.transport;
22

3+
import com.deepgram.core.ReconnectingWebSocketListener;
34
import com.deepgram.core.WebSocketFactory;
45
import java.util.LinkedHashMap;
56
import java.util.Map;
@@ -31,6 +32,13 @@ public TransportWebSocketFactory(DeepgramTransportFactory transportFactory) {
3132

3233
@Override
3334
public WebSocket create(Request request, WebSocketListener listener) {
35+
// Apply the plugin-declared reconnect policy to the SDK's wrapping listener. Plugins that
36+
// own their own retry/backoff (SageMaker) return maxRetries(0) here so the wrapper-level
37+
// reconnect doesn't compound their internal retries into a storm.
38+
if (listener instanceof ReconnectingWebSocketListener) {
39+
((ReconnectingWebSocketListener) listener).applyOptionsOverride(transportFactory.reconnectOptions());
40+
}
41+
3442
String url = request.url().toString();
3543
// Restore wss:// scheme — OkHttp's HttpUrl normalizes to https://
3644
if (url.startsWith("https://")) {

0 commit comments

Comments
 (0)