Skip to content

Commit 1cb4de4

Browse files
authored
perf: Optimize approx_distinct for inline Utf8View (#21064)
## Which issue does this PR close? - Closes #21039 . ## Rationale for this change For short strings that are stored inline in a `Utf8View`, we can hash the string's value directly, without materializing a `&str`, and then add the hash value to HyperLogLog directly. This improves performance by ~40%. ## What changes are included in this PR? * Add benchmark for `approx_distinct` on short strings * Add `add_hashed` API to `HyperLogLog` * Rename `SEED` to `HLL_HASH_STATE` and make it `pub(crate)` * Optimize `approx_distinct` on short strings as described above. ## Are these changes tested? Yes. ## Are there any user-facing changes? No.
1 parent f734ec5 commit 1cb4de4

3 files changed

Lines changed: 65 additions & 33 deletions

File tree

datafusion/functions-aggregate/benches/approx_distinct.rs

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ use rand::rngs::StdRng;
2828
use rand::{Rng, SeedableRng};
2929

3030
const BATCH_SIZE: usize = 8192;
31-
const STRING_LENGTH: usize = 20;
31+
const SHORT_STRING_LENGTH: usize = 8;
32+
const LONG_STRING_LENGTH: usize = 20;
3233

3334
fn prepare_accumulator(data_type: DataType) -> Box<dyn Accumulator> {
3435
let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)]));
@@ -55,12 +56,12 @@ fn create_i64_array(n_distinct: usize) -> Int64Array {
5556
.collect()
5657
}
5758

58-
/// Creates a pool of `n_distinct` random strings.
59-
fn create_string_pool(n_distinct: usize) -> Vec<String> {
59+
/// Creates a pool of `n_distinct` random strings of the given length.
60+
fn create_string_pool(n_distinct: usize, string_length: usize) -> Vec<String> {
6061
let mut rng = StdRng::seed_from_u64(42);
6162
(0..n_distinct)
6263
.map(|_| {
63-
(0..STRING_LENGTH)
64+
(0..string_length)
6465
.map(|_| rng.random_range(b'a'..=b'z') as char)
6566
.collect()
6667
})
@@ -98,29 +99,39 @@ fn approx_distinct_benchmark(c: &mut Criterion) {
9899
})
99100
});
100101

101-
let string_pool = create_string_pool(n_distinct);
102+
for (label, str_len) in
103+
[("short", SHORT_STRING_LENGTH), ("long", LONG_STRING_LENGTH)]
104+
{
105+
let string_pool = create_string_pool(n_distinct, str_len);
102106

103-
// --- Utf8 benchmarks ---
104-
let values = Arc::new(create_string_array(&string_pool)) as ArrayRef;
105-
c.bench_function(&format!("approx_distinct utf8 {pct}% distinct"), |b| {
106-
b.iter(|| {
107-
let mut accumulator = prepare_accumulator(DataType::Utf8);
108-
accumulator
109-
.update_batch(std::slice::from_ref(&values))
110-
.unwrap()
111-
})
112-
});
107+
// --- Utf8 benchmarks ---
108+
let values = Arc::new(create_string_array(&string_pool)) as ArrayRef;
109+
c.bench_function(
110+
&format!("approx_distinct utf8 {label} {pct}% distinct"),
111+
|b| {
112+
b.iter(|| {
113+
let mut accumulator = prepare_accumulator(DataType::Utf8);
114+
accumulator
115+
.update_batch(std::slice::from_ref(&values))
116+
.unwrap()
117+
})
118+
},
119+
);
113120

114-
// --- Utf8View benchmarks ---
115-
let values = Arc::new(create_string_view_array(&string_pool)) as ArrayRef;
116-
c.bench_function(&format!("approx_distinct utf8view {pct}% distinct"), |b| {
117-
b.iter(|| {
118-
let mut accumulator = prepare_accumulator(DataType::Utf8View);
119-
accumulator
120-
.update_batch(std::slice::from_ref(&values))
121-
.unwrap()
122-
})
123-
});
121+
// --- Utf8View benchmarks ---
122+
let values = Arc::new(create_string_view_array(&string_pool)) as ArrayRef;
123+
c.bench_function(
124+
&format!("approx_distinct utf8view {label} {pct}% distinct"),
125+
|b| {
126+
b.iter(|| {
127+
let mut accumulator = prepare_accumulator(DataType::Utf8View);
128+
accumulator
129+
.update_batch(std::slice::from_ref(&values))
130+
.unwrap()
131+
})
132+
},
133+
);
134+
}
124135
}
125136
}
126137

datafusion/functions-aggregate/src/approx_distinct.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
//! Defines physical expressions that can evaluated at runtime during query execution
1919
20-
use crate::hyperloglog::HyperLogLog;
21-
use arrow::array::{BinaryArray, StringViewArray};
20+
use crate::hyperloglog::{HLL_HASH_STATE, HyperLogLog};
21+
use arrow::array::{Array, BinaryArray, StringViewArray};
2222
use arrow::array::{
2323
GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray,
2424
};
@@ -44,7 +44,7 @@ use datafusion_functions_aggregate_common::noop_accumulator::NoopAccumulator;
4444
use datafusion_macros::user_doc;
4545
use std::any::Any;
4646
use std::fmt::{Debug, Formatter};
47-
use std::hash::Hash;
47+
use std::hash::{BuildHasher, Hash};
4848
use std::marker::PhantomData;
4949

5050
make_udaf_expr_and_func!(
@@ -212,8 +212,19 @@ where
212212
impl Accumulator for StringViewHLLAccumulator {
213213
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
214214
let array: &StringViewArray = downcast_value!(values[0], StringViewArray);
215-
// flatten because we would skip nulls
216-
self.hll.extend(array.iter().flatten());
215+
216+
// When all strings are stored inline in the StringView (≤ 12 bytes),
217+
// hash the raw u128 view directly instead of materializing a &str.
218+
if array.data_buffers().is_empty() {
219+
for (i, &view) in array.views().iter().enumerate() {
220+
if !array.is_null(i) {
221+
self.hll.add_hashed(HLL_HASH_STATE.hash_one(view));
222+
}
223+
}
224+
} else {
225+
self.hll.extend(array.iter().flatten());
226+
}
227+
217228
Ok(())
218229
}
219230

datafusion/functions-aggregate/src/hyperloglog.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,11 @@ where
5858
/// Fixed seed for the hashing so that values are consistent across runs
5959
///
6060
/// Note that when we later move on to have serialized HLL register binaries
61-
/// shared across cluster, this SEED will have to be consistent across all
61+
/// shared across cluster, this HLL_HASH_STATE will have to be consistent across all
6262
/// parties otherwise we might have corruption. So ideally for later this seed
6363
/// shall be part of the serialized form (or stay unchanged across versions).
64-
const SEED: foldhash::quality::FixedState = foldhash::quality::FixedState::with_seed(0);
64+
pub(crate) const HLL_HASH_STATE: foldhash::quality::FixedState =
65+
foldhash::quality::FixedState::with_seed(0);
6566

6667
impl<T> Default for HyperLogLog<T>
6768
where
@@ -97,12 +98,21 @@ where
9798
/// reasonable performance.
9899
#[inline]
99100
fn hash_value(&self, obj: &T) -> u64 {
100-
SEED.hash_one(obj)
101+
HLL_HASH_STATE.hash_one(obj)
101102
}
102103

103104
/// Adds an element to the HyperLogLog.
104105
pub fn add(&mut self, obj: &T) {
105106
let hash = self.hash_value(obj);
107+
self.add_hashed(hash);
108+
}
109+
110+
/// Adds a pre-computed hash value directly to the HyperLogLog.
111+
///
112+
/// The hash should be computed using [`HLL_HASH_STATE`], the same hasher used
113+
/// by [`Self::add`].
114+
#[inline]
115+
pub(crate) fn add_hashed(&mut self, hash: u64) {
106116
let index = (hash & HLL_P_MASK) as usize;
107117
let p = ((hash >> HLL_P) | (1_u64 << HLL_Q)).trailing_zeros() + 1;
108118
self.registers[index] = self.registers[index].max(p as u8);

0 commit comments

Comments
 (0)