Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet-variant-compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ arrow-schema = { workspace = true }
half = { version = "2.1", default-features = false }
parquet-variant = { workspace = true }
parquet-variant-json = { workspace = true }
chrono = {workspace = true}
chrono = { workspace = true }

[lib]
name = "parquet_variant_compute"
Expand Down
151 changes: 150 additions & 1 deletion parquet-variant-compute/src/cast_to_variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ use arrow::array::{
use arrow::datatypes::{
i256, BinaryType, BinaryViewType, Decimal128Type, Decimal256Type, Decimal32Type, Decimal64Type,
Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
LargeBinaryType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
LargeBinaryType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Time64NanosecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use arrow::temporal_conversions::{
timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_s_to_datetime,
timestamp_us_to_datetime,
};
use arrow_schema::{ArrowError, DataType, TimeUnit};
use chrono::NaiveTime;
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use half::f16;
use parquet_variant::{Variant, VariantDecimal16, VariantDecimal4, VariantDecimal8};
Expand Down Expand Up @@ -321,6 +323,75 @@ pub fn cast_to_variant(input: &dyn Array) -> Result<VariantArray, ArrowError> {
DataType::Timestamp(time_unit, time_zone) => {
convert_timestamp(time_unit, time_zone, input, &mut builder);
}
DataType::Time32(unit) => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic looks good to me -- thank you

match *unit {
TimeUnit::Second => {
generic_conversion!(
Time32SecondType,
as_primitive,
// nano second are always 0
|v| NaiveTime::from_num_seconds_from_midnight_opt(v as u32, 0u32).unwrap(),
input,
builder
);
}
TimeUnit::Millisecond => {
generic_conversion!(
Time32MillisecondType,
as_primitive,
|v| NaiveTime::from_num_seconds_from_midnight_opt(
v as u32 / 1000,
(v as u32 % 1000) * 1_000_000
)
.unwrap(),
input,
builder
);
}
_ => {
return Err(ArrowError::CastError(format!(
"Unsupported Time32 unit: {:?}",
unit
)));
}
};
}
DataType::Time64(unit) => {
match *unit {
TimeUnit::Microsecond => {
generic_conversion!(
Time64MicrosecondType,
as_primitive,
|v| NaiveTime::from_num_seconds_from_midnight_opt(
(v / 1_000_000) as u32,
(v % 1_000_000 * 1_000) as u32
)
.unwrap(),
input,
builder
);
}
TimeUnit::Nanosecond => {
generic_conversion!(
Time64NanosecondType,
as_primitive,
|v| NaiveTime::from_num_seconds_from_midnight_opt(
(v / 1_000_000_000) as u32,
(v % 1_000_000_000) as u32
)
.unwrap(),
input,
builder
);
}
_ => {
return Err(ArrowError::CastError(format!(
"Unsupported Time64 unit: {:?}",
unit
)));
}
};
}
dt => {
return Err(ArrowError::CastError(format!(
"Unsupported data type for casting to Variant: {dt:?}",
Expand All @@ -341,8 +412,10 @@ mod tests {
ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Decimal32Array, Decimal64Array,
FixedSizeBinaryBuilder, Float16Array, Float32Array, Float64Array, GenericByteBuilder,
GenericByteViewBuilder, Int16Array, Int32Array, Int64Array, Int8Array, NullArray,
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::datatypes::i256;
use arrow_schema::{
DECIMAL128_MAX_PRECISION, DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION,
};
Expand Down Expand Up @@ -1130,6 +1203,82 @@ mod tests {
)
}

#[test]
fn test_cast_time32_second_to_variant_time() {
let array: Time32SecondArray = vec![Some(1), Some(86_399), None].into();
let values = Arc::new(array);
run_test(
values,
vec![
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(1, 0).unwrap(),
)),
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(86_399, 0).unwrap(),
)),
None,
],
)
}

#[test]
fn test_cast_time32_millisecond_to_variant_time() {
let array: Time32MillisecondArray = vec![Some(123_456), Some(456_000), None].into();
let values = Arc::new(array);
run_test(
values,
vec![
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(123, 456_000_000).unwrap(),
)),
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(456, 0).unwrap(),
)),
None,
],
)
}

#[test]
fn test_cast_time64_micro_to_variant_time() {
let array: Time64MicrosecondArray = vec![Some(1), Some(123_456_789), None].into();
let values = Arc::new(array);
run_test(
values,
vec![
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(0, 1_000).unwrap(),
)),
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(123, 456_789_000).unwrap(),
)),
None,
],
)
}

#[test]
fn test_cast_time64_nano_to_variant_time() {
let array: Time64NanosecondArray =
vec![Some(1), Some(1001), Some(123_456_789_012), None].into();
run_test(
Arc::new(array),
// as we can only present with micro second, so the nano second will round donw to 0
vec![
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(0, 0).unwrap(),
)),
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(0, 1_000).unwrap(),
)),
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(123, 456_789_000).unwrap(),
)),
None,
],
)
}

/// Converts the given `Array` to a `VariantArray` and tests the conversion
/// against the expected values. It also tests the handling of nulls by
/// setting one element to null and verifying the output.
Expand Down
33 changes: 30 additions & 3 deletions parquet-variant-json/src/to_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
//! Module for converting Variant data to JSON format
use arrow_schema::ArrowError;
use base64::{engine::general_purpose, Engine as _};
use chrono::Timelike;
use parquet_variant::{Variant, VariantList, VariantObject};
use serde_json::Value;
use std::io::Write;

use parquet_variant::{Variant, VariantList, VariantObject};

// Format string constants to avoid duplication and reduce errors
const DATE_FORMAT: &str = "%Y-%m-%d";
const TIMESTAMP_NTZ_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%.6f";
Expand All @@ -40,6 +40,19 @@ fn format_binary_base64(bytes: &[u8]) -> String {
general_purpose::STANDARD.encode(bytes)
}

fn format_time_ntz_str(time: &chrono::NaiveTime) -> String {
let base = time.format("%H:%M:%S").to_string();
let micros = time.nanosecond() / 1000;
match micros {
0 => format!("{}.{}", base, 0),
_ => {
let micros_str = format!("{:06}", micros);
let micros_str_trimmed = micros_str.trim_matches('0');
format!("{}.{}", base, micros_str_trimmed)
}
}
}

///
/// This function writes JSON directly to any type that implements [`Write`],
/// making it efficient for streaming or when you want to control the output destination.
Expand Down Expand Up @@ -110,6 +123,7 @@ pub fn variant_to_json(json_buffer: &mut impl Write, variant: &Variant) -> Resul
Variant::TimestampNtzMicros(ts) => {
write!(json_buffer, "\"{}\"", format_timestamp_ntz_string(ts))?
}
Variant::Time(time) => write!(json_buffer, "\"{}\"", format_time_ntz_str(time))?,
Variant::Binary(bytes) => {
// Encode binary as base64 string
let base64_str = format_binary_base64(bytes);
Expand Down Expand Up @@ -348,6 +362,7 @@ pub fn variant_to_json_value(variant: &Variant) -> Result<Value, ArrowError> {
Variant::Date(date) => Ok(Value::String(format_date_string(date))),
Variant::TimestampMicros(ts) => Ok(Value::String(ts.to_rfc3339())),
Variant::TimestampNtzMicros(ts) => Ok(Value::String(format_timestamp_ntz_string(ts))),
Variant::Time(time) => Ok(Value::String(format_time_ntz_str(time))),
Variant::Binary(bytes) => Ok(Value::String(format_binary_base64(bytes))),
Variant::String(s) => Ok(Value::String(s.to_string())),
Variant::ShortString(s) => Ok(Value::String(s.to_string())),
Expand All @@ -371,7 +386,7 @@ pub fn variant_to_json_value(variant: &Variant) -> Result<Value, ArrowError> {
#[cfg(test)]
mod tests {
use super::*;
use chrono::{DateTime, NaiveDate, Utc};
use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
use parquet_variant::{VariantDecimal16, VariantDecimal4, VariantDecimal8};

#[test]
Expand Down Expand Up @@ -457,6 +472,18 @@ mod tests {
Ok(())
}

#[test]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

fn test_time_to_json() -> Result<(), ArrowError> {
let naive_time = NaiveTime::from_num_seconds_from_midnight_opt(12345, 123460708).unwrap();
let variant = Variant::Time(naive_time);
let json = variant_to_json_string(&variant)?;
assert_eq!("\"03:25:45.12346\"", json);

let json_value = variant_to_json_value(&variant)?;
assert!(matches!(json_value, Value::String(_)));
Ok(())
}

#[test]
fn test_binary_to_json() -> Result<(), ArrowError> {
let binary_data = b"Hello, World!";
Expand Down
10 changes: 10 additions & 0 deletions parquet-variant/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
VariantMetadata, VariantObject,
};
use arrow_schema::ArrowError;
use chrono::Timelike;
use indexmap::{IndexMap, IndexSet};
use std::collections::HashSet;

Expand Down Expand Up @@ -190,6 +191,13 @@ impl ValueBuffer {
self.append_slice(&micros.to_le_bytes());
}

fn append_time_micros(&mut self, value: chrono::NaiveTime) {
self.append_primitive_header(VariantPrimitiveType::Time);
let micros_from_midnight = value.num_seconds_from_midnight() as u64 * 1_000_000
+ value.nanosecond() as u64 / 1_000;
self.append_slice(&micros_from_midnight.to_le_bytes());
}

fn append_decimal4(&mut self, decimal4: VariantDecimal4) {
self.append_primitive_header(VariantPrimitiveType::Decimal4);
self.append_u8(decimal4.scale());
Expand Down Expand Up @@ -334,6 +342,7 @@ impl ValueBuffer {
Variant::ShortString(s) => self.append_short_string(s),
Variant::Object(obj) => self.append_object(metadata_builder, obj),
Variant::List(list) => self.append_list(metadata_builder, list),
Variant::Time(v) => self.append_time_micros(v),
}
}

Expand Down Expand Up @@ -364,6 +373,7 @@ impl ValueBuffer {
Variant::ShortString(s) => self.append_short_string(s),
Variant::Object(obj) => self.try_append_object(metadata_builder, obj)?,
Variant::List(list) => self.try_append_list(metadata_builder, list)?,
Variant::Time(v) => self.append_time_micros(v),
}

Ok(())
Expand Down
42 changes: 41 additions & 1 deletion parquet-variant/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::utils::{
use crate::ShortString;

use arrow_schema::ArrowError;
use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, NaiveTime, Utc};

/// The basic type of a [`Variant`] value, encoded in the first two bits of the
/// header byte.
Expand Down Expand Up @@ -63,6 +63,7 @@ pub enum VariantPrimitiveType {
Float = 14,
Binary = 15,
String = 16,
Time = 17,
}

/// Extracts the basic type from a header byte
Expand Down Expand Up @@ -104,6 +105,7 @@ impl TryFrom<u8> for VariantPrimitiveType {
14 => Ok(VariantPrimitiveType::Float),
15 => Ok(VariantPrimitiveType::Binary),
16 => Ok(VariantPrimitiveType::String),
17 => Ok(VariantPrimitiveType::Time),
_ => Err(ArrowError::InvalidArgumentError(format!(
"unknown primitive type: {value}",
))),
Expand Down Expand Up @@ -295,6 +297,25 @@ pub(crate) fn decode_timestampntz_micros(data: &[u8]) -> Result<NaiveDateTime, A
.map(|v| v.naive_utc())
}

pub(crate) fn decode_time_ntz(data: &[u8]) -> Result<NaiveTime, ArrowError> {
let micros_since_epoch = u64::from_le_bytes(array_from_slice(data, 0)?);

let case_error = ArrowError::CastError(format!(
"Could not cast {micros_since_epoch} microseconds into a NaiveTime"
));

if micros_since_epoch >= 86_400_000_000 {
return Err(case_error);
}

let nanos_since_midnight = micros_since_epoch * 1_000;
NaiveTime::from_num_seconds_from_midnight_opt(
(nanos_since_midnight / 1_000_000_000) as u32,
(nanos_since_midnight % 1_000_000_000) as u32,
)
.ok_or(case_error)
}

/// Decodes a Binary from the value section of a variant.
pub(crate) fn decode_binary(data: &[u8]) -> Result<&[u8], ArrowError> {
let len = u32::from_le_bytes(array_from_slice(data, 0)?) as usize;
Expand Down Expand Up @@ -441,6 +462,25 @@ mod tests {
);
}

mod time {
use super::*;

test_decoder_bounds!(
test_timentz,
[0x53, 0x1f, 0x8e, 0xdf, 0x2, 0, 0, 0],
decode_time_ntz,
NaiveTime::from_num_seconds_from_midnight_opt(12340, 567_891_000).unwrap()
);

#[test]
fn test_decode_time_ntz_invalid() {
let invalid_second = u64::MAX;
let data = invalid_second.to_le_bytes();
let result = decode_time_ntz(&data);
assert!(matches!(result, Err(ArrowError::CastError(_))));
}
}

#[test]
fn test_binary_exact_length() {
let data = [
Expand Down
Loading
Loading