Skip to content

Commit 3565cad

Browse files
committed
Implement bounded DP join reordering
1 parent 067ba4b commit 3565cad

2 files changed

Lines changed: 592 additions & 5 deletions

File tree

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,55 @@ fn create_nested_with_min_max() -> (
205205
(big, medium, small)
206206
}
207207

208+
fn statistics_with_distinct_counts(
209+
num_rows: usize,
210+
distinct_counts: &[usize],
211+
) -> Statistics {
212+
Statistics {
213+
num_rows: Precision::Inexact(num_rows),
214+
total_byte_size: Precision::Absent,
215+
column_statistics: distinct_counts
216+
.iter()
217+
.map(|distinct_count| ColumnStatistics {
218+
distinct_count: Precision::Inexact(*distinct_count),
219+
..Default::default()
220+
})
221+
.collect(),
222+
}
223+
}
224+
225+
/// Returns three plans connected as a chain:
226+
/// * big.b_m_key = medium.m_b_key
227+
/// * medium.m_s_key = small.s_m_key
228+
fn create_reorderable_join_chain() -> (
229+
Arc<dyn ExecutionPlan>,
230+
Arc<dyn ExecutionPlan>,
231+
Arc<dyn ExecutionPlan>,
232+
) {
233+
let big = Arc::new(StatisticsExec::new(
234+
statistics_with_distinct_counts(100_000, &[100_000, 100_000]),
235+
Schema::new(vec![
236+
Field::new("b_m_key", DataType::Int32, false),
237+
Field::new("b_payload", DataType::Int32, false),
238+
]),
239+
));
240+
241+
let medium = Arc::new(StatisticsExec::new(
242+
statistics_with_distinct_counts(10_000, &[10_000, 10_000]),
243+
Schema::new(vec![
244+
Field::new("m_b_key", DataType::Int32, false),
245+
Field::new("m_s_key", DataType::Int32, false),
246+
]),
247+
));
248+
249+
let small = Arc::new(StatisticsExec::new(
250+
statistics_with_distinct_counts(100, &[100]),
251+
Schema::new(vec![Field::new("s_m_key", DataType::Int32, false)]),
252+
));
253+
254+
(big, medium, small)
255+
}
256+
208257
#[tokio::test]
209258
async fn test_join_with_swap() {
210259
let (big, small) = create_big_and_small();
@@ -499,6 +548,56 @@ async fn test_nested_join_swap() {
499548
);
500549
}
501550

551+
#[tokio::test]
552+
async fn test_inner_hash_join_dp_reorders_chain() {
553+
let (big, medium, small) = create_reorderable_join_chain();
554+
555+
let child_join = HashJoinExec::try_new(
556+
Arc::clone(&big),
557+
Arc::clone(&medium),
558+
vec![(
559+
col("b_m_key", &big.schema()).unwrap(),
560+
col("m_b_key", &medium.schema()).unwrap(),
561+
)],
562+
None,
563+
&JoinType::Inner,
564+
None,
565+
PartitionMode::Auto,
566+
NullEquality::NullEqualsNothing,
567+
false,
568+
)
569+
.unwrap();
570+
let child_schema = child_join.schema();
571+
572+
let join = HashJoinExec::try_new(
573+
Arc::new(child_join),
574+
Arc::clone(&small),
575+
vec![(
576+
col("m_s_key", &child_schema).unwrap(),
577+
col("s_m_key", &small.schema()).unwrap(),
578+
)],
579+
None,
580+
&JoinType::Inner,
581+
None,
582+
PartitionMode::Auto,
583+
NullEquality::NullEqualsNothing,
584+
false,
585+
)
586+
.unwrap();
587+
588+
assert_optimized!(
589+
join,
590+
@r"
591+
ProjectionExec: expr=[b_m_key@3 as b_m_key, b_payload@4 as b_payload, m_b_key@1 as m_b_key, m_s_key@2 as m_s_key, s_m_key@0 as s_m_key]
592+
HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(m_b_key@1, b_m_key@0)]
593+
HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(s_m_key@0, m_s_key@1)]
594+
StatisticsExec: col_count=1, row_count=Inexact(100)
595+
StatisticsExec: col_count=2, row_count=Inexact(10000)
596+
StatisticsExec: col_count=2, row_count=Inexact(100000)
597+
"
598+
);
599+
}
600+
502601
#[tokio::test]
503602
async fn test_join_no_swap() {
504603
let (big, small) = create_big_and_small();

0 commit comments

Comments
 (0)