Skip to content

Commit 2f24256

Browse files
fvaleyertyler
authored andcommitted
perf(core): wire kernel skip_stats for no-predicate file listing
Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
1 parent fa0010e commit 2f24256

4 files changed

Lines changed: 163 additions & 13 deletions

File tree

crates/core/src/kernel/snapshot/iterators/scan_row.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,27 @@ pin_project! {
3333
stats_schema: KernelSchemaRef,
3434
partitions_schema: Option<KernelSchemaRef>,
3535
column_mapping_mode: ColumnMappingMode,
36+
skip_stats: bool,
3637

3738
#[pin]
3839
stream: S,
3940
}
4041
}
4142

4243
impl<S> ScanRowOutStream<S> {
43-
pub fn try_new(snapshot: Arc<KernelSnapshot>, stream: S) -> DeltaResult<Self> {
44+
pub fn try_new(
45+
snapshot: Arc<KernelSnapshot>,
46+
stream: S,
47+
skip_stats: bool,
48+
) -> DeltaResult<Self> {
4449
let stats_schema = snapshot.stats_schema()?;
4550
let partitions_schema = snapshot.partitions_schema()?;
4651
let column_mapping_mode = snapshot.table_configuration().column_mapping_mode();
4752
Ok(Self {
4853
stats_schema,
4954
partitions_schema,
5055
column_mapping_mode,
56+
skip_stats,
5157
stream,
5258
})
5359
}
@@ -68,6 +74,7 @@ where
6874
this.stats_schema.clone(),
6975
this.partitions_schema.as_ref(),
7076
*this.column_mapping_mode,
77+
*this.skip_stats,
7178
);
7279
Poll::Ready(Some(result))
7380
}
@@ -121,6 +128,7 @@ pub(crate) fn parse_stats_column_with_schema(
121128
stats_schema,
122129
partitions_schema.as_ref(),
123130
column_mapping_mode,
131+
false,
124132
)
125133
}
126134

@@ -129,6 +137,7 @@ fn parse_stats_column_impl(
129137
stats_schema: KernelSchemaRef,
130138
partitions_schema: Option<&KernelSchemaRef>,
131139
column_mapping_mode: ColumnMappingMode,
140+
skip_stats: bool,
132141
) -> DeltaResult<RecordBatch> {
133142
let Some((stats_idx, _)) = batch.schema_ref().column_with_name("stats") else {
134143
return Err(DeltaTableError::SchemaMismatch {
@@ -139,13 +148,24 @@ fn parse_stats_column_impl(
139148
let mut columns = batch.columns().to_vec();
140149
let mut fields = batch.schema().fields().to_vec();
141150

142-
let stats_batch = batch.project(&[stats_idx])?;
143-
let stats_data = Box::new(ArrowEngineData::new(stats_batch));
144-
145-
let parsed = parse_json(stats_data, stats_schema)?;
146-
let parsed: RecordBatch = ArrowEngineData::try_from_engine_data(parsed)?.into();
147-
148-
let stats_array: Arc<StructArray> = Arc::new(parsed.into());
151+
let stats_array: Arc<StructArray> = if skip_stats {
152+
// `parse_json` on a null `stats` column still runs the full JSON
153+
// machinery and produces `{}` structs, not nulls: cancels the
154+
// skip_stats win. Build the fully-null `StructArray` directly instead.
155+
let arrow_struct: arrow_schema::Schema = stats_schema.as_ref().try_into_arrow()?;
156+
Arc::new(StructArray::new_null(
157+
arrow_struct.fields().clone(),
158+
batch.num_rows(),
159+
))
160+
} else {
161+
let stats_batch = batch.project(&[stats_idx])?;
162+
let stats_data = Box::new(ArrowEngineData::new(stats_batch));
163+
164+
let parsed = parse_json(stats_data, stats_schema)?;
165+
let parsed: RecordBatch = ArrowEngineData::try_from_engine_data(parsed)?.into();
166+
167+
Arc::new(parsed.into())
168+
};
149169
fields[stats_idx] = Arc::new(Field::new(
150170
"stats_parsed",
151171
stats_array.data_type().to_owned(),

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 106 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,14 @@ impl Snapshot {
394394
engine: Arc<dyn Engine>,
395395
predicate: Option<PredicateRef>,
396396
) -> SendableRBStream {
397-
let scan = match self.scan_builder().with_predicate(predicate).build() {
397+
self.warn_if_skip_stats_with_predicate(&predicate);
398+
let skip_stats = predicate.is_none() && self.config.skip_stats;
399+
let scan = match self
400+
.scan_builder()
401+
.with_predicate(predicate)
402+
.with_skip_stats(skip_stats)
403+
.build()
404+
{
398405
Ok(scan) => scan,
399406
Err(err) => return Box::pin(once(ready(Err(err)))),
400407
};
@@ -403,12 +410,23 @@ impl Snapshot {
403410
.scan_metadata(engine)
404411
.map(|d| Ok(rb_from_scan_meta(d?)?));
405412

406-
match ScanRowOutStream::try_new(self.inner.clone(), stream) {
413+
match ScanRowOutStream::try_new(self.inner.clone(), stream, skip_stats) {
407414
Ok(s) => s.boxed(),
408415
Err(err) => Box::pin(once(ready(Err(err)))),
409416
}
410417
}
411418

419+
fn warn_if_skip_stats_with_predicate(&self, predicate: &Option<PredicateRef>) {
420+
if self.config.skip_stats && predicate.is_some() {
421+
tracing::warn!(
422+
"`DeltaTable` was opened with `skip_stats=true`, but this query has \
423+
a predicate. Every file in the table will be scanned. To avoid \
424+
this, open a separate `DeltaTable` without `skip_stats=true` for \
425+
query workloads."
426+
);
427+
}
428+
}
429+
412430
pub(crate) fn files_from<T: Iterator<Item = RecordBatch> + Send + 'static>(
413431
&self,
414432
engine: Arc<dyn Engine>,
@@ -417,7 +435,14 @@ impl Snapshot {
417435
existing_data: Box<T>,
418436
existing_predicate: Option<PredicateRef>,
419437
) -> SendableRBStream {
420-
let scan = match self.scan_builder().with_predicate(predicate).build() {
438+
self.warn_if_skip_stats_with_predicate(&predicate);
439+
let skip_stats = predicate.is_none() && self.config.skip_stats;
440+
let scan = match self
441+
.scan_builder()
442+
.with_predicate(predicate)
443+
.with_skip_stats(skip_stats)
444+
.build()
445+
{
421446
Ok(scan) => scan,
422447
Err(err) => return Box::pin(once(ready(Err(err)))),
423448
};
@@ -426,7 +451,7 @@ impl Snapshot {
426451
.scan_metadata_from(engine, existing_version, existing_data, existing_predicate)
427452
.map(|d| Ok(rb_from_scan_meta(d?)?));
428453

429-
match ScanRowOutStream::try_new(self.inner.clone(), stream) {
454+
match ScanRowOutStream::try_new(self.inner.clone(), stream, skip_stats) {
430455
Ok(s) => s.boxed(),
431456
Err(err) => Box::pin(once(ready(Err(err)))),
432457
}
@@ -1046,7 +1071,7 @@ mod tests {
10461071
// use super::replay::tests::test_log_replay;
10471072
use super::*;
10481073
use crate::{
1049-
DeltaTable, checkpoints,
1074+
DeltaTable, DeltaTableConfig, checkpoints,
10501075
kernel::transaction::CommitData,
10511076
kernel::transaction::{CommitBuilder, TableReference},
10521077
kernel::{Action, DataType, PrimitiveType, StructField, StructType},
@@ -1432,6 +1457,82 @@ mod tests {
14321457
Ok(())
14331458
}
14341459

1460+
#[tokio::test]
1461+
async fn test_file_views_skip_stats_same_paths() -> TestResult {
1462+
let base = TestTables::Checkpoints.table_builder()?.build_storage()?;
1463+
let mut skip_cfg = DeltaTableConfig::default();
1464+
skip_cfg.skip_stats = true;
1465+
let with_skip = EagerSnapshot::try_new(base.as_ref(), skip_cfg, Some(12)).await?;
1466+
let full = EagerSnapshot::try_new(base.as_ref(), Default::default(), Some(12)).await?;
1467+
let mut paths_skip: Vec<String> = with_skip
1468+
.file_views(base.as_ref(), None)
1469+
.map_ok(|v| v.path().to_string())
1470+
.try_collect()
1471+
.await?;
1472+
let mut paths_full: Vec<String> = full
1473+
.file_views(base.as_ref(), None)
1474+
.map_ok(|v| v.path().to_string())
1475+
.try_collect()
1476+
.await?;
1477+
paths_skip.sort();
1478+
paths_full.sort();
1479+
assert_eq!(paths_skip, paths_full);
1480+
Ok(())
1481+
}
1482+
1483+
#[tokio::test]
1484+
async fn test_skip_stats_leaves_stats_parsed_null() -> TestResult {
1485+
let base = TestTables::Checkpoints.table_builder()?.build_storage()?;
1486+
1487+
let default_eager =
1488+
EagerSnapshot::try_new(base.as_ref(), Default::default(), Some(12)).await?;
1489+
let default_stats: Vec<bool> = default_eager
1490+
.file_views(base.as_ref(), None)
1491+
.map_ok(|view| view.stats().is_some())
1492+
.try_collect()
1493+
.await?;
1494+
assert!(!default_stats.is_empty());
1495+
assert!(default_stats.iter().any(|b| *b));
1496+
1497+
let mut skip_cfg = DeltaTableConfig::default();
1498+
skip_cfg.skip_stats = true;
1499+
let skip_eager = EagerSnapshot::try_new(base.as_ref(), skip_cfg, Some(12)).await?;
1500+
let skip_stats: Vec<Option<String>> = skip_eager
1501+
.file_views(base.as_ref(), None)
1502+
.map_ok(|view| view.stats())
1503+
.try_collect()
1504+
.await?;
1505+
assert!(!skip_stats.is_empty());
1506+
assert!(skip_stats.iter().all(|s| s.is_none()));
1507+
1508+
Ok(())
1509+
}
1510+
1511+
#[tokio::test]
1512+
async fn test_skip_stats_bypassed_when_predicate_present() -> TestResult {
1513+
use delta_kernel::expressions::Scalar;
1514+
1515+
let base = TestTables::Checkpoints.table_builder()?.build_storage()?;
1516+
1517+
let mut skip_cfg = DeltaTableConfig::default();
1518+
skip_cfg.skip_stats = true;
1519+
let snapshot = Snapshot::try_new(base.as_ref(), skip_cfg, Some(12)).await?;
1520+
1521+
let predicate: PredicateRef =
1522+
Arc::new(Expression::column(["value"]).gt(Scalar::String("".to_string())));
1523+
1524+
let has_stats: Vec<bool> = snapshot
1525+
.file_views(base.as_ref(), Some(predicate))
1526+
.map_ok(|view| view.stats().is_some())
1527+
.try_collect()
1528+
.await?;
1529+
1530+
assert!(!has_stats.is_empty());
1531+
assert!(has_stats.iter().any(|b| *b));
1532+
1533+
Ok(())
1534+
}
1535+
14351536
#[test]
14361537
fn test_materialized_files_full_table_seed_shares_batches() {
14371538
let batch = RecordBatch::new_empty(Arc::new(arrow_schema::Schema::empty()));

crates/core/src/kernel/snapshot/scan.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,17 @@ impl ScanBuilder {
6565
self
6666
}
6767

68+
/// Skip parsing file-level statistics during kernel log replay.
69+
///
70+
/// When `true`, per-file min/max/null stats are not parsed; `stats_parsed` in scan
71+
/// output may be null. Partition-based filtering still applies. When combined with a
72+
/// non-empty predicate, the kernel cannot use stats for data skipping; prefer `false`
73+
/// when you need predicate-based file pruning from statistics.
74+
pub fn with_skip_stats(mut self, skip_stats: bool) -> Self {
75+
self.inner = self.inner.with_skip_stats(skip_stats);
76+
self
77+
}
78+
6879
pub fn build(self) -> DeltaResult<Scan> {
6980
Ok(Scan::from(self.inner.build()?))
7081
}

crates/core/src/table/builder.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ pub struct DeltaTableConfig {
5353
/// when processing record batches.
5454
pub log_batch_size: usize,
5555

56+
/// Skip parsing per-file statistics while opening the table.
57+
/// This defaults to `false`.
58+
///
59+
/// Use for workflows that never need file pruning (vacuum, filesystem check,
60+
/// append-only writes). Any predicated query on this instance will scan every
61+
/// file because the cache has no stats. Partition pruning is unaffected.
62+
#[serde(default)]
63+
pub skip_stats: bool,
64+
5665
#[serde(skip_serializing, skip_deserializing)]
5766
#[delta(skip)]
5867
/// When a runtime handler is provided, all IO tasks are spawn in that handle
@@ -65,6 +74,7 @@ impl Default for DeltaTableConfig {
6574
require_files: true,
6675
log_buffer_size: num_cpus::get() * 4,
6776
log_batch_size: 1024,
77+
skip_stats: false,
6878
io_runtime: None,
6979
}
7080
}
@@ -75,6 +85,7 @@ impl PartialEq for DeltaTableConfig {
7585
self.require_files == other.require_files
7686
&& self.log_buffer_size == other.log_buffer_size
7787
&& self.log_batch_size == other.log_batch_size
88+
&& self.skip_stats == other.skip_stats
7889
}
7990
}
8091

@@ -129,6 +140,13 @@ impl DeltaTableBuilder {
129140
self
130141
}
131142

143+
/// Sets `skip_stats` to the builder. See [`DeltaTableConfig::skip_stats`]
144+
/// for the impact on predicated queries.
145+
pub fn with_skip_stats(mut self, skip_stats: bool) -> Self {
146+
self.table_config.skip_stats = skip_stats;
147+
self
148+
}
149+
132150
/// Sets `version` to the builder
133151
pub fn with_version(mut self, version: Version) -> Self {
134152
self.version = DeltaVersion::Version(version);

0 commit comments

Comments
 (0)