Skip to content

Commit 34f250a

Browse files
Dandandanctsk
andauthored
Fast path for joins with distinct values in build side (apache#16153)
* Specialize unique join * handle splitting * rename a bit * fix * fix * fix * fix * Fix the test, add explanation * Simplify * Update datafusion/physical-plan/src/joins/join_hash_map.rs Co-authored-by: Christian <9384305+ctsk@users.noreply.github.com> * Update datafusion/physical-plan/src/joins/join_hash_map.rs Co-authored-by: Christian <9384305+ctsk@users.noreply.github.com> * Simplify * Simplify * Simplify --------- Co-authored-by: Christian <9384305+ctsk@users.noreply.github.com>
1 parent dacdda2 commit 34f250a

2 files changed

Lines changed: 47 additions & 24 deletions

File tree

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,8 +1307,8 @@ fn lookup_join_hashmap(
13071307
limit: usize,
13081308
offset: JoinHashMapOffset,
13091309
) -> Result<(UInt64Array, UInt32Array, Option<JoinHashMapOffset>)> {
1310-
let (probe_indices, build_indices, next_offset) = build_hashmap
1311-
.get_matched_indices_with_limit_offset(hashes_buffer, None, limit, offset);
1310+
let (probe_indices, build_indices, next_offset) =
1311+
build_hashmap.get_matched_indices_with_limit_offset(hashes_buffer, limit, offset);
13121312

13131313
let build_indices: UInt64Array = build_indices.into();
13141314
let probe_indices: UInt32Array = probe_indices.into();
@@ -3333,7 +3333,7 @@ mod tests {
33333333

33343334
#[test]
33353335
fn join_with_hash_collision() -> Result<()> {
3336-
let mut hashmap_left = HashTable::with_capacity(2);
3336+
let mut hashmap_left = HashTable::with_capacity(4);
33373337
let left = build_table_i32(
33383338
("a", &vec![10, 20]),
33393339
("x", &vec![100, 200]),
@@ -3348,9 +3348,15 @@ mod tests {
33483348
hashes_buff,
33493349
)?;
33503350

3351-
// Create hash collisions (same hashes)
3351+
// Maps both values to both indices (1 and 2, representing input 0 and 1)
3352+
// 0 -> (0, 1)
3353+
// 1 -> (0, 2)
3354+
// The equality check will make sure only hashes[0] maps to 0 and hashes[1] maps to 1
33523355
hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
3356+
hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
3357+
33533358
hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
3359+
hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
33543360

33553361
let next = vec![2, 0];
33563362

datafusion/physical-plan/src/joins/join_hash_map.rs

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -116,20 +116,10 @@ pub(crate) type JoinHashMapOffset = (usize, Option<u64>);
116116
macro_rules! chain_traverse {
117117
(
118118
$input_indices:ident, $match_indices:ident, $hash_values:ident, $next_chain:ident,
119-
$input_idx:ident, $chain_idx:ident, $deleted_offset:ident, $remaining_output:ident
119+
$input_idx:ident, $chain_idx:ident, $remaining_output:ident
120120
) => {
121-
let mut i = $chain_idx - 1;
121+
let mut match_row_idx = $chain_idx - 1;
122122
loop {
123-
let match_row_idx = if let Some(offset) = $deleted_offset {
124-
// This arguments means that we prune the next index way before here.
125-
if i < offset as u64 {
126-
// End of the list due to pruning
127-
break;
128-
}
129-
i - offset as u64
130-
} else {
131-
i
132-
};
133123
$match_indices.push(match_row_idx);
134124
$input_indices.push($input_idx as u32);
135125
$remaining_output -= 1;
@@ -150,7 +140,7 @@ macro_rules! chain_traverse {
150140
// end of list
151141
break;
152142
}
153-
i = next - 1;
143+
match_row_idx = next - 1;
154144
}
155145
};
156146
}
@@ -168,6 +158,11 @@ pub trait JoinHashMapType {
168158
/// Returns a reference to the next.
169159
fn get_list(&self) -> &Self::NextType;
170160

161+
// Whether values in the hashmap are distinct (no duplicate keys)
162+
fn is_distinct(&self) -> bool {
163+
false
164+
}
165+
171166
/// Updates hashmap from iterator of row indices & row hashes pairs.
172167
fn update_from_iter<'a>(
173168
&mut self,
@@ -257,17 +252,35 @@ pub trait JoinHashMapType {
257252
fn get_matched_indices_with_limit_offset(
258253
&self,
259254
hash_values: &[u64],
260-
deleted_offset: Option<usize>,
261255
limit: usize,
262256
offset: JoinHashMapOffset,
263257
) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>) {
264-
let mut input_indices = vec![];
265-
let mut match_indices = vec![];
266-
267-
let mut remaining_output = limit;
258+
let mut input_indices = Vec::with_capacity(limit);
259+
let mut match_indices = Vec::with_capacity(limit);
268260

269261
let hash_map: &HashTable<(u64, u64)> = self.get_map();
270262
let next_chain = self.get_list();
263+
// Check if hashmap consists of unique values
264+
// If so, we can skip the chain traversal
265+
if self.is_distinct() {
266+
let start = offset.0;
267+
let end = (start + limit).min(hash_values.len());
268+
for (row_idx, &hash_value) in hash_values[start..end].iter().enumerate() {
269+
if let Some((_, index)) =
270+
hash_map.find(hash_value, |(hash, _)| hash_value == *hash)
271+
{
272+
input_indices.push(start as u32 + row_idx as u32);
273+
match_indices.push(*index - 1);
274+
}
275+
}
276+
if end == hash_values.len() {
277+
// No more values to process
278+
return (input_indices, match_indices, None);
279+
}
280+
return (input_indices, match_indices, Some((end, None)));
281+
}
282+
283+
let mut remaining_output = limit;
271284

272285
// Calculate initial `hash_values` index before iterating
273286
let to_skip = match offset {
@@ -286,7 +299,6 @@ pub trait JoinHashMapType {
286299
next_chain,
287300
initial_idx,
288301
initial_next_idx,
289-
deleted_offset,
290302
remaining_output
291303
);
292304

@@ -295,6 +307,7 @@ pub trait JoinHashMapType {
295307
};
296308

297309
let mut row_idx = to_skip;
310+
298311
for hash_value in &hash_values[to_skip..] {
299312
if let Some((_, index)) =
300313
hash_map.find(*hash_value, |(hash, _)| *hash_value == *hash)
@@ -306,7 +319,6 @@ pub trait JoinHashMapType {
306319
next_chain,
307320
row_idx,
308321
index,
309-
deleted_offset,
310322
remaining_output
311323
);
312324
}
@@ -338,6 +350,11 @@ impl JoinHashMapType for JoinHashMap {
338350
fn get_list(&self) -> &Self::NextType {
339351
&self.next
340352
}
353+
354+
/// Check if the values in the hashmap are distinct.
355+
fn is_distinct(&self) -> bool {
356+
self.map.len() == self.next.len()
357+
}
341358
}
342359

343360
impl Debug for JoinHashMap {

0 commit comments

Comments
 (0)