Skip to content

Commit 9747361

Browse files
committed
fix: cast to and from timestamp_ntz
1 parent ec91c91 commit 9747361

7 files changed

Lines changed: 373 additions & 75 deletions

File tree

native/spark-expr/src/conversion_funcs/cast.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::conversion_funcs::temporal::{
3535
is_df_cast_from_timestamp_spark_compatible,
3636
};
3737
use crate::conversion_funcs::utils::spark_cast_postprocess;
38-
use crate::utils::array_with_timezone;
38+
use crate::utils::{array_with_timezone, cast_timestamp_to_ntz, timestamp_ntz_to_timestamp};
3939
use crate::EvalMode::Legacy;
4040
use crate::{cast_whole_num_to_binary, BinaryOutputStyle};
4141
use crate::{EvalMode, SparkError};
@@ -441,6 +441,21 @@ pub(crate) fn cast_array(
441441
(Float32 | Float64, Timestamp(_, tz)) => cast_float_to_timestamp(&array, tz, eval_mode),
442442
(Boolean, Timestamp(_, tz)) => cast_boolean_to_timestamp(&array, tz),
443443
(Decimal128(_, scale), Timestamp(_, tz)) => cast_decimal_to_timestamp(&array, tz, *scale),
444+
// NTZ → TIMESTAMP: interpret NTZ local-epoch value as session-TZ local time, convert to UTC.
445+
// Must come before the is_datafusion_spark_compatible fallthrough which would
446+
// incorrectly copy raw μs without any timezone conversion.
447+
(Timestamp(_, None), Timestamp(_, Some(target_tz))) => Ok(timestamp_ntz_to_timestamp(
448+
array,
449+
&cast_options.timezone,
450+
Some(target_tz.as_ref()),
451+
)?),
452+
// TIMESTAMP → NTZ: shift UTC epoch to local time in session TZ, store as local epoch.
453+
(Timestamp(_, Some(_)), Timestamp(_, None)) => {
454+
Ok(cast_timestamp_to_ntz(array, &cast_options.timezone)?)
455+
}
456+
// NTZ → Date32 and NTZ → Utf8 are handled by the DataFusion fall-through below
457+
// (is_df_cast_from_timestamp_spark_compatible returns true for Date32 and Utf8).
458+
// These casts are timezone-independent and DataFusion's implementation matches Spark.
444459
_ if cast_options.is_adapting_schema
445460
|| is_datafusion_spark_compatible(&from_type, to_type) =>
446461
{

native/spark-expr/src/conversion_funcs/string.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1516,7 +1516,12 @@ fn extract_offset_suffix(value: &str) -> Option<(&str, timezone::Tz)> {
15161516

15171517
type TimestampParsePattern<T> = (&'static Regex, fn(&str, &T) -> SparkResult<Option<i64>>);
15181518

1519-
static RE_YEAR: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}$").unwrap());
1519+
// RE_YEAR allows only 4-6 digits (not 7) because a bare 7-digit string like "0119704"
1520+
// is ambiguous and Spark rejects it. The other patterns (RE_MONTH, RE_DAY, etc.) keep
1521+
// \d{4,7} because the `-` separator disambiguates the year portion, so "0002020-01-01"
1522+
// is validly year 2020 with leading zeros. date_parser's is_valid_digits also allows up
1523+
// to 7 year digits for the same reason.
1524+
static RE_YEAR: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,6}$").unwrap());
15201525
static RE_MONTH: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}$").unwrap());
15211526
static RE_DAY: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^-?\d{4,7}-\d{2}-\d{2}$").unwrap());
15221527
static RE_HOUR: LazyLock<Regex> =
@@ -1802,6 +1807,9 @@ mod tests {
18021807
Some("T2"),
18031808
Some("0100-01-01T12:34:56.123456"),
18041809
Some("10000-01-01T12:34:56.123456"),
1810+
// 7-digit year-only strings must return null (Spark returns null for these)
1811+
Some("0119704"),
1812+
Some("2024001"),
18051813
]));
18061814
let tz = &timezone::Tz::from_str("UTC").unwrap();
18071815

@@ -1826,7 +1834,10 @@ mod tests {
18261834
result.data_type(),
18271835
&DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
18281836
);
1829-
assert_eq!(result.len(), 4);
1837+
assert_eq!(result.len(), 6);
1838+
// 7-digit year-only strings must be null
1839+
assert!(result.is_null(4), "0119704 should be null");
1840+
assert!(result.is_null(5), "2024001 should be null");
18301841
}
18311842

18321843
#[test]

native/spark-expr/src/conversion_funcs/temporal.rs

Lines changed: 89 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::utils::resolve_local_datetime;
1819
use crate::{timezone, SparkCastOptions, SparkResult};
1920
use arrow::array::{ArrayRef, AsArray, TimestampMicrosecondBuilder};
2021
use arrow::datatypes::{DataType, Date32Type};
21-
use chrono::{NaiveDate, TimeZone};
22+
use chrono::NaiveDate;
2223
use std::str::FromStr;
2324
use std::sync::Arc;
2425

@@ -38,37 +39,49 @@ pub(crate) fn cast_date_to_timestamp(
3839
cast_options: &SparkCastOptions,
3940
target_tz: &Option<Arc<str>>,
4041
) -> SparkResult<ArrayRef> {
41-
let tz_str = if cast_options.timezone.is_empty() {
42-
"UTC"
43-
} else {
44-
cast_options.timezone.as_str()
45-
};
46-
// safe to unwrap since we are falling back to UTC above
47-
let tz = timezone::Tz::from_str(tz_str)?;
48-
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
4942
let date_array = array_ref.as_primitive::<Date32Type>();
50-
5143
let mut builder = TimestampMicrosecondBuilder::with_capacity(date_array.len());
5244

53-
for date in date_array.iter() {
54-
match date {
55-
Some(date) => {
56-
// safe to unwrap since chrono's range ( 262,143 yrs) is higher than
57-
// number of years possible with days as i32 (~ 6 mil yrs)
58-
// convert date in session timezone to timestamp in UTC
59-
let naive_date = epoch + chrono::Duration::days(date as i64);
60-
let local_midnight = naive_date.and_hms_opt(0, 0, 0).unwrap();
61-
let local_midnight_in_microsec = tz
62-
.from_local_datetime(&local_midnight)
63-
// return earliest possible time (edge case with spring / fall DST changes)
64-
.earliest()
65-
.map(|dt| dt.timestamp_micros())
66-
// in case there is an issue with DST and returns None , we fall back to UTC
67-
.unwrap_or((date as i64) * 86_400 * 1_000_000);
68-
builder.append_value(local_midnight_in_microsec);
45+
if target_tz.is_none() {
46+
// TIMESTAMP_NTZ: pure day arithmetic, no session-TZ offset.
47+
// Matches Spark: daysToMicros(d, ZoneOffset.UTC)
48+
for date in date_array.iter() {
49+
match date {
50+
Some(d) => builder.append_value((d as i64) * 86_400 * 1_000_000),
51+
None => builder.append_null(),
6952
}
70-
None => {
71-
builder.append_null();
53+
}
54+
} else {
55+
// TIMESTAMP: midnight in session TZ → UTC epoch μs
56+
let tz_str = if cast_options.timezone.is_empty() {
57+
"UTC"
58+
} else {
59+
cast_options.timezone.as_str()
60+
};
61+
// safe to unwrap since we are falling back to UTC above
62+
let tz = timezone::Tz::from_str(tz_str)?;
63+
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
64+
for date in date_array.iter() {
65+
match date {
66+
Some(d) => {
67+
// safe to unwrap since chrono's range ( 262,143 yrs) is higher than
68+
// number of years possible with days as i32 (~ 6 mil yrs)
69+
// convert date in session timezone to timestamp in UTC
70+
let naive_date = epoch + chrono::Duration::days(d as i64);
71+
let local_midnight = naive_date.and_hms_opt(0, 0, 0).unwrap();
72+
// Use resolve_local_datetime to correctly handle DST transitions:
73+
// - Single: normal case, uses the given offset
74+
// - Ambiguous (fall back): uses the earlier/DST occurrence, matching Spark
75+
// - None (spring forward gap at midnight, e.g. America/Sao_Paulo): uses the
76+
// pre-transition offset to compute the correct UTC time, matching Spark's
77+
// LocalDate.atStartOfDay(zoneId) behaviour.
78+
let local_midnight_in_microsec =
79+
resolve_local_datetime(&tz, local_midnight).timestamp_micros();
80+
builder.append_value(local_midnight_in_microsec);
81+
}
82+
None => {
83+
builder.append_null();
84+
}
7285
}
7386
}
7487
}
@@ -142,4 +155,52 @@ mod tests {
142155
assert_eq!(ts.value(2), dst_date + seven_hours_ts);
143156
assert!(ts.is_null(3));
144157
}
158+
159+
#[test]
160+
fn test_cast_date_to_timestamp_ntz() {
161+
use crate::EvalMode;
162+
use arrow::array::Date32Array;
163+
use arrow::array::{Array, ArrayRef};
164+
use arrow::datatypes::TimestampMicrosecondType;
165+
166+
// For NTZ, result is always days * 86_400_000_000 regardless of session TZ
167+
let dates: ArrayRef = Arc::new(Date32Array::from(vec![
168+
Some(0), // 1970-01-01
169+
Some(1), // 1970-01-02
170+
Some(-1), // 1969-12-31
171+
Some(19723), // 2024-01-01
172+
None,
173+
]));
174+
175+
// NTZ target: no timezone annotation
176+
let ntz_target: Option<Arc<str>> = None;
177+
178+
// session TZ should be ignored for NTZ
179+
for tz in &[
180+
"UTC",
181+
"America/Los_Angeles",
182+
"America/New_York",
183+
"Asia/Kolkata",
184+
] {
185+
let result = cast_date_to_timestamp(
186+
&dates,
187+
&SparkCastOptions::new(EvalMode::Legacy, tz, false),
188+
&ntz_target,
189+
)
190+
.unwrap();
191+
let ts = result.as_primitive::<TimestampMicrosecondType>();
192+
// values are pure arithmetic regardless of session TZ
193+
assert_eq!(ts.value(0), 0, "epoch, tz={tz}");
194+
assert_eq!(ts.value(1), 86_400_000_000i64, "day+1, tz={tz}");
195+
assert_eq!(ts.value(2), -86_400_000_000i64, "day-1, tz={tz}");
196+
assert_eq!(
197+
ts.value(3),
198+
19723i64 * 86_400_000_000i64,
199+
"2024-01-01, tz={tz}"
200+
);
201+
assert!(ts.is_null(4), "null, tz={tz}");
202+
// output array has no timezone annotation
203+
assert_eq!(ts.timezone(), None, "no tz annotation, tz={tz}");
204+
}
205+
}
145206
}

native/spark-expr/src/utils.rs

Lines changed: 96 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,10 @@ pub fn array_with_timezone(
7676
assert!(!timezone.is_empty());
7777
match to_type {
7878
Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array),
79-
Some(DataType::Timestamp(_, Some(_))) => {
80-
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str()))
79+
Some(DataType::Timestamp(_, Some(target_tz))) => {
80+
// Interpret NTZ as local time in session TZ; annotate output with target TZ
81+
// so the result has the exact annotation the caller expects.
82+
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(target_tz.as_ref()))
8183
}
8284
Some(DataType::Timestamp(TimeUnit::Microsecond, None)) => {
8385
// Convert from Timestamp(Millisecond, None) to Timestamp(Microsecond, None)
@@ -100,8 +102,8 @@ pub fn array_with_timezone(
100102
assert!(!timezone.is_empty());
101103
match to_type {
102104
Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array),
103-
Some(DataType::Timestamp(_, Some(_))) => {
104-
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str()))
105+
Some(DataType::Timestamp(_, Some(target_tz))) => {
106+
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(target_tz.as_ref()))
105107
}
106108
_ => {
107109
// Not supported
@@ -117,8 +119,8 @@ pub fn array_with_timezone(
117119
assert!(!timezone.is_empty());
118120
match to_type {
119121
Some(DataType::Utf8) | Some(DataType::Date32) => Ok(array),
120-
Some(DataType::Timestamp(_, Some(_))) => {
121-
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str()))
122+
Some(DataType::Timestamp(_, Some(target_tz))) => {
123+
timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(target_tz.as_ref()))
122124
}
123125
_ => {
124126
// Not supported
@@ -179,7 +181,7 @@ fn datetime_cast_err(value: i64) -> ArrowError {
179181
/// Parameters:
180182
/// tz - timezone used to interpret local_datetime
181183
/// local_datetime - a naive local datetime to resolve
182-
fn resolve_local_datetime(tz: &Tz, local_datetime: NaiveDateTime) -> DateTime<Tz> {
184+
pub(crate) fn resolve_local_datetime(tz: &Tz, local_datetime: NaiveDateTime) -> DateTime<Tz> {
183185
match tz.from_local_datetime(&local_datetime) {
184186
LocalResult::Single(dt) => dt,
185187
LocalResult::Ambiguous(dt, _) => dt,
@@ -210,7 +212,7 @@ fn resolve_local_datetime(tz: &Tz, local_datetime: NaiveDateTime) -> DateTime<Tz
210212
/// array - input array of timestamp without timezone
211213
/// tz - timezone of the values in the input array
212214
/// to_timezone - timezone to change the input values to
213-
fn timestamp_ntz_to_timestamp(
215+
pub(crate) fn timestamp_ntz_to_timestamp(
214216
array: ArrayRef,
215217
tz: &str,
216218
to_timezone: Option<&str>,
@@ -259,6 +261,41 @@ fn timestamp_ntz_to_timestamp(
259261
}
260262
}
261263

264+
/// Converts a `Timestamp(Microsecond, Some(_))` array to `Timestamp(Microsecond, None)`
265+
/// (TIMESTAMP_NTZ) by interpreting the UTC epoch value in the given session timezone and
266+
/// storing the resulting local datetime as epoch-relative microseconds without a TZ annotation.
267+
///
268+
/// Matches Spark: `convertTz(ts, ZoneOffset.UTC, zoneId)`
269+
pub(crate) fn cast_timestamp_to_ntz(
270+
array: ArrayRef,
271+
timezone: &str,
272+
) -> Result<ArrayRef, ArrowError> {
273+
assert!(!timezone.is_empty());
274+
let tz: Tz = timezone.parse()?;
275+
match array.data_type() {
276+
DataType::Timestamp(TimeUnit::Microsecond, Some(_)) => {
277+
let array = as_primitive_array::<TimestampMicrosecondType>(&array);
278+
let result: PrimitiveArray<TimestampMicrosecondType> = array.try_unary(|value| {
279+
as_datetime::<TimestampMicrosecondType>(value)
280+
.ok_or_else(|| datetime_cast_err(value))
281+
.map(|utc_naive| {
282+
// Convert UTC naive datetime → local datetime in session TZ
283+
let local_dt = tz.from_utc_datetime(&utc_naive);
284+
// Re-encode as epoch-relative μs treating local time as UTC anchor.
285+
// This produces the NTZ representation (no offset applied).
286+
local_dt.naive_local().and_utc().timestamp_micros()
287+
})
288+
})?;
289+
// No timezone annotation on output = TIMESTAMP_NTZ
290+
Ok(Arc::new(result))
291+
}
292+
_ => Err(ArrowError::CastError(format!(
293+
"cast_timestamp_to_ntz: unexpected input type {:?}",
294+
array.data_type()
295+
))),
296+
}
297+
}
298+
262299
/// This takes for special pre-casting cases of Spark. E.g., Timestamp to String.
263300
fn pre_timestamp_cast(array: ArrayRef, timezone: String) -> Result<ArrayRef, ArrowError> {
264301
assert!(!timezone.is_empty());
@@ -401,4 +438,55 @@ mod tests {
401438
micros_for("2024-10-27 00:30:00")
402439
);
403440
}
441+
442+
// Helper: build a Timestamp(Microsecond, Some(tz)) array from a UTC datetime string
443+
fn ts_with_tz(utc_datetime: &str, tz: &str) -> ArrayRef {
444+
let dt = NaiveDateTime::parse_from_str(utc_datetime, "%Y-%m-%d %H:%M:%S").unwrap();
445+
let ts = dt.and_utc().timestamp_micros();
446+
Arc::new(TimestampMicrosecondArray::from(vec![ts]).with_timezone(tz.to_string()))
447+
}
448+
449+
#[test]
450+
fn test_cast_timestamp_to_ntz_utc() {
451+
// In UTC, local time == UTC time, so NTZ value == UTC epoch value
452+
let input = ts_with_tz("2024-01-15 10:30:00", "UTC");
453+
let result = cast_timestamp_to_ntz(input, "UTC").unwrap();
454+
let out = as_primitive_array::<TimestampMicrosecondType>(&result);
455+
// Expected NTZ value: epoch μs for "2024-01-15 10:30:00" as if it were UTC
456+
let expected = NaiveDateTime::parse_from_str("2024-01-15 10:30:00", "%Y-%m-%d %H:%M:%S")
457+
.unwrap()
458+
.and_utc()
459+
.timestamp_micros();
460+
assert_eq!(out.value(0), expected);
461+
assert_eq!(out.timezone(), None); // no TZ annotation = NTZ
462+
}
463+
464+
#[test]
465+
fn test_cast_timestamp_to_ntz_offset_timezone() {
466+
// UTC epoch for "2024-01-15 15:30:00 UTC" cast to NTZ with session TZ = America/New_York (UTC-5)
467+
// Local time in NY = 10:30:00 → NTZ should store epoch μs for "2024-01-15 10:30:00"
468+
let input = ts_with_tz("2024-01-15 15:30:00", "UTC");
469+
let result = cast_timestamp_to_ntz(input, "America/New_York").unwrap();
470+
let out = as_primitive_array::<TimestampMicrosecondType>(&result);
471+
let expected = NaiveDateTime::parse_from_str("2024-01-15 10:30:00", "%Y-%m-%d %H:%M:%S")
472+
.unwrap()
473+
.and_utc()
474+
.timestamp_micros();
475+
assert_eq!(out.value(0), expected);
476+
assert_eq!(out.timezone(), None);
477+
}
478+
479+
#[test]
480+
fn test_cast_timestamp_to_ntz_dst() {
481+
// During DST: UTC epoch for "2024-07-04 16:30:00 UTC", session TZ = America/New_York (UTC-4 in summer)
482+
// Local time in NY = 12:30:00 → NTZ stores epoch μs for "2024-07-04 12:30:00"
483+
let input = ts_with_tz("2024-07-04 16:30:00", "UTC");
484+
let result = cast_timestamp_to_ntz(input, "America/New_York").unwrap();
485+
let out = as_primitive_array::<TimestampMicrosecondType>(&result);
486+
let expected = NaiveDateTime::parse_from_str("2024-07-04 12:30:00", "%Y-%m-%d %H:%M:%S")
487+
.unwrap()
488+
.and_utc()
489+
.timestamp_micros();
490+
assert_eq!(out.value(0), expected);
491+
}
404492
}

0 commit comments

Comments
 (0)