Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/spark_expressions_support.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@
- [ ] date_add
- [ ] date_diff
- [ ] date_format
- [ ] date_from_unix_date
- [x] date_from_unix_date
- [x] date_part
- [ ] date_sub
- [ ] date_trunc
Expand Down
3 changes: 2 additions & 1 deletion native/spark-expr/src/comet_scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan,
spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex,
spark_unscaled_value, EvalMode, SparkArrayCompact, SparkContains, SparkDateDiff,
SparkDateTrunc, SparkMakeDate, SparkSizeFunc,
SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, SparkSizeFunc,
};
use arrow::datatypes::DataType;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
Expand Down Expand Up @@ -203,6 +203,7 @@ fn all_scalar_functions() -> Vec<Arc<ScalarUDF>> {
Arc::new(ScalarUDF::new_from_impl(SparkArrayCompact::default())),
Arc::new(ScalarUDF::new_from_impl(SparkContains::default())),
Arc::new(ScalarUDF::new_from_impl(SparkDateDiff::default())),
Arc::new(ScalarUDF::new_from_impl(SparkDateFromUnixDate::default())),
Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())),
Arc::new(ScalarUDF::new_from_impl(SparkMakeDate::default())),
Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())),
Expand Down
102 changes: 102 additions & 0 deletions native/spark-expr/src/datetime_funcs/date_from_unix_date.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{Array, Date32Array, Int32Array};
use arrow::datatypes::DataType;
use datafusion::common::{utils::take_function_args, DataFusionError, Result, ScalarValue};
use datafusion::logical_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
};
use std::any::Any;
use std::sync::Arc;

/// Spark-compatible date_from_unix_date function.
/// Converts an integer representing days since Unix epoch (1970-01-01) to a Date32 value.
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SparkDateFromUnixDate {
signature: Signature,
aliases: Vec<String>,
}

impl SparkDateFromUnixDate {
pub fn new() -> Self {
Self {
signature: Signature::exact(vec![DataType::Int32], Volatility::Immutable),
aliases: vec![],
}
}
}

impl Default for SparkDateFromUnixDate {
fn default() -> Self {
Self::new()
}
}

impl ScalarUDFImpl for SparkDateFromUnixDate {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"date_from_unix_date"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Date32)
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let [unix_date] = take_function_args(self.name(), args.args)?;
match unix_date {
ColumnarValue::Array(arr) => {
let int_array = arr.as_any().downcast_ref::<Int32Array>().ok_or_else(|| {
DataFusionError::Execution(
"date_from_unix_date expects Int32Array input".to_string(),
)
})?;

// Date32 and Int32 both represent days since epoch, so we can directly
// reinterpret the values. The only operation needed is creating a Date32Array
// from the same underlying i32 values.
let date_array =
Date32Array::new(int_array.values().clone(), int_array.nulls().cloned());

Ok(ColumnarValue::Array(Arc::new(date_array)))
}
ColumnarValue::Scalar(scalar) => match scalar {
ScalarValue::Int32(Some(days)) => {
Ok(ColumnarValue::Scalar(ScalarValue::Date32(Some(days))))
}
ScalarValue::Int32(None) | ScalarValue::Null => {
Ok(ColumnarValue::Scalar(ScalarValue::Date32(None)))
}
_ => Err(DataFusionError::Execution(
"date_from_unix_date expects Int32 scalar input".to_string(),
)),
},
}
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}
2 changes: 2 additions & 0 deletions native/spark-expr/src/datetime_funcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

mod date_diff;
mod date_from_unix_date;
mod date_trunc;
mod extract_date_part;
mod hours;
Expand All @@ -24,6 +25,7 @@ mod timestamp_trunc;
mod unix_timestamp;

pub use date_diff::SparkDateDiff;
pub use date_from_unix_date::SparkDateFromUnixDate;
pub use date_trunc::SparkDateTrunc;
pub use extract_date_part::SparkHour;
pub use extract_date_part::SparkMinute;
Expand Down
4 changes: 2 additions & 2 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ pub use comet_scalar_funcs::{
};
pub use csv_funcs::*;
pub use datetime_funcs::{
SparkDateDiff, SparkDateTrunc, SparkHour, SparkHoursTransform, SparkMakeDate, SparkMinute,
SparkSecond, SparkUnixTimestamp, TimestampTruncExpr,
SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkHour, SparkHoursTransform,
SparkMakeDate, SparkMinute, SparkSecond, SparkUnixTimestamp, TimestampTruncExpr,
};
pub use error::{decimal_overflow_error, SparkError, SparkErrorWithContext, SparkResult};
pub use hash_funcs::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[DateAdd] -> CometDateAdd,
classOf[DateDiff] -> CometDateDiff,
classOf[DateFormatClass] -> CometDateFormat,
classOf[DateFromUnixDate] -> CometDateFromUnixDate,
classOf[Days] -> CometDays,
classOf[Hours] -> CometHours,
classOf[DateSub] -> CometDateSub,
Expand Down
4 changes: 3 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.comet.serde

import java.util.Locale

import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampNTZType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -354,6 +354,8 @@ object CometMakeDate extends CometScalarFunction[MakeDate]("make_date")

object CometLastDay extends CometScalarFunction[LastDay]("last_day")

object CometDateFromUnixDate extends CometScalarFunction[DateFromUnixDate]("date_from_unix_date")

object CometDateDiff extends CometScalarFunction[DateDiff]("date_diff")

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

statement
CREATE TABLE test_date_from_unix_date(i int) USING parquet

-- -719162 = 0001-01-01 (Spark min date), 2932896 = 9999-12-31 (Spark max date)
statement
INSERT INTO test_date_from_unix_date VALUES (0), (1), (-1), (18993), (-25567), (-719162), (2932896), (NULL)

query
SELECT date_from_unix_date(i) FROM test_date_from_unix_date

-- literal arguments
query
SELECT date_from_unix_date(0), date_from_unix_date(1), date_from_unix_date(-1), date_from_unix_date(18993), date_from_unix_date(NULL)
Loading