Is your feature request related to a problem? Please describe.
The current iceberg-source creates one SHUFFLE_WRITE task per data file. When a large UPDATE/DELETE operation affects many data files (e.g. 500K rows scattered across thousands of files in a Copy on Write table), the task count can reach thousands. Source coordination stores all SHUFFLE_WRITE tasks under the same DynamoDB partition key, which has a per partition write throughput limit of 1,000 WCU/s regardless of provisioned capacity. This causes write throttling that degrades performance or stalls processing entirely.
Observed during performance testing with NYC Yellow Taxi (41 million rows, partitioned by day):
| SHUFFLE_WRITE task count |
Result |
| ~200 |
Completed normally |
| ~2,000 |
Completed with throttling (3 to 7 minutes) |
| ~5,000 |
Stalled |
Additionally, when individual data files are small (e.g. median 20KB from Spark parallel writes), the coordination overhead per task dominates the actual file processing time, making the per file task granularity inefficient regardless of the coordination backend.
Describe the solution you'd like
Group multiple data files into a single SHUFFLE_WRITE task based on total file size, so that each task represents a meaningful amount of work. fileSizeInBytes is a required field in Iceberg file metadata, so it is available at planning time without reading the actual files.
Additional context
Performance test results: #6682 (comment)
Related PR: #6682 (source-layer shuffle implementation)
Is your feature request related to a problem? Please describe.
The current iceberg-source creates one SHUFFLE_WRITE task per data file. When a large UPDATE/DELETE operation affects many data files (e.g. 500K rows scattered across thousands of files in a Copy on Write table), the task count can reach thousands. Source coordination stores all SHUFFLE_WRITE tasks under the same DynamoDB partition key, which has a per partition write throughput limit of 1,000 WCU/s regardless of provisioned capacity. This causes write throttling that degrades performance or stalls processing entirely.
Observed during performance testing with NYC Yellow Taxi (41 million rows, partitioned by day):
Additionally, when individual data files are small (e.g. median 20KB from Spark parallel writes), the coordination overhead per task dominates the actual file processing time, making the per file task granularity inefficient regardless of the coordination backend.
Describe the solution you'd like
Group multiple data files into a single SHUFFLE_WRITE task based on total file size, so that each task represents a meaningful amount of work.
fileSizeInBytesis a required field in Iceberg file metadata, so it is available at planning time without reading the actual files.Additional context
Performance test results: #6682 (comment)
Related PR: #6682 (source-layer shuffle implementation)