Skip to content

Commit 89eb9a2

Browse files
committed
fixup! Add experimental SOCKS5 support for S3
Isolate the proxy installation and related reflection operations
1 parent 87b89b1 commit 89eb9a2

File tree

3 files changed

+149
-117
lines changed

3 files changed

+149
-117
lines changed

storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/SdkHttpClientBuilder.java

Lines changed: 3 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,13 @@
1616

1717
package io.aiven.kafka.tieredstorage.storage.s3;
1818

19-
import javax.net.ssl.HostnameVerifier;
20-
import javax.net.ssl.SSLContext;
21-
22-
import java.io.IOException;
23-
import java.net.InetSocketAddress;
24-
import java.net.Proxy;
25-
import java.net.Socket;
26-
2719
import io.aiven.kafka.tieredstorage.storage.proxy.ProxyConfig;
28-
import io.aiven.kafka.tieredstorage.storage.proxy.Socks5ProxyAuthenticator;
20+
import io.aiven.kafka.tieredstorage.storage.s3.proxy.ProxyInstaller;
2921

30-
import org.apache.http.config.Registry;
31-
import org.apache.http.config.RegistryBuilder;
32-
import org.apache.http.conn.socket.ConnectionSocketFactory;
33-
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
34-
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
35-
import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
36-
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
37-
import org.apache.http.protocol.HttpContext;
3822
import software.amazon.awssdk.core.internal.http.loader.DefaultSdkHttpClientBuilder;
3923
import software.amazon.awssdk.http.SdkHttpClient;
4024
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
4125
import software.amazon.awssdk.http.apache.ApacheHttpClient;
42-
import software.amazon.awssdk.http.apache.internal.conn.SdkTlsSocketFactory;
43-
import software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient;
4426
import software.amazon.awssdk.utils.AttributeMap;
4527

4628
class SdkHttpClientBuilder implements SdkHttpClient.Builder<SdkHttpClientBuilder> {
@@ -65,109 +47,14 @@ public SdkHttpClient buildWithDefaults(final AttributeMap serviceDefaults) {
6547
final ApacheHttpClient client =
6648
(ApacheHttpClient) defaultSdkHttpClientBuilder.buildWithDefaults(actualServiceDefaults);
6749

68-
if (this.proxyConfig != null) {
50+
if (proxyConfig != null) {
6951
try {
70-
installProxy(client);
52+
ProxyInstaller.install(client, proxyConfig);
7153
} catch (final ReflectiveOperationException e) {
7254
throw new RuntimeException(e);
7355
}
7456
}
7557

7658
return client;
7759
}
78-
79-
private void installProxy(final ApacheHttpClient client) throws ReflectiveOperationException {
80-
final var defaultHttpClientConnectionOperator = extractClientConnectionOperator(client);
81-
82-
final InetSocketAddress proxyAddr = new InetSocketAddress(proxyConfig.host(), proxyConfig.port());
83-
final Proxy proxy = new Proxy(Proxy.Type.SOCKS, proxyAddr);
84-
85-
final var socketFactoryRegistryField = PrivateField.of(
86-
DefaultHttpClientConnectionOperator.class, defaultHttpClientConnectionOperator,
87-
Registry.class, "socketFactoryRegistry");
88-
89-
@SuppressWarnings("unchecked")
90-
final var originalConnectionSocketFactoryRegistry =
91-
(Registry<ConnectionSocketFactory>) socketFactoryRegistryField.getValue();
92-
93-
final var proxiedConnectionSocketFactoryRegistry =
94-
createProxiedConnectionSocketFactoryRegistry(originalConnectionSocketFactoryRegistry, proxy);
95-
socketFactoryRegistryField.setValue(proxiedConnectionSocketFactoryRegistry);
96-
97-
if (proxyConfig.username() != null) {
98-
Socks5ProxyAuthenticator.register(
99-
proxyConfig.host(), proxyConfig.port(), proxyConfig.username(), proxyConfig.password());
100-
}
101-
}
102-
103-
private static DefaultHttpClientConnectionOperator extractClientConnectionOperator(
104-
final ApacheHttpClient apacheHttpClient
105-
) throws ReflectiveOperationException {
106-
final var apacheSdkHttpClient = PrivateField.of(
107-
ApacheHttpClient.class, apacheHttpClient, ApacheSdkHttpClient.class, "httpClient").getValue();
108-
final var poolingHttpClientConnectionManager = PrivateField.of(
109-
ApacheSdkHttpClient.class, apacheSdkHttpClient, PoolingHttpClientConnectionManager.class, "cm")
110-
.getValue();
111-
return PrivateField.of(
112-
PoolingHttpClientConnectionManager.class, poolingHttpClientConnectionManager,
113-
DefaultHttpClientConnectionOperator.class, "connectionOperator")
114-
.getValue();
115-
}
116-
117-
private Registry<ConnectionSocketFactory> createProxiedConnectionSocketFactoryRegistry(
118-
final Registry<ConnectionSocketFactory> originalConnectionSocketFactoryRegistry,
119-
final Proxy proxy
120-
) throws ReflectiveOperationException {
121-
if (originalConnectionSocketFactoryRegistry.lookup("http") == null) {
122-
throw new RuntimeException("Connection factory for HTTP doesn't exist");
123-
}
124-
125-
final SdkTlsSocketFactory httpsConnectionSocketFactory =
126-
(SdkTlsSocketFactory) originalConnectionSocketFactoryRegistry.lookup("https");
127-
if (httpsConnectionSocketFactory == null) {
128-
throw new RuntimeException("Connection factory for HTTPS doesn't exist");
129-
}
130-
131-
final var sslContext = PrivateField.of(
132-
SdkTlsSocketFactory.class, httpsConnectionSocketFactory, SSLContext.class, "sslContext").getValue();
133-
final var hostnameVerifier = PrivateField.of(
134-
SSLConnectionSocketFactory.class, httpsConnectionSocketFactory,
135-
HostnameVerifier.class, "hostnameVerifier")
136-
.getValue();
137-
return RegistryBuilder.<ConnectionSocketFactory>create()
138-
.register("http", new ProxiedPlainConnectionSocketFactory(proxy))
139-
.register("https", new ProxiedSdkTlsSocketFactory(sslContext, hostnameVerifier, proxy))
140-
.build();
141-
}
142-
143-
private static class ProxiedPlainConnectionSocketFactory extends PlainConnectionSocketFactory {
144-
private final Proxy proxy;
145-
146-
private ProxiedPlainConnectionSocketFactory(final Proxy proxy) {
147-
this.proxy = proxy;
148-
}
149-
150-
@Override
151-
public Socket createSocket(final HttpContext context) throws IOException {
152-
return new Socket(proxy);
153-
}
154-
155-
}
156-
157-
private static class ProxiedSdkTlsSocketFactory extends SdkTlsSocketFactory {
158-
private final Proxy proxy;
159-
160-
private ProxiedSdkTlsSocketFactory(
161-
final SSLContext sslContext, final HostnameVerifier hostnameVerifier, final Proxy proxy
162-
) {
163-
super(sslContext, hostnameVerifier);
164-
this.proxy = proxy;
165-
}
166-
167-
@Override
168-
public Socket createSocket(final HttpContext context) throws IOException {
169-
return new Socket(proxy);
170-
}
171-
172-
}
17360
}

storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/PrivateField.java renamed to storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/proxy/PrivateField.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.aiven.kafka.tieredstorage.storage.s3;
17+
package io.aiven.kafka.tieredstorage.storage.s3.proxy;
1818

1919
import java.lang.reflect.Field;
2020

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Copyright 2024 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage.storage.s3.proxy;
18+
19+
import javax.net.ssl.HostnameVerifier;
20+
import javax.net.ssl.SSLContext;
21+
22+
import java.io.IOException;
23+
import java.net.InetSocketAddress;
24+
import java.net.Proxy;
25+
import java.net.Socket;
26+
27+
import io.aiven.kafka.tieredstorage.storage.proxy.ProxyConfig;
28+
import io.aiven.kafka.tieredstorage.storage.proxy.Socks5ProxyAuthenticator;
29+
30+
import org.apache.http.config.Registry;
31+
import org.apache.http.config.RegistryBuilder;
32+
import org.apache.http.conn.socket.ConnectionSocketFactory;
33+
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
34+
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
35+
import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
36+
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
37+
import org.apache.http.protocol.HttpContext;
38+
import software.amazon.awssdk.http.apache.ApacheHttpClient;
39+
import software.amazon.awssdk.http.apache.internal.conn.SdkTlsSocketFactory;
40+
import software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient;
41+
42+
/**
43+
* Makes the {@link ApacheHttpClient} connect via a SOCKS5 proxy.
44+
*
45+
* <p>This operation heavily depends on reflection.
46+
* However, this is the only way to do this in the present state of the AWS SDK configurability.
47+
*/
48+
public class ProxyInstaller {
49+
public static void install(
50+
final ApacheHttpClient client, final ProxyConfig proxyConfig
51+
) throws ReflectiveOperationException {
52+
final var defaultHttpClientConnectionOperator = extractClientConnectionOperator(client);
53+
54+
final InetSocketAddress proxyAddr = new InetSocketAddress(proxyConfig.host(), proxyConfig.port());
55+
final Proxy proxy = new Proxy(Proxy.Type.SOCKS, proxyAddr);
56+
57+
final var socketFactoryRegistryField = PrivateField.of(
58+
DefaultHttpClientConnectionOperator.class, defaultHttpClientConnectionOperator,
59+
Registry.class, "socketFactoryRegistry");
60+
61+
@SuppressWarnings("unchecked")
62+
final var originalConnectionSocketFactoryRegistry =
63+
(Registry<ConnectionSocketFactory>) socketFactoryRegistryField.getValue();
64+
65+
final var proxiedConnectionSocketFactoryRegistry =
66+
createProxiedConnectionSocketFactoryRegistry(originalConnectionSocketFactoryRegistry, proxy);
67+
socketFactoryRegistryField.setValue(proxiedConnectionSocketFactoryRegistry);
68+
69+
if (proxyConfig.username() != null) {
70+
Socks5ProxyAuthenticator.register(
71+
proxyConfig.host(), proxyConfig.port(), proxyConfig.username(), proxyConfig.password());
72+
}
73+
}
74+
75+
private static DefaultHttpClientConnectionOperator extractClientConnectionOperator(
76+
final ApacheHttpClient apacheHttpClient
77+
) throws ReflectiveOperationException {
78+
final var apacheSdkHttpClient = PrivateField.of(
79+
ApacheHttpClient.class, apacheHttpClient, ApacheSdkHttpClient.class, "httpClient").getValue();
80+
final var poolingHttpClientConnectionManager = PrivateField.of(
81+
ApacheSdkHttpClient.class, apacheSdkHttpClient, PoolingHttpClientConnectionManager.class, "cm")
82+
.getValue();
83+
return PrivateField.of(
84+
PoolingHttpClientConnectionManager.class, poolingHttpClientConnectionManager,
85+
DefaultHttpClientConnectionOperator.class, "connectionOperator")
86+
.getValue();
87+
}
88+
89+
private static Registry<ConnectionSocketFactory> createProxiedConnectionSocketFactoryRegistry(
90+
final Registry<ConnectionSocketFactory> originalConnectionSocketFactoryRegistry,
91+
final Proxy proxy
92+
) throws ReflectiveOperationException {
93+
if (originalConnectionSocketFactoryRegistry.lookup("http") == null) {
94+
throw new RuntimeException("Connection factory for HTTP doesn't exist");
95+
}
96+
97+
final SdkTlsSocketFactory httpsConnectionSocketFactory =
98+
(SdkTlsSocketFactory) originalConnectionSocketFactoryRegistry.lookup("https");
99+
if (httpsConnectionSocketFactory == null) {
100+
throw new RuntimeException("Connection factory for HTTPS doesn't exist");
101+
}
102+
103+
final var sslContext = PrivateField.of(
104+
SdkTlsSocketFactory.class, httpsConnectionSocketFactory, SSLContext.class, "sslContext").getValue();
105+
final var hostnameVerifier = PrivateField.of(
106+
SSLConnectionSocketFactory.class, httpsConnectionSocketFactory,
107+
HostnameVerifier.class, "hostnameVerifier")
108+
.getValue();
109+
return RegistryBuilder.<ConnectionSocketFactory>create()
110+
.register("http", new ProxiedPlainConnectionSocketFactory(proxy))
111+
.register("https", new ProxiedSdkTlsSocketFactory(sslContext, hostnameVerifier, proxy))
112+
.build();
113+
}
114+
115+
private static class ProxiedPlainConnectionSocketFactory extends PlainConnectionSocketFactory {
116+
private final Proxy proxy;
117+
118+
private ProxiedPlainConnectionSocketFactory(final Proxy proxy) {
119+
this.proxy = proxy;
120+
}
121+
122+
@Override
123+
public Socket createSocket(final HttpContext context) throws IOException {
124+
return new Socket(proxy);
125+
}
126+
127+
}
128+
129+
private static class ProxiedSdkTlsSocketFactory extends SdkTlsSocketFactory {
130+
private final Proxy proxy;
131+
132+
private ProxiedSdkTlsSocketFactory(
133+
final SSLContext sslContext, final HostnameVerifier hostnameVerifier, final Proxy proxy
134+
) {
135+
super(sslContext, hostnameVerifier);
136+
this.proxy = proxy;
137+
}
138+
139+
@Override
140+
public Socket createSocket(final HttpContext context) throws IOException {
141+
return new Socket(proxy);
142+
}
143+
144+
}
145+
}

0 commit comments

Comments
 (0)