Skip to content

Commit e06cdb6

Browse files
zhuqi-lucasDandandan
authored andcommitted
feat: make sort pushdown BufferExec capacity configurable, default 1GB (apache#21426)
## Which issue does this PR close? Closes apache#21417 ## Rationale for this change apache#21182 introduced `BufferExec` between `SortPreservingMergeExec` and `DataSourceExec` when sort elimination removes a `SortExec`. The buffer capacity was hardcoded to 64MB, which can cause I/O stalls for wide-row full scans. ## What changes are included in this PR? - Add `datafusion.execution.sort_pushdown_buffer_capacity` config option (default 1GB) - Replace hardcoded `BUFFER_CAPACITY_AFTER_SORT_ELIMINATION` constant with the config value - Update SLT test expectations for new default capacity ## How are these changes justified? **Why 1GB default:** - This is a maximum, not pre-allocated — actual usage is bounded by partition data size - Strictly less memory than the `SortExec` it replaces (which buffers entire partition) - `BufferExec` integrates with `MemoryPool`, so global memory limits are respected - 64MB was too small for wide-row scans (16-column TPC-H `SELECT *` queries showed I/O stalls) **Why configurable:** - Different workloads have different optimal buffer sizes - Users with memory-constrained environments can reduce it - Users with wide tables or large row groups can increase it ## Are these changes tested? - Existing SLT Test G verifies `BufferExec` appears in plan with correct capacity - Config integration tested via existing config framework ## Are there any user-facing changes? New config option: `datafusion.execution.sort_pushdown_buffer_capacity` (default: 1GB)
1 parent 078bab2 commit e06cdb6

5 files changed

Lines changed: 22 additions & 20 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,19 @@ config_namespace! {
557557
/// batches and merged.
558558
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
559559

560+
/// Maximum buffer capacity (in bytes) per partition for BufferExec
561+
/// inserted during sort pushdown optimization.
562+
///
563+
/// When PushdownSort eliminates a SortExec under SortPreservingMergeExec,
564+
/// a BufferExec is inserted to replace SortExec's buffering role. This
565+
/// prevents I/O stalls by allowing the scan to run ahead of the merge.
566+
///
567+
/// This uses strictly less memory than the SortExec it replaces (which
568+
/// buffers the entire partition). The buffer respects the global memory
569+
/// pool limit. Setting this to a large value is safe — actual memory
570+
/// usage is bounded by partition size and global memory limits.
571+
pub sort_pushdown_buffer_capacity: usize, default = 1024 * 1024 * 1024
572+
560573
/// Maximum size in bytes for individual spill files before rotating to a new file.
561574
///
562575
/// When operators spill data to disk (e.g., RepartitionExec), they write

datafusion/physical-optimizer/src/pushdown_sort.rs

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -65,20 +65,6 @@ use datafusion_physical_plan::sorts::sort::SortExec;
6565
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
6666
use std::sync::Arc;
6767

68-
/// Per-partition buffer capacity (in bytes) inserted between SPM and
69-
/// DataSourceExec when sort elimination removes the buffering SortExec.
70-
///
71-
/// SortExec buffers all input data in memory (potentially GB per partition)
72-
/// before outputting sorted results. When we eliminate SortExec, SPM reads
73-
/// directly from I/O-bound sources. BufferExec compensates with bounded
74-
/// buffering, allowing I/O to pipeline with merge computation.
75-
///
76-
/// This is strictly less memory than the SortExec it replaces, and only
77-
/// inserted when PushdownSort eliminates a SortExec — no impact on other
78-
/// query plans. BufferExec also integrates with MemoryPool, so it respects
79-
/// the global memory limit and won't cause OOM.
80-
const BUFFER_CAPACITY_AFTER_SORT_ELIMINATION: usize = 64 * 1024 * 1024; // 64 MB
81-
8268
/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources.
8369
///
8470
/// See module-level documentation for details.
@@ -102,6 +88,8 @@ impl PhysicalOptimizerRule for PushdownSort {
10288
return Ok(plan);
10389
}
10490

91+
let buffer_capacity = config.execution.sort_pushdown_buffer_capacity;
92+
10593
// Use transform_down to find and optimize all SortExec nodes (including nested ones)
10694
// Also handles SPM → SortExec pattern to insert BufferExec when sort is eliminated
10795
plan.transform_down(|plan: Arc<dyn ExecutionPlan>| {
@@ -124,10 +112,8 @@ impl PhysicalOptimizerRule for PushdownSort {
124112
// Insert BufferExec to replace SortExec's buffering role.
125113
// SortExec buffered all data in memory; BufferExec provides
126114
// bounded buffering so SPM doesn't stall on I/O.
127-
let buffered: Arc<dyn ExecutionPlan> = Arc::new(BufferExec::new(
128-
inner,
129-
BUFFER_CAPACITY_AFTER_SORT_ELIMINATION,
130-
));
115+
let buffered: Arc<dyn ExecutionPlan> =
116+
Arc::new(BufferExec::new(inner, buffer_capacity));
131117
let new_spm =
132118
SortPreservingMergeExec::new(spm.expr().clone(), buffered)
133119
.with_fetch(spm.fetch());

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000
270270
datafusion.execution.skip_physical_aggregate_schema_check false
271271
datafusion.execution.soft_max_rows_per_output_file 50000000
272272
datafusion.execution.sort_in_place_threshold_bytes 1048576
273+
datafusion.execution.sort_pushdown_buffer_capacity 1073741824
273274
datafusion.execution.sort_spill_reservation_bytes 10485760
274275
datafusion.execution.spill_compression uncompressed
275276
datafusion.execution.split_file_groups_by_statistics false
@@ -413,6 +414,7 @@ datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number
413414
datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step.
414415
datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max
415416
datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged.
417+
datafusion.execution.sort_pushdown_buffer_capacity 1073741824 Maximum buffer capacity (in bytes) per partition for BufferExec inserted during sort pushdown optimization. When PushdownSort eliminates a SortExec under SortPreservingMergeExec, a BufferExec is inserted to replace SortExec's buffering role. This prevents I/O stalls by allowing the scan to run ahead of the merge. This uses strictly less memory than the SortExec it replaces (which buffers the entire partition). The buffer respects the global memory pool limit. Setting this to a large value is safe — actual memory usage is bounded by partition size and global memory limits.
416418
datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured).
417419
datafusion.execution.spill_compression uncompressed Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed.
418420
datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental

datafusion/sqllogictest/test_files/sort_pushdown.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2221,7 +2221,7 @@ logical_plan
22212221
02)--TableScan: tg_buffer projection=[id, value]
22222222
physical_plan
22232223
01)SortPreservingMergeExec: [id@0 ASC NULLS LAST]
2224-
02)--BufferExec: capacity=67108864
2224+
02)--BufferExec: capacity=1073741824
22252225
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
22262226

22272227
# Verify correctness
@@ -2248,7 +2248,7 @@ logical_plan
22482248
02)--TableScan: tg_buffer projection=[id, value]
22492249
physical_plan
22502250
01)SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=3
2251-
02)--BufferExec: capacity=67108864
2251+
02)--BufferExec: capacity=1073741824
22522252
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
22532253

22542254
query II

docs/source/user-guide/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ The following configuration settings are available:
118118
| datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. |
119119
| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |
120120
| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. |
121+
| datafusion.execution.sort_pushdown_buffer_capacity | 1073741824 | Maximum buffer capacity (in bytes) per partition for BufferExec inserted during sort pushdown optimization. When PushdownSort eliminates a SortExec under SortPreservingMergeExec, a BufferExec is inserted to replace SortExec's buffering role. This prevents I/O stalls by allowing the scan to run ahead of the merge. This uses strictly less memory than the SortExec it replaces (which buffers the entire partition). The buffer respects the global memory pool limit. Setting this to a large value is safe — actual memory usage is bounded by partition size and global memory limits. |
121122
| datafusion.execution.max_spill_file_size_bytes | 134217728 | Maximum size in bytes for individual spill files before rotating to a new file. When operators spill data to disk (e.g., RepartitionExec), they write multiple batches to the same file until this size limit is reached, then rotate to a new file. This reduces syscall overhead compared to one-file-per-batch while preventing files from growing too large. A larger value reduces file creation overhead but may hold more disk space. A smaller value creates more files but allows finer-grained space reclamation as files can be deleted once fully consumed. Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators may create spill files larger than the limit. Default: 128 MB |
122123
| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics |
123124
| datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. |

0 commit comments

Comments
 (0)