Skip to content

Commit c8f9184

Browse files
authored
refactor: centralize default Parquet writer properties (delta-io#4335)
1 parent df786bf commit c8f9184

10 files changed

Lines changed: 325 additions & 63 deletions

File tree

crates/core/src/delta_datafusion/mod.rs

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,7 @@ mod tests {
633633
use delta_kernel::schema::ArrayType;
634634
use futures::{StreamExt, TryStreamExt};
635635
use serde_json::json;
636+
use std::ops::Range;
636637
use url::Url;
637638

638639
use super::*;
@@ -1451,28 +1452,65 @@ mod tests {
14511452
let small = batch.column_by_name("small").unwrap().as_string::<i32>();
14521453
assert_eq!("a", small.iter().next().unwrap().unwrap());
14531454

1455+
let files = table.get_files_by_partitions(&[]).await.unwrap();
1456+
assert_eq!(1, files.len());
1457+
let object_store = table.object_store();
1458+
let file_meta = object_store.head(&files[0]).await.unwrap();
1459+
let file_reader = parquet::arrow::async_reader::ParquetObjectReader::new(
1460+
object_store,
1461+
file_meta.location.clone(),
1462+
)
1463+
.with_file_size(file_meta.size);
1464+
let parquet_metadata =
1465+
parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder::new(file_reader)
1466+
.await
1467+
.unwrap()
1468+
.metadata()
1469+
.as_ref()
1470+
.clone();
1471+
let (small_start, small_len) = parquet_metadata.row_group(0).column(0).byte_range();
1472+
let small_range = small_start..small_start + small_len;
1473+
let (large_start, large_len) = parquet_metadata.row_group(0).column(1).byte_range();
1474+
let large_range = large_start..large_start + large_len;
1475+
14541476
let actual = drain_recorded_ops(&mut operations).await;
1477+
1478+
let data_ranges = actual
1479+
.iter()
1480+
.flat_map(|operation| match operation {
1481+
ObjectStoreOperation::GetRange(PathKind::Data, range) => vec![range.clone()],
1482+
ObjectStoreOperation::GetRanges(PathKind::Data, ranges) => ranges.clone(),
1483+
_ => Vec::new(),
1484+
})
1485+
.collect::<Vec<_>>();
1486+
1487+
let overlaps = |left: &Range<u64>, right: &Range<u64>| {
1488+
left.start < right.end && right.start < left.end
1489+
};
1490+
14551491
assert!(
1456-
actual.iter().any(|op| matches!(
1457-
op,
1458-
ObjectStoreOperation::GetRange(PathKind::Data, range)
1459-
if range == &(957_u64..965_u64)
1460-
)),
1461-
"expected footer range read for the selected column, saw {actual:?}",
1492+
!data_ranges.is_empty(),
1493+
"expected ranged parquet data reads, saw {actual:?}"
14621494
);
14631495
assert!(
1464-
actual.iter().any(|op| matches!(
1465-
op,
1466-
ObjectStoreOperation::GetRange(PathKind::Data, range)
1467-
if range == &(326_u64..957_u64)
1468-
)),
1469-
"expected column chunk range read for the selected column, saw {actual:?}",
1496+
data_ranges
1497+
.iter()
1498+
.any(|range| overlaps(range, &small_range)),
1499+
"expected selected column chunk {small_range:?} to be read, saw {actual:?}"
14701500
);
14711501
assert!(
1472-
!actual
1502+
data_ranges
14731503
.iter()
1474-
.any(|op| matches!(op, ObjectStoreOperation::Get(PathKind::Data))),
1475-
"expected column pruning to avoid full parquet object reads, saw {actual:?}",
1504+
.all(|range| !overlaps(range, &large_range)),
1505+
"expected unselected column chunk {large_range:?} to be pruned, saw {actual:?}"
1506+
);
1507+
assert!(
1508+
!actual.iter().any(|operation| matches!(
1509+
operation,
1510+
ObjectStoreOperation::Get(PathKind::Data)
1511+
| ObjectStoreOperation::GetOpts(PathKind::Data)
1512+
)),
1513+
"expected no full data file reads, saw {actual:?}"
14761514
);
14771515
}
14781516

crates/core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub mod errors;
8383
pub mod kernel;
8484
pub mod logstore;
8585
pub mod operations;
86+
pub(crate) mod parquet_utils;
8687
pub mod protocol;
8788
pub use kernel::schema;
8889
pub mod table;

crates/core/src/operations/optimize.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,12 @@ use crate::kernel::transaction::{CommitBuilder, CommitProperties, DEFAULT_RETRIE
5858
use crate::kernel::{Action, Add, PartitionsExt, Remove, Version, scalars::ScalarExt};
5959
use crate::kernel::{EagerSnapshot, resolve_snapshot};
6060
use crate::logstore::{LogStore, LogStoreRef, ObjectStoreRef};
61+
use crate::parquet_utils::default_writer_properties;
6162
use crate::protocol::DeltaOperation;
6263
use crate::table::config::TablePropertiesExt as _;
6364
use crate::table::state::DeltaTableState;
6465
use crate::writer::utils::arrow_schema_without_partitions;
65-
use crate::{DeltaTable, ObjectMeta, PartitionFilter, crate_version, to_kernel_predicate};
66+
use crate::{DeltaTable, ObjectMeta, PartitionFilter, to_kernel_predicate};
6667

6768
/// Planner used by optimize.
6869
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
@@ -421,10 +422,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
421422
this.pre_execute(operation_id).await?;
422423

423424
let writer_properties = this.writer_properties.unwrap_or_else(|| {
424-
WriterProperties::builder()
425-
.set_compression(Compression::ZSTD(ZstdLevel::try_new(4).unwrap()))
426-
.set_created_by(format!("delta-rs version {}", crate_version()))
427-
.build()
425+
default_writer_properties(Compression::ZSTD(ZstdLevel::try_new(4).unwrap()))
428426
});
429427
let (session, _) = resolve_session_state(
430428
this.session.as_deref(),

crates/core/src/operations/write/writer.rs

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@ use parquet::file::properties::WriterProperties;
1919
use tokio::task::JoinSet;
2020
use tracing::*;
2121

22-
use crate::crate_version;
23-
2422
use crate::errors::{DeltaResult, DeltaTableError};
2523
use crate::kernel::{Add, PartitionsExt};
2624
use crate::logstore::ObjectStoreRef;
25+
use crate::parquet_utils::default_writer_properties;
2726
use crate::writer::record_batch::{PartitionResult, divide_by_partition_values};
2827
use crate::writer::stats::create_add;
2928
use crate::writer::utils::{
@@ -88,17 +87,6 @@ fn sort_completed_writes_by_path<T>(results: &mut [(Path, usize, T)]) {
8887
results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
8988
}
9089

91-
fn default_writer_properties(include_created_by: bool) -> WriterProperties {
92-
let builder = WriterProperties::builder().set_compression(Compression::SNAPPY);
93-
if include_created_by {
94-
builder
95-
.set_created_by(format!("delta-rs version {}", crate_version()))
96-
.build()
97-
} else {
98-
builder.build()
99-
}
100-
}
101-
10290
#[derive(thiserror::Error, Debug)]
10391
enum WriteError {
10492
#[error("Unexpected Arrow schema: got: {schema}, expected: {expected_schema}")]
@@ -169,7 +157,7 @@ impl WriterConfig {
169157
stats_columns: Option<Vec<String>>,
170158
) -> Self {
171159
let writer_properties =
172-
writer_properties.unwrap_or_else(|| default_writer_properties(false));
160+
writer_properties.unwrap_or_else(|| default_writer_properties(Compression::SNAPPY));
173161
let write_batch_size = write_batch_size.unwrap_or(DEFAULT_WRITE_BATCH_SIZE);
174162

175163
Self {
@@ -335,7 +323,7 @@ impl PartitionWriterConfig {
335323
let part_path = partition_values.hive_partition_path();
336324
let prefix = Path::parse(part_path)?;
337325
let writer_properties =
338-
writer_properties.unwrap_or_else(|| default_writer_properties(true));
326+
writer_properties.unwrap_or_else(|| default_writer_properties(Compression::SNAPPY));
339327
let write_batch_size = write_batch_size.unwrap_or(DEFAULT_WRITE_BATCH_SIZE);
340328

341329
Ok(Self {
@@ -550,11 +538,13 @@ impl PartitionWriter {
550538
mod tests {
551539
use super::*;
552540
use crate::DeltaTableBuilder;
541+
use crate::crate_version;
553542
use crate::logstore::tests::flatten_list_stream as list;
554543
use crate::table::config::DEFAULT_NUM_INDEX_COLS;
555544
use crate::writer::test_utils::*;
556545
use arrow::array::{Int32Array, StringArray};
557546
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
547+
use parquet::schema::types::ColumnPath;
558548
use std::sync::Arc;
559549

560550
fn get_delta_writer(
@@ -601,6 +591,59 @@ mod tests {
601591
.unwrap()
602592
}
603593

594+
fn assert_default_created_by(writer_properties: &WriterProperties) {
595+
assert_eq!(
596+
writer_properties.created_by(),
597+
format!("delta-rs version {}", crate_version())
598+
);
599+
}
600+
601+
#[test]
602+
fn test_writer_config_defaults_include_delta_rs_created_by() {
603+
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
604+
"id",
605+
DataType::Int32,
606+
true,
607+
)]));
608+
let config = WriterConfig::new(
609+
schema,
610+
vec![],
611+
None,
612+
None,
613+
None,
614+
DataSkippingNumIndexedCols::NumColumns(DEFAULT_NUM_INDEX_COLS),
615+
None,
616+
);
617+
618+
assert_default_created_by(&config.writer_properties);
619+
assert_eq!(
620+
config
621+
.writer_properties
622+
.compression(&ColumnPath::from("id")),
623+
Compression::SNAPPY
624+
);
625+
}
626+
627+
#[test]
628+
fn test_partition_writer_config_defaults_include_delta_rs_created_by() {
629+
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
630+
"id",
631+
DataType::Int32,
632+
true,
633+
)]));
634+
let config =
635+
PartitionWriterConfig::try_new(schema, IndexMap::new(), None, None, None, None)
636+
.unwrap();
637+
638+
assert_default_created_by(&config.writer_properties);
639+
assert_eq!(
640+
config
641+
.writer_properties
642+
.compression(&ColumnPath::from("id")),
643+
Compression::SNAPPY
644+
);
645+
}
646+
604647
#[tokio::test]
605648
async fn test_write_partition() {
606649
let log_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())

crates/core/src/parquet_utils.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use parquet::basic::Compression;
2+
use parquet::file::properties::WriterProperties;
3+
4+
pub(crate) fn default_writer_properties(compression: Compression) -> WriterProperties {
5+
WriterProperties::builder()
6+
.set_created_by(format!("delta-rs version {}", crate::crate_version()))
7+
.set_compression(compression)
8+
.build()
9+
}
10+
11+
#[cfg(test)]
12+
mod tests {
13+
use super::*;
14+
use parquet::schema::types::ColumnPath;
15+
16+
#[test]
17+
fn default_writer_properties_sets_created_by_and_compression() {
18+
let writer_properties = default_writer_properties(Compression::SNAPPY);
19+
20+
assert_eq!(
21+
writer_properties.created_by(),
22+
format!("delta-rs version {}", crate::crate_version())
23+
);
24+
assert_eq!(
25+
writer_properties.compression(&ColumnPath::from("id")),
26+
Compression::SNAPPY
27+
);
28+
}
29+
}

crates/core/src/protocol/checkpoints.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
33
use std::sync::LazyLock;
44

5-
use parquet::file::properties::WriterProperties;
65
use url::Url;
76

87
use chrono::{TimeZone, Utc};
@@ -20,6 +19,7 @@ use uuid::Uuid;
2019

2120
use crate::kernel::{Version, spawn_blocking_with_span};
2221
use crate::logstore::{DELTA_LOG_REGEX, LogStore};
22+
use crate::parquet_utils::default_writer_properties;
2323
use crate::protocol::to_rb;
2424
use crate::table::config::TablePropertiesExt as _;
2525
use crate::{DeltaResult, DeltaTableError};
@@ -67,11 +67,9 @@ pub(crate) async fn create_checkpoint_for(
6767
let mut writer = AsyncArrowWriter::try_new(
6868
object_store_writer,
6969
first_batch.schema(),
70-
Some(
71-
WriterProperties::builder()
72-
.set_compression(parquet::basic::Compression::SNAPPY)
73-
.build(),
74-
),
70+
Some(default_writer_properties(
71+
parquet::basic::Compression::SNAPPY,
72+
)),
7573
)?;
7674
writer.write(&first_batch).await?;
7775

crates/core/src/writer/json.rs

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@ use delta_kernel::expressions::Scalar;
1010
use indexmap::IndexMap;
1111
use itertools::Itertools;
1212
use object_store::path::Path;
13-
use parquet::{
14-
arrow::ArrowWriter, basic::Compression, errors::ParquetError,
15-
file::properties::WriterProperties,
16-
};
13+
use parquet::{arrow::ArrowWriter, errors::ParquetError, file::properties::WriterProperties};
1714
use serde_json::Value;
1815
use tracing::*;
1916
use url::Url;
@@ -29,6 +26,7 @@ use crate::DeltaTable;
2926
use crate::errors::DeltaTableError;
3027
use crate::kernel::{Add, PartitionsExt, scalars::ScalarExt};
3128
use crate::logstore::ObjectStoreRetryExt;
29+
use crate::parquet_utils::default_writer_properties;
3230
use crate::table::builder::DeltaTableBuilder;
3331
use crate::table::config::TablePropertiesExt as _;
3432
use crate::writer::utils::ShareableBuffer;
@@ -198,10 +196,7 @@ impl JsonWriter {
198196
.load()
199197
.await?;
200198
// Initialize writer properties for the underlying arrow writer
201-
let writer_properties = WriterProperties::builder()
202-
// NOTE: Consider extracting config for writer properties and setting more than just compression
203-
.set_compression(Compression::SNAPPY)
204-
.build();
199+
let writer_properties = default_writer_properties(parquet::basic::Compression::SNAPPY);
205200

206201
Ok(Self {
207202
table,
@@ -219,10 +214,7 @@ impl JsonWriter {
219214
let partition_columns = metadata.partition_columns().clone();
220215

221216
// Initialize writer properties for the underlying arrow writer
222-
let writer_properties = WriterProperties::builder()
223-
// NOTE: Consider extracting config for writer properties and setting more than just compression
224-
.set_compression(Compression::SNAPPY)
225-
.build();
217+
let writer_properties = default_writer_properties(parquet::basic::Compression::SNAPPY);
226218

227219
Ok(Self {
228220
table: table.clone(),
@@ -547,6 +539,40 @@ mod tests {
547539
assert_eq!(columns, vec!["id".to_string(), "value".to_string()]);
548540
}
549541

542+
#[tokio::test]
543+
async fn test_json_writer_for_table_defaults_include_delta_rs_created_by() {
544+
let table_dir = tempfile::tempdir().unwrap();
545+
let table = get_test_table(&table_dir).await;
546+
547+
let writer = JsonWriter::for_table(&table).unwrap();
548+
549+
assert_eq!(
550+
writer.writer_properties.created_by(),
551+
format!("delta-rs version {}", crate::crate_version())
552+
);
553+
}
554+
555+
#[tokio::test]
556+
async fn test_json_writer_try_new_defaults_include_delta_rs_created_by() {
557+
let table_dir = tempfile::tempdir().unwrap();
558+
let table = get_test_table(&table_dir).await;
559+
let arrow_schema = table.snapshot().unwrap().snapshot().arrow_schema();
560+
561+
let writer = JsonWriter::try_new(
562+
table.table_url().clone(),
563+
arrow_schema,
564+
Some(vec!["modified".to_string()]),
565+
None,
566+
)
567+
.await
568+
.unwrap();
569+
570+
assert_eq!(
571+
writer.writer_properties.created_by(),
572+
format!("delta-rs version {}", crate::crate_version())
573+
);
574+
}
575+
550576
#[test]
551577
fn test_extract_partition_values() {
552578
let record_batch = RecordBatch::try_new(

0 commit comments

Comments
 (0)