Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
BinaryOperator::PGRegexNotIMatch => Ok(Operator::RegexNotIMatch),
BinaryOperator::BitwiseAnd => Ok(Operator::BitwiseAnd),
BinaryOperator::BitwiseOr => Ok(Operator::BitwiseOr),
BinaryOperator::StringConcat => Ok(Operator::StringConcat),
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported SQL binary operator {:?}",
op
Expand Down
53 changes: 53 additions & 0 deletions datafusion/core/tests/sql/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,59 @@ async fn query_scalar_minus_array() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_string_concat_operator() -> Result<()> {
let ctx = SessionContext::new();
// concat 2 strings
let sql = "SELECT 'aa' || 'b'";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-------------------------+",
"| Utf8(\"aa\") || Utf8(\"b\") |",
"+-------------------------+",
"| aab |",
"+-------------------------+",
];
assert_batches_eq!(expected, &actual);

// concat 4 strings as a string concat pipe.
let sql = "SELECT 'aa' || 'b' || 'cc' || 'd'";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+----------------------------------------------------+",
"| Utf8(\"aa\") || Utf8(\"b\") || Utf8(\"cc\") || Utf8(\"d\") |",
"+----------------------------------------------------+",
"| aabccd |",
"+----------------------------------------------------+",
];
assert_batches_eq!(expected, &actual);

// concat 2 strings and NULL, output should be NULL
let sql = "SELECT 'aa' || NULL || 'd'";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------------------------+",
"| Utf8(\"aa\") || Utf8(NULL) || Utf8(\"d\") |",
"+---------------------------------------+",
"| |",
"+---------------------------------------+",
];
assert_batches_eq!(expected, &actual);

// concat 1 strings and 2 numeric
let sql = "SELECT 'a' || 42 || 23.3";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-----------------------------------------+",
"| Utf8(\"a\") || Int64(42) || Float64(23.3) |",
"+-----------------------------------------+",
"| a4223.3 |",
"+-----------------------------------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn test_boolean_expressions() -> Result<()> {
test_expression!("true", "true");
Expand Down
3 changes: 3 additions & 0 deletions datafusion/expr/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub enum Operator {
BitwiseAnd,
/// Bitwise or, like `|`
BitwiseOr,
/// String concat
StringConcat,
}

impl fmt::Display for Operator {
Expand Down Expand Up @@ -99,6 +101,7 @@ impl fmt::Display for Operator {
Operator::IsNotDistinctFrom => "IS NOT DISTINCT FROM",
Operator::BitwiseAnd => "&",
Operator::BitwiseOr => "|",
Operator::StringConcat => "||",
};
write!(f, "{}", display)
}
Expand Down
31 changes: 31 additions & 0 deletions datafusion/physical-expr/src/coercion_rule/binary_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Coercion rules for matching argument types for binary operators

use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, DECIMAL_MAX_PRECISION, DECIMAL_MAX_SCALE};
use datafusion_common::DataFusionError;
use datafusion_common::Result;
Expand Down Expand Up @@ -59,6 +60,8 @@ pub(crate) fn coerce_types(
| Operator::RegexIMatch
| Operator::RegexNotMatch
| Operator::RegexNotIMatch => string_coercion(lhs_type, rhs_type),
// "||" operator has its own rules, and always return a string type
Operator::StringConcat => string_concat_coercion(lhs_type, rhs_type),
Operator::IsDistinctFrom | Operator::IsNotDistinctFrom => {
eq_coercion(lhs_type, rhs_type)
}
Expand Down Expand Up @@ -370,6 +373,34 @@ fn dictionary_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataT
}
}

/// Coercion rules for string concat.
/// This is a union of string coercion rules and specified rules:
/// 1. At lease one side of lhs and rhs should be string type (Utf8 / LargeUtf8)
/// 2. Data type of the other side should be able to cast to string type
fn string_concat_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
use arrow::datatypes::DataType::*;
string_coercion(lhs_type, rhs_type).or(match (lhs_type, rhs_type) {
(Utf8, from_type) | (from_type, Utf8) => {
string_concat_internal_coercion(from_type, &Utf8)
}
(LargeUtf8, from_type) | (from_type, LargeUtf8) => {
string_concat_internal_coercion(from_type, &LargeUtf8)
}
_ => None,
})
}

fn string_concat_internal_coercion(
from_type: &DataType,
to_type: &DataType,
) -> Option<DataType> {
if can_cast_types(from_type, to_type) {
Some(to_type.to_owned())
} else {
None
}
}

/// Coercion rules for Strings: the type that both lhs and rhs can be
/// casted to for the purpose of a string computation
fn string_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
Expand Down
31 changes: 31 additions & 0 deletions datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use arrow::record_batch::RecordBatch;

use crate::coercion_rule::binary_rule::coerce_types;
use crate::expressions::try_cast;
use crate::string_expressions;
use crate::PhysicalExpr;
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
Expand Down Expand Up @@ -484,6 +485,33 @@ fn bitwise_or(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
}
}

/// Use datafusion build-in expression `concat` to evaluate `StringConcat` operator.
/// Besides, any `NULL` exists on lhs or rhs will come out result `NULL`
/// 1. 'a' || 'b' || 32 = 'ab32'
/// 2. 'a' || NULL = NULL
fn string_concat(left: ArrayRef, right: ArrayRef) -> Result<ArrayRef> {
let ignore_null = match string_expressions::concat(&[
ColumnarValue::Array(left.clone()),
ColumnarValue::Array(right.clone()),
])? {
ColumnarValue::Array(array_ref) => array_ref,
scalar_value => scalar_value.into_array(left.clone().len()),
};
let ignore_null_array = ignore_null.as_any().downcast_ref::<StringArray>().unwrap();
let result = (0..ignore_null_array.len())
.into_iter()
.map(|index| {
if left.is_null(index) || right.is_null(index) {
None
} else {
Some(ignore_null_array.value(index))
}
})
.collect::<StringArray>();

Ok(Arc::new(result) as ArrayRef)
}

fn bitwise_and_scalar(
array: &dyn Array,
scalar: ScalarValue,
Expand Down Expand Up @@ -1073,6 +1101,8 @@ pub fn binary_operator_data_type(
| Operator::Divide
| Operator::Multiply
| Operator::Modulo => Ok(result_type),
// string operations return the same values as the common coerced type
Operator::StringConcat => Ok(result_type),
}
}

Expand Down Expand Up @@ -1334,6 +1364,7 @@ impl BinaryExpr {
}
Operator::BitwiseAnd => bitwise_and(left, right),
Operator::BitwiseOr => bitwise_or(left, right),
Operator::StringConcat => string_concat(left, right),
}
}
}
Expand Down