Skip to content

Commit 20a42c2

Browse files
authored
Merge branch 'main' into experiment_roaring_bitmap_for_int32_anti_semi_joins
2 parents 173bd10 + 7d5ddca commit 20a42c2

16 files changed

Lines changed: 936 additions & 71 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
2424
use crate::error::_config_err;
2525
use crate::format::{ExplainAnalyzeCategories, ExplainFormat, MetricType};
2626
use crate::parquet_config::DFParquetWriterVersion;
27-
use crate::parsers::CompressionTypeVariant;
27+
use crate::parsers::{CompressionTypeVariant, CsvQuoteStyle};
2828
use crate::utils::get_available_parallelism;
2929
use crate::{DataFusionError, Result};
3030
#[cfg(feature = "parquet_encryption")]
@@ -2042,6 +2042,17 @@ impl ConfigField for CompressionTypeVariant {
20422042
}
20432043
}
20442044

2045+
impl ConfigField for CsvQuoteStyle {
2046+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
2047+
v.some(key, self, description)
2048+
}
2049+
2050+
fn set(&mut self, _: &str, value: &str) -> Result<()> {
2051+
*self = CsvQuoteStyle::from_str(value)?;
2052+
Ok(())
2053+
}
2054+
}
2055+
20452056
/// An implementation trait used to recursively walk configuration
20462057
pub trait Visit {
20472058
fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
@@ -3114,6 +3125,15 @@ config_namespace! {
31143125
pub terminator: Option<u8>, default = None
31153126
pub escape: Option<u8>, default = None
31163127
pub double_quote: Option<bool>, default = None
3128+
/// Quote style for CSV writing.
3129+
/// One of: "Always", "Necessary", "NonNumeric", "Never"
3130+
pub quote_style: CsvQuoteStyle, default = CsvQuoteStyle::Necessary
3131+
/// Whether to ignore leading whitespace in string values when writing CSV.
3132+
/// Defaults to `false` when `None`.
3133+
pub ignore_leading_whitespace: Option<bool>, default = None
3134+
/// Whether to ignore trailing whitespace in string values when writing CSV.
3135+
/// Defaults to `false` when `None`.
3136+
pub ignore_trailing_whitespace: Option<bool>, default = None
31173137
/// Specifies whether newlines in (quoted) values are supported.
31183138
///
31193139
/// Parsing newlines in quoted values may be affected by execution behaviour such as
@@ -3222,6 +3242,30 @@ impl CsvOptions {
32223242
self
32233243
}
32243244

3245+
/// Set the quote style for CSV writing.
3246+
pub fn with_quote_style(mut self, quote_style: CsvQuoteStyle) -> Self {
3247+
self.quote_style = quote_style;
3248+
self
3249+
}
3250+
3251+
/// Set whether to ignore leading whitespace in string values when writing CSV.
3252+
pub fn with_ignore_leading_whitespace(
3253+
mut self,
3254+
ignore_leading_whitespace: bool,
3255+
) -> Self {
3256+
self.ignore_leading_whitespace = Some(ignore_leading_whitespace);
3257+
self
3258+
}
3259+
3260+
/// Set whether to ignore trailing whitespace in string values when writing CSV.
3261+
pub fn with_ignore_trailing_whitespace(
3262+
mut self,
3263+
ignore_trailing_whitespace: bool,
3264+
) -> Self {
3265+
self.ignore_trailing_whitespace = Some(ignore_trailing_whitespace);
3266+
self
3267+
}
3268+
32253269
/// Specifies whether newlines in (quoted) values are supported.
32263270
///
32273271
/// Parsing newlines in quoted values may be affected by execution behaviour such as

datafusion/common/src/file_options/csv_writer.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,13 @@ impl TryFrom<&CsvOptions> for CsvWriterOptions {
9494
if let Some(v) = &value.double_quote {
9595
builder = builder.with_double_quote(*v)
9696
}
97+
builder = builder.with_quote_style(value.quote_style.into());
98+
if let Some(v) = &value.ignore_leading_whitespace {
99+
builder = builder.with_ignore_leading_whitespace(*v)
100+
}
101+
if let Some(v) = &value.ignore_trailing_whitespace {
102+
builder = builder.with_ignore_trailing_whitespace(*v)
103+
}
97104
Ok(CsvWriterOptions {
98105
writer_options: builder,
99106
compression: value.compression,

datafusion/common/src/parsers.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,59 @@ impl CompressionTypeVariant {
7373
!matches!(self, &Self::UNCOMPRESSED)
7474
}
7575
}
76+
77+
/// CSV quote style
78+
///
79+
/// Controls when fields are quoted when writing CSV files.
80+
/// Corresponds to [`arrow::csv::QuoteStyle`].
81+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
82+
pub enum CsvQuoteStyle {
83+
/// Quote all fields
84+
Always,
85+
/// Only quote fields when necessary (default)
86+
#[default]
87+
Necessary,
88+
/// Quote all non-numeric fields
89+
NonNumeric,
90+
/// Never quote fields
91+
Never,
92+
}
93+
94+
impl FromStr for CsvQuoteStyle {
95+
type Err = DataFusionError;
96+
97+
fn from_str(s: &str) -> Result<Self, Self::Err> {
98+
match s.to_lowercase().as_str() {
99+
"always" => Ok(Self::Always),
100+
"necessary" => Ok(Self::Necessary),
101+
"non_numeric" | "nonnumeric" => Ok(Self::NonNumeric),
102+
"never" => Ok(Self::Never),
103+
_ => Err(DataFusionError::NotImplemented(format!(
104+
"Unsupported CSV quote style {s}"
105+
))),
106+
}
107+
}
108+
}
109+
110+
impl From<CsvQuoteStyle> for arrow::csv::QuoteStyle {
111+
fn from(style: CsvQuoteStyle) -> Self {
112+
match style {
113+
CsvQuoteStyle::Always => Self::Always,
114+
CsvQuoteStyle::NonNumeric => Self::NonNumeric,
115+
CsvQuoteStyle::Never => Self::Never,
116+
CsvQuoteStyle::Necessary => Self::Necessary,
117+
}
118+
}
119+
}
120+
121+
impl Display for CsvQuoteStyle {
122+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123+
let str = match self {
124+
Self::Always => "Always",
125+
Self::Necessary => "Necessary",
126+
Self::NonNumeric => "NonNumeric",
127+
Self::Never => "Never",
128+
};
129+
write!(f, "{str}")
130+
}
131+
}

datafusion/expr/src/type_coercion/functions.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,8 @@ fn coerced_from<'a>(
888888
(Utf8View, Utf8 | LargeUtf8 | Null) => Some(type_into.clone()),
889889
// Any type can be coerced into strings
890890
(Utf8 | LargeUtf8, _) => Some(type_into.clone()),
891+
// We can go into a BinaryView from a Binary or LargeBinary
892+
(BinaryView, Binary | LargeBinary | Null) => Some(type_into.clone()),
891893
(Null, _) if can_cast_types(type_from, type_into) => Some(type_into.clone()),
892894

893895
(List(_), FixedSizeList(_, _)) => Some(type_into.clone()),
@@ -956,6 +958,20 @@ mod tests {
956958
let cases = vec![
957959
(DataType::Utf8View, DataType::Utf8),
958960
(DataType::Utf8View, DataType::LargeUtf8),
961+
(DataType::Utf8View, DataType::Null),
962+
];
963+
964+
for case in cases {
965+
assert_eq!(coerced_from(&case.0, &case.1), Some(case.0));
966+
}
967+
}
968+
969+
#[test]
970+
fn test_binary_conversion() {
971+
let cases = vec![
972+
(DataType::BinaryView, DataType::Binary),
973+
(DataType::BinaryView, DataType::LargeBinary),
974+
(DataType::BinaryView, DataType::Null),
959975
];
960976

961977
for case in cases {

datafusion/functions-nested/benches/array_remove.rs

Lines changed: 100 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616
// under the License.
1717

1818
use arrow::array::{
19-
Array, ArrayRef, BinaryArray, BooleanArray, FixedSizeBinaryArray, ListArray,
20-
StringArray,
19+
Array, ArrayRef, BinaryArray, BooleanArray, Decimal128Array, FixedSizeBinaryArray,
20+
Float64Array, Int64Array, ListArray, StringArray,
2121
};
2222
use arrow::buffer::OffsetBuffer;
23-
use arrow::datatypes::{DataType, Decimal128Type, Field, Float64Type, Int64Type};
24-
use arrow::util::bench_util::create_primitive_list_array_with_seed;
23+
use arrow::datatypes::{DataType, Field};
2524
use criterion::{
2625
criterion_group, criterion_main, {BenchmarkId, Criterion},
2726
};
@@ -56,15 +55,7 @@ fn bench_array_remove_int64(c: &mut Criterion) {
5655
let mut group = c.benchmark_group("array_remove_int64");
5756

5857
for &array_size in ARRAY_SIZES {
59-
let list_array: ArrayRef =
60-
Arc::new(create_primitive_list_array_with_seed::<i32, Int64Type>(
61-
NUM_ROWS,
62-
0.0,
63-
NULL_DENSITY as f32,
64-
array_size,
65-
SEED,
66-
));
67-
58+
let list_array = create_int64_list_array(NUM_ROWS, array_size, NULL_DENSITY);
6859
let element_to_remove = ScalarValue::Int64(Some(1));
6960
let args = create_args(list_array.clone(), element_to_remove.clone());
7061

@@ -105,14 +96,7 @@ fn bench_array_remove_f64(c: &mut Criterion) {
10596
let mut group = c.benchmark_group("array_remove_f64");
10697

10798
for &array_size in ARRAY_SIZES {
108-
let list_array: ArrayRef =
109-
Arc::new(create_primitive_list_array_with_seed::<i32, Float64Type>(
110-
NUM_ROWS,
111-
0.0,
112-
NULL_DENSITY as f32,
113-
array_size,
114-
SEED,
115-
));
99+
let list_array = create_f64_list_array(NUM_ROWS, array_size, NULL_DENSITY);
116100
let element_to_remove = ScalarValue::Float64(Some(1.0));
117101
let args = create_args(list_array.clone(), element_to_remove.clone());
118102

@@ -276,17 +260,8 @@ fn bench_array_remove_decimal64(c: &mut Criterion) {
276260
let mut group = c.benchmark_group("array_remove_decimal64");
277261

278262
for &array_size in ARRAY_SIZES {
279-
let list_array: ArrayRef = Arc::new(create_primitive_list_array_with_seed::<
280-
i32,
281-
Decimal128Type,
282-
>(
283-
NUM_ROWS,
284-
0.0,
285-
NULL_DENSITY as f32,
286-
array_size,
287-
SEED,
288-
));
289-
let element_to_remove = ScalarValue::Decimal128(Some(100_i128), 38, 10);
263+
let list_array = create_decimal64_list_array(NUM_ROWS, array_size, NULL_DENSITY);
264+
let element_to_remove = ScalarValue::Decimal128(Some(100_i128), 10, 2);
290265
let args = create_args(list_array.clone(), element_to_remove.clone());
291266

292267
group.bench_with_input(
@@ -301,7 +276,7 @@ fn bench_array_remove_decimal64(c: &mut Criterion) {
301276
arg_fields: vec![
302277
Field::new("arr", list_array.data_type().clone(), false)
303278
.into(),
304-
Field::new("el", DataType::Decimal128(38, 10), false)
279+
Field::new("el", DataType::Decimal128(10, 2), false)
305280
.into(),
306281
],
307282
number_rows: NUM_ROWS,
@@ -373,6 +348,66 @@ fn create_args(list_array: ArrayRef, element: ScalarValue) -> Vec<ColumnarValue>
373348
]
374349
}
375350

351+
fn create_int64_list_array(
352+
num_rows: usize,
353+
array_size: usize,
354+
null_density: f64,
355+
) -> ArrayRef {
356+
let mut rng = StdRng::seed_from_u64(SEED);
357+
let values = (0..num_rows * array_size)
358+
.map(|_| {
359+
if rng.random::<f64>() < null_density {
360+
None
361+
} else {
362+
Some(rng.random_range(0..array_size as i64))
363+
}
364+
})
365+
.collect::<Int64Array>();
366+
let offsets = (0..=num_rows)
367+
.map(|i| (i * array_size) as i32)
368+
.collect::<Vec<i32>>();
369+
370+
Arc::new(
371+
ListArray::try_new(
372+
Arc::new(Field::new("item", DataType::Int64, true)),
373+
OffsetBuffer::new(offsets.into()),
374+
Arc::new(values),
375+
None,
376+
)
377+
.unwrap(),
378+
)
379+
}
380+
381+
fn create_f64_list_array(
382+
num_rows: usize,
383+
array_size: usize,
384+
null_density: f64,
385+
) -> ArrayRef {
386+
let mut rng = StdRng::seed_from_u64(SEED);
387+
let values = (0..num_rows * array_size)
388+
.map(|_| {
389+
if rng.random::<f64>() < null_density {
390+
None
391+
} else {
392+
Some(rng.random_range(0..array_size as i64) as f64)
393+
}
394+
})
395+
.collect::<Float64Array>();
396+
let offsets = (0..=num_rows)
397+
.map(|i| (i * array_size) as i32)
398+
.collect::<Vec<i32>>();
399+
400+
Arc::new(
401+
ListArray::try_new(
402+
Arc::new(Field::new("item", DataType::Float64, true)),
403+
OffsetBuffer::new(offsets.into()),
404+
Arc::new(values),
405+
None,
406+
)
407+
.unwrap(),
408+
)
409+
}
410+
376411
fn create_string_list_array(
377412
num_rows: usize,
378413
array_size: usize,
@@ -465,6 +500,38 @@ fn create_boolean_list_array(
465500
)
466501
}
467502

503+
fn create_decimal64_list_array(
504+
num_rows: usize,
505+
array_size: usize,
506+
null_density: f64,
507+
) -> ArrayRef {
508+
let mut rng = StdRng::seed_from_u64(SEED);
509+
let values = (0..num_rows * array_size)
510+
.map(|_| {
511+
if rng.random::<f64>() < null_density {
512+
None
513+
} else {
514+
Some(rng.random_range(0..array_size) as i128 * 100)
515+
}
516+
})
517+
.collect::<Decimal128Array>()
518+
.with_precision_and_scale(10, 2)
519+
.unwrap();
520+
let offsets = (0..=num_rows)
521+
.map(|i| (i * array_size) as i32)
522+
.collect::<Vec<i32>>();
523+
524+
Arc::new(
525+
ListArray::try_new(
526+
Arc::new(Field::new("item", DataType::Decimal128(10, 2), true)),
527+
OffsetBuffer::new(offsets.into()),
528+
Arc::new(values),
529+
None,
530+
)
531+
.unwrap(),
532+
)
533+
}
534+
468535
fn create_fixed_size_binary_list_array(
469536
num_rows: usize,
470537
array_size: usize,

0 commit comments

Comments
 (0)