Skip to content

Commit d025869

Browse files
authored
fix: HashJoin panic with String dictionary keys (don't flatten keys) (#20505)
## Which issue does this PR close? - Fixes #20696 - Follow on to #20441 ## Rationale for this change #20441 (review) fixes the special case DictionaryArray handling in Joins. However, I don't think we need to special case DictionaryArrays at all ## What changes are included in this PR? 1. Remove the special case dictionary handling ## Are these changes tested? Yes by CI ## Are there any user-facing changes? No (though maybe some queries get faster)
1 parent 028e351 commit d025869

2 files changed

Lines changed: 118 additions & 26 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
use std::sync::Arc;
2121

2222
use arrow::array::{ArrayRef, StructArray};
23-
use arrow::compute::cast;
2423
use arrow::datatypes::{Field, FieldRef, Fields};
2524
use arrow_schema::DataType;
2625
use datafusion_common::Result;
@@ -33,19 +32,6 @@ pub(super) fn build_struct_fields(data_types: &[DataType]) -> Result<Fields> {
3332
.collect()
3433
}
3534

36-
/// Casts dictionary-encoded arrays to their underlying value type, preserving row count.
37-
/// Non-dictionary arrays are returned as-is.
38-
fn flatten_dictionary_array(array: &ArrayRef) -> Result<ArrayRef> {
39-
match array.data_type() {
40-
DataType::Dictionary(_, value_type) => {
41-
let casted = cast(array, value_type)?;
42-
// Recursively flatten in case of nested dictionaries
43-
flatten_dictionary_array(&casted)
44-
}
45-
_ => Ok(Arc::clone(array)),
46-
}
47-
}
48-
4935
/// Builds InList values from join key column arrays.
5036
///
5137
/// If `join_key_arrays` is:
@@ -65,20 +51,14 @@ fn flatten_dictionary_array(array: &ArrayRef) -> Result<ArrayRef> {
6551
pub(super) fn build_struct_inlist_values(
6652
join_key_arrays: &[ArrayRef],
6753
) -> Result<Option<ArrayRef>> {
68-
// Flatten any dictionary-encoded arrays
69-
let flattened_arrays: Vec<ArrayRef> = join_key_arrays
70-
.iter()
71-
.map(flatten_dictionary_array)
72-
.collect::<Result<Vec<_>>>()?;
73-
7454
// Build the source array/struct
75-
let source_array: ArrayRef = if flattened_arrays.len() == 1 {
55+
let source_array: ArrayRef = if join_key_arrays.len() == 1 {
7656
// Single column: use directly
77-
Arc::clone(&flattened_arrays[0])
57+
Arc::clone(&join_key_arrays[0])
7858
} else {
7959
// Multi-column: build StructArray once from all columns
8060
let fields = build_struct_fields(
81-
&flattened_arrays
61+
&join_key_arrays
8262
.iter()
8363
.map(|arr| arr.data_type().clone())
8464
.collect::<Vec<_>>(),
@@ -88,7 +68,7 @@ pub(super) fn build_struct_inlist_values(
8868
let arrays_with_fields: Vec<(FieldRef, ArrayRef)> = fields
8969
.iter()
9070
.cloned()
91-
.zip(flattened_arrays.iter().cloned())
71+
.zip(join_key_arrays.iter().cloned())
9272
.collect();
9373

9474
Arc::new(StructArray::from(arrays_with_fields))
@@ -152,7 +132,14 @@ mod tests {
152132
assert_eq!(
153133
*result.data_type(),
154134
DataType::Struct(
155-
build_struct_fields(&[DataType::Utf8, DataType::Int32]).unwrap()
135+
build_struct_fields(&[
136+
DataType::Dictionary(
137+
Box::new(DataType::Int8),
138+
Box::new(DataType::Utf8)
139+
),
140+
DataType::Int32
141+
])
142+
.unwrap()
156143
)
157144
);
158145
}
@@ -168,6 +155,6 @@ mod tests {
168155
.unwrap();
169156

170157
assert_eq!(result.len(), 3);
171-
assert_eq!(*result.data_type(), DataType::Utf8);
158+
assert_eq!(result.data_type(), dict_array.data_type());
172159
}
173160
}

datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,3 +737,108 @@ DROP TABLE t_union_mem;
737737

738738
statement ok
739739
DROP TABLE t_union_parquet;
740+
741+
# Cleanup settings
742+
statement ok
743+
set datafusion.optimizer.max_passes = 3;
744+
745+
statement ok
746+
set datafusion.execution.parquet.pushdown_filters = false;
747+
748+
749+
# Regression test for https://github.com/apache/datafusion/issues/20696
750+
# Multi-column INNER JOIN with dictionary fails
751+
# when parquet pushdown filters are enabled.
752+
753+
754+
statement ok
755+
COPY (
756+
SELECT
757+
to_timestamp_nanos(time_ns) AS time,
758+
arrow_cast(state, 'Dictionary(Int32, Utf8)') AS state,
759+
arrow_cast(city, 'Dictionary(Int32, Utf8)') AS city,
760+
temp
761+
FROM (
762+
VALUES
763+
(200, 'CA', 'LA', 90.0),
764+
(250, 'MA', 'Boston', 72.4),
765+
(100, 'MA', 'Boston', 70.4),
766+
(350, 'CA', 'LA', 90.0)
767+
) AS t(time_ns, state, city, temp)
768+
)
769+
TO 'test_files/scratch/parquet_filter_pushdown/issue_20696/h2o/data.parquet';
770+
771+
statement ok
772+
COPY (
773+
SELECT
774+
to_timestamp_nanos(time_ns) AS time,
775+
arrow_cast(state, 'Dictionary(Int32, Utf8)') AS state,
776+
arrow_cast(city, 'Dictionary(Int32, Utf8)') AS city,
777+
temp,
778+
reading
779+
FROM (
780+
VALUES
781+
(250, 'MA', 'Boston', 53.4, 51.0),
782+
(100, 'MA', 'Boston', 50.4, 50.0)
783+
) AS t(time_ns, state, city, temp, reading)
784+
)
785+
TO 'test_files/scratch/parquet_filter_pushdown/issue_20696/o2/data.parquet';
786+
787+
statement ok
788+
CREATE EXTERNAL TABLE h2o_parquet_20696 STORED AS PARQUET
789+
LOCATION 'test_files/scratch/parquet_filter_pushdown/issue_20696/h2o/';
790+
791+
statement ok
792+
CREATE EXTERNAL TABLE o2_parquet_20696 STORED AS PARQUET
793+
LOCATION 'test_files/scratch/parquet_filter_pushdown/issue_20696/o2/';
794+
795+
# Query should work both with and without filters
796+
statement ok
797+
set datafusion.execution.parquet.pushdown_filters = false;
798+
799+
query RRR
800+
SELECT
801+
h2o_parquet_20696.temp AS h2o_temp,
802+
o2_parquet_20696.temp AS o2_temp,
803+
o2_parquet_20696.reading
804+
FROM h2o_parquet_20696
805+
INNER JOIN o2_parquet_20696
806+
ON h2o_parquet_20696.time = o2_parquet_20696.time
807+
AND h2o_parquet_20696.state = o2_parquet_20696.state
808+
AND h2o_parquet_20696.city = o2_parquet_20696.city
809+
WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z'
810+
AND h2o_parquet_20696.time <= '1970-01-01T00:00:00.000000300Z';
811+
----
812+
72.4 53.4 51
813+
70.4 50.4 50
814+
815+
816+
statement ok
817+
set datafusion.execution.parquet.pushdown_filters = true;
818+
819+
query RRR
820+
SELECT
821+
h2o_parquet_20696.temp AS h2o_temp,
822+
o2_parquet_20696.temp AS o2_temp,
823+
o2_parquet_20696.reading
824+
FROM h2o_parquet_20696
825+
INNER JOIN o2_parquet_20696
826+
ON h2o_parquet_20696.time = o2_parquet_20696.time
827+
AND h2o_parquet_20696.state = o2_parquet_20696.state
828+
AND h2o_parquet_20696.city = o2_parquet_20696.city
829+
WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z'
830+
AND h2o_parquet_20696.time <= '1970-01-01T00:00:00.000000300Z';
831+
----
832+
72.4 53.4 51
833+
70.4 50.4 50
834+
835+
# Cleanup
836+
statement ok
837+
DROP TABLE h2o_parquet_20696;
838+
839+
statement ok
840+
DROP TABLE o2_parquet_20696;
841+
842+
# Cleanup settings
843+
statement ok
844+
set datafusion.execution.parquet.pushdown_filters = false;

0 commit comments

Comments
 (0)