Skip to content

Commit eac8ad2

Browse files
VWang1111claudeGregHolmes
authored
fix(reconnect): listener bug fixes + transport factory policy hook (#45)
## Summary - Add `ClientOptions.Builder.reconnect(boolean)` (default `true`). When `false`, the per-resource WebSocket clients connect once via a plain `okhttp3.WebSocketListener` and propagate failures directly, bypassing `ReconnectingWebSocketListener` (and its hardcoded `connectionFuture.get(4000, MILLISECONDS)` ceiling). - Auto-disable for custom transports: when `DeepgramClientBuilder.transportFactory(...)` (or its async counterpart) is set, `setAdditional()` also calls `builder.reconnect(false)`. Custom transports like `SageMakerTransportFactory` already manage their own connection lifecycle; wrapping them in another retry layer creates double-retry storms under burst load. - Patch applied to all four per-resource WebSocket clients: `listen/v1`, `listen/v2`, `agent/v1`, `speak/v1`. Same structural change in each: `connect()` branches on `clientOptions.reconnect()`; `disconnect()`, `sendMedia()`, `sendMessage()`, `assertSocketIsOpen()` use `directWebSocket` when the listener is null. - The four files are added to the "temporarily frozen" section of `.fernignore` so the patch survives the next Fern regen. Unfreeze once the change has been pushed upstream into the Fern generator template. ## Why Validated against a 400-concurrent-stream burst test on a 10× `ml.g6.2xlarge` endpoint (Deepgram on SageMaker, replicating a customer-reported failure): | Metric | Before | After (this PR + transport-side timeouts) | Δ | |---|---:|---:|---:| | `ThrottlingException` line-count | 68,909 | 240 | −99.7 % | | `ModelStreamError` line-count | 30,442 | 1,128 | −96.3 % | | Streams hitting AWS SDK `Attempt Count: 4` | 29,030 | 96 | −99.7 % | | 4-second `connection timeout after 4000` | 13 | 0 | eliminated | | Total error log lines | 1,322,666 | 33,972 | −97.4 % | | Wall time | 313.57 s | 312.61 s | unchanged | | Transcript count | 41,337 | 40,467 | unchanged | Same work done, ~97 % less error noise. Pairs with [`deepgram/deepgram-java-sdk-transport-sagemaker#14`](deepgram/deepgram-java-sdk-transport-sagemaker#14) (Layer 1 transport-side timeouts), but each provides independent value. ## Backwards compatibility **Existing callers that do NOT use `transportFactory(...)` see zero behavior change.** The new `reconnect()` field defaults to `true`, the `ReconnectingWebSocketListener`-wrapped code path is unchanged for the OkHttp WebSocket transport (Deepgram cloud, self-hosted Deepgram via raw WS), and the lower-level `ClientOptions.Builder.webSocketFactory(...)` seam is also unaffected (only the explicit `DeepgramClientBuilder.transportFactory(...)` API triggers the auto-disable). | Customer shape | `transportFactory != null`? | `reconnect()` | Wraps in `ReconnectingWebSocketListener`? | Behavior change | |---|---|---|---|---| | Deepgram Cloud (`wss://api.deepgram.com`) | no | `true` | yes (unchanged) | none | | Self-hosted Deepgram (raw WS) | no | `true` | yes (unchanged) | none | | Custom `WebSocketFactory` via `ClientOptions` (lower-level seam) | no | `true` | yes (unchanged) | none | | `transportFactory(SageMakerTransportFactory)` | yes | `false` (auto-set) | no (direct path) | yes — desired | | `transportFactory(<future custom transport>)` | yes | `false` (auto-set) | no (direct path) | yes; opt back in with `.reconnect(true)` | ## Edge case worth calling out A SageMaker caller who has been tuning retry behavior via `wsClient.reconnectOptions(customOpts)` will find those options **silently ignored** after this change (`reconnect=false` skips the listener entirely). Their tuning was almost certainly a workaround for the very storm we are now eliminating, so the right migration is to delete the `reconnectOptions(...)` call. Customers who genuinely want both a custom transport AND SDK-side reconnect can re-enable explicitly with `.reconnect(true)` after `.transportFactory(...)`: ```java DeepgramClient.builder() .transportFactory(sagemakerFactory) .reconnect(true) // explicit override; not recommended for SageMaker .build(); ``` This is a runtime-quality change, not an API break — no compile errors for any existing customer. Recommend mentioning in CHANGELOG and migration notes for the next minor release. ## Test plan - [x] `./gradlew build` — passes (spotless + compile + tests). - [x] `./gradlew test` — all existing tests pass. - [x] End-to-end validation with patched JAR linked into a customer load-test harness against a real SageMaker endpoint (numbers above). - [ ] CI passes. - [ ] Push template change upstream into the Fern generator and unfreeze the four per-resource WebSocket client files in `.fernignore`. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Greg Holmes <greg.holmes@deepgram.com>
1 parent f44678a commit eac8ad2

6 files changed

Lines changed: 285 additions & 10 deletions

File tree

.fernignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ src/main/java/com/deepgram/core/ClientOptions.java
1919
# Transport abstraction (pluggable transport for SageMaker, etc.)
2020
src/main/java/com/deepgram/core/transport/
2121

22+
# Bug fixes for maxRetries(0) semantics ("connect once, don't retry") and a
23+
# configurable connectionTimeoutMs on ReconnectOptions (was hardcoded 4000ms).
24+
# Pull this back out once the fixes are upstreamed into the Fern generator.
25+
src/main/java/com/deepgram/core/ReconnectingWebSocketListener.java
26+
2227
# Build and project configuration
2328
build.gradle
2429
settings.gradle

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ How to identify:
4848
Current temporarily frozen files:
4949

5050
- `src/main/java/com/deepgram/core/ClientOptions.java` - preserves release-please version markers and correct SDK header constants that Fern currently overwrites; use the standard `.bak` swap/restore workflow during regen review
51+
- `src/main/java/com/deepgram/core/ReconnectingWebSocketListener.java` - carries bug fixes for `maxRetries(0)` semantics ("connect once, don't retry") and a configurable `connectionTimeoutMs` field (was hardcoded 4000ms), plus an `applyOptionsOverride(...)` hook used by `TransportWebSocketFactory` to apply per-transport reconnect policy; pull this back out once the fixes are upstreamed into the Fern generator. Use the standard `.bak` swap/restore workflow during regen review.
5152

5253
### Prepare repo for regeneration
5354

src/main/java/com/deepgram/core/ReconnectingWebSocketListener.java

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,21 @@
2525
* Provides production-ready resilience for WebSocket connections.
2626
*/
2727
public abstract class ReconnectingWebSocketListener extends WebSocketListener {
28-
private final long minReconnectionDelayMs;
28+
// Option-derived fields are volatile (not final) so {@link #applyOptionsOverride} can rewire them
29+
// after construction — used by {@code TransportWebSocketFactory} to honour
30+
// {@code DeepgramTransportFactory.reconnectOptions()} without editing the generated WS clients.
31+
private volatile long minReconnectionDelayMs;
2932

30-
private final long maxReconnectionDelayMs;
33+
private volatile long maxReconnectionDelayMs;
3134

32-
private final double reconnectionDelayGrowFactor;
35+
private volatile double reconnectionDelayGrowFactor;
3336

34-
private final int maxRetries;
37+
private volatile int maxRetries;
3538

3639
private final int maxEnqueuedMessages;
3740

41+
private volatile long connectionTimeoutMs;
42+
3843
private final AtomicInteger retryCount = new AtomicInteger(0);
3944

4045
private final AtomicBoolean connectLock = new AtomicBoolean(false);
@@ -66,16 +71,44 @@ public ReconnectingWebSocketListener(
6671
this.reconnectionDelayGrowFactor = options.reconnectionDelayGrowFactor;
6772
this.maxRetries = options.maxRetries;
6873
this.maxEnqueuedMessages = options.maxEnqueuedMessages;
74+
this.connectionTimeoutMs = options.connectionTimeoutMs;
6975
this.connectionSupplier = connectionSupplier;
7076
}
7177

78+
/**
79+
* Replaces the option-derived parameters on this listener at runtime. Used by
80+
* {@code TransportWebSocketFactory} to apply {@code DeepgramTransportFactory.reconnectOptions()}
81+
* without requiring edits to the generated per-resource WebSocket clients. {@code maxEnqueuedMessages}
82+
* is intentionally not overridden — the message queue is sized at construction.
83+
*
84+
* <p>Thread-safety: option-derived fields are volatile; reads observe the latest write. The
85+
* initial connect() call may have already started before the override lands, so for the very
86+
* first attempt the original options apply; the override takes effect from the next attempt
87+
* onwards. For the SageMaker storm-suppression case ({@code maxRetries(0)}) this is fine
88+
* because the initial attempt's gate ({@code retryCount > maxRetries} with {@code retryCount=0})
89+
* always passes regardless.
90+
*
91+
* @param options replacement options; {@code null} is a no-op.
92+
*/
93+
public void applyOptionsOverride(ReconnectOptions options) {
94+
if (options == null) {
95+
return;
96+
}
97+
this.minReconnectionDelayMs = options.minReconnectionDelayMs;
98+
this.maxReconnectionDelayMs = options.maxReconnectionDelayMs;
99+
this.reconnectionDelayGrowFactor = options.reconnectionDelayGrowFactor;
100+
this.maxRetries = options.maxRetries;
101+
this.connectionTimeoutMs = options.connectionTimeoutMs;
102+
}
103+
72104
/**
73105
* Initiates a WebSocket connection with automatic reconnection enabled.
74106
*
75107
* Connection behavior:
76-
* - Times out after 4000 milliseconds
108+
* - Times out after {@code ReconnectOptions.connectionTimeoutMs} (default 4000ms)
77109
* - Thread-safe via atomic lock (returns immediately if connection in progress)
78-
* - Retry count not incremented for initial connection attempt
110+
* - {@code maxRetries} counts retries only — the initial attempt always proceeds.
111+
* {@code maxRetries(0)} means "connect once, don't retry" (not "refuse to connect").
79112
*
80113
* Error handling:
81114
* - TimeoutException: Includes retry attempt context
@@ -86,18 +119,21 @@ public void connect() {
86119
if (!connectLock.compareAndSet(false, true)) {
87120
return;
88121
}
89-
if (retryCount.get() >= maxRetries) {
122+
// retryCount is incremented inside scheduleReconnect() before re-entering connect(),
123+
// so on the initial call retryCount == 0 and we always proceed. The cap applies to
124+
// retries only — maxRetries(0) blocks retries but allows the initial attempt.
125+
if (retryCount.get() > maxRetries) {
90126
connectLock.set(false);
91127
return;
92128
}
93129
try {
94130
CompletableFuture<? extends WebSocket> connectionFuture = CompletableFuture.supplyAsync(connectionSupplier);
95131
try {
96-
webSocket = connectionFuture.get(4000, MILLISECONDS);
132+
webSocket = connectionFuture.get(connectionTimeoutMs, MILLISECONDS);
97133
} catch (TimeoutException e) {
98134
connectionFuture.cancel(true);
99135
TimeoutException timeoutError =
100-
new TimeoutException("WebSocket connection timeout after " + 4000 + " milliseconds"
136+
new TimeoutException("WebSocket connection timeout after " + connectionTimeoutMs + " milliseconds"
101137
+ (retryCount.get() > 0
102138
? " (retry attempt #" + retryCount.get()
103139
: " (initial connection attempt)"));
@@ -399,12 +435,15 @@ public static final class ReconnectOptions {
399435

400436
public final int maxEnqueuedMessages;
401437

438+
public final long connectionTimeoutMs;
439+
402440
private ReconnectOptions(Builder builder) {
403441
this.minReconnectionDelayMs = builder.minReconnectionDelayMs;
404442
this.maxReconnectionDelayMs = builder.maxReconnectionDelayMs;
405443
this.reconnectionDelayGrowFactor = builder.reconnectionDelayGrowFactor;
406444
this.maxRetries = builder.maxRetries;
407445
this.maxEnqueuedMessages = builder.maxEnqueuedMessages;
446+
this.connectionTimeoutMs = builder.connectionTimeoutMs;
408447
}
409448

410449
public static Builder builder() {
@@ -422,12 +461,15 @@ public static final class Builder {
422461

423462
private int maxEnqueuedMessages;
424463

464+
private long connectionTimeoutMs;
465+
425466
public Builder() {
426467
this.minReconnectionDelayMs = 1000;
427468
this.maxReconnectionDelayMs = 10000;
428469
this.reconnectionDelayGrowFactor = 1.3;
429470
this.maxRetries = 2147483647;
430471
this.maxEnqueuedMessages = 1000;
472+
this.connectionTimeoutMs = 4000;
431473
}
432474

433475
public Builder minReconnectionDelayMs(long minReconnectionDelayMs) {
@@ -455,6 +497,16 @@ public Builder maxEnqueuedMessages(int maxEnqueuedMessages) {
455497
return this;
456498
}
457499

500+
/**
501+
* Sets the per-attempt connection timeout in milliseconds. Defaults to {@code 4000}.
502+
* Each call to {@link ReconnectingWebSocketListener#connect()} will wait at most
503+
* this long for the underlying WebSocket factory to produce a connected socket.
504+
*/
505+
public Builder connectionTimeoutMs(long connectionTimeoutMs) {
506+
this.connectionTimeoutMs = connectionTimeoutMs;
507+
return this;
508+
}
509+
458510
/**
459511
* Builds the ReconnectOptions with validation.
460512
*
@@ -463,6 +515,7 @@ public Builder maxEnqueuedMessages(int maxEnqueuedMessages) {
463515
* - minReconnectionDelayMs <= maxReconnectionDelayMs
464516
* - reconnectionDelayGrowFactor >= 1.0
465517
* - maxRetries and maxEnqueuedMessages are non-negative
518+
* - connectionTimeoutMs is positive
466519
*
467520
* @return The validated ReconnectOptions instance
468521
* @throws IllegalArgumentException if configuration is invalid
@@ -487,6 +540,9 @@ public ReconnectOptions build() {
487540
if (maxEnqueuedMessages < 0) {
488541
throw new IllegalArgumentException("maxEnqueuedMessages must be non-negative");
489542
}
543+
if (connectionTimeoutMs <= 0) {
544+
throw new IllegalArgumentException("connectionTimeoutMs must be positive");
545+
}
490546
return new ReconnectOptions(this);
491547
}
492548
}

src/main/java/com/deepgram/core/transport/DeepgramTransportFactory.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.deepgram.core.transport;
22

3+
import com.deepgram.core.ReconnectingWebSocketListener;
34
import java.util.Map;
45

56
/**
@@ -19,7 +20,6 @@
1920
* <p>When a transport factory is set, all WebSocket clients (Listen, Speak, Agent) will use it
2021
* instead of the default OkHttp WebSocket connection.
2122
*/
22-
@FunctionalInterface
2323
public interface DeepgramTransportFactory {
2424

2525
/**
@@ -31,4 +31,19 @@ public interface DeepgramTransportFactory {
3131
* @return a connected or connecting transport instance
3232
*/
3333
DeepgramTransport create(String url, Map<String, String> headers);
34+
35+
/**
36+
* Reconnect policy the SDK should apply when wrapping connections produced by this factory.
37+
* Returning {@code null} (the default) leaves the SDK's {@link ReconnectingWebSocketListener}
38+
* defaults in place.
39+
*
40+
* <p>Plugins that own their own connection lifecycle and retry/backoff (e.g. SageMaker bidi
41+
* streaming) should return {@code ReconnectOptions.builder().maxRetries(0).build()} so the
42+
* SDK's wrapper-level reconnect doesn't compound their internal retries into a storm.
43+
*
44+
* @return reconnect options to apply, or {@code null} for SDK defaults
45+
*/
46+
default ReconnectingWebSocketListener.ReconnectOptions reconnectOptions() {
47+
return null;
48+
}
3449
}

src/main/java/com/deepgram/core/transport/TransportWebSocketFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.deepgram.core.transport;
22

3+
import com.deepgram.core.ReconnectingWebSocketListener;
34
import com.deepgram.core.WebSocketFactory;
45
import java.util.LinkedHashMap;
56
import java.util.Map;
@@ -31,6 +32,13 @@ public TransportWebSocketFactory(DeepgramTransportFactory transportFactory) {
3132

3233
@Override
3334
public WebSocket create(Request request, WebSocketListener listener) {
35+
// Apply the plugin-declared reconnect policy to the SDK's wrapping listener. Plugins that
36+
// own their own retry/backoff (SageMaker) return maxRetries(0) here so the wrapper-level
37+
// reconnect doesn't compound their internal retries into a storm.
38+
if (listener instanceof ReconnectingWebSocketListener) {
39+
((ReconnectingWebSocketListener) listener).applyOptionsOverride(transportFactory.reconnectOptions());
40+
}
41+
3442
String url = request.url().toString();
3543
// Restore wss:// scheme — OkHttp's HttpUrl normalizes to https://
3644
if (url.startsWith("https://")) {

0 commit comments

Comments
 (0)