Skip to content

Commit 45d4948

Browse files
Validate parquet writer version (apache#19515)
## Which issue does this PR close? - Part of apache#17498. ## Rationale for this change Currently, invalid `writer_version` values (e.g., "3.0") are accepted at `SET` time and only fail later when writing parquet files. This PR adds early validation so invalid values are rejected immediately with clear error messages, following the same pattern as `ExplainFormat`. ## What changes are included in this PR? - Add `DFWriterVersion` enum with `FromStr`, `Display`, `ConfigField` implementations - Change `ParquetOptions.writer_version` from `String` to `DFWriterVersion` - Remove `parse_version_string` function (validation now happens at config time) - Update proto conversions to validate during deserialization - Add test for early validation ## Are these changes tested? Yes. Added `test_parquet_writer_version_validation` that verifies: - Valid values ("1.0", "2.0") are accepted - Invalid values ("3.0", "invalid") are rejected immediately at SET time - Error messages are clear and helpful ## Are there any user-facing changes? Not exactly. Invalid `writer_version` values now error immediately when set via `SET` command or proto deserialization, instead of failing later during parquet file writing. This provides better error messages and earlier feedback. So the change is in the feedback not in the input.
1 parent e0b4e8d commit 45d4948

File tree

8 files changed

+164
-26
lines changed

8 files changed

+164
-26
lines changed

datafusion/common/src/config.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use arrow_ipc::CompressionType;
2323
use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
2424
use crate::error::_config_err;
2525
use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
26+
use crate::parquet_config::DFParquetWriterVersion;
2627
use crate::parsers::CompressionTypeVariant;
2728
use crate::utils::get_available_parallelism;
2829
use crate::{DataFusionError, Result};
@@ -742,7 +743,7 @@ config_namespace! {
742743

743744
/// (writing) Sets parquet writer version
744745
/// valid values are "1.0" and "2.0"
745-
pub writer_version: String, default = "1.0".to_string()
746+
pub writer_version: DFParquetWriterVersion, default = DFParquetWriterVersion::default()
746747

747748
/// (writing) Skip encoding the embedded arrow metadata in the KV_meta
748749
///
@@ -3455,4 +3456,37 @@ mod tests {
34553456
let parsed_metadata = table_config.parquet.key_value_metadata;
34563457
assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
34573458
}
3459+
#[cfg(feature = "parquet")]
3460+
#[test]
3461+
fn test_parquet_writer_version_validation() {
3462+
use crate::{config::ConfigOptions, parquet_config::DFParquetWriterVersion};
3463+
3464+
let mut config = ConfigOptions::default();
3465+
3466+
// Valid values should work
3467+
config
3468+
.set("datafusion.execution.parquet.writer_version", "1.0")
3469+
.unwrap();
3470+
assert_eq!(
3471+
config.execution.parquet.writer_version,
3472+
DFParquetWriterVersion::V1_0
3473+
);
3474+
3475+
config
3476+
.set("datafusion.execution.parquet.writer_version", "2.0")
3477+
.unwrap();
3478+
assert_eq!(
3479+
config.execution.parquet.writer_version,
3480+
DFParquetWriterVersion::V2_0
3481+
);
3482+
3483+
// Invalid value should error immediately at SET time
3484+
let err = config
3485+
.set("datafusion.execution.parquet.writer_version", "3.0")
3486+
.unwrap_err();
3487+
assert_eq!(
3488+
err.to_string(),
3489+
"Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0"
3490+
);
3491+
}
34583492
}

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use parquet::{
3333
metadata::KeyValue,
3434
properties::{
3535
DEFAULT_STATISTICS_ENABLED, EnabledStatistics, WriterProperties,
36-
WriterPropertiesBuilder, WriterVersion,
36+
WriterPropertiesBuilder,
3737
},
3838
},
3939
schema::types::ColumnPath,
@@ -214,7 +214,7 @@ impl ParquetOptions {
214214
let mut builder = WriterProperties::builder()
215215
.set_data_page_size_limit(*data_pagesize_limit)
216216
.set_write_batch_size(*write_batch_size)
217-
.set_writer_version(parse_version_string(writer_version.as_str())?)
217+
.set_writer_version((*writer_version).into())
218218
.set_dictionary_page_size_limit(*dictionary_page_size_limit)
219219
.set_statistics_enabled(
220220
statistics_enabled
@@ -373,18 +373,6 @@ pub fn parse_compression_string(
373373
}
374374
}
375375

376-
pub(crate) fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
377-
let str_setting_lower: &str = &str_setting.to_lowercase();
378-
match str_setting_lower {
379-
"1.0" => Ok(WriterVersion::PARQUET_1_0),
380-
"2.0" => Ok(WriterVersion::PARQUET_2_0),
381-
_ => Err(DataFusionError::Configuration(format!(
382-
"Unknown or unsupported parquet writer version {str_setting} \
383-
valid options are 1.0 and 2.0"
384-
))),
385-
}
386-
}
387-
388376
pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
389377
let str_setting_lower: &str = &str_setting.to_lowercase();
390378
match str_setting_lower {
@@ -405,6 +393,7 @@ mod tests {
405393
#[cfg(feature = "parquet_encryption")]
406394
use crate::config::ConfigFileEncryptionProperties;
407395
use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
396+
use crate::parquet_config::DFParquetWriterVersion;
408397
use parquet::basic::Compression;
409398
use parquet::file::properties::{
410399
BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_NDV,
@@ -431,16 +420,17 @@ mod tests {
431420

432421
fn parquet_options_with_non_defaults() -> ParquetOptions {
433422
let defaults = ParquetOptions::default();
434-
let writer_version = if defaults.writer_version.eq("1.0") {
435-
"2.0"
423+
let writer_version = if defaults.writer_version.eq(&DFParquetWriterVersion::V1_0)
424+
{
425+
DFParquetWriterVersion::V2_0
436426
} else {
437-
"1.0"
427+
DFParquetWriterVersion::V1_0
438428
};
439429

440430
ParquetOptions {
441431
data_pagesize_limit: 42,
442432
write_batch_size: 42,
443-
writer_version: writer_version.into(),
433+
writer_version,
444434
compression: Some("zstd(22)".into()),
445435
dictionary_enabled: Some(!defaults.dictionary_enabled.unwrap_or(false)),
446436
dictionary_page_size_limit: 42,
@@ -548,7 +538,7 @@ mod tests {
548538
// global options
549539
data_pagesize_limit: props.dictionary_page_size_limit(),
550540
write_batch_size: props.write_batch_size(),
551-
writer_version: format!("{}.0", props.writer_version().as_num()),
541+
writer_version: props.writer_version().into(),
552542
dictionary_page_size_limit: props.dictionary_page_size_limit(),
553543
max_row_group_size: props.max_row_group_size(),
554544
created_by: props.created_by().to_string(),

datafusion/common/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub mod instant;
5151
pub mod metadata;
5252
pub mod nested_struct;
5353
mod null_equality;
54+
pub mod parquet_config;
5455
pub mod parsers;
5556
pub mod pruning;
5657
pub mod rounding;
@@ -61,7 +62,6 @@ pub mod test_util;
6162
pub mod tree_node;
6263
pub mod types;
6364
pub mod utils;
64-
6565
/// Reexport arrow crate
6666
pub use arrow;
6767
pub use column::Column;
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::fmt::{self, Display};
19+
use std::str::FromStr;
20+
21+
use crate::config::{ConfigField, Visit};
22+
use crate::error::{DataFusionError, Result};
23+
24+
/// Parquet writer version options for controlling the Parquet file format version
25+
///
26+
/// This enum validates parquet writer version values at configuration time,
27+
/// ensuring only valid versions ("1.0" or "2.0") can be set via `SET` commands
28+
/// or proto deserialization.
29+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
30+
pub enum DFParquetWriterVersion {
31+
/// Parquet format version 1.0
32+
#[default]
33+
V1_0,
34+
/// Parquet format version 2.0
35+
V2_0,
36+
}
37+
38+
/// Implement parsing strings to `DFParquetWriterVersion`
39+
impl FromStr for DFParquetWriterVersion {
40+
type Err = DataFusionError;
41+
42+
fn from_str(s: &str) -> Result<Self, Self::Err> {
43+
match s.to_lowercase().as_str() {
44+
"1.0" => Ok(DFParquetWriterVersion::V1_0),
45+
"2.0" => Ok(DFParquetWriterVersion::V2_0),
46+
other => Err(DataFusionError::Configuration(format!(
47+
"Invalid parquet writer version: {other}. Expected one of: 1.0, 2.0"
48+
))),
49+
}
50+
}
51+
}
52+
53+
impl Display for DFParquetWriterVersion {
54+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55+
let s = match self {
56+
DFParquetWriterVersion::V1_0 => "1.0",
57+
DFParquetWriterVersion::V2_0 => "2.0",
58+
};
59+
write!(f, "{s}")
60+
}
61+
}
62+
63+
impl ConfigField for DFParquetWriterVersion {
64+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
65+
v.some(key, self, description)
66+
}
67+
68+
fn set(&mut self, _: &str, value: &str) -> Result<()> {
69+
*self = DFParquetWriterVersion::from_str(value)?;
70+
Ok(())
71+
}
72+
}
73+
74+
/// Convert `DFParquetWriterVersion` to parquet crate's `WriterVersion`
75+
///
76+
/// This conversion is infallible since `DFParquetWriterVersion` only contains
77+
/// valid values that have been validated at configuration time.
78+
#[cfg(feature = "parquet")]
79+
impl From<DFParquetWriterVersion> for parquet::file::properties::WriterVersion {
80+
fn from(value: DFParquetWriterVersion) -> Self {
81+
match value {
82+
DFParquetWriterVersion::V1_0 => {
83+
parquet::file::properties::WriterVersion::PARQUET_1_0
84+
}
85+
DFParquetWriterVersion::V2_0 => {
86+
parquet::file::properties::WriterVersion::PARQUET_2_0
87+
}
88+
}
89+
}
90+
}
91+
92+
/// Convert parquet crate's `WriterVersion` to `DFParquetWriterVersion`
93+
///
94+
/// This is used when converting from existing parquet writer properties,
95+
/// such as when reading from proto or test code.
96+
#[cfg(feature = "parquet")]
97+
impl From<parquet::file::properties::WriterVersion> for DFParquetWriterVersion {
98+
fn from(version: parquet::file::properties::WriterVersion) -> Self {
99+
match version {
100+
parquet::file::properties::WriterVersion::PARQUET_1_0 => {
101+
DFParquetWriterVersion::V1_0
102+
}
103+
parquet::file::properties::WriterVersion::PARQUET_2_0 => {
104+
DFParquetWriterVersion::V2_0
105+
}
106+
}
107+
}
108+
}

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -951,7 +951,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
951951
force_filter_selections: value.force_filter_selections,
952952
data_pagesize_limit: value.data_pagesize_limit as usize,
953953
write_batch_size: value.write_batch_size as usize,
954-
writer_version: value.writer_version.clone(),
954+
writer_version: value.writer_version.parse().map_err(|e| {
955+
DataFusionError::Internal(format!("Failed to parse writer_version: {e}"))
956+
})?,
955957
compression: value.compression_opt.clone().map(|opt| match opt {
956958
protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v),
957959
}).unwrap_or(None),

datafusion/proto-common/src/to_proto/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
860860
force_filter_selections: value.force_filter_selections,
861861
data_pagesize_limit: value.data_pagesize_limit as u64,
862862
write_batch_size: value.write_batch_size as u64,
863-
writer_version: value.writer_version.clone(),
863+
writer_version: value.writer_version.to_string(),
864864
compression_opt: value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
865865
dictionary_enabled_opt: value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
866866
dictionary_page_size_limit: value.dictionary_page_size_limit as u64,

datafusion/proto/src/logical_plan/file_formats.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ mod parquet {
381381
force_filter_selections: global_options.global.force_filter_selections,
382382
data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
383383
write_batch_size: global_options.global.write_batch_size as u64,
384-
writer_version: global_options.global.writer_version.clone(),
384+
writer_version: global_options.global.writer_version.to_string(),
385385
compression_opt: global_options.global.compression.map(|compression| {
386386
parquet_options::CompressionOpt::Compression(compression)
387387
}),
@@ -477,7 +477,10 @@ mod parquet {
477477
force_filter_selections: proto.force_filter_selections,
478478
data_pagesize_limit: proto.data_pagesize_limit as usize,
479479
write_batch_size: proto.write_batch_size as usize,
480-
writer_version: proto.writer_version.clone(),
480+
// TODO: Consider changing to TryFrom to avoid panic on invalid proto data
481+
writer_version: proto.writer_version.parse().expect("
482+
Invalid parquet writer version in proto, expected '1.0' or '2.0'
483+
"),
481484
compression: proto.compression_opt.as_ref().map(|opt| match opt {
482485
parquet_options::CompressionOpt::Compression(compression) => compression.clone(),
483486
}),

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use datafusion::datasource::listing::{
3131
use datafusion::execution::options::ArrowReadOptions;
3232
use datafusion::optimizer::Optimizer;
3333
use datafusion::optimizer::optimize_unions::OptimizeUnions;
34+
use datafusion_common::parquet_config::DFParquetWriterVersion;
3435
use datafusion_common::parsers::CompressionTypeVariant;
3536
use datafusion_functions_aggregate::sum::sum_distinct;
3637
use prost::Message;
@@ -464,7 +465,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
464465

465466
parquet_format.global.bloom_filter_on_read = true;
466467
parquet_format.global.created_by = "DataFusion Test".to_string();
467-
parquet_format.global.writer_version = "PARQUET_2_0".to_string();
468+
parquet_format.global.writer_version = DFParquetWriterVersion::V2_0;
468469
parquet_format.global.write_batch_size = 111;
469470
parquet_format.global.data_pagesize_limit = 222;
470471
parquet_format.global.data_page_row_count_limit = 333;

0 commit comments

Comments
 (0)