From d807b68a74e3bddb5aef4a74d64c3401e9f9e48c Mon Sep 17 00:00:00 2001 From: DuRipeng <453243496@qq.com> Date: Fri, 8 Apr 2022 04:15:53 +0800 Subject: [PATCH] implement 'StringConcat' operator to support sql like "select 'aa' || 'b' " (#2142) * implement stringconcat operator * snake case fix * support non-string concat & handle NULL * value -> array * string concat internal coercion * get NULL in right index of vec Co-authored-by: duripeng --- datafusion/core/src/sql/planner.rs | 1 + datafusion/core/tests/sql/expr.rs | 53 +++++++++++++++++++ datafusion/expr/src/operator.rs | 3 ++ .../src/coercion_rule/binary_rule.rs | 31 +++++++++++ .../physical-expr/src/expressions/binary.rs | 31 +++++++++++ 5 files changed, 119 insertions(+) diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index b6ee6c8ecccac..267a4075a6016 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -1503,6 +1503,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { BinaryOperator::PGRegexNotIMatch => Ok(Operator::RegexNotIMatch), BinaryOperator::BitwiseAnd => Ok(Operator::BitwiseAnd), BinaryOperator::BitwiseOr => Ok(Operator::BitwiseOr), + BinaryOperator::StringConcat => Ok(Operator::StringConcat), _ => Err(DataFusionError::NotImplemented(format!( "Unsupported SQL binary operator {:?}", op diff --git a/datafusion/core/tests/sql/expr.rs b/datafusion/core/tests/sql/expr.rs index 6e92920d2ee21..903550c42ce87 100644 --- a/datafusion/core/tests/sql/expr.rs +++ b/datafusion/core/tests/sql/expr.rs @@ -280,6 +280,59 @@ async fn query_scalar_minus_array() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_string_concat_operator() -> Result<()> { + let ctx = SessionContext::new(); + // concat 2 strings + let sql = "SELECT 'aa' || 'b'"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------------------------+", + "| Utf8(\"aa\") || Utf8(\"b\") |", + "+-------------------------+", + "| aab |", + "+-------------------------+", + ]; + assert_batches_eq!(expected, &actual); + + // concat 4 strings as a string concat pipe. + let sql = "SELECT 'aa' || 'b' || 'cc' || 'd'"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+----------------------------------------------------+", + "| Utf8(\"aa\") || Utf8(\"b\") || Utf8(\"cc\") || Utf8(\"d\") |", + "+----------------------------------------------------+", + "| aabccd |", + "+----------------------------------------------------+", + ]; + assert_batches_eq!(expected, &actual); + + // concat 2 strings and NULL, output should be NULL + let sql = "SELECT 'aa' || NULL || 'd'"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+---------------------------------------+", + "| Utf8(\"aa\") || Utf8(NULL) || Utf8(\"d\") |", + "+---------------------------------------+", + "| |", + "+---------------------------------------+", + ]; + assert_batches_eq!(expected, &actual); + + // concat 1 strings and 2 numeric + let sql = "SELECT 'a' || 42 || 23.3"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-----------------------------------------+", + "| Utf8(\"a\") || Int64(42) || Float64(23.3) |", + "+-----------------------------------------+", + "| a4223.3 |", + "+-----------------------------------------+", + ]; + assert_batches_eq!(expected, &actual); + Ok(()) +} + #[tokio::test] async fn test_boolean_expressions() -> Result<()> { test_expression!("true", "true"); diff --git a/datafusion/expr/src/operator.rs b/datafusion/expr/src/operator.rs index e4a9871e674d2..d22cb85694eb7 100644 --- a/datafusion/expr/src/operator.rs +++ b/datafusion/expr/src/operator.rs @@ -71,6 +71,8 @@ pub enum Operator { BitwiseAnd, /// Bitwise or, like `|` BitwiseOr, + /// String concat + StringConcat, } impl fmt::Display for Operator { @@ -99,6 +101,7 @@ impl fmt::Display for Operator { Operator::IsNotDistinctFrom => "IS NOT DISTINCT FROM", Operator::BitwiseAnd => "&", Operator::BitwiseOr => "|", + Operator::StringConcat => "||", }; write!(f, "{}", display) } diff --git a/datafusion/physical-expr/src/coercion_rule/binary_rule.rs b/datafusion/physical-expr/src/coercion_rule/binary_rule.rs index 3e7e323f8eab0..2bedcc3683ef0 100644 --- a/datafusion/physical-expr/src/coercion_rule/binary_rule.rs +++ b/datafusion/physical-expr/src/coercion_rule/binary_rule.rs @@ -17,6 +17,7 @@ //! Coercion rules for matching argument types for binary operators +use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, DECIMAL_MAX_PRECISION, DECIMAL_MAX_SCALE}; use datafusion_common::DataFusionError; use datafusion_common::Result; @@ -59,6 +60,8 @@ pub(crate) fn coerce_types( | Operator::RegexIMatch | Operator::RegexNotMatch | Operator::RegexNotIMatch => string_coercion(lhs_type, rhs_type), + // "||" operator has its own rules, and always return a string type + Operator::StringConcat => string_concat_coercion(lhs_type, rhs_type), Operator::IsDistinctFrom | Operator::IsNotDistinctFrom => { eq_coercion(lhs_type, rhs_type) } @@ -370,6 +373,34 @@ fn dictionary_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option Option { + use arrow::datatypes::DataType::*; + string_coercion(lhs_type, rhs_type).or(match (lhs_type, rhs_type) { + (Utf8, from_type) | (from_type, Utf8) => { + string_concat_internal_coercion(from_type, &Utf8) + } + (LargeUtf8, from_type) | (from_type, LargeUtf8) => { + string_concat_internal_coercion(from_type, &LargeUtf8) + } + _ => None, + }) +} + +fn string_concat_internal_coercion( + from_type: &DataType, + to_type: &DataType, +) -> Option { + if can_cast_types(from_type, to_type) { + Some(to_type.to_owned()) + } else { + None + } +} + /// Coercion rules for Strings: the type that both lhs and rhs can be /// casted to for the purpose of a string computation fn string_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 6b40c8f5af83c..abd0f466c2160 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -56,6 +56,7 @@ use arrow::record_batch::RecordBatch; use crate::coercion_rule::binary_rule::coerce_types; use crate::expressions::try_cast; +use crate::string_expressions; use crate::PhysicalExpr; use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; @@ -484,6 +485,33 @@ fn bitwise_or(left: ArrayRef, right: ArrayRef) -> Result { } } +/// Use datafusion build-in expression `concat` to evaluate `StringConcat` operator. +/// Besides, any `NULL` exists on lhs or rhs will come out result `NULL` +/// 1. 'a' || 'b' || 32 = 'ab32' +/// 2. 'a' || NULL = NULL +fn string_concat(left: ArrayRef, right: ArrayRef) -> Result { + let ignore_null = match string_expressions::concat(&[ + ColumnarValue::Array(left.clone()), + ColumnarValue::Array(right.clone()), + ])? { + ColumnarValue::Array(array_ref) => array_ref, + scalar_value => scalar_value.into_array(left.clone().len()), + }; + let ignore_null_array = ignore_null.as_any().downcast_ref::().unwrap(); + let result = (0..ignore_null_array.len()) + .into_iter() + .map(|index| { + if left.is_null(index) || right.is_null(index) { + None + } else { + Some(ignore_null_array.value(index)) + } + }) + .collect::(); + + Ok(Arc::new(result) as ArrayRef) +} + fn bitwise_and_scalar( array: &dyn Array, scalar: ScalarValue, @@ -1073,6 +1101,8 @@ pub fn binary_operator_data_type( | Operator::Divide | Operator::Multiply | Operator::Modulo => Ok(result_type), + // string operations return the same values as the common coerced type + Operator::StringConcat => Ok(result_type), } } @@ -1334,6 +1364,7 @@ impl BinaryExpr { } Operator::BitwiseAnd => bitwise_and(left, right), Operator::BitwiseOr => bitwise_or(left, right), + Operator::StringConcat => string_concat(left, right), } } }