Skip to content

Commit 724152e

Browse files
feat: Add TimestampNTZType support for unix_timestamp (#4039)
* feat: Add TimestampNTZType support for casts and unix_timestamp Co-authored-by: Andy Grove <agrove@apache.org>
1 parent f089295 commit 724152e

7 files changed

Lines changed: 346 additions & 36 deletions

File tree

docs/source/user-guide/latest/compatibility/expressions/datetime.md

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,27 @@ under the License.
1919

2020
# Date/Time Expressions
2121

22-
<!--BEGIN:EXPR_COMPAT[datetime]-->
23-
<!--END:EXPR_COMPAT-->
22+
- **Hour, Minute, Second**: Incorrectly apply timezone conversion to TimestampNTZ inputs. TimestampNTZ stores local
23+
time without timezone, so no conversion should be applied. These expressions work correctly with Timestamp inputs.
24+
[#3180](https://github.com/apache/datafusion-comet/issues/3180)
25+
- **TruncTimestamp (date_trunc)**: Produces incorrect results when used with non-UTC timezones. Compatible when
26+
timezone is UTC.
27+
[#2649](https://github.com/apache/datafusion-comet/issues/2649)
28+
29+
## Date and Time Functions
30+
31+
Comet's native implementation of date and time functions may produce different results than Spark for dates
32+
far in the future (approximately beyond year 2100). This is because Comet uses the chrono-tz library for
33+
timezone calculations, which has limited support for Daylight Saving Time (DST) rules beyond the IANA
34+
time zone database's explicit transitions.
35+
36+
For dates within a reasonable range (approximately 1970-2100), Comet's date and time functions are compatible
37+
with Spark. For dates beyond this range, functions that involve timezone-aware calculations (such as
38+
`date_trunc` with timezone-aware timestamps) may produce results with incorrect DST offsets.
39+
40+
If you need to process dates far in the future with accurate timezone handling, consider:
41+
42+
- Using timezone-naive types (`timestamp_ntz`) when timezone conversion is not required
43+
- Falling back to Spark for these specific operations
44+
<!--BEGIN:EXPR_COMPAT[datetime]-->
45+
<!--END:EXPR_COMPAT-->

native/spark-expr/src/datetime_funcs/timestamp_trunc.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -113,20 +113,33 @@ impl PhysicalExpr for TimestampTruncExpr {
113113
let tz = self.timezone.clone();
114114
match (timestamp, format) {
115115
(ColumnarValue::Array(ts), ColumnarValue::Scalar(Utf8(Some(format)))) => {
116-
let ts = array_with_timezone(
117-
ts,
118-
tz.clone(),
119-
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
120-
)?;
116+
// For TimestampNTZ (Timestamp(Microsecond, None)), skip timezone conversion.
117+
// NTZ values are timezone-independent and truncation should operate directly
118+
// on the naive microsecond values without any timezone resolution.
119+
let is_ntz = matches!(ts.data_type(), DataType::Timestamp(Microsecond, None));
120+
let ts = if is_ntz {
121+
ts
122+
} else {
123+
array_with_timezone(
124+
ts,
125+
tz.clone(),
126+
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
127+
)?
128+
};
121129
let result = timestamp_trunc_dyn(&ts, format)?;
122130
Ok(ColumnarValue::Array(result))
123131
}
124132
(ColumnarValue::Array(ts), ColumnarValue::Array(formats)) => {
125-
let ts = array_with_timezone(
126-
ts,
127-
tz.clone(),
128-
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
129-
)?;
133+
let is_ntz = matches!(ts.data_type(), DataType::Timestamp(Microsecond, None));
134+
let ts = if is_ntz {
135+
ts
136+
} else {
137+
array_with_timezone(
138+
ts,
139+
tz.clone(),
140+
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
141+
)?
142+
};
130143
let result = timestamp_trunc_array_fmt_dyn(&ts, &formats)?;
131144
Ok(ColumnarValue::Array(result))
132145
}

native/spark-expr/src/datetime_funcs/unix_timestamp.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,27 @@ impl ScalarUDFImpl for SparkUnixTimestamp {
7878

7979
match args {
8080
[ColumnarValue::Array(array)] => match array.data_type() {
81+
DataType::Timestamp(Microsecond, None) => {
82+
// TimestampNTZ: No timezone conversion needed - simply divide microseconds
83+
// by MICROS_PER_SECOND. TimestampNTZ stores local time without timezone.
84+
let timestamp_array =
85+
array.as_primitive::<arrow::datatypes::TimestampMicrosecondType>();
86+
87+
let result: PrimitiveArray<Int64Type> = if timestamp_array.null_count() == 0 {
88+
timestamp_array
89+
.values()
90+
.iter()
91+
.map(|&micros| div_floor(micros, MICROS_PER_SECOND))
92+
.collect()
93+
} else {
94+
timestamp_array
95+
.iter()
96+
.map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND)))
97+
.collect()
98+
};
99+
100+
Ok(ColumnarValue::Array(Arc::new(result)))
101+
}
81102
DataType::Timestamp(_, _) => {
82103
let is_utc = self.timezone == "UTC";
83104
let array = if is_utc
@@ -99,7 +120,7 @@ impl ScalarUDFImpl for SparkUnixTimestamp {
99120
timestamp_array
100121
.values()
101122
.iter()
102-
.map(|&micros| micros / MICROS_PER_SECOND)
123+
.map(|&micros| div_floor(micros, MICROS_PER_SECOND))
103124
.collect()
104125
} else {
105126
timestamp_array

native/spark-expr/src/kernels/temporal.rs

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
//! temporal kernels
1919
20-
use chrono::{DateTime, Datelike, Duration, NaiveDate, Timelike, Utc};
20+
use chrono::{
21+
DateTime, Datelike, Duration, LocalResult, NaiveDate, NaiveDateTime, TimeZone, Timelike, Utc,
22+
};
2123

2224
use std::sync::Arc;
2325

@@ -153,10 +155,23 @@ where
153155
Ok(())
154156
}
155157

156-
// Apply the Tz to the Naive Date Time,,convert to UTC, and return as microseconds in Unix epoch
158+
// Apply the Tz to the Naive Date Time, convert to UTC, and return as microseconds in Unix epoch.
159+
// After truncation the carried UTC offset may be wrong if the truncated time falls in a different
160+
// DST period than the original (e.g., truncating a December/PST timestamp to QUARTER yields
161+
// October 1 which is in PDT). We re-resolve the naive local time through the timezone so that
162+
// chrono picks the correct offset for the target date.
157163
#[inline]
158164
fn as_micros_from_unix_epoch_utc(dt: Option<DateTime<Tz>>) -> i64 {
159-
dt.unwrap().with_timezone(&Utc).timestamp_micros()
165+
let dt = dt.unwrap();
166+
let naive = dt.naive_local();
167+
let tz = dt.timezone();
168+
169+
match tz.from_local_datetime(&naive) {
170+
LocalResult::Single(resolved) | LocalResult::Ambiguous(resolved, _) => {
171+
resolved.with_timezone(&Utc).timestamp_micros()
172+
}
173+
LocalResult::None => dt.with_timezone(&Utc).timestamp_micros(),
174+
}
160175
}
161176

162177
#[inline]
@@ -529,6 +544,89 @@ pub(crate) fn timestamp_trunc_dyn(
529544
}
530545
}
531546

547+
/// Convert microseconds since epoch to NaiveDateTime
548+
#[inline]
549+
fn micros_to_naive(micros: i64) -> Option<NaiveDateTime> {
550+
DateTime::from_timestamp_micros(micros).map(|dt| dt.naive_utc())
551+
}
552+
553+
/// Convert NaiveDateTime back to microseconds since epoch
554+
#[inline]
555+
fn naive_to_micros(dt: NaiveDateTime) -> i64 {
556+
dt.and_utc().timestamp_micros()
557+
}
558+
559+
/// Resolve a truncation format string to the corresponding NaiveDateTime truncation function.
560+
fn ntz_trunc_fn_for_format(
561+
format: &str,
562+
) -> Result<fn(NaiveDateTime) -> Option<NaiveDateTime>, SparkError> {
563+
match format.to_uppercase().as_str() {
564+
"YEAR" | "YYYY" | "YY" => Ok(trunc_date_to_year),
565+
"QUARTER" => Ok(trunc_date_to_quarter),
566+
"MONTH" | "MON" | "MM" => Ok(trunc_date_to_month),
567+
"WEEK" => Ok(trunc_date_to_week),
568+
"DAY" | "DD" => Ok(trunc_date_to_day),
569+
"HOUR" => Ok(trunc_date_to_hour),
570+
"MINUTE" => Ok(trunc_date_to_minute),
571+
"SECOND" => Ok(trunc_date_to_second),
572+
"MILLISECOND" => Ok(trunc_date_to_ms),
573+
"MICROSECOND" => Ok(trunc_date_to_microsec),
574+
_ => Err(SparkError::Internal(format!(
575+
"Unsupported format: {format:?} for function 'timestamp_trunc'"
576+
))),
577+
}
578+
}
579+
580+
/// Truncate a TimestampNTZ array without any timezone conversion.
581+
/// NTZ values are timezone-independent; we treat the raw microseconds as a naive datetime.
582+
fn timestamp_trunc_ntz<T>(
583+
array: &PrimitiveArray<T>,
584+
format: String,
585+
) -> Result<TimestampMicrosecondArray, SparkError>
586+
where
587+
T: ArrowTemporalType + ArrowNumericType,
588+
i64: From<T::Native>,
589+
{
590+
let trunc_fn = ntz_trunc_fn_for_format(&format)?;
591+
592+
let result: TimestampMicrosecondArray = array
593+
.iter()
594+
.map(|opt_val| {
595+
opt_val.and_then(|v| {
596+
let micros: i64 = v.into();
597+
micros_to_naive(micros)
598+
.and_then(trunc_fn)
599+
.map(naive_to_micros)
600+
})
601+
})
602+
.collect();
603+
604+
Ok(result)
605+
}
606+
607+
/// Truncate a single NTZ value and append to builder
608+
fn timestamp_trunc_ntz_single<F>(
609+
value: Option<i64>,
610+
builder: &mut PrimitiveBuilder<TimestampMicrosecondType>,
611+
op: F,
612+
) -> Result<(), SparkError>
613+
where
614+
F: Fn(NaiveDateTime) -> Option<NaiveDateTime>,
615+
{
616+
match value {
617+
Some(micros) => match micros_to_naive(micros).and_then(op) {
618+
Some(truncated) => builder.append_value(naive_to_micros(truncated)),
619+
None => {
620+
return Err(SparkError::Internal(
621+
"Unable to truncate NTZ timestamp".to_string(),
622+
))
623+
}
624+
},
625+
None => builder.append_null(),
626+
}
627+
Ok(())
628+
}
629+
532630
pub(crate) fn timestamp_trunc<T>(
533631
array: &PrimitiveArray<T>,
534632
format: String,
@@ -540,6 +638,10 @@ where
540638
let builder = TimestampMicrosecondBuilder::with_capacity(array.len());
541639
let iter = ArrayIter::new(array);
542640
match array.data_type() {
641+
DataType::Timestamp(TimeUnit::Microsecond, None) => {
642+
// TimestampNTZ: operate directly on naive microsecond values without timezone
643+
timestamp_trunc_ntz(array, format)
644+
}
543645
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
544646
match format.to_uppercase().as_str() {
545647
"YEAR" | "YYYY" | "YY" => {
@@ -687,6 +789,15 @@ macro_rules! timestamp_trunc_array_fmt_helper {
687789
"lengths of values array and format array must be the same"
688790
);
689791
match $datatype {
792+
DataType::Timestamp(TimeUnit::Microsecond, None) => {
793+
// TimestampNTZ: operate directly on naive microsecond values
794+
for (index, val) in iter.enumerate() {
795+
let micros_val = val.map(|v| i64::from(v));
796+
let trunc_fn = ntz_trunc_fn_for_format($formats.value(index))?;
797+
timestamp_trunc_ntz_single(micros_val, &mut builder, trunc_fn)?;
798+
}
799+
Ok(builder.finish())
800+
}
690801
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
691802
let tz: Tz = tz.parse()?;
692803
for (index, val) in iter.enumerate() {

spark/src/main/scala/org/apache/comet/serde/datetime.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,11 @@ object CometHour extends CometExpressionSerde[Hour] {
186186
override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason)
187187

188188
override def getSupportLevel(expr: Hour): SupportLevel = {
189-
if (expr.child.dataType.typeName == "timestamp_ntz") {
190-
Incompatible(Some(incompatReason))
189+
if (expr.child.dataType == TimestampNTZType) {
190+
Incompatible(
191+
Some(
192+
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
193+
" (https://github.com/apache/datafusion-comet/issues/3180)"))
191194
} else {
192195
Compatible()
193196
}
@@ -221,7 +224,7 @@ object CometHour extends CometExpressionSerde[Hour] {
221224
object CometMinute extends CometExpressionSerde[Minute] {
222225

223226
override def getSupportLevel(expr: Minute): SupportLevel = {
224-
if (expr.child.dataType.typeName == "timestamp_ntz") {
227+
if (expr.child.dataType == TimestampNTZType) {
225228
Incompatible(
226229
Some(
227230
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
@@ -259,7 +262,7 @@ object CometMinute extends CometExpressionSerde[Minute] {
259262
object CometSecond extends CometExpressionSerde[Second] {
260263

261264
override def getSupportLevel(expr: Second): SupportLevel = {
262-
if (expr.child.dataType.typeName == "timestamp_ntz") {
265+
if (expr.child.dataType == TimestampNTZType) {
263266
Incompatible(
264267
Some(
265268
"Incorrectly applies timezone conversion to TimestampNTZ inputs" +
@@ -297,11 +300,9 @@ object CometSecond extends CometExpressionSerde[Second] {
297300
object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] {
298301

299302
private def isSupportedInputType(expr: UnixTimestamp): Boolean = {
300-
// Note: TimestampNTZType is not supported because Comet incorrectly applies
301-
// timezone conversion to TimestampNTZ values. TimestampNTZ stores local time
302-
// without timezone, so no conversion should be applied.
303303
expr.children.head.dataType match {
304304
case TimestampType | DateType => true
305+
case TimestampNTZType => true
305306
case _ => false
306307
}
307308
}

spark/src/test/scala/org/apache/comet/CometCastSuite.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1613,7 +1613,13 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
16131613
// CAST from TimestampNTZType
16141614

16151615
test("cast TimestampNTZType to StringType") {
1616-
castTest(generateTimestampNTZ(), DataTypes.StringType)
1616+
// TimestampNTZ is timezone-independent, so casting to string should produce
1617+
// the same result regardless of the session timezone.
1618+
for (tz <- representativeTimezones) {
1619+
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) {
1620+
castTest(generateTimestampNTZ(), DataTypes.StringType)
1621+
}
1622+
}
16171623
}
16181624

16191625
test("cast TimestampNTZType to DateType") {

0 commit comments

Comments
 (0)