Skip to content

Commit 4c6eb3a

Browse files
committed
support_cast_date_timestamp
1 parent 87d1524 commit 4c6eb3a

3 files changed

Lines changed: 54 additions & 3 deletions

File tree

native/spark-expr/src/conversion_funcs/cast.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::{EvalMode, SparkError, SparkResult};
2121
use arrow::array::builder::StringBuilder;
2222
use arrow::array::{
2323
BooleanBuilder, Decimal128Builder, DictionaryArray, GenericByteArray, ListArray,
24-
PrimitiveBuilder, StringArray, StructArray,
24+
PrimitiveBuilder, StringArray, StructArray, TimestampMicrosecondBuilder,
2525
};
2626
use arrow::compute::can_cast_types;
2727
use arrow::datatypes::{
@@ -1100,6 +1100,7 @@ fn cast_array(
11001100
Ok(cast_with_options(&array, to_type, &CAST_OPTIONS)?)
11011101
}
11021102
(Binary, Utf8) => Ok(cast_binary_to_string::<i32>(&array, cast_options)?),
1103+
(Date32, Timestamp(_, tz)) => Ok(cast_date_to_timestamp(&array, cast_options, tz)?),
11031104
_ if cast_options.is_adapting_schema
11041105
|| is_datafusion_spark_compatible(from_type, to_type) =>
11051106
{
@@ -1118,6 +1119,50 @@ fn cast_array(
11181119
Ok(spark_cast_postprocess(cast_result?, from_type, to_type))
11191120
}
11201121

1122+
fn cast_date_to_timestamp(
1123+
array_ref: &ArrayRef,
1124+
cast_options: &SparkCastOptions,
1125+
target_tz: &Option<Arc<str>>,
1126+
) -> SparkResult<ArrayRef> {
1127+
let tz_str = if cast_options.timezone.is_empty() {
1128+
"UTC"
1129+
} else {
1130+
cast_options.timezone.as_str()
1131+
};
1132+
// safe to unwrap since we are falling back to UTC above
1133+
let tz = timezone::Tz::from_str(tz_str)?;
1134+
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
1135+
let date_array = array_ref.as_primitive::<Date32Type>();
1136+
1137+
let mut builder = TimestampMicrosecondBuilder::with_capacity(date_array.len());
1138+
1139+
for date in date_array.iter() {
1140+
match date {
1141+
Some(date) => {
1142+
// safe to unwrap since chrono's range ( 262,143 yrs) is higher than
1143+
// number of years possible with days as i32 (~ 6 mil yrs)
1144+
// convert date in session timezone to timestamp in UTC
1145+
let naive_date = epoch + chrono::Duration::days(date as i64);
1146+
let local_midnight = naive_date.and_hms_opt(0, 0, 0).unwrap();
1147+
let local_midnight_in_microsec = tz
1148+
.from_local_datetime(&local_midnight)
1149+
// return earliest possible time (edge case with spring / fall DST changes)
1150+
.earliest()
1151+
.map(|dt| dt.timestamp_micros())
1152+
// in case there is an issue with DST and returns None , we fall back to UTC
1153+
.unwrap_or((date as i64) * 86_400 * 1_000_000);
1154+
builder.append_value(local_midnight_in_microsec);
1155+
}
1156+
None => {
1157+
builder.append_null();
1158+
}
1159+
}
1160+
}
1161+
Ok(Arc::new(
1162+
builder.finish().with_timezone_opt(target_tz.clone()),
1163+
))
1164+
}
1165+
11211166
fn cast_string_to_float(
11221167
array: &ArrayRef,
11231168
to_type: &DataType,

spark/src/main/scala/org/apache/comet/expressions/CometCast.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
168168
}
169169
}
170170
Compatible()
171+
case (DataTypes.DateType, toType) => canCastFromDate(toType)
171172
case _ => unsupported(fromType, toType)
172173
}
173174
}
@@ -344,6 +345,12 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
344345
case _ => Unsupported(Some(s"Cast from DecimalType to $toType is not supported"))
345346
}
346347

348+
private def canCastFromDate(toType: DataType): SupportLevel = toType match {
349+
case DataTypes.TimestampType =>
350+
Compatible()
351+
case _ => Unsupported(Some(s"Cast from DateType to $toType is not supported"))
352+
}
353+
347354
private def unsupported(fromType: DataType, toType: DataType): Unsupported = {
348355
Unsupported(Some(s"Cast from $fromType to $toType is not supported"))
349356
}

spark/src/test/scala/org/apache/comet/CometCastSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -989,8 +989,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
989989
castTest(generateDates(), DataTypes.StringType)
990990
}
991991

992-
ignore("cast DateType to TimestampType") {
993-
// Arrow error: Cast error: Casting from Date32 to Timestamp(Microsecond, Some("UTC")) not supported
992+
test("cast DateType to TimestampType") {
994993
castTest(generateDates(), DataTypes.TimestampType)
995994
}
996995

0 commit comments

Comments
 (0)