Add top-level filters option for S3 source#6735
Add top-level filters option for S3 source#6735lawofcycles wants to merge 1 commit intoopensearch-project:mainfrom
Conversation
✅ License Header Check PassedAll newly added files have proper license headers. Great work! 🎉 |
Add a top-level filters configuration to the S3 source that applies include_prefix and exclude_suffix filtering for both SQS and scan modes. Previously, key path filters were only available under scan bucket options, making it impossible to filter S3 objects when using SQS notifications. The new filters option uses the same bucket name keyed Map pattern as bucket_owners. Top-level filters and scan bucket-level filters cannot be used together, as the top-level filters are intended to eventually replace the scan bucket-level filters. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
6318b7e to
5e5fc0c
Compare
dlvenable
left a comment
There was a problem hiding this comment.
Thank you @lawofcycles for this contribution! This will be a great help to users of SQS/EventBridge.
| /** | ||
| * Helper class for applying top-level S3 object key filters (include_prefix and exclude_suffix). | ||
| */ | ||
| public class S3ObjectFilteringHelper { |
There was a problem hiding this comment.
Thank you for splitting this logic into a class. Maybe rename it to S3ObjectKeyFilter or S3ObjectFilter.
| when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(null); | ||
| final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); | ||
|
|
||
| assertThat(helper.isKeyMatchingFilters("my-bucket", "logs/app.log"), equalTo(false)); |
There was a problem hiding this comment.
This could be combined with the test above using @ParameterizedTest and @CsvSource
@CsvSource({
"assets/image.png, true"
"logs/app.log, false"
})
| when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg", ".xml")); | ||
| final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); | ||
|
|
||
| assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/data.json"), equalTo(true)); |
There was a problem hiding this comment.
Similar comment about combining tests.
| when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg")); | ||
| final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); | ||
|
|
||
| assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/data.json"), equalTo(true)); |
There was a problem hiding this comment.
This is a good scenario for @CsvSource as well.
| return true; | ||
| } | ||
|
|
||
| @AssertTrue(message = "Top-level filters cannot be used together with scan bucket-level filter. Use one or the other.") |
There was a problem hiding this comment.
Thank you for this assertion. It is the right approach in my opinion.
There was a problem hiding this comment.
May I ask why we don't want to keep filters at both levels. For the case below that includes difference filters at two bucket level, it couldn't support SQS as the notification with a top level filter.
buckets:
- bucket:
name: offlinebatch
data_selection: metadata_only
filter:
include_prefix:
- bedrock-multisource/my_batch
exclude_suffix:
- .out
- bucket:
name: offlinebatch
data_selection: data_only
filter:
include_prefix:
- bedrock-multisource/output/
exclude_suffix:
- manifest.json.out
There was a problem hiding this comment.
@Zhangxunmt , The top-level filters are still able to differentiate buckets even with SQS. SQS includes the bucket name in the message.
| } | ||
|
|
||
| @AssertTrue(message = "Top-level filters cannot be used together with scan bucket-level filter. Use one or the other.") | ||
| boolean isFiltersNotUsedWithScanBucketFilter() { |
There was a problem hiding this comment.
nit: Early returns (guard clauses) to reduce nesting for better readability.
boolean isFiltersNotUsedWithScanBucketFilter() {
if (filters == null || filters.isEmpty()) return true; // top filter is null
if (s3ScanScanOptions == null || s3ScanScanOptions.getBuckets() == null) return true; // bucket scan is null
return s3ScanScanOptions.getBuckets().stream()
.map(bucket -> bucket.getS3ScanBucketOption())
.noneMatch(option -> option != null && option.getS3ScanFilter() != null); // bucket filter is null
}
Add a top-level
filtersconfiguration to the S3 source that appliesinclude_prefixandexclude_suffixfiltering for both SQS and scan modes.Previously, key path filters were only available under scan bucket options, making it impossible to filter S3 objects when using SQS notifications. The new
filtersoption uses the same bucket name keyed Map pattern asbucket_owners.Top-level
filtersand scan bucket-levelfiltercannot be used together. A validation error is raised if both are configured.Testing
In addition to unit tests, I verified this with a real S3 bucket and SQS queue (S3 ObjectCreated notifications forwarded to SQS). With
include_prefix: ["assets/"]andexclude_suffix: [".jpg"]configured, the following results were observed.assets/data1.jsonassets/photo.jpglogs/app.logroot-file.jsonIssues Resolved
Resolves #6386
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.