@@ -25,7 +25,7 @@ use std::task::{Context, Poll};
2525use futures:: stream:: { Fuse , Stream } ;
2626use futures:: StreamExt ;
2727
28- use arrow:: array:: { ArrayRef , BooleanArray } ;
28+ use arrow:: array:: { build_compare , ArrayRef , BooleanArray , DynComparator } ;
2929pub use arrow:: compute:: SortOptions ;
3030use arrow:: compute:: {
3131 filter_record_batch, lexsort_to_indices, take, SortColumn , TakeOptions ,
@@ -40,13 +40,10 @@ use crate::physical_plan::{ExecutionPlan, OptimizerHints, Partitioning};
4040
4141use crate :: cube_ext:: util:: { cmp_array_row_same_types, lexcmp_array_rows} ;
4242use crate :: physical_plan:: expressions:: Column ;
43- use crate :: physical_plan:: hash_aggregate:: create_key;
4443use crate :: physical_plan:: memory:: MemoryStream ;
4544use arrow:: array:: { make_array, MutableArrayData } ;
4645use async_trait:: async_trait;
47- use core:: mem;
4846use futures:: future:: join_all;
49- use smallvec:: SmallVec ;
5047use std:: cmp:: { Ordering , Reverse } ;
5148use std:: collections:: BinaryHeap ;
5249
@@ -657,6 +654,15 @@ struct LastRowByUniqueKeyExecStream {
657654}
658655
659656impl LastRowByUniqueKeyExecStream {
657+ fn row_equals ( comparators : & Vec < DynComparator > , a : usize , b : usize ) -> bool {
658+ for comparator in comparators. iter ( ) . rev ( ) {
659+ if comparator ( a, b) != Ordering :: Equal {
660+ return false ;
661+ }
662+ }
663+ true
664+ }
665+
660666 fn keep_only_last_rows_by_key (
661667 & mut self ,
662668 next_batch : Option < RecordBatch > ,
@@ -669,36 +675,33 @@ impl LastRowByUniqueKeyExecStream {
669675 . iter ( )
670676 . map ( |k| batch. column ( k. index ( ) ) . clone ( ) )
671677 . collect :: < Vec < ArrayRef > > ( ) ;
672- let mut next_key = SmallVec :: new ( ) ;
673- let mut current_row_key = None ;
674678 let mut requires_filtering = false ;
675- for i in 0 ..num_rows + 1 {
676- if i == num_rows && next_batch. is_none ( ) {
677- builder. append_value ( true ) ?;
678- continue ;
679- } else if i == num_rows {
679+ let self_column_comparators = key_columns
680+ . iter ( )
681+ . map ( |c| build_compare ( c. as_ref ( ) , c. as_ref ( ) ) )
682+ . collect :: < ArrowResult < Vec < _ > > > ( ) ?;
683+ for i in 0 ..num_rows {
684+ let filter_value = if i == num_rows - 1 && next_batch. is_none ( ) {
685+ true
686+ } else if i == num_rows - 1 {
680687 let next_key_columns = self
681688 . unique_key
682689 . iter ( )
683690 . map ( |k| next_batch. as_ref ( ) . unwrap ( ) . column ( k. index ( ) ) . clone ( ) )
684691 . collect :: < Vec < ArrayRef > > ( ) ;
685- // TODO replace create_key with special compare with next kernel
686- create_key ( next_key_columns. as_slice ( ) , 0 , & mut next_key)
687- . map_err ( DataFusionError :: into_arrow_external_error) ?;
692+ let next_column_comparators = key_columns
693+ . iter ( )
694+ . zip ( next_key_columns. iter ( ) )
695+ . map ( |( c, n) | build_compare ( c. as_ref ( ) , n. as_ref ( ) ) )
696+ . collect :: < ArrowResult < Vec < _ > > > ( ) ?;
697+ !Self :: row_equals ( & next_column_comparators, i, 0 )
688698 } else {
689- create_key ( key_columns. as_slice ( ) , i, & mut next_key)
690- . map_err ( DataFusionError :: into_arrow_external_error) ?;
691- }
692- if current_row_key. is_some ( ) {
693- let filter_value = current_row_key. as_ref ( ) . unwrap ( ) != & next_key;
694- if !filter_value {
695- requires_filtering = true ;
696- }
697- builder. append_value ( filter_value) ?;
699+ !Self :: row_equals ( & self_column_comparators, i, i + 1 )
700+ } ;
701+ if !filter_value {
702+ requires_filtering = true ;
698703 }
699- let mut to_update = SmallVec :: new ( ) ;
700- mem:: swap ( & mut to_update, & mut next_key) ;
701- current_row_key = Some ( to_update) ;
704+ builder. append_value ( filter_value) ?;
702705 }
703706 self . current_record_batch = next_batch;
704707 if requires_filtering {
0 commit comments