Skip to content

Commit 168d932

Browse files
committed
Timestamp dimensions for rolling window queries
This required `date_add` implementation from CubeStore, it is now in DataFusion repository. Also allow `PRECEDING` and `FOLLOWING` on both window frame bounds.
1 parent 73bc4f7 commit 168d932

11 files changed

Lines changed: 482 additions & 142 deletions

File tree

datafusion/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ ahash = "0.7"
4848
hashbrown = "0.11"
4949
arrow = { git = "https://github.com/cube-js/arrow-rs.git", branch = "cube", features = ["prettyprint"] }
5050
parquet = { git = "https://github.com/cube-js/arrow-rs.git", branch = "cube", features = ["arrow"] }
51-
sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", rev = "a04701767d9d6a9eb99cce9a64dee7e1b7c25b7c" }
51+
sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", rev = "22544370d1f1b608b9a5e75477b1979ed1f7eb72" }
5252
paste = "^1.0"
5353
num_cpus = "1.13.0"
5454
chrono = "0.4"
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::error::DataFusionError;
19+
use crate::scalar::ScalarValue;
20+
use arrow::array::{Array, TimestampNanosecondArray, TimestampNanosecondBuilder};
21+
use chrono::{DateTime, Datelike, Duration, NaiveDate, TimeZone, Utc};
22+
23+
pub fn date_addsub_array(
24+
t: &TimestampNanosecondArray,
25+
i: ScalarValue,
26+
is_add: bool,
27+
) -> Result<TimestampNanosecondArray, DataFusionError> {
28+
let mut result = TimestampNanosecondBuilder::new(t.len());
29+
match i {
30+
ScalarValue::IntervalYearMonth(Some(v)) => {
31+
for i in 0..t.len() {
32+
if t.is_null(i) {
33+
result.append_null()?;
34+
} else {
35+
let t = Utc.timestamp_nanos(t.value(i));
36+
result.append_value(
37+
date_addsub_year_month(t, v, is_add)?.timestamp_nanos(),
38+
)?;
39+
}
40+
}
41+
}
42+
ScalarValue::IntervalDayTime(Some(v)) => {
43+
for i in 0..t.len() {
44+
if t.is_null(i) {
45+
result.append_null()?;
46+
} else {
47+
let t = Utc.timestamp_nanos(t.value(i));
48+
result.append_value(
49+
date_addsub_day_time(t, v, is_add)?.timestamp_nanos(),
50+
)?;
51+
}
52+
}
53+
}
54+
_ => {
55+
let name = match is_add {
56+
true => "DATE_ADD",
57+
false => "DATE_SUB",
58+
};
59+
return Err(DataFusionError::Plan(format!(
60+
"Second argument of `{}` must be a non-null interval",
61+
name
62+
)));
63+
}
64+
}
65+
66+
Ok(result.finish())
67+
}
68+
69+
pub fn date_addsub_scalar(
70+
t: DateTime<Utc>,
71+
i: ScalarValue,
72+
is_add: bool,
73+
) -> Result<DateTime<Utc>, DataFusionError> {
74+
match i {
75+
ScalarValue::IntervalYearMonth(Some(v)) => date_addsub_year_month(t, v, is_add),
76+
ScalarValue::IntervalDayTime(Some(v)) => date_addsub_day_time(t, v, is_add),
77+
_ => {
78+
let name = match is_add {
79+
true => "DATE_ADD",
80+
false => "DATE_SUB",
81+
};
82+
return Err(DataFusionError::Plan(format!(
83+
"Second argument of `{}` must be a non-null interval",
84+
name
85+
)));
86+
}
87+
}
88+
}
89+
90+
fn date_addsub_year_month(
91+
t: DateTime<Utc>,
92+
i: i32,
93+
is_add: bool,
94+
) -> Result<DateTime<Utc>, DataFusionError> {
95+
let i = match is_add {
96+
true => i,
97+
false => -i,
98+
};
99+
100+
let mut year = t.year();
101+
// Note month is numbered 0..11 in this function.
102+
let mut month = t.month() as i32 - 1;
103+
104+
year += i / 12;
105+
month += i % 12;
106+
107+
if month < 0 {
108+
year -= 1;
109+
month += 12;
110+
}
111+
debug_assert!(0 <= month);
112+
year += month / 12;
113+
month = month % 12;
114+
115+
match change_ym(t, year, 1 + month as u32) {
116+
Some(t) => return Ok(t),
117+
None => {
118+
return Err(DataFusionError::Execution(format!(
119+
"Failed to set date to ({}-{})",
120+
year,
121+
1 + month
122+
)))
123+
}
124+
};
125+
}
126+
127+
fn date_addsub_day_time(
128+
t: DateTime<Utc>,
129+
interval: i64,
130+
is_add: bool,
131+
) -> Result<DateTime<Utc>, DataFusionError> {
132+
let i = match is_add {
133+
true => interval,
134+
false => -interval,
135+
};
136+
137+
let days: i64 = i.signum() * (i.abs() >> 32);
138+
let millis: i64 = i.signum() * ((i.abs() << 32) >> 32);
139+
return Ok(t + Duration::days(days) + Duration::milliseconds(millis));
140+
}
141+
142+
fn change_ym(t: DateTime<Utc>, y: i32, m: u32) -> Option<DateTime<Utc>> {
143+
debug_assert!(1 <= m && m <= 12);
144+
let mut d = t.day();
145+
d = d.min(last_day_of_month(y, m));
146+
t.with_day(1)?.with_year(y)?.with_month(m)?.with_day(d)
147+
}
148+
149+
fn last_day_of_month(y: i32, m: u32) -> u32 {
150+
debug_assert!(1 <= m && m <= 12);
151+
if m == 12 {
152+
return 31;
153+
}
154+
NaiveDate::from_ymd(y, m + 1, 1).pred().day()
155+
}

datafusion/src/cube_ext/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
pub mod alias;
19+
pub mod datetime;
1920
pub mod join;
2021
pub mod joinagg;
2122
pub mod rolling;

0 commit comments

Comments
 (0)