Skip to content

Support shared catalog config across tables in iceberg-source#6727

Open
lawofcycles wants to merge 2 commits intoopensearch-project:mainfrom
lawofcycles:feature/iceberg-source-shared-catalog
Open

Support shared catalog config across tables in iceberg-source#6727
lawofcycles wants to merge 2 commits intoopensearch-project:mainfrom
lawofcycles:feature/iceberg-source-shared-catalog

Conversation

@lawofcycles
Copy link
Copy Markdown
Contributor

Description

Adds support for a top level catalog configuration in iceberg-source that applies to all tables by default. When a table specifies its own catalog, it fully replaces the top level definition.

This reduces configuration duplication when multiple tables share the same catalog.

iceberg:
 catalog:
   type: rest
   uri: "http://iceberg-rest-catalog:8181"
   io-impl: "org.apache.iceberg.aws.s3.S3FileIO"
 tables:
    - table_name: "db.table_a"
     identifier_columns: ["id"]
    - table_name: "db.table_b"
     identifier_columns: ["id"]
     catalog:
       type: glue
       warehouse: "s3://other-bucket/warehouse"
       io-impl: "org.apache.iceberg.aws.s3.S3FileIO"

Issues Resolved

Partially addresses #6726 (catalog config sharing). Per-table shuffle overrides will be addressed after #6682 is merged.

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lawofcycles for this contribution! I have a few small comments.


final Map<String, String> catalogProps = new HashMap<>(tableConfig.getCatalog());
final Map<String, String> catalogProps = new HashMap<>(
tableConfig.getCatalog().isEmpty() ? sourceConfig.getCatalog() : tableConfig.getCatalog());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use != null instead? Perhaps an empty catalog can be an explicit no catalog at all.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Changed to != null and updated TableConfig.catalog default from Collections.emptyMap() to null. This way, an unset catalog falls back to the shared config, while an explicitly empty catalog: {} is passed through as is (which will fail at catalog initialization.

when(configB.getIdentifierColumns()).thenReturn(List.of("id"));
when(configB.isDisableExport()).thenReturn(false);

when(sourceConfig.getCatalog()).thenReturn(helper.catalogProperties());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use an alternative catalog to be sure that it is correctly selected?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the integration test environment has a single REST catalog server, using a truly distinct catalog to verify selection is not straightforward. I added IcebergServiceTest with unit tests that use mockStatic(CatalogUtil.class) to capture and assert the exact catalog properties passed to buildIcebergCatalog for each table. The integration test export_with_mixed_catalog_config is kept as a smoke test to confirm the mixed configuration works end to end.

final EnhancedSourceCoordinator coordinator = createInMemoryCoordinator();
coordinator.createPartition(new LeaderPartition());
final IcebergService service = new IcebergService(coordinator, sourceConfig, pluginMetrics,
acknowledgementSetManager, org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use an import on the class or static import. Avoid fully qualified names.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Change catalog fallback check from isEmpty() to null check so that
an explicitly empty catalog is not silently treated as unset.
TableConfig.catalog default is now null instead of emptyMap.

Add IcebergServiceTest to verify catalog selection logic:
shared only, table override, and mixed configurations.

Replace fully qualified TestEventFactory references with imports
in IcebergSourceIT.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles lawofcycles requested a review from dlvenable April 10, 2026 01:46
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @lawofcycles !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants