Skip to content

Commit 65d3c5b

Browse files
authored
Last row by unique key exec implementation (#7)
* Last row by unique key exec implementation * Last row by unique key exec implementation: TODO * Allow to use MemoryExec inside of CubeTable plan * Use comparators instead of create_key for LastRowByUniqueKeyExecStream to speed it up
1 parent c51bba2 commit 65d3c5b

3 files changed

Lines changed: 250 additions & 8 deletions

File tree

datafusion/src/physical_plan/memory.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use async_trait::async_trait;
3535
use futures::Stream;
3636

3737
/// Execution plan for reading in-memory batches of data
38+
#[derive(Clone)]
3839
pub struct MemoryExec {
3940
/// The partitions to query
4041
partitions: Vec<Vec<RecordBatch>>,
@@ -76,12 +77,16 @@ impl ExecutionPlan for MemoryExec {
7677

7778
fn with_new_children(
7879
&self,
79-
_: Vec<Arc<dyn ExecutionPlan>>,
80+
children: Vec<Arc<dyn ExecutionPlan>>,
8081
) -> Result<Arc<dyn ExecutionPlan>> {
81-
Err(DataFusionError::Internal(format!(
82-
"Children cannot be replaced in {:?}",
83-
self
84-
)))
82+
if children.is_empty() {
83+
Ok(Arc::new(self.clone()))
84+
} else {
85+
Err(DataFusionError::Internal(format!(
86+
"Children cannot be replaced in {:?}",
87+
self
88+
)))
89+
}
8590
}
8691

8792
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {

datafusion/src/physical_plan/merge_sort.rs

Lines changed: 233 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ use std::task::{Context, Poll};
2525
use futures::stream::{Fuse, Stream};
2626
use futures::StreamExt;
2727

28-
use arrow::array::ArrayRef;
28+
use arrow::array::{build_compare, ArrayRef, BooleanArray, DynComparator};
2929
pub use arrow::compute::SortOptions;
30-
use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
30+
use arrow::compute::{
31+
filter_record_batch, lexsort_to_indices, take, SortColumn, TakeOptions,
32+
};
3133
use arrow::datatypes::SchemaRef;
3234
use arrow::error::Result as ArrowResult;
3335
use arrow::record_batch::RecordBatch;
@@ -551,6 +553,209 @@ impl RecordBatchStream for MergeSortStream {
551553
}
552554
}
553555

556+
/// Filter out all but last row by unique key execution plan
557+
#[derive(Debug)]
558+
pub struct LastRowByUniqueKeyExec {
559+
input: Arc<dyn ExecutionPlan>,
560+
/// Columns to sort on
561+
pub unique_key: Vec<Column>,
562+
}
563+
564+
impl LastRowByUniqueKeyExec {
565+
/// Create a new execution plan
566+
pub fn try_new(
567+
input: Arc<dyn ExecutionPlan>,
568+
unique_key: Vec<Column>,
569+
) -> Result<Self> {
570+
if unique_key.is_empty() {
571+
return Err(DataFusionError::Internal(
572+
"Empty unique_key passed for LastRowByUniqueKeyExec".to_string(),
573+
));
574+
}
575+
Ok(Self { input, unique_key })
576+
}
577+
578+
/// Input execution plan
579+
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
580+
&self.input
581+
}
582+
}
583+
584+
#[async_trait]
585+
impl ExecutionPlan for LastRowByUniqueKeyExec {
586+
fn as_any(&self) -> &dyn Any {
587+
self
588+
}
589+
590+
fn schema(&self) -> SchemaRef {
591+
self.input.schema()
592+
}
593+
594+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
595+
vec![self.input.clone()]
596+
}
597+
598+
fn output_partitioning(&self) -> Partitioning {
599+
Partitioning::UnknownPartitioning(1)
600+
}
601+
602+
fn with_new_children(
603+
&self,
604+
children: Vec<Arc<dyn ExecutionPlan>>,
605+
) -> Result<Arc<dyn ExecutionPlan>> {
606+
Ok(Arc::new(LastRowByUniqueKeyExec::try_new(
607+
children[0].clone(),
608+
self.unique_key.clone(),
609+
)?))
610+
}
611+
612+
fn output_hints(&self) -> OptimizerHints {
613+
OptimizerHints {
614+
single_value_columns: self.input.output_hints().single_value_columns,
615+
sort_order: self.input.output_hints().sort_order,
616+
}
617+
}
618+
619+
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
620+
if 0 != partition {
621+
return Err(DataFusionError::Internal(format!(
622+
"LastRowByUniqueKeyExec invalid partition {}",
623+
partition
624+
)));
625+
}
626+
627+
if self.input.output_partitioning().partition_count() != 1 {
628+
return Err(DataFusionError::Internal(format!(
629+
"LastRowByUniqueKeyExec expects only one partition but got {}",
630+
self.input.output_partitioning().partition_count()
631+
)));
632+
}
633+
let input_stream = self.input.execute(0).await?;
634+
635+
Ok(Box::pin(LastRowByUniqueKeyExecStream {
636+
schema: self.input.schema(),
637+
input: input_stream,
638+
unique_key: self.unique_key.clone(),
639+
current_record_batch: None,
640+
}))
641+
}
642+
}
643+
644+
/// Filter out all but last row by unique key stream
645+
struct LastRowByUniqueKeyExecStream {
646+
/// Output schema, which is the same as the input schema for this operator
647+
schema: SchemaRef,
648+
/// The input stream to filter.
649+
input: SendableRecordBatchStream,
650+
/// Key columns
651+
unique_key: Vec<Column>,
652+
/// Current Record Batch
653+
current_record_batch: Option<RecordBatch>,
654+
}
655+
656+
impl LastRowByUniqueKeyExecStream {
657+
fn row_equals(comparators: &Vec<DynComparator>, a: usize, b: usize) -> bool {
658+
for comparator in comparators.iter().rev() {
659+
if comparator(a, b) != Ordering::Equal {
660+
return false;
661+
}
662+
}
663+
true
664+
}
665+
666+
fn keep_only_last_rows_by_key(
667+
&mut self,
668+
next_batch: Option<RecordBatch>,
669+
) -> ArrowResult<RecordBatch> {
670+
let batch = self.current_record_batch.take().unwrap();
671+
let num_rows = batch.num_rows();
672+
let mut builder = BooleanArray::builder(num_rows);
673+
let key_columns = self
674+
.unique_key
675+
.iter()
676+
.map(|k| batch.column(k.index()).clone())
677+
.collect::<Vec<ArrayRef>>();
678+
let mut requires_filtering = false;
679+
let self_column_comparators = key_columns
680+
.iter()
681+
.map(|c| build_compare(c.as_ref(), c.as_ref()))
682+
.collect::<ArrowResult<Vec<_>>>()?;
683+
for i in 0..num_rows {
684+
let filter_value = if i == num_rows - 1 && next_batch.is_none() {
685+
true
686+
} else if i == num_rows - 1 {
687+
let next_key_columns = self
688+
.unique_key
689+
.iter()
690+
.map(|k| next_batch.as_ref().unwrap().column(k.index()).clone())
691+
.collect::<Vec<ArrayRef>>();
692+
let next_column_comparators = key_columns
693+
.iter()
694+
.zip(next_key_columns.iter())
695+
.map(|(c, n)| build_compare(c.as_ref(), n.as_ref()))
696+
.collect::<ArrowResult<Vec<_>>>()?;
697+
!Self::row_equals(&next_column_comparators, i, 0)
698+
} else {
699+
!Self::row_equals(&self_column_comparators, i, i + 1)
700+
};
701+
if !filter_value {
702+
requires_filtering = true;
703+
}
704+
builder.append_value(filter_value)?;
705+
}
706+
self.current_record_batch = next_batch;
707+
if requires_filtering {
708+
let filter_array = builder.finish();
709+
filter_record_batch(&batch, &filter_array)
710+
} else {
711+
Ok(batch)
712+
}
713+
}
714+
}
715+
716+
impl Stream for LastRowByUniqueKeyExecStream {
717+
type Item = ArrowResult<RecordBatch>;
718+
719+
fn poll_next(
720+
mut self: Pin<&mut Self>,
721+
cx: &mut Context<'_>,
722+
) -> Poll<Option<Self::Item>> {
723+
self.input.poll_next_unpin(cx).map(|x| {
724+
match x {
725+
Some(Ok(batch)) => {
726+
if self.current_record_batch.is_none() {
727+
let schema = batch.schema();
728+
self.current_record_batch = Some(batch);
729+
// TODO get rid of empty batch. Returning Poll::Pending here results in stuck stream.
730+
Some(Ok(RecordBatch::new_empty(schema)))
731+
} else {
732+
Some(self.keep_only_last_rows_by_key(Some(batch)))
733+
}
734+
}
735+
None => {
736+
if self.current_record_batch.is_some() {
737+
Some(self.keep_only_last_rows_by_key(None))
738+
} else {
739+
None
740+
}
741+
}
742+
other => other,
743+
}
744+
})
745+
}
746+
747+
fn size_hint(&self) -> (usize, Option<usize>) {
748+
let (lower, upper) = self.input.size_hint();
749+
(lower, upper.map(|u| u + 1))
750+
}
751+
}
752+
753+
impl RecordBatchStream for LastRowByUniqueKeyExecStream {
754+
fn schema(&self) -> SchemaRef {
755+
self.schema.clone()
756+
}
757+
}
758+
554759
#[cfg(test)]
555760
mod tests {
556761
use super::*;
@@ -1059,6 +1264,32 @@ mod tests {
10591264
)
10601265
}
10611266

1267+
#[tokio::test]
1268+
async fn last_row_by_unique_key_exec() {
1269+
let p1 = vec![
1270+
ints(vec![1, 1, 2, 3, 4, 5, 5, 6, 7]),
1271+
ints(vec![8, 9, 9, 10]),
1272+
ints(vec![11, 12, 13]),
1273+
];
1274+
1275+
let schema = ints_schema();
1276+
let inp = Arc::new(MemoryExec::try_new(&vec![p1], schema.clone(), None).unwrap());
1277+
let r = collect(Arc::new(
1278+
LastRowByUniqueKeyExec::try_new(inp, vec![col("a", &schema)]).unwrap(),
1279+
))
1280+
.await
1281+
.unwrap();
1282+
assert_eq!(
1283+
to_ints(r),
1284+
vec![
1285+
vec![],
1286+
vec![1, 2, 3, 4, 5, 6, 7],
1287+
vec![8, 9, 10],
1288+
vec![11, 12, 13]
1289+
]
1290+
);
1291+
}
1292+
10621293
fn test_merge(arrays: Vec<&ArrayRef>) -> ArrayRef {
10631294
let schema = Arc::new(Schema::new(vec![Field::new(
10641295
"a",

datafusion/src/physical_plan/planner.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ use crate::physical_plan::hash_join::HashJoinExec;
4141
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
4242
use crate::physical_plan::merge::MergeExec;
4343
use crate::physical_plan::merge_join::MergeJoinExec;
44-
use crate::physical_plan::merge_sort::{MergeReSortExec, MergeSortExec};
44+
use crate::physical_plan::merge_sort::{
45+
LastRowByUniqueKeyExec, MergeReSortExec, MergeSortExec,
46+
};
4547
use crate::physical_plan::projection::ProjectionExec;
4648
use crate::physical_plan::repartition::RepartitionExec;
4749
use crate::physical_plan::skip::SkipExec;
@@ -947,6 +949,10 @@ impl DefaultPhysicalPlanner {
947949
Some(node.clone())
948950
} else if let Some(aliased) = node.as_any().downcast_ref::<FilterExec>() {
949951
self.merge_sort_node(aliased.children()[0].clone())
952+
} else if let Some(aliased) =
953+
node.as_any().downcast_ref::<LastRowByUniqueKeyExec>()
954+
{
955+
self.merge_sort_node(aliased.children()[0].clone())
950956
} else if let Some(aliased) = node.as_any().downcast_ref::<ProjectionExec>() {
951957
// TODO
952958
self.merge_sort_node(aliased.children()[0].clone())

0 commit comments

Comments
 (0)