Skip to content

Commit d4d43cd

Browse files
committed
Use comparators instead of create_key for LastRowByUniqueKeyExecStream to speed it up
1 parent e8137bb commit d4d43cd

1 file changed

Lines changed: 22 additions & 26 deletions

File tree

datafusion/src/physical_plan/merge_sort.rs

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::task::{Context, Poll};
2525
use futures::stream::{Fuse, Stream};
2626
use futures::StreamExt;
2727

28-
use arrow::array::{ArrayRef, BooleanArray};
28+
use arrow::array::{ArrayRef, BooleanArray, build_compare, DynComparator};
2929
pub use arrow::compute::SortOptions;
3030
use arrow::compute::{
3131
filter_record_batch, lexsort_to_indices, take, SortColumn, TakeOptions,
@@ -40,13 +40,10 @@ use crate::physical_plan::{ExecutionPlan, OptimizerHints, Partitioning};
4040

4141
use crate::cube_ext::util::{cmp_array_row_same_types, lexcmp_array_rows};
4242
use crate::physical_plan::expressions::Column;
43-
use crate::physical_plan::hash_aggregate::create_key;
4443
use crate::physical_plan::memory::MemoryStream;
4544
use arrow::array::{make_array, MutableArrayData};
4645
use async_trait::async_trait;
47-
use core::mem;
4846
use futures::future::join_all;
49-
use smallvec::SmallVec;
5047
use std::cmp::{Ordering, Reverse};
5148
use std::collections::BinaryHeap;
5249

@@ -657,6 +654,15 @@ struct LastRowByUniqueKeyExecStream {
657654
}
658655

659656
impl LastRowByUniqueKeyExecStream {
657+
fn row_equals(comparators: &Vec<DynComparator>, a: usize, b: usize) -> bool {
658+
for comparator in comparators.iter().rev() {
659+
if comparator(a, b) != Ordering::Equal {
660+
return false;
661+
}
662+
}
663+
true
664+
}
665+
660666
fn keep_only_last_rows_by_key(
661667
&mut self,
662668
next_batch: Option<RecordBatch>,
@@ -669,36 +675,26 @@ impl LastRowByUniqueKeyExecStream {
669675
.iter()
670676
.map(|k| batch.column(k.index()).clone())
671677
.collect::<Vec<ArrayRef>>();
672-
let mut next_key = SmallVec::new();
673-
let mut current_row_key = None;
674678
let mut requires_filtering = false;
675-
for i in 0..num_rows + 1 {
676-
if i == num_rows && next_batch.is_none() {
677-
builder.append_value(true)?;
678-
continue;
679-
} else if i == num_rows {
679+
let self_column_comparators = key_columns.iter().map(|c| build_compare(c.as_ref(), c.as_ref())).collect::<ArrowResult<Vec<_>>>()?;
680+
for i in 0..num_rows {
681+
let filter_value = if i == num_rows - 1 && next_batch.is_none() {
682+
true
683+
} else if i == num_rows - 1 {
680684
let next_key_columns = self
681685
.unique_key
682686
.iter()
683687
.map(|k| next_batch.as_ref().unwrap().column(k.index()).clone())
684688
.collect::<Vec<ArrayRef>>();
685-
// TODO replace create_key with special compare with next kernel
686-
create_key(next_key_columns.as_slice(), 0, &mut next_key)
687-
.map_err(DataFusionError::into_arrow_external_error)?;
689+
let next_column_comparators = key_columns.iter().zip(next_key_columns.iter()).map(|(c, n)| build_compare(c.as_ref(), n.as_ref())).collect::<ArrowResult<Vec<_>>>()?;
690+
!Self::row_equals(&next_column_comparators, i, 0)
688691
} else {
689-
create_key(key_columns.as_slice(), i, &mut next_key)
690-
.map_err(DataFusionError::into_arrow_external_error)?;
691-
}
692-
if current_row_key.is_some() {
693-
let filter_value = current_row_key.as_ref().unwrap() != &next_key;
694-
if !filter_value {
695-
requires_filtering = true;
696-
}
697-
builder.append_value(filter_value)?;
692+
!Self::row_equals(&self_column_comparators, i, i + 1)
693+
};
694+
if !filter_value {
695+
requires_filtering = true;
698696
}
699-
let mut to_update = SmallVec::new();
700-
mem::swap(&mut to_update, &mut next_key);
701-
current_row_key = Some(to_update);
697+
builder.append_value(filter_value)?;
702698
}
703699
self.current_record_batch = next_batch;
704700
if requires_filtering {

0 commit comments

Comments
 (0)