Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,19 @@ config_namespace! {
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024

/// Maximum buffer capacity (in bytes) per partition for BufferExec
/// inserted during sort pushdown optimization.
///
/// When PushdownSort eliminates a SortExec under SortPreservingMergeExec,
/// a BufferExec is inserted to replace SortExec's buffering role. This
/// prevents I/O stalls by allowing the scan to run ahead of the merge.
///
/// This uses strictly less memory than the SortExec it replaces (which
/// buffers the entire partition). The buffer respects the global memory
/// pool limit. Setting this to a large value is safe — actual memory
/// usage is bounded by partition size and global memory limits.
Comment thread
zhuqi-lucas marked this conversation as resolved.
pub sort_pushdown_buffer_capacity: usize, default = 1024 * 1024 * 1024
Comment thread
zhuqi-lucas marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR increases the size buffer because

64MB was too small for wide-row scans (16-column TPC-H SELECT * queries showed I/O stalls)

To be clear to anyone reading this, what will be on main is still better than 53.0.0 because prior to #21182 DataFusion would have sorted the entire thing (rather than just buffering it)

A fixed size like this is likely to buffer more than required for narrow cases

I suspect a better solution than a fixed size buffer would be some calculation based on the actual size of the data (e.g. the number of rows to buffer). However, that is tricky to compute / constrain memory when large strings are involved.

We probably would need to have both a row limit and a memory cap and pick the smaller of the two.

We can perhaps do this as a follow on issue/PR

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! A dynamic approach with both a row limit and a memory cap (whichever is reached first) would adapt better to different row widths. Created #21440 to track this.


/// Maximum size in bytes for individual spill files before rotating to a new file.
///
/// When operators spill data to disk (e.g., RepartitionExec), they write
Expand Down
22 changes: 4 additions & 18 deletions datafusion/physical-optimizer/src/pushdown_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,6 @@ use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use std::sync::Arc;

/// Per-partition buffer capacity (in bytes) inserted between SPM and
/// DataSourceExec when sort elimination removes the buffering SortExec.
///
/// SortExec buffers all input data in memory (potentially GB per partition)
/// before outputting sorted results. When we eliminate SortExec, SPM reads
/// directly from I/O-bound sources. BufferExec compensates with bounded
/// buffering, allowing I/O to pipeline with merge computation.
///
/// This is strictly less memory than the SortExec it replaces, and only
/// inserted when PushdownSort eliminates a SortExec — no impact on other
/// query plans. BufferExec also integrates with MemoryPool, so it respects
/// the global memory limit and won't cause OOM.
const BUFFER_CAPACITY_AFTER_SORT_ELIMINATION: usize = 64 * 1024 * 1024; // 64 MB

/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources.
///
/// See module-level documentation for details.
Expand All @@ -102,6 +88,8 @@ impl PhysicalOptimizerRule for PushdownSort {
return Ok(plan);
}

let buffer_capacity = config.execution.sort_pushdown_buffer_capacity;

Comment thread
zhuqi-lucas marked this conversation as resolved.
// Use transform_down to find and optimize all SortExec nodes (including nested ones)
// Also handles SPM → SortExec pattern to insert BufferExec when sort is eliminated
plan.transform_down(|plan: Arc<dyn ExecutionPlan>| {
Expand All @@ -124,10 +112,8 @@ impl PhysicalOptimizerRule for PushdownSort {
// Insert BufferExec to replace SortExec's buffering role.
// SortExec buffered all data in memory; BufferExec provides
// bounded buffering so SPM doesn't stall on I/O.
let buffered: Arc<dyn ExecutionPlan> = Arc::new(BufferExec::new(
inner,
BUFFER_CAPACITY_AFTER_SORT_ELIMINATION,
));
let buffered: Arc<dyn ExecutionPlan> =
Arc::new(BufferExec::new(inner, buffer_capacity));
let new_spm =
SortPreservingMergeExec::new(spm.expr().clone(), buffered)
.with_fetch(spm.fetch());
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/sort_pushdown.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2221,7 +2221,7 @@ logical_plan
02)--TableScan: tg_buffer projection=[id, value]
physical_plan
01)SortPreservingMergeExec: [id@0 ASC NULLS LAST]
02)--BufferExec: capacity=67108864
02)--BufferExec: capacity=1073741824
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet

# Verify correctness
Expand All @@ -2248,7 +2248,7 @@ logical_plan
02)--TableScan: tg_buffer projection=[id, value]
physical_plan
01)SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=3
02)--BufferExec: capacity=67108864
02)--BufferExec: capacity=1073741824
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet

query II
Expand Down
Loading