Skip to content

Commit 27d7b57

Browse files
committed
Add exact reverse scan with per-RG buffering and tests
When exact_reverse is enabled via with_exact_reverse(true) on ParquetSource: - try_reverse_output returns Exact (Sort operator removed, fetch pushdown enabled) - ReversedRowGroupStream buffers batches per row group, reverses batch order, and reverses rows within each batch. Memory: O(largest_row_group). - Default (exact_reverse=false) returns Inexact (backward compatible) Row reversal is done in ReversedRowGroupStream (per-RG buffer), NOT in the per-batch map closure. This ensures correct ordering across batch boundaries within a row group. Tests added: - test_exact_reverse_scan_per_rg_buffer: multi-RG, small batch_size, verifies [6,5,4,3,2,1] - test_inexact_reverse_scan_preserves_row_order: verifies [4,5,6,1,2,3] - test_reversed_row_group_stream_standalone: unit test for ReversedRowGroupStream - test_exact_reverse_returns_exact: option returns Exact - test_default_returns_inexact: default returns Inexact Based on the approach from apache#18817.
1 parent 6d2c1cf commit 27d7b57

3 files changed

Lines changed: 285 additions & 15 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 155 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
2929
use datafusion_physical_expr::projection::ProjectionExprs;
3030
use datafusion_physical_expr::utils::reassign_expr_columns;
3131
use datafusion_physical_expr_adapter::replace_columns_with_literals;
32-
use std::collections::HashMap;
32+
use std::collections::{HashMap, VecDeque};
3333
use std::pin::Pin;
3434
use std::sync::Arc;
3535
use std::task::{Context, Poll};
@@ -118,6 +118,8 @@ pub(super) struct ParquetOpener {
118118
pub max_predicate_cache_size: Option<usize>,
119119
/// Whether to read row groups in reverse order
120120
pub reverse_row_groups: bool,
121+
/// Whether to reverse rows within each batch (for Exact reverse scan)
122+
pub reverse_rows: bool,
121123
}
122124

123125
/// Represents a prepared access plan with optional row selection
@@ -274,6 +276,7 @@ impl FileOpener for ParquetOpener {
274276
let max_predicate_cache_size = self.max_predicate_cache_size;
275277

276278
let reverse_row_groups = self.reverse_row_groups;
279+
let reverse_rows = self.reverse_rows;
277280
Ok(Box::pin(async move {
278281
#[cfg(feature = "parquet_encryption")]
279282
let file_decryption_properties = encryption_context
@@ -555,6 +558,18 @@ impl FileOpener for ParquetOpener {
555558
prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
556559
}
557560

561+
// Collect per-RG row counts for exact reverse buffering
562+
let rg_row_counts: Vec<usize> = if reverse_rows {
563+
let rg_metadata = file_metadata.row_groups();
564+
prepared_plan
565+
.row_group_indexes
566+
.iter()
567+
.map(|&idx| rg_metadata[idx].num_rows() as usize)
568+
.collect()
569+
} else {
570+
vec![]
571+
};
572+
558573
// Apply the prepared plan to the builder
559574
builder = prepared_plan.apply_to_builder(builder);
560575

@@ -601,6 +616,9 @@ impl FileOpener for ParquetOpener {
601616
&predicate_cache_inner_records,
602617
&predicate_cache_records,
603618
);
619+
// Note: per-batch row reversal is handled by ReversedRowGroupStream
620+
// (wraps the stream below), NOT here. Reversing per-batch here would
621+
// double-reverse when combined with the RG-level buffer+reverse.
604622
b = projector.project_batch(&b)?;
605623
if replace_schema {
606624
// Ensure the output batch has the expected schema.
@@ -677,6 +695,15 @@ impl FileOpener for ParquetOpener {
677695
})
678696
});
679697

698+
// When exact reverse is enabled, wrap the stream to buffer
699+
// and reverse rows per row group. Memory cost: O(largest_RG).
700+
let stream: futures::stream::BoxStream<'static, Result<RecordBatch>> =
701+
if reverse_rows {
702+
ReversedRowGroupStream::new(stream, rg_row_counts).boxed()
703+
} else {
704+
stream.boxed()
705+
};
706+
680707
if let Some(file_pruner) = file_pruner {
681708
Ok(EarlyStoppingStream::new(
682709
stream,
@@ -691,6 +718,127 @@ impl FileOpener for ParquetOpener {
691718
}
692719
}
693720

721+
/// Buffers batches per row group, then emits them in reversed order with
722+
/// reversed rows within each batch. Memory: O(largest row group).
723+
///
724+
/// The input stream has row groups already in reversed order (via
725+
/// `PreparedAccessPlan::reverse`). This stream reverses the row order
726+
/// *within* each row group so the final output is in exact descending order.
727+
struct ReversedRowGroupStream<S> {
728+
inner: S,
729+
/// Number of rows in each row group (in read order, already reversed)
730+
rg_row_counts: Vec<usize>,
731+
/// Index of the current row group being buffered
732+
current_rg: usize,
733+
/// Rows remaining in the current row group
734+
rows_remaining_in_rg: usize,
735+
/// Buffered batches for the current row group
736+
buffer: Vec<RecordBatch>,
737+
/// Reversed batches ready to emit
738+
output_buffer: VecDeque<RecordBatch>,
739+
/// Whether the inner stream is exhausted
740+
done: bool,
741+
}
742+
743+
impl<S> ReversedRowGroupStream<S> {
744+
fn new(inner: S, rg_row_counts: Vec<usize>) -> Self {
745+
let rows_remaining = rg_row_counts.first().copied().unwrap_or(0);
746+
Self {
747+
inner,
748+
rg_row_counts,
749+
current_rg: 0,
750+
rows_remaining_in_rg: rows_remaining,
751+
buffer: Vec::new(),
752+
output_buffer: VecDeque::new(),
753+
done: false,
754+
}
755+
}
756+
757+
/// Reverse the buffered batches: reverse batch order, reverse rows
758+
/// within each batch, and move them to output_buffer.
759+
fn flush_buffer(&mut self) -> Result<()> {
760+
let batches = std::mem::take(&mut self.buffer);
761+
for batch in batches.into_iter().rev() {
762+
if batch.num_rows() <= 1 {
763+
self.output_buffer.push_back(batch);
764+
continue;
765+
}
766+
let indices = arrow::array::UInt32Array::from_iter_values(
767+
(0..batch.num_rows() as u32).rev(),
768+
);
769+
let reversed = arrow::compute::take_record_batch(&batch, &indices)?;
770+
self.output_buffer.push_back(reversed);
771+
}
772+
// Advance to next row group
773+
self.current_rg += 1;
774+
self.rows_remaining_in_rg = self
775+
.rg_row_counts
776+
.get(self.current_rg)
777+
.copied()
778+
.unwrap_or(0);
779+
Ok(())
780+
}
781+
}
782+
783+
impl<S> Stream for ReversedRowGroupStream<S>
784+
where
785+
S: Stream<Item = Result<RecordBatch>> + Unpin,
786+
{
787+
type Item = Result<RecordBatch>;
788+
789+
fn poll_next(
790+
mut self: Pin<&mut Self>,
791+
cx: &mut Context<'_>,
792+
) -> Poll<Option<Self::Item>> {
793+
use Poll;
794+
795+
// First, emit any already-reversed batches
796+
if let Some(batch) = self.output_buffer.pop_front() {
797+
return Poll::Ready(Some(Ok(batch)));
798+
}
799+
800+
if self.done {
801+
return Poll::Ready(None);
802+
}
803+
804+
// Pull batches from the inner stream until we complete a row group
805+
loop {
806+
match ready!(self.inner.poll_next_unpin(cx)) {
807+
Some(Ok(batch)) => {
808+
let num_rows = batch.num_rows();
809+
self.buffer.push(batch);
810+
self.rows_remaining_in_rg =
811+
self.rows_remaining_in_rg.saturating_sub(num_rows);
812+
813+
if self.rows_remaining_in_rg == 0 {
814+
// Row group complete — flush buffer
815+
if let Err(e) = self.flush_buffer() {
816+
return Poll::Ready(Some(Err(e)));
817+
}
818+
if let Some(batch) = self.output_buffer.pop_front() {
819+
return Poll::Ready(Some(Ok(batch)));
820+
}
821+
}
822+
}
823+
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
824+
None => {
825+
self.done = true;
826+
// Flush any remaining buffered batches
827+
if !self.buffer.is_empty()
828+
&& let Err(e) = self.flush_buffer()
829+
{
830+
return Poll::Ready(Some(Err(e)));
831+
}
832+
if let Some(batch) = self.output_buffer.pop_front() {
833+
return Poll::Ready(Some(Ok(batch)));
834+
}
835+
return Poll::Ready(None);
836+
}
837+
}
838+
}
839+
}
840+
}
841+
694842
/// Copies metrics from ArrowReaderMetrics (the metrics collected by the
695843
/// arrow-rs parquet reader) to the parquet file metrics for DataFusion
696844
fn copy_arrow_reader_metrics(
@@ -1027,6 +1175,7 @@ fn should_enable_page_index(
10271175

10281176
#[cfg(test)]
10291177
mod test {
1178+
use std::pin::Pin;
10301179
use std::sync::Arc;
10311180

10321181
use super::{ConstantColumns, constant_columns_from_stats};
@@ -1076,6 +1225,7 @@ mod test {
10761225
coerce_int96: Option<arrow::datatypes::TimeUnit>,
10771226
max_predicate_cache_size: Option<usize>,
10781227
reverse_row_groups: bool,
1228+
reverse_rows: bool,
10791229
}
10801230

10811231
impl ParquetOpenerBuilder {
@@ -1101,6 +1251,7 @@ mod test {
11011251
coerce_int96: None,
11021252
max_predicate_cache_size: None,
11031253
reverse_row_groups: false,
1254+
reverse_rows: false,
11041255
}
11051256
}
11061257

@@ -1208,6 +1359,7 @@ mod test {
12081359
encryption_factory: None,
12091360
max_predicate_cache_size: self.max_predicate_cache_size,
12101361
reverse_row_groups: self.reverse_row_groups,
1362+
reverse_rows: self.reverse_rows,
12111363
}
12121364
}
12131365
}
@@ -1296,7 +1448,7 @@ mod test {
12961448
}
12971449

12981450
async fn count_batches_and_rows(
1299-
mut stream: std::pin::Pin<
1451+
mut stream: Pin<
13001452
Box<
13011453
dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>>
13021454
+ Send,
@@ -1314,7 +1466,7 @@ mod test {
13141466

13151467
/// Helper to collect all int32 values from the first column of batches
13161468
async fn collect_int32_values(
1317-
mut stream: std::pin::Pin<
1469+
mut stream: Pin<
13181470
Box<
13191471
dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>>
13201472
+ Send,

0 commit comments

Comments
 (0)