Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/spark_expressions_support.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@
- [ ] second
- [ ] timestamp_micros
- [ ] timestamp_millis
- [ ] timestamp_seconds
- [x] timestamp_seconds
- [ ] to_date
- [ ] to_timestamp
- [ ] to_timestamp_ltz
Expand Down
3 changes: 2 additions & 1 deletion native/spark-expr/src/comet_scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan,
spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex,
spark_unscaled_value, EvalMode, SparkArrayCompact, SparkContains, SparkDateDiff,
SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, SparkSizeFunc,
SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, SparkSecondsToTimestamp, SparkSizeFunc,
};
use arrow::datatypes::DataType;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
Expand Down Expand Up @@ -206,6 +206,7 @@ fn all_scalar_functions() -> Vec<Arc<ScalarUDF>> {
Arc::new(ScalarUDF::new_from_impl(SparkDateFromUnixDate::default())),
Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())),
Arc::new(ScalarUDF::new_from_impl(SparkMakeDate::default())),
Arc::new(ScalarUDF::new_from_impl(SparkSecondsToTimestamp::default())),
Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())),
]
}
Expand Down
2 changes: 2 additions & 0 deletions native/spark-expr/src/datetime_funcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod date_trunc;
mod extract_date_part;
mod hours;
mod make_date;
mod seconds_to_timestamp;
mod timestamp_trunc;
mod unix_timestamp;

Expand All @@ -32,5 +33,6 @@ pub use extract_date_part::SparkMinute;
pub use extract_date_part::SparkSecond;
pub use hours::SparkHoursTransform;
pub use make_date::SparkMakeDate;
pub use seconds_to_timestamp::SparkSecondsToTimestamp;
pub use timestamp_trunc::TimestampTruncExpr;
pub use unix_timestamp::SparkUnixTimestamp;
192 changes: 192 additions & 0 deletions native/spark-expr/src/datetime_funcs/seconds_to_timestamp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{
Array, Float32Array, Float64Array, Int32Array, Int64Array, TimestampMicrosecondArray,
};
use arrow::compute::try_unary;
use arrow::datatypes::{DataType, TimeUnit};
use datafusion::common::{utils::take_function_args, DataFusionError, Result, ScalarValue};
use datafusion::logical_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use std::any::Any;
use std::sync::Arc;

const MICROS_PER_SECOND: i64 = 1_000_000;

/// Spark-compatible seconds_to_timestamp (timestamp_seconds) function.
/// Converts seconds since Unix epoch to a timestamp.
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SparkSecondsToTimestamp {
signature: Signature,
aliases: Vec<String>,
}

impl SparkSecondsToTimestamp {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Int32]),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be byte or short type?

Spark is specifying

override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)

private[sql] abstract class IntegralType extends NumericType

class ShortType private () extends IntegralType {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah Matt already pointed out...

TypeSignature::Exact(vec![DataType::Int64]),
TypeSignature::Exact(vec![DataType::Float32]),
TypeSignature::Exact(vec![DataType::Float64]),
],
Volatility::Immutable,
),
aliases: vec!["timestamp_seconds".to_string()],
}
}
}

impl Default for SparkSecondsToTimestamp {
fn default() -> Self {
Self::new()
}
}

impl ScalarUDFImpl for SparkSecondsToTimestamp {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"seconds_to_timestamp"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let [seconds] = take_function_args(self.name(), args.args)?;

match seconds {
ColumnarValue::Array(arr) => {
// Handle Int32 input — no overflow possible since i32 * 1_000_000 fits in i64
if let Some(int_array) = arr.as_any().downcast_ref::<Int32Array>() {
let result: TimestampMicrosecondArray =
try_unary(int_array, |s| Ok((s as i64) * MICROS_PER_SECOND))?;
return Ok(ColumnarValue::Array(Arc::new(result)));
}

// Handle Int64 input — error on overflow to match Spark's Math.multiplyExact
if let Some(int_array) = arr.as_any().downcast_ref::<Int64Array>() {
let result: TimestampMicrosecondArray = try_unary(int_array, |s| {
s.checked_mul(MICROS_PER_SECOND).ok_or_else(|| {
arrow::error::ArrowError::ComputeError("long overflow".to_string())
})
})?;
return Ok(ColumnarValue::Array(Arc::new(result)));
}

// Handle Float32 input — cast to f64 and use Float64 path
if let Some(float_array) = arr.as_any().downcast_ref::<Float32Array>() {
let result: arrow::array::TimestampMicrosecondArray = float_array
.iter()
.map(|opt| {
opt.and_then(|s| {
let s = s as f64;
if s.is_nan() || s.is_infinite() {
None
} else {
Some((s * (MICROS_PER_SECOND as f64)) as i64)
}
})
})
.collect();
return Ok(ColumnarValue::Array(Arc::new(result)));
}

// Handle Float64 input — NaN and Infinity return null per Spark behavior
if let Some(float_array) = arr.as_any().downcast_ref::<Float64Array>() {
let result: arrow::array::TimestampMicrosecondArray = float_array
.iter()
.map(|opt| {
opt.and_then(|s| {
if s.is_nan() || s.is_infinite() {
None
} else {
Some((s * (MICROS_PER_SECOND as f64)) as i64)
}
})
})
.collect();
return Ok(ColumnarValue::Array(Arc::new(result)));
}

Err(DataFusionError::Execution(format!(
"seconds_to_timestamp expects Int32, Int64, Float32 or Float64 input, got {:?}",
arr.data_type()
)))
}
ColumnarValue::Scalar(scalar) => {
let ts_micros = match &scalar {
ScalarValue::Int32(Some(s)) => Some((*s as i64) * MICROS_PER_SECOND),
ScalarValue::Int64(Some(s)) => {
Some(s.checked_mul(MICROS_PER_SECOND).ok_or_else(|| {
DataFusionError::ArrowError(
Box::new(arrow::error::ArrowError::ComputeError(
"long overflow".to_string(),
)),
None,
)
})?)
}
ScalarValue::Float32(Some(s)) => {
let s = *s as f64;
if s.is_nan() || s.is_infinite() {
None
} else {
Some((s * (MICROS_PER_SECOND as f64)) as i64)
}
}
ScalarValue::Float64(Some(s)) => {
if s.is_nan() || s.is_infinite() {
None
} else {
Some((s * (MICROS_PER_SECOND as f64)) as i64)
}
}
ScalarValue::Int32(None)
| ScalarValue::Int64(None)
| ScalarValue::Float32(None)
| ScalarValue::Float64(None)
| ScalarValue::Null => None,
_ => {
return Err(DataFusionError::Execution(format!(
"seconds_to_timestamp expects numeric scalar input, got {:?}",
scalar.data_type()
)))
}
};
Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
ts_micros, None,
)))
}
}
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}
3 changes: 2 additions & 1 deletion native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ pub use comet_scalar_funcs::{
pub use csv_funcs::*;
pub use datetime_funcs::{
SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkHour, SparkHoursTransform,
SparkMakeDate, SparkMinute, SparkSecond, SparkUnixTimestamp, TimestampTruncExpr,
SparkMakeDate, SparkMinute, SparkSecond, SparkSecondsToTimestamp, SparkUnixTimestamp,
TimestampTruncExpr,
};
pub use error::{decimal_overflow_error, SparkError, SparkErrorWithContext, SparkResult};
pub use hash_funcs::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[Minute] -> CometMinute,
classOf[NextDay] -> CometNextDay,
classOf[Second] -> CometSecond,
classOf[SecondsToTimestamp] -> CometSecondsToTimestamp,
classOf[TruncDate] -> CometTruncDate,
classOf[TruncTimestamp] -> CometTruncTimestamp,
classOf[UnixTimestamp] -> CometUnixTimestamp,
Expand Down
5 changes: 4 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.comet.serde

import java.util.Locale

import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, SecondsToTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampNTZType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -352,6 +352,9 @@ object CometNextDay extends CometScalarFunction[NextDay]("next_day")

object CometMakeDate extends CometScalarFunction[MakeDate]("make_date")

object CometSecondsToTimestamp
extends CometScalarFunction[SecondsToTimestamp]("seconds_to_timestamp")

object CometLastDay extends CometScalarFunction[LastDay]("last_day")

object CometDateFromUnixDate extends CometScalarFunction[DateFromUnixDate]("date_from_unix_date")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- Config: spark.sql.session.timeZone=UTC
-- ConfigMatrix: parquet.enable.dictionary=false,true

-- bigint column
statement
CREATE TABLE test_ts_seconds_bigint(c0 bigint) USING parquet

statement
INSERT INTO test_ts_seconds_bigint VALUES (0), (1640995200), (-86400), (4102444800), (-2208988800), (NULL)

query
SELECT c0, timestamp_seconds(c0) FROM test_ts_seconds_bigint

-- int column
statement
CREATE TABLE test_ts_seconds_int(c0 int) USING parquet

statement
INSERT INTO test_ts_seconds_int VALUES (0), (1640995200), (-86400), (NULL)

query
SELECT c0, timestamp_seconds(c0) FROM test_ts_seconds_int

-- double column
statement
CREATE TABLE test_ts_seconds_double(c0 double) USING parquet

statement
INSERT INTO test_ts_seconds_double VALUES (0.0), (1640995200.123), (-86400.5), (NULL)

query
SELECT c0, timestamp_seconds(c0) FROM test_ts_seconds_double

-- literal arguments
query
SELECT timestamp_seconds(0)

query
SELECT timestamp_seconds(1640995200)

-- negative value (before epoch)
query
SELECT timestamp_seconds(-86400)

-- decimal seconds (fractional)
query
SELECT timestamp_seconds(CAST(1640995200.123 AS DOUBLE))

-- null handling
query
SELECT timestamp_seconds(NULL)

-- NaN input (should return null)
query
SELECT timestamp_seconds(CAST('NaN' AS DOUBLE))

-- Infinity input (should return null)
query
SELECT timestamp_seconds(CAST('Infinity' AS DOUBLE))

-- Negative infinity input (should return null)
query
SELECT timestamp_seconds(CAST('-Infinity' AS DOUBLE))
Loading