Skip to content

Commit 2e4ca5b

Browse files
fvaleyertyler
authored andcommitted
perf: short-circuit no-predicate cached file replay
Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
1 parent e38a08d commit 2e4ca5b

1 file changed

Lines changed: 79 additions & 0 deletions

File tree

  • crates/core/src/kernel/snapshot

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,14 @@ impl Snapshot {
346346
log_store: &dyn LogStore,
347347
predicate: Option<PredicateRef>,
348348
) -> SendableRBStream {
349+
// Avoid the `stats_parsed -> JSON -> stats_parsed` roundtrip that
350+
// `scan_metadata_from` forces on a no-predicate cache replay.
351+
if predicate.is_none()
352+
&& let Some(cached) = self.cached_parsed_batches()
353+
{
354+
return cached;
355+
}
356+
349357
match self
350358
.materialized_files()
351359
.and_then(|materialized_files| materialized_files.full_table_seed())
@@ -365,6 +373,22 @@ impl Snapshot {
365373
}
366374
}
367375

376+
fn cached_parsed_batches(&self) -> Option<SendableRBStream> {
377+
let materialized = self.materialized_files()?;
378+
if materialized.existing_predicate.is_some() {
379+
return None;
380+
}
381+
match materialized.scope {
382+
MaterializedFilesScope::FullTable => {
383+
let batches = Arc::clone(&materialized.batches);
384+
Some(
385+
futures::stream::iter((0..batches.len()).map(move |i| Ok(batches[i].clone())))
386+
.boxed(),
387+
)
388+
}
389+
}
390+
}
391+
368392
fn files_with_engine(
369393
&self,
370394
engine: Arc<dyn Engine>,
@@ -1820,4 +1844,59 @@ mod tests {
18201844
);
18211845
Ok(())
18221846
}
1847+
1848+
#[tokio::test]
1849+
async fn test_cached_parsed_batches_short_circuit_guards() -> TestResult {
1850+
let base = TestTables::Checkpoints.table_builder()?.build_storage()?;
1851+
1852+
let plain = Snapshot::try_new(base.as_ref(), Default::default(), Some(12)).await?;
1853+
assert!(plain.cached_parsed_batches().is_none());
1854+
1855+
let eager = EagerSnapshot::try_new(base.as_ref(), Default::default(), Some(12)).await?;
1856+
let cached_stream = eager
1857+
.snapshot()
1858+
.cached_parsed_batches()
1859+
.expect("materialized full-table cache -> Some");
1860+
let cached_batches: Vec<_> = cached_stream.try_collect().await?;
1861+
let direct_batches = eager
1862+
.snapshot()
1863+
.materialized_files()
1864+
.expect("materialized cache present")
1865+
.batches
1866+
.clone();
1867+
assert_eq!(cached_batches.len(), direct_batches.len());
1868+
for (a, b) in cached_batches.iter().zip(direct_batches.iter()) {
1869+
assert_eq!(a.num_rows(), b.num_rows());
1870+
assert_eq!(a.schema(), b.schema());
1871+
}
1872+
1873+
Ok(())
1874+
}
1875+
1876+
#[tokio::test]
1877+
async fn test_file_views_no_predicate_matches_fresh_replay() -> TestResult {
1878+
let base = TestTables::Checkpoints.table_builder()?.build_storage()?;
1879+
1880+
let eager = EagerSnapshot::try_new(base.as_ref(), Default::default(), Some(12)).await?;
1881+
let eager_paths: Vec<String> = eager
1882+
.file_views(base.as_ref(), None)
1883+
.map_ok(|view| view.path_raw().to_string())
1884+
.try_collect()
1885+
.await?;
1886+
1887+
let plain = Snapshot::try_new(base.as_ref(), Default::default(), Some(12)).await?;
1888+
let plain_paths: Vec<String> = plain
1889+
.file_views(base.as_ref(), None)
1890+
.map_ok(|view| view.path_raw().to_string())
1891+
.try_collect()
1892+
.await?;
1893+
1894+
assert_eq!(
1895+
eager_paths, plain_paths,
1896+
"short-circuit cache replay must yield the same files in the same \
1897+
order as a fresh kernel replay with predicate = None",
1898+
);
1899+
1900+
Ok(())
1901+
}
18231902
}

0 commit comments

Comments
 (0)