Skip to content

Commit 78b6338

Browse files
VWang1111claudeGregHolmes
authored
feat: configurable timeouts and concurrency with lenient defaults for high-burst workloads (#14)
## Summary - Expose `connectionTimeout`, `connectionAcquireTimeout`, `subscriptionTimeout`, and `maxConcurrency` on `SageMakerConfig.Builder`. Each is validated `> 0` / non-null at build time. Public `DEFAULT_*` constants on `SageMakerConfig` document the chosen defaults. - Wire them through `SageMakerTransportFactory` (Netty `connectionTimeout` / `connectionAcquisitionTimeout` / `maxConcurrency`) and `SageMakerTransport` (input-publisher subscription wait at `ensureConnected()`). - Move defaults to values tuned for high-burst workloads instead of inheriting the AWS Netty defaults, which fail-fast in a way that's actively wrong for SageMaker bidi streaming under bursts. | Knob | Old default | New default | Why | |---|---|---|---| | `connectionTimeout` | AWS Netty default = 2 s | **30 s** | Cold-start endpoints under a 200–500-stream burst can't accept TLS handshakes in 2 s | | `connectionAcquireTimeout` | AWS Netty default = 10 s | **60 s** | A 400-stream burst drains the acquire pool past 10 s | | `subscriptionTimeout` | (was hardcoded 30 s) | **60 s** | Slow first-stream subscribe under burst | | `maxConcurrency` | (was hardcoded 500) | **500** (now tunable) | No change — just made it tunable | ## Why this matters Empirically validated against a 400-concurrent-stream burst test on a 10× `ml.g6.2xlarge` endpoint, replicating a customer report: | Error category | Before (Maven Central 0.1.2 + AWS defaults) | After (this PR) | |---|---:|---:| | `Connection pool exhausted (Acquire took longer)` | 3,230 | ~0 | | `TCP connect timed out` | 706 | ~0 | | `TCP Connection reset` | 3,528 | sharply down | Out of the box, callers no longer have to know that AWS's general-purpose Netty defaults bite hard on the SageMaker streaming path; the transport ships sane-for-its-use-case defaults. Anyone who wants fail-fast behavior can tighten the knobs explicitly: ```java SageMakerConfig config = SageMakerConfig.builder() .endpointName("my-deepgram-endpoint") .region("us-east-2") .connectionTimeout(Duration.ofSeconds(5)) .connectionAcquireTimeout(Duration.ofSeconds(15)) .build(); ``` ## Scope and what this does NOT fix This PR addresses the transport layer only. The SDK-level retry storm in `com.deepgram:deepgram-java-sdk` (the hardcoded `connectionFuture.get(4000, MILLISECONDS)` in `ReconnectingWebSocketListener.connect()` plus the listener's default `maxRetries = Integer.MAX_VALUE`) is not touched here and was empirically observed to remain even with this PR's transport fixes — the bottleneck simply moves inward to the SDK ceiling. The full burst-handling fix requires pairing this PR with the corresponding change in `deepgram-java-sdk` that auto-disables `ReconnectingWebSocketListener` when a custom transport is configured (`ClientOptions.reconnect(false)` + auto-set in `DeepgramClientBuilder.transportFactory(...)`). With both layers in place, end-to-end error counts drop ~97 % at 400 concurrent streams. ## Backwards compatibility Adds new builder methods only; no existing API is removed or modified. Existing callers using `SageMakerConfig.builder().endpointName(...).region(...).build()` get the new defaults automatically — same calling convention, more lenient runtime behavior. No code changes required for existing applications. ## Test plan - [x] `./gradlew :sagemaker-transport:test` — all tests pass (18/18; 4 new tests for defaults, custom values, and validation of non-positive arguments). - [x] Local build of the patched JAR consumed by a load-test harness against a real SageMaker endpoint. - [x] Validation run vs. baseline measured a `Connection pool exhausted` / `TCP connect timed out` reduction to ~0. - [ ] CI passes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Greg Holmes <greg.holmes@deepgram.com>
1 parent a25bf11 commit 78b6338

10 files changed

Lines changed: 1792 additions & 86 deletions

File tree

README.md

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ SageMaker transport for the [Deepgram Java SDK](https://github.com/deepgram/deep
1111

1212
```groovy
1313
dependencies {
14-
implementation 'com.deepgram:deepgram-java-sdk:0.2.1'
14+
implementation 'com.deepgram:deepgram-java-sdk:0.4.0'
1515
implementation 'com.deepgram:deepgram-sagemaker:0.1.2' // x-release-please-version
1616
}
1717
```
@@ -29,7 +29,7 @@ dependencies {
2929
## Requirements
3030

3131
- Java 11+
32-
- [Deepgram Java SDK](https://github.com/deepgram/deepgram-java-sdk) v0.2.1+
32+
- [Deepgram Java SDK](https://github.com/deepgram/deepgram-java-sdk) v0.4.0+ (the `default ReconnectOptions reconnectOptions()` hook on `DeepgramTransportFactory` is required for storm absorption)
3333
- AWS credentials configured (environment variables, shared credentials file, or IAM role)
3434
- A Deepgram model deployed to an AWS SageMaker endpoint
3535

@@ -95,6 +95,15 @@ The transport is transparent — the SDK API is identical whether using Deepgram
9595
|-----------|----------|---------|-------------|
9696
| `endpointName` | Yes || SageMaker endpoint name |
9797
| `region` | No | `us-west-2` | AWS region |
98+
| `connectionTimeout` | No | `30s` | Max time for the underlying TCP/TLS connect (AWS Netty default is 2&nbsp;s — bumped here so cold-start endpoints under burst load have time to accept TLS handshakes). |
99+
| `connectionAcquireTimeout` | No | `60s` | Max time to acquire a connection from the Netty pool (AWS Netty default is 10&nbsp;s — bumped so a 200&ndash;500-stream burst doesn't drain the acquire pool). |
100+
| `subscriptionTimeout` | No | `60s` | Max time the transport waits for the AWS SDK to subscribe to the bidi-stream input publisher before failing. A timeout here is treated as a transient connect failure and counts against `maxRetries` / `retryBudget`. |
101+
| `maxConcurrency` | No | `500` | Max simultaneous in-flight HTTP/2 streams across the shared Netty pool. With `maxStreams=1` this is the cap on simultaneous bidirectional streams. |
102+
| `maxRetries` | No | `5` | Max retries on transient AWS errors (throttling, pool-exhausted, transient connect/timeout). Set to `0` to disable internal retry. Terminal errors (auth, validation) bypass this. |
103+
| `initialBackoff` | No | `100ms` | First backoff delay applied after the initial failure. |
104+
| `maxBackoff` | No | `5s` | Cap on the per-attempt backoff delay regardless of multiplier. |
105+
| `backoffMultiplier` | No | `2.0` | Exponential growth factor between retry attempts. Must be `>= 1.0`. |
106+
| `retryBudget` | No | `30s` | Total wall-clock cap across all retry attempts before giving up and surfacing the error to listeners. |
98107

99108
```java
100109
SageMakerConfig config = SageMakerConfig.builder()
@@ -103,6 +112,88 @@ SageMakerConfig config = SageMakerConfig.builder()
103112
.build();
104113
```
105114

115+
#### High-concurrency notes
116+
117+
The transport's defaults are tuned for high-burst workloads (large numbers of
118+
streams opened in a tight loop against an endpoint that may need to scale up).
119+
If you're opening 200&ndash;500 streams simultaneously against a cold endpoint,
120+
the AWS Netty defaults (2&nbsp;s connect / 10&nbsp;s acquire) will fire before
121+
the load balancer has accepted all of the inbound TLS handshakes &mdash; you'll
122+
see a wave of `connection acquire` and `connect timed out` errors that look
123+
like server-side problems but are really client-side fail-fast tripping early.
124+
125+
This transport ships with more lenient defaults (30&nbsp;s / 60&nbsp;s) so the
126+
common high-concurrency path works out of the box. Tighten them if you need
127+
fail-fast behavior in low-latency pipelines:
128+
129+
```java
130+
SageMakerConfig config = SageMakerConfig.builder()
131+
.endpointName("my-deepgram-endpoint")
132+
.region("us-east-2")
133+
.connectionTimeout(Duration.ofSeconds(5))
134+
.connectionAcquireTimeout(Duration.ofSeconds(15))
135+
.build();
136+
```
137+
138+
#### Retry & storm absorption
139+
140+
Transient AWS-side failures (`ThrottlingException`, connection-pool exhaustion, transient
141+
connect/timeout failures) are absorbed by the transport itself: classified as retryable, retried
142+
with exponential backoff up to `maxRetries` and `retryBudget`, with messages enqueued during the
143+
reset window persisted across the reconnect so audio isn't dropped. Only **terminal** errors (auth,
144+
validation) and budget-exhausted retryable errors propagate to `transport.onError(...)` and reach
145+
the application's error handler.
146+
147+
This means the SDK's wrapper-level reconnect (`ReconnectingWebSocketListener`) would compound the
148+
plugin's internal retries into a Throttling-on-Throttling storm under burst load, so the plugin
149+
declares `ReconnectOptions.builder().maxRetries(0).build()` via the
150+
`DeepgramTransportFactory.reconnectOptions()` hook. The SDK applies it automatically when it sees
151+
a `transportFactory` in use; no user wiring required.
152+
153+
To tune retry behavior:
154+
155+
```java
156+
SageMakerConfig config = SageMakerConfig.builder()
157+
.endpointName("my-deepgram-endpoint")
158+
.maxRetries(10)
159+
.initialBackoff(Duration.ofMillis(200))
160+
.maxBackoff(Duration.ofSeconds(10))
161+
.retryBudget(Duration.ofMinutes(1))
162+
.build();
163+
```
164+
165+
Set `maxRetries(0)` to disable internal retry entirely (every transient AWS error then surfaces
166+
immediately to the application).
167+
168+
#### Connection-pool sharing
169+
170+
The default `new SageMakerTransportFactory(config)` constructor backs every factory instance with
171+
a **process-wide shared** `SageMakerRuntimeHttp2AsyncClient`, keyed by the parts of
172+
`SageMakerConfig` that affect the underlying Netty HTTP/2 client (region, max concurrency,
173+
connect/acquire timeouts). Multiple factories built with the same config fingerprint reuse one
174+
Netty event loop group and one connection pool — so naive code that constructs a fresh factory
175+
per stream still gets a single, well-behaved client underneath.
176+
177+
Without sharing, every factory instantiates its own Netty pool, and a burst of N factories
178+
triggers N simultaneous TLS handshakes from N distinct Netty clients against the same SageMaker
179+
endpoint. Under high concurrency (100+ streams) the SageMaker HTTP/2 frontline silently drops a
180+
large fraction of those streams before they ever reach the model container — verified
181+
end-to-end with CloudWatch logs from a 400-stream burst test against a 1× ml.g6.2xlarge endpoint:
182+
without sharing, ~65% of streams never appeared in the Deepgram container's listen log; with
183+
sharing, the burst behaves the same as the canonical Python load-test harness.
184+
185+
Lifecycle:
186+
187+
| Constructor | Client backing | `factory.shutdown()` |
188+
|---|---|---|
189+
| `SageMakerTransportFactory(config)` | shared (lazy-init, keyed by config fingerprint) | no-op — call `SageMakerTransportFactory.shutdownAllSharedClients()` once at app shutdown to release Netty resources |
190+
| `SageMakerTransportFactory(config, smClient)` | caller-provided (BYO, used for testing or custom credential providers) | no-op — caller owns the client lifecycle |
191+
192+
```java
193+
// At app shutdown — releases all shared Netty pools the plugin lazily created.
194+
Runtime.getRuntime().addShutdownHook(new Thread(SageMakerTransportFactory::shutdownAllSharedClients));
195+
```
196+
106197
### Custom AWS Client
107198

108199
For custom credential providers, proxy configuration, or testing:

examples/src/main/java/com/deepgram/examples/FluxSageMakerExample.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.deepgram.resources.listen.v2.websocket.V2WebSocketClient;
99
import com.deepgram.sagemaker.SageMakerConfig;
1010
import com.deepgram.sagemaker.SageMakerTransportFactory;
11+
import com.deepgram.types.ListenV2Model;
1112

1213
import java.io.RandomAccessFile;
1314
import java.nio.ByteBuffer;
@@ -85,10 +86,10 @@ public static void main(String[] args) throws Exception {
8586
done.countDown();
8687
});
8788

88-
// Connect — V2 uses model name as string via additionalProperty
89+
// Connect using the typed Flux model constant from the SDK.
8990
CompletableFuture<Void> connectFuture = wsClient.connect(
9091
V2ConnectOptions.builder()
91-
.model("flux-general-en")
92+
.model(ListenV2Model.FLUX_GENERAL_EN)
9293
.build());
9394
connectFuture.get(30, TimeUnit.SECONDS);
9495
System.out.println("Connected. Streaming audio...\n");

examples/src/main/java/com/deepgram/examples/LiveMicFluxSageMakerExample.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.deepgram.sagemaker.SageMakerConfig;
1010
import com.deepgram.sagemaker.SageMakerTransportFactory;
1111
import com.deepgram.types.ListenV2Encoding;
12+
import com.deepgram.types.ListenV2Model;
1213
import com.deepgram.types.ListenV2SampleRate;
1314

1415
import javax.sound.sampled.AudioFormat;
@@ -100,7 +101,7 @@ public static void main(String[] args) throws Exception {
100101

101102
CompletableFuture<Void> connectFuture = wsClient.connect(
102103
V2ConnectOptions.builder()
103-
.model("flux-general-en")
104+
.model(ListenV2Model.FLUX_GENERAL_EN)
104105
.encoding(ListenV2Encoding.LINEAR16)
105106
.sampleRate(ListenV2SampleRate.of(16000))
106107
.build());

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
<dependency>
7070
<groupId>com.deepgram</groupId>
7171
<artifactId>deepgram-java-sdk</artifactId>
72-
<version>0.2.1</version>
72+
<version>0.4.0</version>
7373
</dependency>
7474
<dependency>
7575
<groupId>software.amazon.awssdk</groupId>

sagemaker-transport/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
dependencies {
22
// Deepgram Java SDK — provides DeepgramTransport / DeepgramTransportFactory interfaces
3-
api 'com.deepgram:deepgram-java-sdk:0.2.1'
3+
api 'com.deepgram:deepgram-java-sdk:0.4.0'
44

55
// AWS SDK v2 — SageMaker Runtime HTTP/2 bidirectional streaming
66
api platform('software.amazon.awssdk:bom:2.42.0')

0 commit comments

Comments
 (0)