Skip to content

Commit 72ea8ec

Browse files
alambgabotechs
andauthored
[branch-52] Fix constant value from stats (#20042) (#20709)
- Part of #20681 - Closes #20041 on branch-52 This PR: - Backports #20042 from @gabotechs to the `branch-52` line Co-authored-by: Gabriel <45515538+gabotechs@users.noreply.github.com>
1 parent 9a67de5 commit 72ea8ec

2 files changed

Lines changed: 49 additions & 3 deletions

File tree

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ mod tests {
3838
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
3939
use crate::test::object_store::local_unpartitioned_file;
4040
use arrow::array::{
41-
ArrayRef, AsArray, Date64Array, Int8Array, Int32Array, Int64Array, StringArray,
42-
StringViewArray, StructArray, TimestampNanosecondArray,
41+
ArrayRef, AsArray, Date64Array, DictionaryArray, Int8Array, Int32Array,
42+
Int64Array, StringArray, StringViewArray, StructArray, TimestampNanosecondArray,
4343
};
44-
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
44+
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder, UInt16Type};
4545
use arrow::record_batch::RecordBatch;
4646
use arrow::util::pretty::pretty_format_batches;
4747
use arrow_schema::{SchemaRef, TimeUnit};
@@ -2249,6 +2249,48 @@ mod tests {
22492249
Ok(())
22502250
}
22512251

2252+
/// Tests that constant dictionary columns (where min == max in statistics)
2253+
/// are correctly handled. This reproduced a bug where the constant value
2254+
/// from statistics had type Utf8 but the schema expected Dictionary.
2255+
#[tokio::test]
2256+
async fn test_constant_dictionary_column_parquet() -> Result<()> {
2257+
let tmp_dir = TempDir::new()?;
2258+
let path = tmp_dir.path().to_str().unwrap().to_string() + "/test.parquet";
2259+
2260+
// Write parquet with dictionary column where all values are the same
2261+
let schema = Arc::new(Schema::new(vec![Field::new(
2262+
"status",
2263+
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
2264+
false,
2265+
)]));
2266+
let status: DictionaryArray<UInt16Type> =
2267+
vec!["active", "active"].into_iter().collect();
2268+
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(status)])?;
2269+
let file = File::create(&path)?;
2270+
let props = WriterProperties::builder()
2271+
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
2272+
.build();
2273+
let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
2274+
writer.write(&batch)?;
2275+
writer.close()?;
2276+
2277+
// Query the constant dictionary column
2278+
let ctx = SessionContext::new();
2279+
ctx.register_parquet("t", &path, ParquetReadOptions::default())
2280+
.await?;
2281+
let result = ctx.sql("SELECT status FROM t").await?.collect().await?;
2282+
2283+
insta::assert_snapshot!(batches_to_string(&result),@r"
2284+
+--------+
2285+
| status |
2286+
+--------+
2287+
| active |
2288+
| active |
2289+
+--------+
2290+
");
2291+
Ok(())
2292+
}
2293+
22522294
fn write_file(file: &String) {
22532295
let struct_fields = Fields::from(vec![
22542296
Field::new("id", DataType::Int64, false),

datafusion/datasource-parquet/src/opener.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,10 @@ fn constant_value_from_stats(
696696
&& !min.is_null()
697697
&& matches!(column_stats.null_count, Precision::Exact(0))
698698
{
699+
// Cast to the expected data type if needed (e.g., Utf8 -> Dictionary)
700+
if min.data_type() != *data_type {
701+
return min.cast_to(data_type).ok();
702+
}
699703
return Some(min.clone());
700704
}
701705

0 commit comments

Comments
 (0)