Skip to content

Commit 71d27b6

Browse files
authored
Test int96 Parquet file from Spark (#7367)
* Stash. * Test int96_from_spark.parquet. * Update parquet-testing to include int96_from_spark.parquet. * Address feedback.
1 parent abb2eef commit 71d27b6

2 files changed

Lines changed: 106 additions & 7 deletions

File tree

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 105 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -959,12 +959,6 @@ mod tests {
959959
use std::path::PathBuf;
960960
use std::sync::Arc;
961961

962-
use bytes::Bytes;
963-
use half::f16;
964-
use num::PrimInt;
965-
use rand::{rng, Rng, RngCore};
966-
use tempfile::tempfile;
967-
968962
use arrow_array::builder::*;
969963
use arrow_array::cast::AsArray;
970964
use arrow_array::types::{
@@ -978,6 +972,11 @@ mod tests {
978972
ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
979973
};
980974
use arrow_select::concat::concat_batches;
975+
use bytes::Bytes;
976+
use half::f16;
977+
use num::PrimInt;
978+
use rand::{rng, Rng, RngCore};
979+
use tempfile::tempfile;
981980

982981
use crate::arrow::arrow_reader::{
983982
ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
@@ -1563,6 +1562,106 @@ mod tests {
15631562
})
15641563
}
15651564

1565+
#[test]
1566+
fn test_int96_from_spark_file_with_provided_schema() {
1567+
// int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1568+
// range for resolution compared to a nanosecond timestamp. We must provide a schema with
1569+
// microsecond resolution for the Parquet reader to interpret these values correctly.
1570+
use arrow_schema::DataType::Timestamp;
1571+
let test_data = arrow::util::test_util::parquet_test_data();
1572+
let path = format!("{test_data}/int96_from_spark.parquet");
1573+
let file = File::open(path).unwrap();
1574+
1575+
let supplied_schema = Arc::new(Schema::new(vec![Field::new(
1576+
"a",
1577+
Timestamp(TimeUnit::Microsecond, None),
1578+
true,
1579+
)]));
1580+
let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
1581+
1582+
let mut record_reader =
1583+
ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
1584+
.unwrap()
1585+
.build()
1586+
.unwrap();
1587+
1588+
let batch = record_reader.next().unwrap().unwrap();
1589+
assert_eq!(batch.num_columns(), 1);
1590+
let column = batch.column(0);
1591+
assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
1592+
1593+
let expected = Arc::new(Int64Array::from(vec![
1594+
Some(1704141296123456),
1595+
Some(1704070800000000),
1596+
Some(253402225200000000),
1597+
Some(1735599600000000),
1598+
None,
1599+
Some(9089380393200000000),
1600+
]));
1601+
1602+
// arrow-rs relies on the chrono library to convert between timestamps and strings, so
1603+
// instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1604+
// anyway, so this should be a zero-copy non-modifying cast.
1605+
1606+
let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1607+
let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1608+
1609+
assert_eq!(casted_timestamps.len(), expected.len());
1610+
1611+
casted_timestamps
1612+
.iter()
1613+
.zip(expected.iter())
1614+
.for_each(|(lhs, rhs)| {
1615+
assert_eq!(lhs, rhs);
1616+
});
1617+
}
1618+
1619+
#[test]
1620+
fn test_int96_from_spark_file_without_provided_schema() {
1621+
// int96_from_spark.parquet was written based on Spark's microsecond timestamps which trade
1622+
// range for resolution compared to a nanosecond timestamp. Without a provided schema, some
1623+
// values when read as nanosecond resolution overflow and result in garbage values.
1624+
use arrow_schema::DataType::Timestamp;
1625+
let test_data = arrow::util::test_util::parquet_test_data();
1626+
let path = format!("{test_data}/int96_from_spark.parquet");
1627+
let file = File::open(path).unwrap();
1628+
1629+
let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
1630+
.unwrap()
1631+
.build()
1632+
.unwrap();
1633+
1634+
let batch = record_reader.next().unwrap().unwrap();
1635+
assert_eq!(batch.num_columns(), 1);
1636+
let column = batch.column(0);
1637+
assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
1638+
1639+
let expected = Arc::new(Int64Array::from(vec![
1640+
Some(1704141296123456000), // Reads as nanosecond fine (note 3 extra 0s)
1641+
Some(1704070800000000000), // Reads as nanosecond fine (note 3 extra 0s)
1642+
Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
1643+
Some(1735599600000000000), // Reads as nanosecond fine (note 3 extra 0s)
1644+
None,
1645+
Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
1646+
]));
1647+
1648+
// arrow-rs relies on the chrono library to convert between timestamps and strings, so
1649+
// instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1650+
// anyway, so this should be a zero-copy non-modifying cast.
1651+
1652+
let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
1653+
let casted_timestamps = binding.as_primitive::<types::Int64Type>();
1654+
1655+
assert_eq!(casted_timestamps.len(), expected.len());
1656+
1657+
casted_timestamps
1658+
.iter()
1659+
.zip(expected.iter())
1660+
.for_each(|(lhs, rhs)| {
1661+
assert_eq!(lhs, rhs);
1662+
});
1663+
}
1664+
15661665
struct RandUtf8Gen {}
15671666

15681667
impl RandGen<ByteArrayType> for RandUtf8Gen {

0 commit comments

Comments
 (0)