Skip to content

Commit f4c2f19

Browse files
committed
Timestamp dimensions for rolling window queries
This required `date_add` implementation from CubeStore, it is now in DataFusion repository. Also allow `PRECEDING` and `FOLLOWING` on both window frame bounds.
1 parent 2a9a713 commit f4c2f19

11 files changed

Lines changed: 465 additions & 142 deletions

File tree

datafusion/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ ahash = "0.7"
4848
hashbrown = "0.11"
4949
arrow = { git = "https://github.com/cube-js/arrow-rs.git", branch = "cube", features = ["prettyprint"] }
5050
parquet = { git = "https://github.com/cube-js/arrow-rs.git", branch = "cube", features = ["arrow"] }
51-
sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", rev = "a04701767d9d6a9eb99cce9a64dee7e1b7c25b7c" }
51+
sqlparser = { git = "https://github.com/cube-js/sqlparser-rs.git", rev = "22544370d1f1b608b9a5e75477b1979ed1f7eb72" }
5252
paste = "^1.0"
5353
num_cpus = "1.13.0"
5454
chrono = "0.4"
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
use crate::error::DataFusionError;
2+
use crate::scalar::ScalarValue;
3+
use arrow::array::{Array, TimestampNanosecondArray, TimestampNanosecondBuilder};
4+
use chrono::{DateTime, Datelike, Duration, NaiveDate, TimeZone, Utc};
5+
6+
pub fn date_addsub_array(
7+
t: &TimestampNanosecondArray,
8+
i: ScalarValue,
9+
is_add: bool,
10+
) -> Result<TimestampNanosecondArray, DataFusionError> {
11+
let mut result = TimestampNanosecondBuilder::new(t.len());
12+
match i {
13+
ScalarValue::IntervalYearMonth(Some(v)) => {
14+
for i in 0..t.len() {
15+
if t.is_null(i) {
16+
result.append_null()?;
17+
} else {
18+
let t = Utc.timestamp_nanos(t.value(i));
19+
result.append_value(
20+
date_addsub_year_month(t, v, is_add)?.timestamp_nanos(),
21+
)?;
22+
}
23+
}
24+
}
25+
ScalarValue::IntervalDayTime(Some(v)) => {
26+
for i in 0..t.len() {
27+
if t.is_null(i) {
28+
result.append_null()?;
29+
} else {
30+
let t = Utc.timestamp_nanos(t.value(i));
31+
result.append_value(
32+
date_addsub_day_time(t, v, is_add)?.timestamp_nanos(),
33+
)?;
34+
}
35+
}
36+
}
37+
_ => {
38+
let name = match is_add {
39+
true => "DATE_ADD",
40+
false => "DATE_SUB",
41+
};
42+
return Err(DataFusionError::Plan(format!(
43+
"Second argument of `{}` must be a non-null interval",
44+
name
45+
)));
46+
}
47+
}
48+
49+
Ok(result.finish())
50+
}
51+
52+
pub fn date_addsub_scalar(
53+
t: DateTime<Utc>,
54+
i: ScalarValue,
55+
is_add: bool,
56+
) -> Result<DateTime<Utc>, DataFusionError> {
57+
match i {
58+
ScalarValue::IntervalYearMonth(Some(v)) => date_addsub_year_month(t, v, is_add),
59+
ScalarValue::IntervalDayTime(Some(v)) => date_addsub_day_time(t, v, is_add),
60+
_ => {
61+
let name = match is_add {
62+
true => "DATE_ADD",
63+
false => "DATE_SUB",
64+
};
65+
return Err(DataFusionError::Plan(format!(
66+
"Second argument of `{}` must be a non-null interval",
67+
name
68+
)));
69+
}
70+
}
71+
}
72+
73+
fn date_addsub_year_month(
74+
t: DateTime<Utc>,
75+
i: i32,
76+
is_add: bool,
77+
) -> Result<DateTime<Utc>, DataFusionError> {
78+
let i = match is_add {
79+
true => i,
80+
false => -i,
81+
};
82+
83+
let mut year = t.year();
84+
// Note month is numbered 0..11 in this function.
85+
let mut month = t.month() as i32 - 1;
86+
87+
year += i / 12;
88+
month += i % 12;
89+
90+
if month < 0 {
91+
year -= 1;
92+
month += 12;
93+
}
94+
debug_assert!(0 <= month);
95+
year += month / 12;
96+
month = month % 12;
97+
98+
match change_ym(t, year, 1 + month as u32) {
99+
Some(t) => return Ok(t),
100+
None => {
101+
return Err(DataFusionError::Execution(format!(
102+
"Failed to set date to ({}-{})",
103+
year,
104+
1 + month
105+
)))
106+
}
107+
};
108+
}
109+
110+
fn date_addsub_day_time(
111+
t: DateTime<Utc>,
112+
interval: i64,
113+
is_add: bool,
114+
) -> Result<DateTime<Utc>, DataFusionError> {
115+
let i = match is_add {
116+
true => interval,
117+
false => -interval,
118+
};
119+
120+
let days: i64 = i.signum() * (i.abs() >> 32);
121+
let millis: i64 = i.signum() * ((i.abs() << 32) >> 32);
122+
return Ok(t + Duration::days(days) + Duration::milliseconds(millis));
123+
}
124+
125+
fn change_ym(t: DateTime<Utc>, y: i32, m: u32) -> Option<DateTime<Utc>> {
126+
debug_assert!(1 <= m && m <= 12);
127+
let mut d = t.day();
128+
d = d.min(last_day_of_month(y, m));
129+
t.with_day(1)?.with_year(y)?.with_month(m)?.with_day(d)
130+
}
131+
132+
fn last_day_of_month(y: i32, m: u32) -> u32 {
133+
debug_assert!(1 <= m && m <= 12);
134+
if m == 12 {
135+
return 31;
136+
}
137+
NaiveDate::from_ymd(y, m + 1, 1).pred().day()
138+
}

datafusion/src/cube_ext/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
pub mod alias;
19+
pub mod datetime;
1920
pub mod join;
2021
pub mod joinagg;
2122
pub mod rolling;

0 commit comments

Comments
 (0)