Skip to content

Commit b714bcb

Browse files
andygroveclaude
andcommitted
perf: Use zero-copy slice instead of take kernel in sort merge join when indices are contiguous
In the sort merge join, replace O(n) Arrow `take` kernel calls with O(1) `RecordBatch::slice` when the join indices form a contiguous range. This is common for both the streamed side (which advances sequentially) and the buffered side (which is scanned sequentially within each key group). Benchmarks show ~30% improvement for 1:1 joins (Q1) and ~7% for high-cardinality 1:100 joins (Q3). Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent ace9cd4 commit b714bcb

1 file changed

Lines changed: 57 additions & 13 deletions

File tree

  • datafusion/physical-plan/src/joins/sort_merge_join

datafusion/physical-plan/src/joins/sort_merge_join/stream.rs

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1490,13 +1490,24 @@ impl SortMergeJoinStream {
14901490
continue;
14911491
}
14921492

1493-
let mut left_columns = self
1494-
.streamed_batch
1495-
.batch
1496-
.columns()
1497-
.iter()
1498-
.map(|column| take(column, &left_indices, None))
1499-
.collect::<Result<Vec<_>, ArrowError>>()?;
1493+
let mut left_columns =
1494+
if let Some(range) = is_contiguous_range(&left_indices) {
1495+
// When indices form a contiguous range (common for the streamed
1496+
// side which advances sequentially), use zero-copy slice instead
1497+
// of the O(n) take kernel.
1498+
self.streamed_batch
1499+
.batch
1500+
.slice(range.start, range.len())
1501+
.columns()
1502+
.to_vec()
1503+
} else {
1504+
self.streamed_batch
1505+
.batch
1506+
.columns()
1507+
.iter()
1508+
.map(|column| take(column, &left_indices, None))
1509+
.collect::<Result<Vec<_>, ArrowError>>()?
1510+
};
15001511

15011512
// The row indices of joined buffered batch
15021513
let right_indices: UInt64Array = chunk.buffered_indices.finish();
@@ -1972,6 +1983,30 @@ fn produce_buffered_null_batch(
19721983
)?))
19731984
}
19741985

1986+
/// Checks if a `UInt64Array` contains a contiguous ascending range (e.g. [3,4,5,6]).
1987+
/// Returns `Some(start..start+len)` if so, `None` otherwise.
1988+
/// This allows replacing an O(n) `take` with an O(1) `slice`.
1989+
#[inline]
1990+
fn is_contiguous_range(indices: &UInt64Array) -> Option<Range<usize>> {
1991+
if indices.is_empty() || indices.null_count() > 0 {
1992+
return None;
1993+
}
1994+
let start = indices.value(0);
1995+
let len = indices.len() as u64;
1996+
// Quick rejection: if last element doesn't match expected, not contiguous
1997+
if indices.value(indices.len() - 1) != start + len - 1 {
1998+
return None;
1999+
}
2000+
// Verify every element is sequential (handles duplicates and gaps)
2001+
let values = indices.values();
2002+
for i in 1..values.len() {
2003+
if values[i] != start + i as u64 {
2004+
return None;
2005+
}
2006+
}
2007+
Some(start as usize..(start + len) as usize)
2008+
}
2009+
19752010
/// Get `buffered_indices` rows for `buffered_data[buffered_batch_idx]` by specific column indices
19762011
#[inline(always)]
19772012
fn fetch_right_columns_by_idxs(
@@ -1992,12 +2027,21 @@ fn fetch_right_columns_from_batch_by_idxs(
19922027
) -> Result<Vec<ArrayRef>> {
19932028
match &buffered_batch.batch {
19942029
// In memory batch
1995-
BufferedBatchState::InMemory(batch) => Ok(batch
1996-
.columns()
1997-
.iter()
1998-
.map(|column| take(column, &buffered_indices, None))
1999-
.collect::<Result<Vec<_>, ArrowError>>()
2000-
.map_err(Into::<DataFusionError>::into)?),
2030+
// In memory batch
2031+
BufferedBatchState::InMemory(batch) => {
2032+
// When indices form a contiguous range (common in SMJ since the
2033+
// buffered side is scanned sequentially), use zero-copy slice.
2034+
if let Some(range) = is_contiguous_range(buffered_indices) {
2035+
Ok(batch.slice(range.start, range.len()).columns().to_vec())
2036+
} else {
2037+
Ok(batch
2038+
.columns()
2039+
.iter()
2040+
.map(|column| take(column, buffered_indices, None))
2041+
.collect::<Result<Vec<_>, ArrowError>>()
2042+
.map_err(Into::<DataFusionError>::into)?)
2043+
}
2044+
}
20012045
// If the batch was spilled to disk, less likely
20022046
BufferedBatchState::Spilled(spill_file) => {
20032047
let mut buffered_cols: Vec<ArrayRef> =

0 commit comments

Comments
 (0)