-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Complete StringViewArray and BinaryViewArray parquet decoder: implement delta byte array and delta length byte array encoding
#6004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
a7b7a81
65ff383
b9e7d71
7f0c404
564d8d0
beefb5a
132d2c6
eb1c29a
5a893c9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,22 +17,23 @@ | |
|
|
||
| use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; | ||
| use crate::arrow::buffer::view_buffer::ViewBuffer; | ||
| use crate::arrow::decoder::DictIndexDecoder; | ||
| use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; | ||
| use crate::arrow::record_reader::GenericRecordReader; | ||
| use crate::arrow::schema::parquet_to_arrow_field; | ||
| use crate::basic::{ConvertedType, Encoding}; | ||
| use crate::column::page::PageIterator; | ||
| use crate::column::reader::decoder::ColumnValueDecoder; | ||
| use crate::data_type::Int32Type; | ||
| use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; | ||
| use crate::errors::{ParquetError, Result}; | ||
| use crate::schema::types::ColumnDescPtr; | ||
| use arrow_array::ArrayRef; | ||
| use arrow_array::{builder::make_view, ArrayRef}; | ||
| use arrow_data::ByteView; | ||
| use arrow_schema::DataType as ArrowType; | ||
| use bytes::Bytes; | ||
| use std::any::Any; | ||
|
|
||
| /// Returns an [`ArrayReader`] that decodes the provided byte array column to view types. | ||
| #[allow(unused)] | ||
| pub fn make_byte_view_array_reader( | ||
| pages: Box<dyn PageIterator>, | ||
| column_desc: ColumnDescPtr, | ||
|
|
@@ -61,7 +62,6 @@ pub fn make_byte_view_array_reader( | |
| } | ||
|
|
||
| /// An [`ArrayReader`] for variable length byte arrays | ||
| #[allow(unused)] | ||
| struct ByteViewArrayReader { | ||
| data_type: ArrowType, | ||
| pages: Box<dyn PageIterator>, | ||
|
|
@@ -213,6 +213,8 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { | |
| pub enum ByteViewArrayDecoder { | ||
| Plain(ByteViewArrayDecoderPlain), | ||
| Dictionary(ByteViewArrayDecoderDictionary), | ||
| DeltaLength(ByteViewArrayDecoderDeltaLength), | ||
| DeltaByteArray(ByteViewArrayDecoderDelta), | ||
| } | ||
|
|
||
| impl ByteViewArrayDecoder { | ||
|
|
@@ -235,9 +237,12 @@ impl ByteViewArrayDecoder { | |
| data, num_levels, num_values, | ||
| )) | ||
| } | ||
| Encoding::DELTA_LENGTH_BYTE_ARRAY | Encoding::DELTA_BYTE_ARRAY => { | ||
| unimplemented!("stay tuned!") | ||
| } | ||
| Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength( | ||
| ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?, | ||
| ), | ||
| Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray( | ||
| ByteViewArrayDecoderDelta::new(data, validate_utf8)?, | ||
| ), | ||
| _ => { | ||
| return Err(general_err!( | ||
| "unsupported encoding for byte array: {}", | ||
|
|
@@ -263,6 +268,8 @@ impl ByteViewArrayDecoder { | |
| .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?; | ||
| d.read(out, dict, len) | ||
| } | ||
| ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len), | ||
| ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -275,6 +282,8 @@ impl ByteViewArrayDecoder { | |
| .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?; | ||
| d.skip(dict, len) | ||
| } | ||
| ByteViewArrayDecoder::DeltaLength(d) => d.skip(len), | ||
| ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len), | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -487,6 +496,177 @@ impl ByteViewArrayDecoderDictionary { | |
| } | ||
| } | ||
|
|
||
| /// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`] | ||
| pub struct ByteViewArrayDecoderDeltaLength { | ||
| lengths: Vec<i32>, | ||
| data: Bytes, | ||
| length_offset: usize, | ||
| data_offset: usize, | ||
| validate_utf8: bool, | ||
| } | ||
|
|
||
| impl ByteViewArrayDecoderDeltaLength { | ||
| fn new(data: Bytes, validate_utf8: bool) -> Result<Self> { | ||
| let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new(); | ||
| len_decoder.set_data(data.clone(), 0)?; | ||
| let values = len_decoder.values_left(); | ||
|
|
||
| let mut lengths = vec![0; values]; | ||
| len_decoder.get(&mut lengths)?; | ||
|
|
||
| let mut total_bytes = 0; | ||
|
|
||
| for l in lengths.iter() { | ||
| if *l < 0 { | ||
| return Err(ParquetError::General( | ||
| "negative delta length byte array length".to_string(), | ||
| )); | ||
| } | ||
| total_bytes += *l as usize; | ||
| } | ||
|
|
||
| if total_bytes + len_decoder.get_offset() > data.len() { | ||
| return Err(ParquetError::General( | ||
| "Insufficient delta length byte array bytes".to_string(), | ||
| )); | ||
| } | ||
|
|
||
| Ok(Self { | ||
| lengths, | ||
| data, | ||
| validate_utf8, | ||
| length_offset: 0, | ||
| data_offset: len_decoder.get_offset(), | ||
| }) | ||
| } | ||
|
|
||
| fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> { | ||
| let to_read = len.min(self.lengths.len() - self.length_offset); | ||
| output.views.reserve(to_read); | ||
|
|
||
| let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read]; | ||
|
|
||
| let block_id = output.append_block(self.data.clone().into()); | ||
|
|
||
| let mut current_offset = self.data_offset; | ||
| let initial_offset = current_offset; | ||
| for length in src_lengths { | ||
| // # Safety | ||
| // The length is from the delta length decoder, so it is valid | ||
| // The start_offset is calculated from the lengths, so it is valid | ||
|
XiangpengHao marked this conversation as resolved.
|
||
| // `start_offset + length` is guaranteed to be within the bounds of `data`, as checked in `new` | ||
| unsafe { output.append_view_unchecked(block_id, current_offset as u32, *length as u32) } | ||
|
|
||
| current_offset += *length as usize; | ||
| } | ||
|
|
||
| // Delta length encoding has continuous strings, we can validate utf8 in one go | ||
| if self.validate_utf8 { | ||
| check_valid_utf8(&self.data[initial_offset..current_offset])?; | ||
| } | ||
|
|
||
| self.data_offset = current_offset; | ||
| self.length_offset += to_read; | ||
|
|
||
| Ok(to_read) | ||
| } | ||
|
|
||
| fn skip(&mut self, to_skip: usize) -> Result<usize> { | ||
| let remain_values = self.lengths.len() - self.length_offset; | ||
| let to_skip = remain_values.min(to_skip); | ||
|
|
||
| let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip]; | ||
| let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); | ||
|
|
||
| self.data_offset += total_bytes; | ||
| self.length_offset += to_skip; | ||
| Ok(to_skip) | ||
| } | ||
| } | ||
|
|
||
| /// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`] | ||
| pub struct ByteViewArrayDecoderDelta { | ||
| decoder: DeltaByteArrayDecoder, | ||
| validate_utf8: bool, | ||
| } | ||
|
|
||
| impl ByteViewArrayDecoderDelta { | ||
| fn new(data: Bytes, validate_utf8: bool) -> Result<Self> { | ||
| Ok(Self { | ||
| decoder: DeltaByteArrayDecoder::new(data)?, | ||
| validate_utf8, | ||
| }) | ||
| } | ||
|
|
||
| // Unlike other encodings, we need to copy the data. | ||
| // | ||
| // DeltaByteArray data is stored using shared prefixes/suffixes, | ||
| // which results in potentially non-contiguous | ||
| // strings, while Arrow encodings require contiguous strings | ||
| // | ||
| // <https://parquet.apache.org/docs/file-format/data-pages/encodings/#delta-strings-delta_byte_array--7> | ||
|
|
||
| fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> { | ||
| output.views.reserve(len.min(self.decoder.remaining())); | ||
|
|
||
| // array buffer only have long strings | ||
| let mut array_buffer: Vec<u8> = Vec::with_capacity(4096); | ||
|
|
||
| let buffer_id = output.buffers.len() as u32; | ||
|
|
||
| let read = if !self.validate_utf8 { | ||
| self.decoder.read(len, |bytes| { | ||
| let offset = array_buffer.len(); | ||
| let view = make_view(bytes, buffer_id, offset as u32); | ||
| if bytes.len() > 12 { | ||
| // only copy the data to buffer if the string can not be inlined. | ||
| array_buffer.extend_from_slice(bytes); | ||
| } | ||
|
|
||
| // # Safety | ||
| // The buffer_id is the last buffer in the output buffers | ||
| // The offset is calculated from the buffer, so it is valid | ||
| unsafe { | ||
| output.append_raw_view_unchecked(&view); | ||
| } | ||
| Ok(()) | ||
| })? | ||
| } else { | ||
| // utf8 validation buffer have all strings, we batch the strings in one buffer to accelerate validation | ||
| let mut utf8_validation_buffer = Vec::with_capacity(4096); | ||
|
|
||
| let v = self.decoder.read(len, |bytes| { | ||
| let offset = array_buffer.len(); | ||
| let view = make_view(bytes, buffer_id, offset as u32); | ||
| if bytes.len() > 12 { | ||
| // only copy the data to buffer if the string can not be inlined. | ||
| array_buffer.extend_from_slice(bytes); | ||
| } | ||
| utf8_validation_buffer.extend_from_slice(bytes); | ||
|
|
||
| // # Safety | ||
| // The buffer_id is the last buffer in the output buffers | ||
| // The offset is calculated from the buffer, so it is valid | ||
| // Utf-8 validation is done later | ||
| unsafe { | ||
| output.append_raw_view_unchecked(&view); | ||
| } | ||
| Ok(()) | ||
| })?; | ||
| check_valid_utf8(&utf8_validation_buffer)?; | ||
| v | ||
| }; | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't we also need to check
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see -- I was thinking about something like this: XiangpengHao#1 (call |
||
| let actual_block_id = output.append_block(array_buffer.into()); | ||
| assert_eq!(actual_block_id, buffer_id); | ||
|
XiangpengHao marked this conversation as resolved.
|
||
| Ok(read) | ||
| } | ||
|
|
||
| fn skip(&mut self, to_skip: usize) -> Result<usize> { | ||
| self.decoder.skip(to_skip) | ||
| } | ||
| } | ||
|
|
||
| /// Check that `val` is a valid UTF-8 sequence | ||
| pub fn check_valid_utf8(val: &[u8]) -> Result<()> { | ||
| match std::str::from_utf8(val) { | ||
|
|
@@ -525,13 +705,6 @@ mod tests { | |
| .unwrap(); | ||
|
|
||
| for (encoding, page) in pages { | ||
| if encoding != Encoding::PLAIN | ||
| && encoding != Encoding::RLE_DICTIONARY | ||
| && encoding != Encoding::PLAIN_DICTIONARY | ||
| { | ||
| // skip unsupported encodings for now as they are not yet implemented | ||
| continue; | ||
| } | ||
| let mut output = ViewBuffer::default(); | ||
| decoder.set_data(encoding, page, 4, Some(4)).unwrap(); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2456,26 +2456,16 @@ mod tests { | |
| let cases = [ | ||
| ( | ||
| invalid_utf8_first_char::<i32>(), | ||
| "Parquet argument error: Parquet error: encountered non UTF-8 data", | ||
| "Parquet argument error: Parquet error: encountered non UTF-8 data: invalid utf-8 sequence of 1 bytes from index 11", | ||
| ), | ||
| ( | ||
| invalid_utf8_later_char::<i32>(), | ||
| "Parquet argument error: Parquet error: encountered non UTF-8 data: invalid utf-8 sequence of 1 bytes from index 6", | ||
| "Parquet argument error: Parquet error: encountered non UTF-8 data: invalid utf-8 sequence of 1 bytes from index 14", | ||
| ), | ||
| ]; | ||
| for (array, expected_error) in cases { | ||
| // cast not yet implemented for BinaryView | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🥳 |
||
| // https://github.com/apache/arrow-rs/issues/5508 | ||
| // so copy directly | ||
| let mut builder = BinaryViewBuilder::with_capacity(100); | ||
| for v in array.iter() { | ||
| if let Some(v) = v { | ||
| builder.append_value(v); | ||
| } else { | ||
| builder.append_null(); | ||
| } | ||
| } | ||
| let array = builder.finish(); | ||
| let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap(); | ||
| let array = array.as_binary_view(); | ||
|
|
||
| // data is not valid utf8 we can not construct a correct StringArray | ||
| // safely, so purposely create an invalid StringArray | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.