Skip to content

Commit 2c1814e

Browse files
committed
Addressed review comments
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent 86a20c0 commit 2c1814e

File tree

7 files changed

+171
-92
lines changed

7 files changed

+171
-92
lines changed

data-prepper-plugins/http-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/http/HttpSinkIT.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,14 @@ private HttpSinkConfiguration createConfig(String url, ThresholdOptions threshol
344344
java.lang.reflect.Field thresholdField = HttpSinkConfiguration.class.getDeclaredField("thresholdOptions");
345345
thresholdField.setAccessible(true);
346346
thresholdField.set(config, thresholdOptions);
347+
348+
java.lang.reflect.Field retryIntervalField = HttpSinkConfiguration.class.getDeclaredField("httpRetryInterval");
349+
retryIntervalField.setAccessible(true);
350+
retryIntervalField.set(config, Duration.ofSeconds(3));
351+
352+
java.lang.reflect.Field serviceNameField = HttpSinkConfiguration.class.getDeclaredField("awsSigv4ServiceName");
353+
serviceNameField.setAccessible(true);
354+
serviceNameField.set(config, "test");
347355
} catch (Exception e) {
348356
throw new RuntimeException(e);
349357
}
@@ -373,6 +381,16 @@ private HttpSinkConfiguration createConfigWithSigV4(String url, ThresholdOptions
373381
awsField.setAccessible(true);
374382
awsField.set(config, awsConfig);
375383

384+
java.lang.reflect.Field retryIntervalField = HttpSinkConfiguration.class.getDeclaredField("httpRetryInterval");
385+
retryIntervalField.setAccessible(true);
386+
retryIntervalField.set(config, Duration.ofSeconds(3));
387+
388+
java.lang.reflect.Field serviceNameField = HttpSinkConfiguration.class.getDeclaredField("awsSigv4ServiceName");
389+
serviceNameField.setAccessible(true);
390+
serviceNameField.set(config, "test");
391+
392+
393+
376394
} catch (Exception e) {
377395
throw new RuntimeException(e);
378396
}

data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/AuthenticationDecorator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@
1010

1111
package org.opensearch.dataprepper.plugins.sink.http;
1212

13-
import software.amazon.awssdk.http.SdkHttpFullRequest;
13+
import com.linecorp.armeria.common.HttpRequest;
14+
15+
import java.util.List;
16+
import java.util.Map;
1417

1518
public interface AuthenticationDecorator {
16-
SdkHttpFullRequest authenticate(SdkHttpFullRequest request);
19+
HttpRequest buildRequest(String url, byte[] payload, Map<String, List<String>> customHeaders);
1720
}

data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/AwsAuthenticationDecorator.java

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,29 +10,41 @@
1010

1111
package org.opensearch.dataprepper.plugins.sink.http;
1212

13+
import com.linecorp.armeria.common.HttpData;
14+
import com.linecorp.armeria.common.HttpMethod;
15+
import com.linecorp.armeria.common.HttpRequest;
16+
import com.linecorp.armeria.common.RequestHeaders;
17+
import com.linecorp.armeria.common.RequestHeadersBuilder;
1318
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
1419
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
1520
import org.opensearch.dataprepper.aws.api.AwsConfig;
1621
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
1722
import software.amazon.awssdk.auth.signer.Aws4Signer;
1823
import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
24+
import software.amazon.awssdk.core.SdkBytes;
1925
import software.amazon.awssdk.http.SdkHttpFullRequest;
26+
import software.amazon.awssdk.http.SdkHttpMethod;
2027
import software.amazon.awssdk.regions.Region;
2128

2229
import javax.annotation.Nonnull;
30+
import java.net.URI;
31+
import java.util.List;
32+
import java.util.Map;
2333

2434
public class AwsAuthenticationDecorator implements AuthenticationDecorator {
25-
private static final String SERVICE_NAME = "execute-api";
2635
private static final String CONTENT_SHA256_HEADER = "x-amz-content-sha256";
2736
private static final String CONTENT_SHA256_VALUE = "required";
2837

2938
private final Aws4Signer signer = Aws4Signer.create();
3039
private final AwsCredentialsProvider credentialsProvider;
3140
private final Region region;
41+
private final String serviceName;
3242

3343
public AwsAuthenticationDecorator(@Nonnull final AwsCredentialsSupplier awsCredentialsSupplier,
34-
@Nonnull final AwsConfig awsConfig) {
44+
@Nonnull final AwsConfig awsConfig,
45+
@Nonnull final String serviceName) {
3546
this.region = awsConfig.getAwsRegion();
47+
this.serviceName = serviceName;
3648
this.credentialsProvider = awsCredentialsSupplier.getProvider(convertToCredentialOptions(awsConfig));
3749
}
3850

@@ -46,15 +58,50 @@ private static AwsCredentialsOptions convertToCredentialOptions(final AwsConfig
4658
}
4759

4860
@Override
49-
public SdkHttpFullRequest authenticate(final SdkHttpFullRequest request) {
61+
public HttpRequest buildRequest(final String url, final byte[] payload, final Map<String, List<String>> customHeaders) {
62+
final SdkHttpFullRequest sdkRequest = createSdkHttpRequest(url, payload, customHeaders);
63+
final SdkHttpFullRequest signedRequest = sign(sdkRequest);
64+
return toArmeriaRequest(signedRequest, payload);
65+
}
66+
67+
private SdkHttpFullRequest createSdkHttpRequest(final String url, final byte[] payload,
68+
final Map<String, List<String>> customHeaders) {
69+
final SdkHttpFullRequest.Builder builder = SdkHttpFullRequest.builder()
70+
.method(SdkHttpMethod.POST)
71+
.uri(URI.create(url))
72+
.contentStreamProvider(() -> SdkBytes.fromByteArray(payload).asInputStream());
73+
74+
if (customHeaders != null) {
75+
customHeaders.forEach((key, values) ->
76+
values.forEach(value -> builder.appendHeader(key, value))
77+
);
78+
}
79+
return builder.build();
80+
}
81+
82+
private SdkHttpFullRequest sign(final SdkHttpFullRequest request) {
5083
final SdkHttpFullRequest requestWithHeader = request.toBuilder()
5184
.putHeader(CONTENT_SHA256_HEADER, CONTENT_SHA256_VALUE)
5285
.build();
5386

5487
return signer.sign(requestWithHeader, Aws4SignerParams.builder()
5588
.signingRegion(region)
56-
.signingName(SERVICE_NAME)
89+
.signingName(serviceName)
5790
.awsCredentials(credentialsProvider.resolveCredentials())
5891
.build());
5992
}
93+
94+
private static HttpRequest toArmeriaRequest(final SdkHttpFullRequest sdkRequest, final byte[] payload) {
95+
final RequestHeadersBuilder headersBuilder = RequestHeaders.builder()
96+
.method(HttpMethod.POST)
97+
.scheme(sdkRequest.getUri().getScheme())
98+
.path(sdkRequest.getUri().getRawPath())
99+
.authority(sdkRequest.getUri().getAuthority());
100+
101+
sdkRequest.headers().forEach((k, vList) ->
102+
vList.forEach(v -> headersBuilder.add(k, v))
103+
);
104+
105+
return HttpRequest.of(headersBuilder.build(), HttpData.wrap(payload));
106+
}
60107
}

data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/HttpSink.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ public HttpSink(final PluginSetting pluginSetting,
6969
final SinkMetrics sinkMetrics = new DefaultSinkMetrics(pluginMetrics, "Event");
7070

7171
final AuthenticationDecorator authDecorator = httpSinkConfiguration.getAwsConfig() != null
72-
? new AwsAuthenticationDecorator(awsCredentialsSupplier, httpSinkConfiguration.getAwsConfig())
72+
? new AwsAuthenticationDecorator(awsCredentialsSupplier, httpSinkConfiguration.getAwsConfig(),
73+
httpSinkConfiguration.getAwsSigv4ServiceName())
7374
: null;
7475

7576
final HttpSinkSender httpSender = new HttpSinkSender(authDecorator, httpSinkConfiguration, sinkMetrics);

data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/HttpSinkSender.java

Lines changed: 17 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@
2222
import org.opensearch.dataprepper.plugins.sink.http.configuration.HttpSinkConfiguration;
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
25-
import software.amazon.awssdk.core.SdkBytes;
26-
import software.amazon.awssdk.http.SdkHttpFullRequest;
27-
import software.amazon.awssdk.http.SdkHttpMethod;
2825

2926
import javax.annotation.Nonnull;
3027
import java.net.URI;
@@ -65,19 +62,17 @@ private static WebClient buildWebClient(final HttpSinkConfiguration config) {
6562
public HttpEndpointResponse send(final byte[] payload) {
6663
HttpEndpointResponse response = null;
6764
int attempt = 0;
68-
65+
6966
while (attempt <= maxRetries) {
7067
try {
7168
final HttpRequest request = buildHttpRequest(payload);
72-
if (request == null) {
73-
return new HttpEndpointResponse(config.getUrl(), 0, "Failed to build request");
74-
}
7569

7670
response = webClient.execute(request)
7771
.aggregate()
7872
.thenApply(resp -> {
7973
int statusCode = resp.status().code();
8074
String responseBody = resp.content().toStringUtf8();
75+
8176
return new HttpEndpointResponse(config.getUrl(), statusCode, responseBody);
8277
})
8378
.exceptionally(throwable -> {
@@ -101,7 +96,7 @@ public HttpEndpointResponse send(final byte[] payload) {
10196
}
10297

10398
if (attempt < maxRetries) {
104-
LOG.warn("Retryable error ({}), attempt {}/{}, retrying after {}ms",
99+
LOG.warn("Retryable error ({}), attempt {}/{}, retrying after {}ms",
105100
response.getStatusCode(), attempt + 1, maxRetries, retryIntervalMs);
106101
Thread.sleep(retryIntervalMs);
107102
sinkMetrics.incrementRetries(1);
@@ -126,41 +121,30 @@ public HttpEndpointResponse send(final byte[] payload) {
126121
}
127122
attempt++;
128123
}
129-
130-
return response != null ? response : new HttpEndpointResponse(config.getUrl(), 0, "Max retries exceeded");
131-
}
132-
133-
private SdkHttpFullRequest createSdkHttpRequest(final String url, @Nonnull final byte[] payload) {
134-
final SdkHttpFullRequest.Builder builder = SdkHttpFullRequest.builder()
135-
.method(SdkHttpMethod.POST)
136-
.uri(URI.create(url))
137-
.contentStreamProvider(() -> SdkBytes.fromByteArray(payload).asInputStream());
138124

139-
if (config.getCustomHeaderOptions() != null) {
140-
config.getCustomHeaderOptions().forEach((key, values) ->
141-
values.forEach(value -> builder.appendHeader(key, value))
142-
);
143-
}
144-
return builder.build();
125+
return response != null ? response : new HttpEndpointResponse(config.getUrl(), 0, "Max retries exceeded");
145126
}
146127

147128
private HttpRequest buildHttpRequest(final byte[] payload) {
148-
SdkHttpFullRequest sdkHttpRequest = createSdkHttpRequest(config.getUrl(), payload);
149-
150129
if (authenticationDecorator != null) {
151-
sdkHttpRequest = authenticationDecorator.authenticate(sdkHttpRequest);
130+
return authenticationDecorator.buildRequest(config.getUrl(), payload, config.getCustomHeaderOptions());
152131
}
132+
return buildPlainHttpRequest(payload);
133+
}
153134

135+
private HttpRequest buildPlainHttpRequest(final byte[] payload) {
136+
final URI uri = URI.create(config.getUrl());
154137
final RequestHeadersBuilder headersBuilder = RequestHeaders.builder()
155138
.method(HttpMethod.POST)
156-
.scheme(sdkHttpRequest.getUri().getScheme())
157-
.path(sdkHttpRequest.getUri().getRawPath())
158-
.authority(sdkHttpRequest.getUri().getAuthority());
139+
.scheme(uri.getScheme())
140+
.path(uri.getRawPath())
141+
.authority(uri.getAuthority());
159142

160-
sdkHttpRequest.headers().forEach((k, vList) -> {
161-
vList.forEach(v -> headersBuilder.add(k, v));
162-
}
163-
);
143+
if (config.getCustomHeaderOptions() != null) {
144+
config.getCustomHeaderOptions().forEach((key, values) ->
145+
values.forEach(value -> headersBuilder.add(key, value))
146+
);
147+
}
164148

165149
return HttpRequest.of(headersBuilder.build(), HttpData.wrap(payload));
166150
}

data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/http/configuration/HttpSinkConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ public class HttpSinkConfiguration {
5858
@JsonProperty("connection_timeout")
5959
private Duration connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
6060

61+
@JsonProperty("aws_sigv4_service_name")
62+
private String awsSigv4ServiceName;
63+
6164
@JsonProperty("custom_headers")
6265
private Map<String, List<String>> customHeaderOptions;
6366

@@ -96,6 +99,10 @@ public Duration getRequestTimeout() {
9699
return requestTimeout;
97100
}
98101

102+
public String getAwsSigv4ServiceName() {
103+
return awsSigv4ServiceName;
104+
}
105+
99106
public Duration getConnectionTimeout() {
100107
return connectionTimeout;
101108
}

0 commit comments

Comments
 (0)