Skip to content

Commit 9a3dd73

Browse files
committed
Add exact reverse scan with per-RG buffering and tests
When exact_reverse is enabled via with_exact_reverse(true) on ParquetSource: - try_reverse_output returns Exact (Sort operator removed, fetch pushdown enabled) - ReversedRowGroupStream buffers batches per row group, reverses batch order, and reverses rows within each batch. Memory: O(largest_row_group). - Default (exact_reverse=false) returns Inexact (backward compatible) Row reversal is done in ReversedRowGroupStream (per-RG buffer), NOT in the per-batch map closure. This ensures correct ordering across batch boundaries within a row group. Tests added: - test_exact_reverse_scan_per_rg_buffer: multi-RG, small batch_size, verifies [6,5,4,3,2,1] - test_inexact_reverse_scan_preserves_row_order: verifies [4,5,6,1,2,3] - test_reversed_row_group_stream_standalone: unit test for ReversedRowGroupStream - test_exact_reverse_returns_exact: option returns Exact - test_default_returns_inexact: default returns Inexact Based on the approach from apache#18817.
1 parent 6d2c1cf commit 9a3dd73

3 files changed

Lines changed: 309 additions & 62 deletions

File tree

datafusion/core/tests/physical_optimizer/pushdown_sort.rs

Lines changed: 32 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ fn test_sort_pushdown_basic_phase1() {
8484
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
8585
output:
8686
Ok:
87-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
88-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
87+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
8988
"
9089
);
9190
}
@@ -113,8 +112,7 @@ fn test_sort_with_limit_phase1() {
113112
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
114113
output:
115114
Ok:
116-
- SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
117-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
115+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
118116
"
119117
);
120118
}
@@ -144,8 +142,7 @@ fn test_sort_multiple_columns_phase1() {
144142
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
145143
output:
146144
Ok:
147-
- SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST], preserve_partitioning=[false]
148-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
145+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet, reverse_row_groups=true
149146
"
150147
);
151148
}
@@ -179,8 +176,7 @@ fn test_prefix_match_single_column() {
179176
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
180177
output:
181178
Ok:
182-
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
183-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
179+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet, reverse_row_groups=true
184180
"
185181
);
186182
}
@@ -213,8 +209,7 @@ fn test_prefix_match_with_limit() {
213209
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 DESC NULLS LAST, c@2 ASC], file_type=parquet
214210
output:
215211
Ok:
216-
- SortExec: TopK(fetch=100), expr=[a@0 DESC NULLS LAST, b@1 ASC], preserve_partitioning=[false]
217-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
212+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 DESC NULLS LAST, c@2 ASC], file_type=parquet, reverse_row_groups=true
218213
"
219214
);
220215
}
@@ -249,10 +244,9 @@ fn test_prefix_match_through_transparent_nodes() {
249244
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC, c@2 DESC NULLS LAST], file_type=parquet
250245
output:
251246
Ok:
252-
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
253-
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
254-
- CoalesceBatchesExec: target_batch_size=1024
255-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
247+
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
248+
- CoalesceBatchesExec: target_batch_size=1024
249+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC, c@2 DESC NULLS LAST], file_type=parquet, reverse_row_groups=true
256250
"
257251
);
258252
}
@@ -344,9 +338,8 @@ fn test_sort_through_coalesce_batches() {
344338
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
345339
output:
346340
Ok:
347-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
348-
- CoalesceBatchesExec: target_batch_size=1024
349-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
341+
- CoalesceBatchesExec: target_batch_size=1024
342+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
350343
"
351344
);
352345
}
@@ -373,9 +366,8 @@ fn test_sort_through_repartition() {
373366
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
374367
output:
375368
Ok:
376-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
377-
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
378-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
369+
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
370+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
379371
"
380372
);
381373
}
@@ -406,8 +398,7 @@ fn test_nested_sorts() {
406398
output:
407399
Ok:
408400
- SortExec: expr=[b@1 ASC], preserve_partitioning=[false]
409-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
410-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
401+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
411402
"
412403
);
413404
}
@@ -469,8 +460,8 @@ fn test_sort_through_coalesce_partitions() {
469460
Ok:
470461
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
471462
- CoalescePartitionsExec
472-
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
473-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
463+
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
464+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
474465
"
475466
);
476467
}
@@ -503,9 +494,9 @@ fn test_complex_plan_with_multiple_operators() {
503494
Ok:
504495
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
505496
- CoalescePartitionsExec
506-
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
497+
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
507498
- CoalesceBatchesExec: target_batch_size=1024
508-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
499+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
509500
"
510501
);
511502
}
@@ -538,8 +529,7 @@ fn test_multiple_sorts_different_columns() {
538529
output:
539530
Ok:
540531
- SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
541-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
542-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
532+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
543533
"
544534
);
545535
}
@@ -667,8 +657,7 @@ fn test_pushdown_through_blocking_node() {
667657
Ok:
668658
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
669659
- AggregateExec: mode=Final, gby=[a@0 as a], aggr=[COUNT(b)], ordering_mode=Sorted
670-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
671-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
660+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
672661
"
673662
);
674663
}
@@ -704,9 +693,8 @@ fn test_sort_pushdown_through_simple_projection() {
704693
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
705694
output:
706695
Ok:
707-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
708-
- ProjectionExec: expr=[a@0 as a, b@1 as b]
709-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
696+
- ProjectionExec: expr=[a@0 as a, b@1 as b]
697+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
710698
"
711699
);
712700
}
@@ -739,9 +727,8 @@ fn test_sort_pushdown_through_projection_with_alias() {
739727
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
740728
output:
741729
Ok:
742-
- SortExec: expr=[id@0 DESC NULLS LAST], preserve_partitioning=[false]
743-
- ProjectionExec: expr=[a@0 as id, b@1 as value]
744-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
730+
- ProjectionExec: expr=[a@0 as id, b@1 as value]
731+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
745732
"
746733
);
747734
}
@@ -828,9 +815,8 @@ fn test_sort_pushdown_projection_reordered_columns() {
828815
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
829816
output:
830817
Ok:
831-
- SortExec: expr=[a@2 DESC NULLS LAST], preserve_partitioning=[false]
832-
- ProjectionExec: expr=[c@2 as c, b@1 as b, a@0 as a]
833-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
818+
- ProjectionExec: expr=[c@2 as c, b@1 as b, a@0 as a]
819+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
834820
"
835821
);
836822
}
@@ -862,9 +848,8 @@ fn test_sort_pushdown_projection_with_limit() {
862848
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
863849
output:
864850
Ok:
865-
- SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
866-
- ProjectionExec: expr=[a@0 as a, b@1 as b]
867-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
851+
- ProjectionExec: expr=[a@0 as a, b@1 as b]
852+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
868853
"
869854
);
870855
}
@@ -899,10 +884,9 @@ fn test_sort_pushdown_through_projection_and_coalesce() {
899884
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
900885
output:
901886
Ok:
902-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
903-
- ProjectionExec: expr=[a@0 as a, b@1 as b]
904-
- CoalesceBatchesExec: target_batch_size=1024
905-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
887+
- ProjectionExec: expr=[a@0 as a, b@1 as b]
888+
- CoalesceBatchesExec: target_batch_size=1024
889+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
906890
"
907891
);
908892
}
@@ -935,9 +919,8 @@ fn test_sort_pushdown_projection_subset_of_columns() {
935919
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=parquet
936920
output:
937921
Ok:
938-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
939-
- ProjectionExec: expr=[a@0 as a]
940-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
922+
- ProjectionExec: expr=[a@0 as a]
923+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=parquet, reverse_row_groups=true
941924
"
942925
);
943926
}

0 commit comments

Comments
 (0)