Skip to content

Commit e753dea

Browse files
authored
Infer 2020-03-19 00:00:00 as timestamp not Date64 in CSV (#3744) (#3746)
* Infer 2020-03-19 00:00:00 as timestamp not Date64 in CSV (#3744) * Update inference logic
1 parent 6bf0aab commit e753dea

File tree

1 file changed

+143
-65
lines changed

1 file changed

+143
-65
lines changed

arrow-csv/src/reader/mod.rs

Lines changed: 143 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ use arrow_cast::parse::Parser;
5151
use arrow_schema::*;
5252
use lazy_static::lazy_static;
5353
use regex::{Regex, RegexSet};
54-
use std::collections::HashSet;
5554
use std::fmt;
5655
use std::fs::File;
5756
use std::io::{BufRead, BufReader as StdBufReader, Read, Seek, SeekFrom};
@@ -62,44 +61,68 @@ use crate::reader::records::{RecordDecoder, StringRecords};
6261
use csv::StringRecord;
6362

6463
lazy_static! {
64+
/// Order should match [`InferredDataType`]
6565
static ref REGEX_SET: RegexSet = RegexSet::new([
6666
r"(?i)^(true)$|^(false)$(?-i)", //BOOLEAN
67-
r"^-?((\d*\.\d+|\d+\.\d*)([eE]-?\d+)?|\d+([eE]-?\d+))$", //DECIMAL
6867
r"^-?(\d+)$", //INTEGER
68+
r"^-?((\d*\.\d+|\d+\.\d*)([eE]-?\d+)?|\d+([eE]-?\d+))$", //DECIMAL
6969
r"^\d{4}-\d\d-\d\d$", //DATE32
70-
r"^\d{4}-\d\d-\d\d[T ]\d\d:\d\d:\d\d$", //DATE64
70+
r"^\d{4}-\d\d-\d\d[T ]\d\d:\d\d:\d\d$", //Timestamp(Second)
71+
r"^\d{4}-\d\d-\d\d[T ]\d\d:\d\d:\d\d.\d{1,3}$", //Timestamp(Millisecond)
72+
r"^\d{4}-\d\d-\d\d[T ]\d\d:\d\d:\d\d.\d{1,6}$", //Timestamp(Microsecond)
73+
r"^\d{4}-\d\d-\d\d[T ]\d\d:\d\d:\d\d.\d{1,9}$", //Timestamp(Nanosecond)
7174
]).unwrap();
72-
//The order should match with REGEX_SET
73-
static ref MATCH_DATA_TYPE: Vec<DataType> = vec![
74-
DataType::Boolean,
75-
DataType::Float64,
76-
DataType::Int64,
77-
DataType::Date32,
78-
DataType::Date64,
79-
];
8075
static ref PARSE_DECIMAL_RE: Regex =
8176
Regex::new(r"^-?(\d+\.?\d*|\d*\.?\d+)$").unwrap();
82-
static ref DATETIME_RE: Regex =
83-
Regex::new(r"^\d{4}-\d\d-\d\d[T ]\d\d:\d\d:\d\d\.\d{1,9}$").unwrap();
8477
}
8578

86-
/// Infer the data type of a record
87-
fn infer_field_schema(string: &str, datetime_re: Option<Regex>) -> DataType {
88-
// when quoting is enabled in the reader, these quotes aren't escaped, we default to
89-
// Utf8 for them
90-
if string.starts_with('"') {
91-
return DataType::Utf8;
92-
}
93-
let matches = REGEX_SET.matches(string).into_iter().next();
94-
// match regex in a particular order
95-
match matches {
96-
Some(ix) => MATCH_DATA_TYPE[ix].clone(),
97-
None => {
98-
let datetime_re = datetime_re.unwrap_or_else(|| DATETIME_RE.clone());
99-
if datetime_re.is_match(string) {
100-
DataType::Timestamp(TimeUnit::Nanosecond, None)
101-
} else {
102-
DataType::Utf8
79+
#[derive(Default, Copy, Clone)]
80+
struct InferredDataType {
81+
/// Packed booleans indicating type
82+
///
83+
/// 0 - Boolean
84+
/// 1 - Integer
85+
/// 2 - Float64
86+
/// 3 - Date32
87+
/// 4 - Timestamp(Second)
88+
/// 5 - Timestamp(Millisecond)
89+
/// 6 - Timestamp(Microsecond)
90+
/// 7 - Timestamp(Nanosecond)
91+
/// 8 - Utf8
92+
packed: u16,
93+
}
94+
95+
impl InferredDataType {
96+
/// Returns the inferred data type
97+
fn get(&self) -> DataType {
98+
match self.packed {
99+
1 => DataType::Boolean,
100+
2 => DataType::Int64,
101+
4 | 6 => DataType::Float64, // Promote Int64 to Float64
102+
b if b != 0 && (b & !0b11111000) == 0 => match b.leading_zeros() {
103+
// Promote to highest precision temporal type
104+
8 => DataType::Timestamp(TimeUnit::Nanosecond, None),
105+
9 => DataType::Timestamp(TimeUnit::Microsecond, None),
106+
10 => DataType::Timestamp(TimeUnit::Millisecond, None),
107+
11 => DataType::Timestamp(TimeUnit::Second, None),
108+
12 => DataType::Date32,
109+
_ => unreachable!(),
110+
},
111+
_ => DataType::Utf8,
112+
}
113+
}
114+
115+
/// Updates the [`InferredDataType`] with the given string
116+
fn update(&mut self, string: &str, datetime_re: Option<&Regex>) {
117+
self.packed |= if string.starts_with('"') {
118+
1 << 8 // Utf8
119+
} else if let Some(m) = REGEX_SET.matches(string).into_iter().next() {
120+
1 << m
121+
} else {
122+
match datetime_re {
123+
// Timestamp(Nanosecond)
124+
Some(d) if d.is_match(string) => 1 << 7,
125+
_ => 1 << 8, // Utf8
103126
}
104127
}
105128
}
@@ -230,10 +253,9 @@ fn infer_reader_schema_with_csv_options<R: Read>(
230253

231254
let header_length = headers.len();
232255
// keep track of inferred field types
233-
let mut column_types: Vec<HashSet<DataType>> = vec![HashSet::new(); header_length];
256+
let mut column_types: Vec<InferredDataType> = vec![Default::default(); header_length];
234257

235258
let mut records_count = 0;
236-
let mut fields = vec![];
237259

238260
let mut record = StringRecord::new();
239261
let max_records = roptions.max_read_records.unwrap_or(usize::MAX);
@@ -248,40 +270,18 @@ fn infer_reader_schema_with_csv_options<R: Read>(
248270
for (i, column_type) in column_types.iter_mut().enumerate().take(header_length) {
249271
if let Some(string) = record.get(i) {
250272
if !string.is_empty() {
251-
column_type
252-
.insert(infer_field_schema(string, roptions.datetime_re.clone()));
273+
column_type.update(string, roptions.datetime_re.as_ref())
253274
}
254275
}
255276
}
256277
}
257278

258279
// build schema from inference results
259-
for i in 0..header_length {
260-
let possibilities = &column_types[i];
261-
let field_name = &headers[i];
262-
263-
// determine data type based on possible types
264-
// if there are incompatible types, use DataType::Utf8
265-
match possibilities.len() {
266-
1 => {
267-
for dtype in possibilities.iter() {
268-
fields.push(Field::new(field_name, dtype.clone(), true));
269-
}
270-
}
271-
2 => {
272-
if possibilities.contains(&DataType::Int64)
273-
&& possibilities.contains(&DataType::Float64)
274-
{
275-
// we have an integer and double, fall down to double
276-
fields.push(Field::new(field_name, DataType::Float64, true));
277-
} else {
278-
// default to Utf8 for conflicting datatypes (e.g bool and int)
279-
fields.push(Field::new(field_name, DataType::Utf8, true));
280-
}
281-
}
282-
_ => fields.push(Field::new(field_name, DataType::Utf8, true)),
283-
}
284-
}
280+
let fields = column_types
281+
.iter()
282+
.zip(&headers)
283+
.map(|(inferred, field_name)| Field::new(field_name, inferred.get(), true))
284+
.collect();
285285

286286
Ok((Schema::new(fields), records_count))
287287
}
@@ -681,6 +681,19 @@ fn parse(
681681
>(
682682
line_number, rows, i, None
683683
),
684+
DataType::Timestamp(TimeUnit::Second, _) => build_primitive_array::<
685+
TimestampSecondType,
686+
>(
687+
line_number, rows, i, None
688+
),
689+
DataType::Timestamp(TimeUnit::Millisecond, _) => {
690+
build_primitive_array::<TimestampMillisecondType>(
691+
line_number,
692+
rows,
693+
i,
694+
None,
695+
)
696+
}
684697
DataType::Timestamp(TimeUnit::Microsecond, _) => {
685698
build_primitive_array::<TimestampMicrosecondType>(
686699
line_number,
@@ -1637,7 +1650,10 @@ mod tests {
16371650
assert_eq!(&DataType::Float64, schema.field(2).data_type());
16381651
assert_eq!(&DataType::Boolean, schema.field(3).data_type());
16391652
assert_eq!(&DataType::Date32, schema.field(4).data_type());
1640-
assert_eq!(&DataType::Date64, schema.field(5).data_type());
1653+
assert_eq!(
1654+
&DataType::Timestamp(TimeUnit::Second, None),
1655+
schema.field(5).data_type()
1656+
);
16411657

16421658
let names: Vec<&str> =
16431659
schema.fields().iter().map(|x| x.name().as_str()).collect();
@@ -1698,6 +1714,13 @@ mod tests {
16981714
}
16991715
}
17001716

1717+
/// Infer the data type of a record
1718+
fn infer_field_schema(string: &str, datetime_re: Option<Regex>) -> DataType {
1719+
let mut v = InferredDataType::default();
1720+
v.update(string, datetime_re.as_ref());
1721+
v.get()
1722+
}
1723+
17011724
#[test]
17021725
fn test_infer_field_schema() {
17031726
assert_eq!(infer_field_schema("A", None), DataType::Utf8);
@@ -1712,22 +1735,22 @@ mod tests {
17121735
assert_eq!(infer_field_schema("2020-11-08", None), DataType::Date32);
17131736
assert_eq!(
17141737
infer_field_schema("2020-11-08T14:20:01", None),
1715-
DataType::Date64
1738+
DataType::Timestamp(TimeUnit::Second, None)
17161739
);
17171740
assert_eq!(
17181741
infer_field_schema("2020-11-08 14:20:01", None),
1719-
DataType::Date64
1742+
DataType::Timestamp(TimeUnit::Second, None)
17201743
);
17211744
let reg = Regex::new(r"^\d{4}-\d\d-\d\d \d\d:\d\d:\d\d$").ok();
17221745
assert_eq!(
17231746
infer_field_schema("2020-11-08 14:20:01", reg),
1724-
DataType::Date64
1747+
DataType::Timestamp(TimeUnit::Second, None)
17251748
);
17261749
assert_eq!(infer_field_schema("-5.13", None), DataType::Float64);
17271750
assert_eq!(infer_field_schema("0.1300", None), DataType::Float64);
17281751
assert_eq!(
17291752
infer_field_schema("2021-12-19 13:12:30.921", None),
1730-
DataType::Timestamp(TimeUnit::Nanosecond, None)
1753+
DataType::Timestamp(TimeUnit::Millisecond, None)
17311754
);
17321755
assert_eq!(
17331756
infer_field_schema("2021-12-19T13:12:30.123456789", None),
@@ -2407,4 +2430,59 @@ mod tests {
24072430
assert_eq!(&read.fill_sizes, &[23, 3, 0, 0]);
24082431
assert_eq!(read.fill_count, 4);
24092432
}
2433+
2434+
#[test]
2435+
fn test_inference() {
2436+
let cases: &[(&[&str], DataType)] = &[
2437+
(&[], DataType::Utf8),
2438+
(&["false", "12"], DataType::Utf8),
2439+
(&["12", "cupcakes"], DataType::Utf8),
2440+
(&["12", "12.4"], DataType::Float64),
2441+
(&["14050", "24332"], DataType::Int64),
2442+
(&["14050.0", "true"], DataType::Utf8),
2443+
(&["14050", "2020-03-19 00:00:00"], DataType::Utf8),
2444+
(&["14050", "2340.0", "2020-03-19 00:00:00"], DataType::Utf8),
2445+
(
2446+
&["2020-03-19 02:00:00", "2020-03-19 00:00:00"],
2447+
DataType::Timestamp(TimeUnit::Second, None),
2448+
),
2449+
(&["2020-03-19", "2020-03-20"], DataType::Date32),
2450+
(
2451+
&["2020-03-19", "2020-03-19 02:00:00", "2020-03-19 00:00:00"],
2452+
DataType::Timestamp(TimeUnit::Second, None),
2453+
),
2454+
(
2455+
&[
2456+
"2020-03-19",
2457+
"2020-03-19 02:00:00",
2458+
"2020-03-19 00:00:00.000",
2459+
],
2460+
DataType::Timestamp(TimeUnit::Millisecond, None),
2461+
),
2462+
(
2463+
&[
2464+
"2020-03-19",
2465+
"2020-03-19 02:00:00",
2466+
"2020-03-19 00:00:00.000000",
2467+
],
2468+
DataType::Timestamp(TimeUnit::Microsecond, None),
2469+
),
2470+
(
2471+
&[
2472+
"2020-03-19",
2473+
"2020-03-19 02:00:00.000000000",
2474+
"2020-03-19 00:00:00.000000",
2475+
],
2476+
DataType::Timestamp(TimeUnit::Nanosecond, None),
2477+
),
2478+
];
2479+
2480+
for (values, expected) in cases {
2481+
let mut t = InferredDataType::default();
2482+
for v in *values {
2483+
t.update(v, None)
2484+
}
2485+
assert_eq!(&t.get(), expected, "{:?}", values)
2486+
}
2487+
}
24102488
}

0 commit comments

Comments
 (0)