Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 177 additions & 3 deletions datafusion/core/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ use ahash::RandomState;

use arrow::{
array::{
ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, LargeStringArray,
as_dictionary_array, as_string_array, ArrayData, ArrayRef, BooleanArray,
Date32Array, Date64Array, DecimalArray, DictionaryArray, LargeStringArray,
PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampSecondArray, UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder,
UInt64Builder,
},
compute,
datatypes::{UInt32Type, UInt64Type},
datatypes::{
Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type,
UInt8Type,
},
};
use smallvec::{smallvec, SmallVec};
use std::sync::Arc;
Expand All @@ -38,7 +42,7 @@ use std::{time::Instant, vec};
use futures::{ready, Stream, StreamExt, TryStreamExt};

use arrow::array::{as_boolean_array, new_null_array, Array};
use arrow::datatypes::DataType;
use arrow::datatypes::{ArrowNativeType, DataType};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -947,6 +951,58 @@ macro_rules! equal_rows_elem {
}};
}

macro_rules! equal_rows_elem_with_string_dict {
($key_array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident, $null_equals_null: ident) => {{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For dictionaries, I think $left and $right are actually indexes into the keys array, and then the keys array contains the corresponding index into values.

┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                                
  ┌─────────────────┐  ┌─────────┐ │     ┌─────────────────┐                       
│ │        A        │  │    0    │       │        A        │     values[keys[0]]   
  ├─────────────────┤  ├─────────┤ │     ├─────────────────┤                       
│ │        D        │  │    2    │       │        B        │     values[keys[1]]   
  ├─────────────────┤  ├─────────┤ │     ├─────────────────┤                       
│ │        B        │  │    2    │       │        B        │     values[keys[2]]   
  ├─────────────────┤  ├─────────┤ │     ├─────────────────┤                       
│ │        C        │  │    1    │       │        D        │     values[keys[3]]   
  ├─────────────────┤  └─────────┘ │     └─────────────────┘                       
│ │        E        │     keys                                                     
  └─────────────────┘              │                                               
│       values                             Logical array                           
 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘          Contents                             
                                                                                   
          DictionaryArray                                                          
             length = 4                                                            
                                                                                   

In other words, I think you need to compare the values using something like:
https://github.com/AssHero/arrow-datafusion/blob/hashjoin/datafusion/common/src/scalar.rs#L338-L361

let left_array: &DictionaryArray<$key_array_type> =
as_dictionary_array::<$key_array_type>($l);
let right_array: &DictionaryArray<$key_array_type> =
as_dictionary_array::<$key_array_type>($r);

let (left_values, left_values_index) = {
let keys_col = left_array.keys();
if keys_col.is_valid($left) {
let values_index = keys_col.value($left).to_usize().ok_or_else(|| {
DataFusionError::Internal(format!(
"Can not convert index to usize in dictionary of type creating group by value {:?}",
keys_col.data_type()
))
});

match values_index {
Ok(index) => (as_string_array(left_array.values()), Some(index)),
_ => (as_string_array(left_array.values()), None)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code silently ignores failures

Filed #2767 to make this a panic

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Thanks again @AssHero -- let me know if you would like help filing the issue for the strange behavior we were seeing with c5

yes, i'll trace the issue and try to solve this issue.

}
} else {
(as_string_array(left_array.values()), None)
}
};
let (right_values, right_values_index) = {
let keys_col = right_array.keys();
if keys_col.is_valid($right) {
let values_index = keys_col.value($right).to_usize().ok_or_else(|| {
DataFusionError::Internal(format!(
"Can not convert index to usize in dictionary of type creating group by value {:?}",
keys_col.data_type()
))
});

match values_index {
Ok(index) => (as_string_array(right_array.values()), Some(index)),
_ => (as_string_array(right_array.values()), None)
}
} else {
(as_string_array(right_array.values()), None)
}
};

match (left_values_index, right_values_index) {
(Some(left_values_index), Some(right_values_index)) => left_values.value(left_values_index) == right_values.value(right_values_index),
(None, None) => $null_equals_null,
_ => false,
}
}};
}

/// Left and right row have equal values
/// If more data types are supported here, please also add the data types in can_hash function
/// to generate hash join logical plan.
Expand Down Expand Up @@ -1054,6 +1110,124 @@ fn equal_rows(
DataType::LargeUtf8 => {
equal_rows_elem!(LargeStringArray, l, r, left, right, null_equals_null)
}
DataType::Decimal(_, lscale) => match r.data_type() {
Copy link
Copy Markdown
Contributor

@liukun4515 liukun4515 Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my knowledge in the datafusion, we have converted the left and right to the same data type in the planner.
The data type of left and right must be same.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I create two table in the spark,

spark-sql> desc t1;
c1                      decimal(10,3)

spark-sql> desc t2;
c1                      decimal(10,4)

and join two table with the eq condition: t1.c1 = t2.c1, get the plan

spark-sql> explain select * from t1,t2 where t1.c1 = t2.c1;
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [cast(c1#111 as decimal(11,4))], [cast(c1#112 as decimal(11,4))], Inner
   :- Sort [cast(c1#111 as decimal(11,4)) ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(cast(c1#111 as decimal(11,4)), 200), ENSURE_REQUIREMENTS, [id=#293]
   :     +- Filter isnotnull(c1#111)
   :        +- Scan hive default.t1 [c1#111], HiveTableRelation [`default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#111], Partition Cols: []]
   +- Sort [cast(c1#112 as decimal(11,4)) ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(cast(c1#112 as decimal(11,4)), 200), ENSURE_REQUIREMENTS, [id=#294]
         +- Filter isnotnull(c1#112)
            +- Scan hive default.t2 [c1#112], HiveTableRelation [`default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [c1#112], Partition Cols: []]

From the from, we can get the the t1.c1 and t2.c1 will be casted to coerced type(decimal(11,4));

Copy link
Copy Markdown
Contributor Author

@HuSen8891 HuSen8891 Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I create two tables in datafusion

❯ describe t1;
+-------------+----------------+-------------+
| column_name | data_type | is_nullable |
+-------------+----------------+-------------+
| c1 | Decimal(10, 2) | NO |
+-------------+----------------+-------------+

select * from t1;
+------+
| c1 |
+------+
| 1.00 |
+------+

❯ describe t2;
+-------------+----------------+-------------+
| column_name | data_type | is_nullable |
+-------------+----------------+-------------+
| c1 | Decimal(10, 4) | NO |
+-------------+----------------+-------------+

select * from t2;
+--------+
| c1 |
+--------+
| 1.0000 |
+--------+

this query gets 0 rows
select * from t1 join t2 on t1.c1 = t2.c1;
0 rows in set. Query took 0.005 seconds.

but this query gets one row
select * from t1 join t2 on t1.c1::decimal(10,4) = t2.c1;
+------+--------+
| c1 | c1 |
+------+--------+
| 1.00 | 1.0000 |
+------+--------+

Does the planner still not converts the data types between left and right for decimal ? If so, perhaps we can add this to planner.
please let me know if I miss something? thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make the data type is same between left and right.
In this pr, we should better forbid calculate the join operation for diff data type, For example
Decimal(10,3) join Decimal(11,3)
I think you can create a follow up pr to make data type coercion in the planner.

Copy link
Copy Markdown
Contributor Author

@HuSen8891 HuSen8891 Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we should check the data types and make the coercion in the planner. I'll create an issue and follow up the pr later. Thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @AssHero

DataType::Decimal(_, rscale) => {
if lscale == rscale {
equal_rows_elem!(
DecimalArray,
l,
r,
left,
right,
null_equals_null
)
} else {
Comment thread
alamb marked this conversation as resolved.
err = Some(Err(DataFusionError::Internal(
"Inconsistent Decimal data type in hasher, the scale should be same".to_string(),
)));
false
}
}
_ => {
err = Some(Err(DataFusionError::Internal(
"Unsupported data type in hasher".to_string(),
)));
false
}
},
DataType::Dictionary(key_type, value_type)
if *value_type.as_ref() == DataType::Utf8 =>
{
match key_type.as_ref() {
DataType::Int8 => {
equal_rows_elem_with_string_dict!(
Int8Type,
l,
r,
left,
right,
null_equals_null
)
}
DataType::Int16 => {
equal_rows_elem_with_string_dict!(
Int16Type,
l,
r,
left,
right,
null_equals_null
)
}
DataType::Int32 => {
equal_rows_elem_with_string_dict!(
Int32Type,
l,
r,
left,
right,
null_equals_null
)
}
DataType::Int64 => {
equal_rows_elem_with_string_dict!(
Int64Type,
l,
r,
left,
right,
null_equals_null
)
}
DataType::UInt8 => {
equal_rows_elem_with_string_dict!(
UInt8Type,
l,
r,
left,
right,
null_equals_null
)
}
DataType::UInt16 => {
equal_rows_elem_with_string_dict!(
UInt16Type,
l,
r,
left,
right,
null_equals_null
)
}
DataType::UInt32 => {
equal_rows_elem_with_string_dict!(
UInt32Type,
l,
r,
left,
right,
null_equals_null
)
}
DataType::UInt64 => {
equal_rows_elem_with_string_dict!(
UInt64Type,
l,
r,
left,
right,
null_equals_null
)
}
_ => {
// should not happen
err = Some(Err(DataFusionError::Internal(
"Unsupported data type in hasher".to_string(),
)));
false
}
}
}
other => {
// This is internal because we should have caught this before.
err = Some(Err(DataFusionError::Internal(format!(
Expand Down
Loading