Skip to content

Commit a9ccffa

Browse files
ahirnerethan-tylerrtyler
authored
fix: DeltaScan (Next) schema overrides and file column projection (delta-io#4364)
# Description - preserve `DeltaScanConfig::schema` through delta scan planning and execution - __defensively__ avoid filter pushdown when schema overrides introduce logical/physical type mismatches - fix `file` column projection so selected and excluded projections behave correctly # Related Issue(s) - closes delta-io#4306 - closes delta-io#4307 I can split the PR but think that reviewing is more efficient in one go. --------- Signed-off-by: Alexander Hirner <a.hirner@gmail.com> Co-authored-by: Ethan Urbanski <ethanurbanski@gmail.com> Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
1 parent 2f24256 commit a9ccffa

5 files changed

Lines changed: 478 additions & 69 deletions

File tree

crates/core/src/delta_datafusion/table_provider/next/mod.rs

Lines changed: 249 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -287,9 +287,13 @@ impl DeltaScan {
287287
pub fn new(snapshot: impl Into<SnapshotWrapper>, config: DeltaScanConfig) -> Result<Self> {
288288
let snapshot = snapshot.into();
289289
let scan_schema = config.table_schema(snapshot.table_configuration())?;
290-
let full_schema = if config.retain_file_id() {
290+
let full_schema = if let Some(file_id_column) =
291+
config.projected_file_id_column(None, scan_schema.as_ref())
292+
{
291293
let mut fields = scan_schema.fields().to_vec();
292-
fields.push(config.file_id_field());
294+
fields.push(crate::delta_datafusion::file_id::file_id_field(Some(
295+
file_id_column,
296+
)));
293297
Arc::new(Schema::new(fields))
294298
} else {
295299
scan_schema.clone()
@@ -402,8 +406,8 @@ impl TableProvider for DeltaScan {
402406
.map(|_| self.scan_schema.fields().len());
403407
let kernel_projection = projection.map(|proj| {
404408
proj.iter()
405-
.filter(|&&idx| Some(idx) != file_id_idx)
406409
.copied()
410+
.filter(|&idx| Some(idx) != file_id_idx)
407411
.collect::<Vec<_>>()
408412
});
409413

@@ -420,6 +424,7 @@ impl TableProvider for DeltaScan {
420424
scan::execution_plan(
421425
&self.config,
422426
session,
427+
projection,
423428
scan_plan,
424429
stream,
425430
engine,
@@ -484,11 +489,17 @@ impl TableProvider for DeltaScan {
484489
#[cfg(test)]
485490
mod tests {
486491
use arrow::{
487-
array::Int64Array,
488-
datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema},
492+
array::{Date32Array, Int64Array, TimestampMillisecondArray},
493+
datatypes::{
494+
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, TimeUnit,
495+
},
489496
record_batch::RecordBatch,
490497
};
491-
use arrow_array::{DictionaryArray, UInt16Array, types::UInt16Type};
498+
use arrow_array::{
499+
DictionaryArray, UInt16Array,
500+
builder::{BinaryDictionaryBuilder, StringDictionaryBuilder},
501+
types::UInt16Type,
502+
};
492503
use datafusion::{
493504
catalog::Session,
494505
datasource::MemTable,
@@ -501,26 +512,27 @@ mod tests {
501512
use futures::{StreamExt as _, TryStreamExt as _};
502513
use parquet::file::reader::FileReader as _;
503514
use parquet::file::serialized_reader::SerializedFileReader;
504-
use std::fs::File;
515+
use std::{fs::File, sync::Arc};
505516
use url::Url;
506517

518+
use super::*;
507519
use crate::{
508520
assert_batches_sorted_eq,
509521
delta_datafusion::{DeltaScanConfig, session::create_session},
510522
kernel::{
511523
Action, DataType, EagerSnapshot, PrimitiveType, Snapshot, StructField, StructType,
512524
},
513525
logstore::get_actions,
526+
operations::create::CreateBuilder,
514527
test_utils::{
515528
TestResult, TestTables,
516529
object_store::{
517530
drain_recorded_object_store_operations as drain_recorded_ops, recording_log_store,
518531
},
532+
open_fs_path,
519533
},
520534
};
521535

522-
use super::*;
523-
524536
/// Extracts fields from the parquet scan
525537
#[derive(Default)]
526538
struct DeltaScanVisitor {
@@ -1355,4 +1367,232 @@ mod tests {
13551367

13561368
Ok(())
13571369
}
1370+
1371+
fn multi_partitioned_override_schema() -> Arc<ArrowSchema> {
1372+
Arc::new(ArrowSchema::new(vec![
1373+
ArrowField::new(
1374+
"letter",
1375+
ArrowDataType::Dictionary(
1376+
Box::new(ArrowDataType::UInt16),
1377+
Box::new(ArrowDataType::Utf8),
1378+
),
1379+
true,
1380+
),
1381+
ArrowField::new("date", ArrowDataType::Date32, true),
1382+
ArrowField::new(
1383+
"data",
1384+
ArrowDataType::Dictionary(
1385+
Box::new(ArrowDataType::UInt16),
1386+
Box::new(ArrowDataType::Binary),
1387+
),
1388+
true,
1389+
),
1390+
ArrowField::new(
1391+
"number",
1392+
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1393+
true,
1394+
),
1395+
]))
1396+
}
1397+
1398+
async fn provider_for_partitioned_table() -> TestResult<(
1399+
crate::DeltaTable,
1400+
Arc<crate::delta_datafusion::table_provider::next::DeltaScan>,
1401+
)> {
1402+
let mut table =
1403+
open_fs_path("../../dat/v0.0.3/reader_tests/generated/multi_partitioned/delta");
1404+
table.load().await?;
1405+
1406+
let provider = crate::delta_datafusion::table_provider::next::DeltaScan::new(
1407+
table.snapshot().unwrap().snapshot().clone(),
1408+
DeltaScanConfig::default().with_schema(multi_partitioned_override_schema()),
1409+
)?
1410+
.with_log_store(table.log_store());
1411+
1412+
Ok((table, Arc::new(provider)))
1413+
}
1414+
1415+
#[tokio::test]
1416+
async fn test_delta_scan_config_schema_override_scan() -> TestResult {
1417+
let (_table, provider) = provider_for_partitioned_table().await?;
1418+
1419+
let ctx = create_session().into_inner();
1420+
ctx.register_table("test_table", provider)?;
1421+
1422+
let df = ctx.sql("SELECT number FROM test_table").await?;
1423+
let batches = df.collect().await?;
1424+
1425+
assert_eq!(batches[0].columns().len(), 1);
1426+
assert_eq!(
1427+
batches[0].schema().fields()[0].data_type(),
1428+
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None)
1429+
);
1430+
1431+
Ok(())
1432+
}
1433+
1434+
#[tokio::test]
1435+
async fn test_delta_scan_config_schema_override_filter() -> TestResult {
1436+
let (_table, provider) = provider_for_partitioned_table().await?;
1437+
1438+
let ctx = create_session().into_inner();
1439+
ctx.register_table("test_table", provider)?;
1440+
1441+
let df = ctx
1442+
.sql("SELECT * FROM test_table WHERE number < '1970-01-01T00:00:00.007'")
1443+
.await?;
1444+
let batches = df.collect().await?;
1445+
1446+
assert_eq!(batches[0].num_rows(), 1);
1447+
1448+
Ok(())
1449+
}
1450+
1451+
#[tokio::test]
1452+
async fn test_delta_scan_config_schema_override_filter_aggregate() -> TestResult {
1453+
let (_table, provider) = provider_for_partitioned_table().await?;
1454+
1455+
let ctx = create_session().into_inner();
1456+
ctx.register_table("test_table", provider)?;
1457+
let query = "SELECT count(1) c, max(number) fake_ts FROM test_table WHERE letter != 'a' and number < '2020-01-01T00:00:00Z'";
1458+
let df = ctx.sql(query).await?;
1459+
let batches = df.collect().await?;
1460+
datafusion::assert_batches_eq!(
1461+
[
1462+
"+---+-------------------------+",
1463+
"| c | fake_ts |",
1464+
"+---+-------------------------+",
1465+
"| 2 | 1970-01-01T00:00:00.007 |",
1466+
"+---+-------------------------+",
1467+
],
1468+
&batches
1469+
);
1470+
1471+
Ok(())
1472+
}
1473+
1474+
#[tokio::test]
1475+
async fn test_delta_scan_config_schema_override_insert() -> TestResult {
1476+
let (_partitioned_table, provider) = provider_for_partitioned_table().await?;
1477+
let logical_schema = provider.schema();
1478+
1479+
let table_dir = tempfile::tempdir()?;
1480+
let table = CreateBuilder::new()
1481+
.with_location(table_dir.path().to_string_lossy())
1482+
.with_columns(
1483+
StructType::try_new(vec![
1484+
StructField::new(
1485+
"letter",
1486+
delta_kernel::schema::DataType::Primitive(
1487+
delta_kernel::schema::PrimitiveType::String,
1488+
),
1489+
true,
1490+
),
1491+
StructField::new("date", delta_kernel::schema::DataType::DATE, true),
1492+
StructField::new(
1493+
"data",
1494+
delta_kernel::schema::DataType::Primitive(
1495+
delta_kernel::schema::PrimitiveType::Binary,
1496+
),
1497+
true,
1498+
),
1499+
StructField::new(
1500+
"number",
1501+
delta_kernel::schema::DataType::Primitive(
1502+
delta_kernel::schema::PrimitiveType::Long,
1503+
),
1504+
true,
1505+
),
1506+
])?
1507+
.fields()
1508+
.cloned(),
1509+
)
1510+
.await?;
1511+
1512+
let provider = Arc::new(
1513+
crate::delta_datafusion::table_provider::next::DeltaScan::new(
1514+
table.snapshot().unwrap().snapshot().clone(),
1515+
DeltaScanConfig::default().with_schema(multi_partitioned_override_schema()),
1516+
)?
1517+
.with_log_store(table.log_store()),
1518+
);
1519+
1520+
let ctx = create_session().into_inner();
1521+
ctx.register_table("test_table", provider.clone())?;
1522+
let state = ctx.state();
1523+
1524+
let mut dict_builder = StringDictionaryBuilder::<UInt16Type>::new();
1525+
dict_builder.append("a")?;
1526+
let mut bin_builder = BinaryDictionaryBuilder::<UInt16Type>::new();
1527+
bin_builder.append(b"hello")?;
1528+
1529+
let batch = RecordBatch::try_new(
1530+
logical_schema.clone(),
1531+
vec![
1532+
Arc::new(dict_builder.finish()),
1533+
Arc::new(Date32Array::from(vec![0])),
1534+
Arc::new(bin_builder.finish()),
1535+
Arc::new(TimestampMillisecondArray::from(vec![2000])),
1536+
],
1537+
)?;
1538+
1539+
let mem_table = MemTable::try_new(logical_schema.clone(), vec![vec![batch]])?;
1540+
let input = mem_table.scan(&state, None, &[], None).await?;
1541+
1542+
let write_plan = provider
1543+
.insert_into(&state, input, InsertOp::Append)
1544+
.await?;
1545+
1546+
let batches = collect_partitioned(write_plan, ctx.task_ctx()).await?;
1547+
1548+
datafusion::assert_batches_eq!(
1549+
[
1550+
"+-------+",
1551+
"| count |",
1552+
"+-------+",
1553+
"| 1 |",
1554+
"+-------+",
1555+
],
1556+
&batches[0]
1557+
);
1558+
1559+
Ok(())
1560+
}
1561+
1562+
#[tokio::test]
1563+
async fn test_delta_scan_config_file_column_projection() -> TestResult {
1564+
let mut table =
1565+
open_fs_path("../../dat/v0.0.3/reader_tests/generated/multi_partitioned/delta");
1566+
table.load().await?;
1567+
let provider = Arc::new(
1568+
crate::delta_datafusion::table_provider::next::DeltaScan::new(
1569+
table.snapshot()?.snapshot().clone(),
1570+
DeltaScanConfig::default()
1571+
.with_schema(multi_partitioned_override_schema())
1572+
.with_file_column_name("my_files"),
1573+
)?
1574+
.with_log_store(table.log_store()),
1575+
);
1576+
1577+
let ctx = create_session().into_inner();
1578+
ctx.register_table("test_table", provider)?;
1579+
1580+
let df = ctx
1581+
.sql("SELECT * EXCEPT (my_files) FROM test_table")
1582+
.await?;
1583+
let batches = df.collect().await?;
1584+
let schema = batches[0].schema();
1585+
1586+
assert_eq!(schema.fields().len(), 4);
1587+
assert!(schema.column_with_name("my_files").is_none(),);
1588+
1589+
let df_file = ctx.sql("SELECT data, my_files FROM test_table").await?;
1590+
let batches_file = df_file.collect().await?;
1591+
let schema_file = batches_file[0].schema();
1592+
1593+
assert_eq!(schema_file.fields().len(), 2);
1594+
assert!(schema_file.column_with_name("my_files").is_some());
1595+
1596+
Ok(())
1597+
}
13581598
}

0 commit comments

Comments
 (0)