2525 * Provides production-ready resilience for WebSocket connections.
2626 */
2727public abstract class ReconnectingWebSocketListener extends WebSocketListener {
28- // Option-derived fields are volatile (not final) so {@link #applyOptionsOverride} can rewire them
29- // after construction — used by {@code TransportWebSocketFactory} to honour
30- // {@code DeepgramTransportFactory.reconnectOptions()} without editing the generated WS clients.
31- private volatile long minReconnectionDelayMs ;
28+ private final long minReconnectionDelayMs ;
3229
33- private volatile long maxReconnectionDelayMs ;
30+ private final long maxReconnectionDelayMs ;
3431
35- private volatile double reconnectionDelayGrowFactor ;
32+ private final double reconnectionDelayGrowFactor ;
3633
37- private volatile int maxRetries ;
34+ private final int maxRetries ;
3835
3936 private final int maxEnqueuedMessages ;
4037
41- private volatile long connectionTimeoutMs ;
42-
4338 private final AtomicInteger retryCount = new AtomicInteger (0 );
4439
4540 private final AtomicBoolean connectLock = new AtomicBoolean (false );
@@ -71,44 +66,16 @@ public ReconnectingWebSocketListener(
7166 this .reconnectionDelayGrowFactor = options .reconnectionDelayGrowFactor ;
7267 this .maxRetries = options .maxRetries ;
7368 this .maxEnqueuedMessages = options .maxEnqueuedMessages ;
74- this .connectionTimeoutMs = options .connectionTimeoutMs ;
7569 this .connectionSupplier = connectionSupplier ;
7670 }
7771
78- /**
79- * Replaces the option-derived parameters on this listener at runtime. Used by
80- * {@code TransportWebSocketFactory} to apply {@code DeepgramTransportFactory.reconnectOptions()}
81- * without requiring edits to the generated per-resource WebSocket clients. {@code maxEnqueuedMessages}
82- * is intentionally not overridden — the message queue is sized at construction.
83- *
84- * <p>Thread-safety: option-derived fields are volatile; reads observe the latest write. The
85- * initial connect() call may have already started before the override lands, so for the very
86- * first attempt the original options apply; the override takes effect from the next attempt
87- * onwards. For the SageMaker storm-suppression case ({@code maxRetries(0)}) this is fine
88- * because the initial attempt's gate ({@code retryCount > maxRetries} with {@code retryCount=0})
89- * always passes regardless.
90- *
91- * @param options replacement options; {@code null} is a no-op.
92- */
93- public void applyOptionsOverride (ReconnectOptions options ) {
94- if (options == null ) {
95- return ;
96- }
97- this .minReconnectionDelayMs = options .minReconnectionDelayMs ;
98- this .maxReconnectionDelayMs = options .maxReconnectionDelayMs ;
99- this .reconnectionDelayGrowFactor = options .reconnectionDelayGrowFactor ;
100- this .maxRetries = options .maxRetries ;
101- this .connectionTimeoutMs = options .connectionTimeoutMs ;
102- }
103-
10472 /**
10573 * Initiates a WebSocket connection with automatic reconnection enabled.
10674 *
10775 * Connection behavior:
108- * - Times out after {@code ReconnectOptions.connectionTimeoutMs} (default 4000ms)
76+ * - Times out after 4000 milliseconds
10977 * - Thread-safe via atomic lock (returns immediately if connection in progress)
110- * - {@code maxRetries} counts retries only — the initial attempt always proceeds.
111- * {@code maxRetries(0)} means "connect once, don't retry" (not "refuse to connect").
78+ * - Retry count not incremented for initial connection attempt
11279 *
11380 * Error handling:
11481 * - TimeoutException: Includes retry attempt context
@@ -119,21 +86,18 @@ public void connect() {
11986 if (!connectLock .compareAndSet (false , true )) {
12087 return ;
12188 }
122- // retryCount is incremented inside scheduleReconnect() before re-entering connect(),
123- // so on the initial call retryCount == 0 and we always proceed. The cap applies to
124- // retries only — maxRetries(0) blocks retries but allows the initial attempt.
125- if (retryCount .get () > maxRetries ) {
89+ if (retryCount .get () >= maxRetries ) {
12690 connectLock .set (false );
12791 return ;
12892 }
12993 try {
13094 CompletableFuture <? extends WebSocket > connectionFuture = CompletableFuture .supplyAsync (connectionSupplier );
13195 try {
132- webSocket = connectionFuture .get (connectionTimeoutMs , MILLISECONDS );
96+ webSocket = connectionFuture .get (4000 , MILLISECONDS );
13397 } catch (TimeoutException e ) {
13498 connectionFuture .cancel (true );
13599 TimeoutException timeoutError =
136- new TimeoutException ("WebSocket connection timeout after " + connectionTimeoutMs + " milliseconds"
100+ new TimeoutException ("WebSocket connection timeout after " + 4000 + " milliseconds"
137101 + (retryCount .get () > 0
138102 ? " (retry attempt #" + retryCount .get ()
139103 : " (initial connection attempt)" ));
@@ -435,15 +399,12 @@ public static final class ReconnectOptions {
435399
436400 public final int maxEnqueuedMessages ;
437401
438- public final long connectionTimeoutMs ;
439-
440402 private ReconnectOptions (Builder builder ) {
441403 this .minReconnectionDelayMs = builder .minReconnectionDelayMs ;
442404 this .maxReconnectionDelayMs = builder .maxReconnectionDelayMs ;
443405 this .reconnectionDelayGrowFactor = builder .reconnectionDelayGrowFactor ;
444406 this .maxRetries = builder .maxRetries ;
445407 this .maxEnqueuedMessages = builder .maxEnqueuedMessages ;
446- this .connectionTimeoutMs = builder .connectionTimeoutMs ;
447408 }
448409
449410 public static Builder builder () {
@@ -461,15 +422,12 @@ public static final class Builder {
461422
462423 private int maxEnqueuedMessages ;
463424
464- private long connectionTimeoutMs ;
465-
466425 public Builder () {
467426 this .minReconnectionDelayMs = 1000 ;
468427 this .maxReconnectionDelayMs = 10000 ;
469428 this .reconnectionDelayGrowFactor = 1.3 ;
470429 this .maxRetries = 2147483647 ;
471430 this .maxEnqueuedMessages = 1000 ;
472- this .connectionTimeoutMs = 4000 ;
473431 }
474432
475433 public Builder minReconnectionDelayMs (long minReconnectionDelayMs ) {
@@ -497,16 +455,6 @@ public Builder maxEnqueuedMessages(int maxEnqueuedMessages) {
497455 return this ;
498456 }
499457
500- /**
501- * Sets the per-attempt connection timeout in milliseconds. Defaults to {@code 4000}.
502- * Each call to {@link ReconnectingWebSocketListener#connect()} will wait at most
503- * this long for the underlying WebSocket factory to produce a connected socket.
504- */
505- public Builder connectionTimeoutMs (long connectionTimeoutMs ) {
506- this .connectionTimeoutMs = connectionTimeoutMs ;
507- return this ;
508- }
509-
510458 /**
511459 * Builds the ReconnectOptions with validation.
512460 *
@@ -515,7 +463,6 @@ public Builder connectionTimeoutMs(long connectionTimeoutMs) {
515463 * - minReconnectionDelayMs <= maxReconnectionDelayMs
516464 * - reconnectionDelayGrowFactor >= 1.0
517465 * - maxRetries and maxEnqueuedMessages are non-negative
518- * - connectionTimeoutMs is positive
519466 *
520467 * @return The validated ReconnectOptions instance
521468 * @throws IllegalArgumentException if configuration is invalid
@@ -540,9 +487,6 @@ public ReconnectOptions build() {
540487 if (maxEnqueuedMessages < 0 ) {
541488 throw new IllegalArgumentException ("maxEnqueuedMessages must be non-negative" );
542489 }
543- if (connectionTimeoutMs <= 0 ) {
544- throw new IllegalArgumentException ("connectionTimeoutMs must be positive" );
545- }
546490 return new ReconnectOptions (this );
547491 }
548492 }
0 commit comments