Skip to content

Commit b8dd0aa

Browse files
authored
Fix SdkAsyncHttpClient resource leak in connector executors (#4716)
* fix: Close SdkAsyncHttpClient in connector executors to prevent resource leak SdkAsyncHttpClient instances were created lazily in AwsConnectorExecutor and HttpJsonConnectorExecutor but never closed. This leaked Netty event loops and connection pools, causing connection pool exhaustion in CI where 23+ test classes run in the same JVM. - Make RemoteConnectorExecutor extend AutoCloseable - Add close() to AwsConnectorExecutor and HttpJsonConnectorExecutor - Fix RemoteModel.close() to close executor before nulling reference - Close short-lived executors in ExecuteConnectorTransportAction, GetTaskTransportAction, CancelBatchJobTransportAction, and RemoteAgenticConversationMemory - Add client_config with generous timeouts to integration test connectors as additional safety margin Signed-off-by: Dhrubo Saha <dhrubo@amazon.com> * refactor: Move close() and httpClientRef to AbstractConnectorExecutor Per review feedback, consolidate the duplicated close() method and httpClientRef field from AwsConnectorExecutor and HttpJsonConnectorExecutor into AbstractConnectorExecutor to avoid duplication. Signed-off-by: Dhrubo Saha <dhrubo@amazon.com> --------- Signed-off-by: Dhrubo Saha <dhrubo@amazon.com>
1 parent a9f254d commit b8dd0aa

File tree

12 files changed

+86
-10
lines changed

12 files changed

+86
-10
lines changed

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/AbstractConnectorExecutor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.ml.engine.algorithms.remote;
77

88
import java.net.http.HttpRequest;
9+
import java.util.concurrent.atomic.AtomicReference;
910

1011
import org.apache.logging.log4j.Logger;
1112
import org.opensearch.action.support.ThreadedActionListener;
@@ -20,13 +21,16 @@
2021
import lombok.Getter;
2122
import lombok.Setter;
2223
import lombok.extern.log4j.Log4j2;
24+
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
2325

2426
@Log4j2
2527
@Setter
2628
@Getter
2729
public abstract class AbstractConnectorExecutor implements RemoteConnectorExecutor {
2830
private ConnectorClientConfig connectorClientConfig;
2931

32+
protected final AtomicReference<SdkAsyncHttpClient> httpClientRef = new AtomicReference<>();
33+
3034
public void initialize(Connector connector) {
3135
if (connector.getConnectorClientConfig() != null) {
3236
connectorClientConfig = connector.getConnectorClientConfig();
@@ -78,4 +82,16 @@ protected ThreadedActionListener<Tuple<Integer, ModelTensors>> createThreadedLis
7882
) {
7983
return new ThreadedActionListener<>(logger, getClient().threadPool(), "opensearch_ml_predict_remote", actionListener, false);
8084
}
85+
86+
/**
87+
* Closes the underlying HTTP client. Safe to call concurrently — NettyNioAsyncHttpClient.close()
88+
* gracefully drains in-flight requests before shutting down the event loop group.
89+
*/
90+
@Override
91+
public void close() {
92+
SdkAsyncHttpClient client = httpClientRef.getAndSet(null);
93+
if (client != null) {
94+
client.close();
95+
}
96+
}
8197
}

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/AwsConnectorExecutor.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.Locale;
2121
import java.util.Map;
2222
import java.util.concurrent.CompletableFuture;
23-
import java.util.concurrent.atomic.AtomicReference;
2423

2524
import org.apache.commons.text.StringEscapeUtils;
2625
import org.apache.logging.log4j.Logger;
@@ -77,15 +76,13 @@ public class AwsConnectorExecutor extends AbstractConnectorExecutor {
7776
@Getter
7877
private MLGuard mlGuard;
7978

80-
private final AtomicReference<SdkAsyncHttpClient> httpClientRef = new AtomicReference<>();
79+
@Setter
80+
private volatile boolean connectorPrivateIpEnabled;
8181

8282
@Setter
8383
@Getter
8484
private StreamTransportService streamTransportService;
8585

86-
@Setter
87-
private volatile boolean connectorPrivateIpEnabled;
88-
8986
public AwsConnectorExecutor(Connector connector) {
9087
super.initialize(connector);
9188
this.connector = (AwsConnector) connector;

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/HttpJsonConnectorExecutor.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.util.Locale;
2020
import java.util.Map;
2121
import java.util.concurrent.CompletableFuture;
22-
import java.util.concurrent.atomic.AtomicReference;
2322

2423
import org.apache.commons.text.StringEscapeUtils;
2524
import org.apache.logging.log4j.Logger;
@@ -79,8 +78,6 @@ public class HttpJsonConnectorExecutor extends AbstractConnectorExecutor {
7978
@Setter
8079
private volatile boolean connectorPrivateIpEnabled;
8180

82-
private final AtomicReference<SdkAsyncHttpClient> httpClientRef = new AtomicReference<>();
83-
8481
@Setter
8582
@Getter
8683
private StreamTransportService streamTransportService;

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/RemoteConnectorExecutor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@
6262

6363
import lombok.Builder;
6464

65-
public interface RemoteConnectorExecutor {
65+
public interface RemoteConnectorExecutor extends AutoCloseable {
66+
67+
@Override
68+
default void close() {}
6669

6770
public String RETRY_EXECUTOR = "opensearch_ml_predict_remote";
6871
String SKIP_SSL_VERIFICATION = "skip_ssl_verification";

ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/remote/RemoteModel.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,13 @@ public void asyncPredict(MLInput mlInput, ActionListener<MLTaskResponse> actionL
9595

9696
@Override
9797
public void close() {
98+
if (this.connectorExecutor != null) {
99+
try {
100+
this.connectorExecutor.close();
101+
} catch (Exception e) {
102+
log.error("Failed to close connector executor", e);
103+
}
104+
}
98105
this.connectorExecutor = null;
99106
}
100107

ml-algorithms/src/main/java/org/opensearch/ml/engine/memory/RemoteAgenticConversationMemory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,8 +1300,11 @@ private void createSessionInRemoteContainer(
13001300
}
13011301
} catch (Exception e) {
13021302
listener.onFailure(e);
1303+
} finally {
1304+
executor.close();
13031305
}
13041306
}, e -> {
1307+
executor.close();
13051308
log.error("Failed to create session via remote connector", e);
13061309
listener.onFailure(e);
13071310
}));

ml-algorithms/src/test/java/org/opensearch/ml/engine/algorithms/remote/AbstractConnectorExecutorTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,4 +188,24 @@ public void testGetMcpRequestHeaders_VerifyHeaderOrder() {
188188
assertEquals(MCP_HEADER_AWS_ACCESS_KEY_ID, headerNameCaptor.getAllValues().get(0));
189189
assertEquals(MCP_HEADER_OPENSEARCH_URL, headerNameCaptor.getAllValues().get(1));
190190
}
191+
192+
@Test
193+
public void testClose_closesHttpClient() {
194+
// Inject a mock SdkAsyncHttpClient into httpClientRef
195+
software.amazon.awssdk.http.async.SdkAsyncHttpClient mockHttpClient = mock(
196+
software.amazon.awssdk.http.async.SdkAsyncHttpClient.class
197+
);
198+
executor.httpClientRef.set(mockHttpClient);
199+
200+
executor.close();
201+
202+
verify(mockHttpClient).close();
203+
assertNull(executor.httpClientRef.get());
204+
}
205+
206+
@Test
207+
public void testClose_whenNoClient_doesNotThrow() {
208+
assertNull(executor.httpClientRef.get());
209+
executor.close(); // should be a no-op
210+
}
191211
}

plugin/src/main/java/org/opensearch/ml/action/connector/ExecuteConnectorTransportAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,11 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<MLTask
106106
connectorExecutor.setXContentRegistry(xContentRegistry);
107107
connectorExecutor
108108
.executeAction(finalConnectorAction, executeConnectorRequest.getMlInput(), ActionListener.wrap(taskResponse -> {
109+
connectorExecutor.close();
109110
connector.removeCredential();
110111
actionListener.onResponse(taskResponse);
111112
}, e -> {
113+
connectorExecutor.close();
112114
connector.removeCredential();
113115
actionListener.onFailure(e);
114116
}));

plugin/src/main/java/org/opensearch/ml/action/tasks/CancelBatchJobTransportAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,12 @@ private void executeConnector(Connector connector, MLInput mlInput, ActionListen
266266
connectorExecutor.setClient(client);
267267
connectorExecutor.setXContentRegistry(xContentRegistry);
268268
connectorExecutor.executeAction(CANCEL_BATCH_PREDICT.name(), mlInput, ActionListener.wrap(taskResponse -> {
269+
connectorExecutor.close();
269270
processTaskResponse(taskResponse, actionListener);
270-
}, actionListener::onFailure));
271+
}, e -> {
272+
connectorExecutor.close();
273+
actionListener.onFailure(e);
274+
}));
271275
}, e -> {
272276
log.error("Failed to decrypt credentials in connector", e);
273277
actionListener.onFailure(e);

plugin/src/main/java/org/opensearch/ml/action/tasks/GetTaskTransportAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,8 +474,10 @@ private void executeConnector(
474474
connectorExecutor.setClient(client);
475475
connectorExecutor.setXContentRegistry(xContentRegistry);
476476
connectorExecutor.executeAction(BATCH_PREDICT_STATUS.name(), mlInput, ActionListener.wrap(taskResponse -> {
477+
connectorExecutor.close();
477478
processTaskResponse(mlTask, taskId, isUserInitiatedGetTaskRequest, taskResponse, remoteJob, r, actionListener);
478479
}, e -> {
480+
connectorExecutor.close();
479481
// When the request to remote service fails, we will retry the request for next 10 minutes (10 runs).
480482
// If it fails even then, we mark it as unreachable in task index and send message to DLQ
481483
if (!isUserInitiatedGetTaskRequest) {

0 commit comments

Comments
 (0)