Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 1 addition & 113 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ mod tests {
use super::*;
use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column};
use crate::arrow::record_reader::buffer::ValuesBuffer;
use arrow_array::{Array, StringArray, StringViewArray};
use arrow_array::{Array, StringArray};
use arrow_buffer::Buffer;

#[test]
Expand Down Expand Up @@ -657,64 +657,6 @@ mod tests {
}
}

#[test]
fn test_byte_array_string_view_decoder() {
let (pages, encoded_dictionary) =
byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]);

let column_desc = utf8_column();
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);

decoder
.set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
.unwrap();

for (encoding, page) in pages {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);

assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);

assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(output.values.as_slice(), "helloworld".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 10]);

assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
assert_eq!(
output.values.as_slice(),
"helloworldlarge payload over 12 bytesb".as_bytes()
);
assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 37, 38]);

assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);

let valid = [false, false, true, true, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View);
let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();

assert_eq!(
strings.iter().collect::<Vec<_>>(),
vec![
None,
None,
Some("hello"),
Some("world"),
None,
Some("large payload over 12 bytes"),
Some("b"),
None,
None,
]
);
}
}

#[test]
fn test_byte_array_decoder_skip() {
let (pages, encoded_dictionary) =
Expand Down Expand Up @@ -759,60 +701,6 @@ mod tests {
}
}

#[test]
fn test_byte_array_string_view_decoder_skip() {
let (pages, encoded_dictionary) =
byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]);

let column_desc = utf8_column();
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);

decoder
.set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
.unwrap();

for (encoding, page) in pages {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);

assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);

assert_eq!(decoder.skip_values(1).unwrap(), 1);
assert_eq!(decoder.skip_values(1).unwrap(), 1);

assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(
output.values.as_slice(),
"hellolarge payload over 12 bytes".as_bytes()
);
assert_eq!(output.offsets.as_slice(), &[0, 5, 32]);

assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);

let valid = [false, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice());
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View);
let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();

assert_eq!(
strings.iter().collect::<Vec<_>>(),
vec![
None,
None,
Some("hello"),
Some("large payload over 12 bytes"),
None,
None,
]
);
}
}

#[test]
fn test_byte_array_decoder_nulls() {
let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new());
Expand Down
100 changes: 13 additions & 87 deletions parquet/src/arrow/buffer/offset_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@
use crate::arrow::buffer::bit_util::iter_set_bits_rev;
use crate::arrow::record_reader::buffer::ValuesBuffer;
use crate::errors::{ParquetError, Result};
use arrow_array::builder::GenericByteViewBuilder;
use arrow_array::types::BinaryViewType;
use arrow_array::{make_array, ArrayRef, OffsetSizeTrait};
use arrow_buffer::{ArrowNativeType, Buffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::DataType as ArrowType;
use std::sync::Arc;

/// A buffer of variable-sized byte arrays that can be converted into
/// a corresponding [`ArrayRef`]
Expand Down Expand Up @@ -128,51 +125,18 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {

/// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) -> ArrayRef {
match data_type {
ArrowType::Utf8View => {
let mut builder = self.build_generic_byte_view();
Arc::new(builder.finish().to_string_view().unwrap())
}
ArrowType::BinaryView => {
let mut builder = self.build_generic_byte_view();
Arc::new(builder.finish())
}
_ => {
let array_data_builder = ArrayDataBuilder::new(data_type)
.len(self.len())
.add_buffer(Buffer::from_vec(self.offsets))
.add_buffer(Buffer::from_vec(self.values))
.null_bit_buffer(null_buffer);

let data = match cfg!(debug_assertions) {
true => array_data_builder.build().unwrap(),
false => unsafe { array_data_builder.build_unchecked() },
};

make_array(data)
}
}
}

fn build_generic_byte_view(self) -> GenericByteViewBuilder<BinaryViewType> {
let mut builder = GenericByteViewBuilder::<BinaryViewType>::with_capacity(self.len());
let buffer = self.values.into();
let block = builder.append_block(buffer);
for window in self.offsets.windows(2) {
let start = window[0];
let end = window[1];
let len = (end - start).to_usize().unwrap();

if len != 0 {
// Safety: (1) the buffer is valid (2) the offsets are valid (3) the values in between are of ByteViewType
unsafe {
builder.append_view_unchecked(block, start.as_usize() as u32, len as u32);
}
} else {
builder.append_null();
}
}
builder
let array_data_builder = ArrayDataBuilder::new(data_type)
.len(self.len())
.add_buffer(Buffer::from_vec(self.offsets))
.add_buffer(Buffer::from_vec(self.values))
.null_bit_buffer(null_buffer);

let data = match cfg!(debug_assertions) {
true => array_data_builder.build().unwrap(),
false => unsafe { array_data_builder.build_unchecked() },
};

make_array(data)
}
}

Expand Down Expand Up @@ -229,7 +193,7 @@ impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::{Array, LargeStringArray, StringArray, StringViewArray};
use arrow_array::{Array, LargeStringArray, StringArray};

#[test]
fn test_offset_buffer_empty() {
Expand Down Expand Up @@ -280,44 +244,6 @@ mod tests {
);
}

#[test]
fn test_string_view() {
let mut buffer = OffsetBuffer::<i32>::default();
for v in [
"hello",
"world",
"large payload over 12 bytes",
"a",
"b",
"c",
] {
buffer.try_push(v.as_bytes(), false).unwrap()
}
let split = std::mem::take(&mut buffer);

let array = split.into_array(None, ArrowType::Utf8View);
let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();
assert_eq!(
strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
vec![
"hello",
"world",
"large payload over 12 bytes",
"a",
"b",
"c"
]
);

buffer.try_push("test".as_bytes(), false).unwrap();
let array = buffer.into_array(None, ArrowType::Utf8View);
let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();
assert_eq!(
strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
vec!["test"]
);
}

#[test]
fn test_offset_buffer_pad_nulls() {
let mut buffer = OffsetBuffer::<i32>::default();
Expand Down