Skip to content

Add Iceberg sink plugin#6734

Open
lawofcycles wants to merge 11 commits intoopensearch-project:mainfrom
lawofcycles:feature/iceberg-sink
Open

Add Iceberg sink plugin#6734
lawofcycles wants to merge 11 commits intoopensearch-project:mainfrom
lawofcycles:feature/iceberg-sink

Conversation

@lawofcycles
Copy link
Copy Markdown
Contributor

@lawofcycles lawofcycles commented Apr 6, 2026

Description

Adds a new iceberg sink plugin that writes Data Prepper events into Apache Iceberg tables. Supports append only writes and row level deletes (equality delete) for handling INSERT, UPDATE, and DELETE operations from CDC sources. Commit coordination across multiple nodes uses the existing EnhancedSourceCoordinator infrastructure. Marked as @Experimental.

See #6664 for the full design.

Framework change: adds UsesEnhancedSinkCoordination interface in data-prepper-api and 8 lines to Pipeline.execute() to inject a coordination store into Sink plugins.

Example: Append only (REST Catalog)

append-pipeline:
 source:
   http:
     path: "/logs"
 sink:
    - iceberg:
       catalog:
         type: "rest"
         uri: "http://localhost:8181"
         io-impl: "org.apache.iceberg.aws.s3.S3FileIO"
         s3.endpoint: "http://localhost:8333"
         s3.access-key-id: "<access_key>"
         s3.secret-access-key: "<secret_key>"
         client.region: "us-east-1"
       table_identifier: "my_db.my_table"

Example: Row level deletes (REST Catalog)

cdc-pipeline:
 source:
   http:
     path: "/cdc"
 sink:
    - iceberg:
       catalog:
         type: "rest"
         uri: "http://localhost:8181"
         io-impl: "org.apache.iceberg.aws.s3.S3FileIO"
         s3.endpoint: "http://localhost:8333"
         s3.access-key-id: "<access_key>"
         s3.secret-access-key: "<secret_key>"
         client.region: "us-east-1"
       table_identifier: "my_db.my_table"
       operation: "${/op}"
       identifier_columns: ["id"]

Example: S3 Tables

s3tables-pipeline:
 source:
   http:
     path: "/logs"
 sink:
    - iceberg:
       catalog:
         type: "rest"
         uri: "https://s3tables.<region>.amazonaws.com/iceberg"
         warehouse: "arn:aws:s3tables:<region>:<account_id>:bucket/<bucket_name>"
         rest.sigv4-enabled: "true"
         rest.signing-name: "s3tables"
         rest.signing-region: "<region>"
         io-impl: "org.apache.iceberg.aws.s3.S3FileIO"
         client.region: "<region>"
       table_identifier: "my_namespace.my_table"

Testing

In addition to the unit and integration tests included in this PR, the following E2E verification was performed.

E2E tests (Data Prepper process with DynamoDB coordination store, SeaweedFS + REST Catalog): 26 scenarios, all passing.

  • Append only: basic, multiple batches, large batch (100 records), null optional fields, extra fields ignored
  • Type coverage: all 9 primitive types (boolean, int, long, float, double, string, date, timestamp, timestamptz), struct, list, map, type coercion (string to int), special characters (empty string, newline, Japanese, quotes)
  • CDC: INSERT+DELETE, UPDATE (equality delete + insert), Debezium style operations (c/u/d/r), in batch dedup (positional delete), consecutive UPDATEs, bulk DELETE (10 of 20)
  • Auto create: schema inference, explicit schema with partition spec
  • Schema evolution: new column addition
  • Dynamic routing: ${/table} expression routing events to different tables in one batch
  • Graceful shutdown: data in writer buffer (flush_interval=60s) committed via final commit cycle on shutdown
  • EventHandle acknowledgement: release(true) after commit confirmed via debug log, ack_poll_interval=2s custom value, shutdown release(false)
  • Validation: operation without identifier_columns rejected at startup
  • Error handling: unrecognized operation to DLQ

E2E verified on Amazon S3 Tables (us-east-1): append only and CDC both confirmed working

Issues Resolved

Resolves #6664

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles
Copy link
Copy Markdown
Contributor Author

Opening this as a draft. The implementation is functional and tested, but I want to carefully review the design decisions, processing flow, and edge cases before requesting reviews. Will mark as ready once that is complete.

EventHandles are collected during output() and associated with
WriteResultPartition IDs on flush. A polling thread checks the
coordination store for completed partitions and releases the
corresponding EventHandles, ensuring events are acknowledged
only after the Iceberg commit succeeds.

Also moves the coordinator null check to the top of the
IcebergSinkService constructor.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Check at startup for static routing, and at TaskWriter creation
for dynamic routing. Throws IllegalArgumentException with the
unknown column name instead of NullPointerException.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Make TableContext immutable and replace it atomically in the
ConcurrentHashMap when schema changes. Each thread detects
stale writers by comparing schema IDs and recreates them.

Also replace fully qualified class names with imports in
IcebergSinkIT.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
When operation is configured (CDC mode) but neither
identifier_columns nor the table's identifier-field-ids are
set, fail at startup with a clear error message instead of
silently producing broken equality deletes.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Schema evolution bug: after evolveSchema(), the local writerManager and
ctx variables in output() were stale. Data written to the old writer was
never flushed, and the old RecordConverter produced records with the
wrong schema (ArrayIndexOutOfBoundsException). Re-read both variables
from their respective maps after evolveSchema().

Graceful shutdown: previously, shutdown() flushed all writers and
registered WriteResultPartitions but immediately killed the
CommitScheduler, so the final data was never committed to Iceberg.
Now CommitScheduler supports a shutdownRequested flag. On shutdown,
the executor sends an interrupt to wake the scheduler from its sleep,
and the scheduler executes one final commit cycle before exiting.
This ensures data buffered at shutdown time is committed rather than
relying on source re-delivery.

Also update the schemaEvolution_concurrentThreads integration test to
verify data via shutdown() (which now guarantees flush and commit)
instead of polling with Awaitility.

Add debug-level logging for EventHandle release to aid E2E verification.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles
Copy link
Copy Markdown
Contributor Author

This PR is ready for review. It adds an iceberg sink plugin that writes Data Prepper events into Apache Iceberg tables, supporting both append only writes and row level deletes (equality delete) for CDC operations.

The full design is documented in #6664. I would really appreciate it if you could review it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[RFC] Iceberg Sink Plugin

1 participant