Skip to content

Commit ab55e11

Browse files
alambjonathanc-n
authored andcommitted
[branch-52] fix: Return probe_side.len() for RightMark/Anti count(*) queries (apache#20710) (apache#20881)
- Part of apache#20855 - Closes apache#20669 on branch-52 This PR: - Backports apache#20710 from @jonathanc-n to the branch-52 line Co-authored-by: Jonathan Chen <chenleejonathan@gmail.com>
1 parent d41bfb2 commit ab55e11

4 files changed

Lines changed: 63 additions & 1 deletion

File tree

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,7 @@ impl HashJoinStream {
639639
filter,
640640
JoinSide::Left,
641641
None,
642+
self.join_type,
642643
)?
643644
} else {
644645
(left_indices, right_indices)
@@ -707,6 +708,7 @@ impl HashJoinStream {
707708
&right_indices,
708709
&self.column_indices,
709710
join_side,
711+
self.join_type,
710712
)?;
711713

712714
self.output_buffer.push_batch(batch)?;
@@ -770,6 +772,7 @@ impl HashJoinStream {
770772
&right_side,
771773
&self.column_indices,
772774
JoinSide::Left,
775+
self.join_type,
773776
)?;
774777
self.output_buffer.push_batch(batch)?;
775778
}

datafusion/physical-plan/src/joins/symmetric_hash_join.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,7 @@ pub(crate) fn build_side_determined_results(
930930
&probe_indices,
931931
column_indices,
932932
build_hash_joiner.build_side,
933+
join_type,
933934
)
934935
.map(|batch| (batch.num_rows() > 0).then_some(batch))
935936
} else {
@@ -993,6 +994,7 @@ pub(crate) fn join_with_probe_batch(
993994
filter,
994995
build_hash_joiner.build_side,
995996
None,
997+
join_type,
996998
)?
997999
} else {
9981000
(build_indices, probe_indices)
@@ -1031,6 +1033,7 @@ pub(crate) fn join_with_probe_batch(
10311033
&probe_indices,
10321034
column_indices,
10331035
build_hash_joiner.build_side,
1036+
join_type,
10341037
)
10351038
.map(|batch| (batch.num_rows() > 0).then_some(batch))
10361039
}

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -918,6 +918,7 @@ pub(crate) fn get_final_indices_from_bit_map(
918918
(left_indices, right_indices)
919919
}
920920

921+
#[expect(clippy::too_many_arguments)]
921922
pub(crate) fn apply_join_filter_to_indices(
922923
build_input_buffer: &RecordBatch,
923924
probe_batch: &RecordBatch,
@@ -926,6 +927,7 @@ pub(crate) fn apply_join_filter_to_indices(
926927
filter: &JoinFilter,
927928
build_side: JoinSide,
928929
max_intermediate_size: Option<usize>,
930+
join_type: JoinType,
929931
) -> Result<(UInt64Array, UInt32Array)> {
930932
if build_indices.is_empty() && probe_indices.is_empty() {
931933
return Ok((build_indices, probe_indices));
@@ -946,6 +948,7 @@ pub(crate) fn apply_join_filter_to_indices(
946948
&probe_indices.slice(i, len),
947949
filter.column_indices(),
948950
build_side,
951+
join_type,
949952
)?;
950953
let filter_result = filter
951954
.expression()
@@ -967,6 +970,7 @@ pub(crate) fn apply_join_filter_to_indices(
967970
&probe_indices,
968971
filter.column_indices(),
969972
build_side,
973+
join_type,
970974
)?;
971975

972976
filter
@@ -987,6 +991,7 @@ pub(crate) fn apply_join_filter_to_indices(
987991

988992
/// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`.
989993
/// The resulting batch has [Schema] `schema`.
994+
#[expect(clippy::too_many_arguments)]
990995
pub(crate) fn build_batch_from_indices(
991996
schema: &Schema,
992997
build_input_buffer: &RecordBatch,
@@ -995,11 +1000,19 @@ pub(crate) fn build_batch_from_indices(
9951000
probe_indices: &UInt32Array,
9961001
column_indices: &[ColumnIndex],
9971002
build_side: JoinSide,
1003+
join_type: JoinType,
9981004
) -> Result<RecordBatch> {
9991005
if schema.fields().is_empty() {
1006+
// For RightAnti and RightSemi joins, after `adjust_indices_by_join_type`
1007+
// the build_indices were untouched so only probe_indices hold the actual
1008+
// row count.
1009+
let row_count = match join_type {
1010+
JoinType::RightAnti | JoinType::RightSemi => probe_indices.len(),
1011+
_ => build_indices.len(),
1012+
};
10001013
let options = RecordBatchOptions::new()
10011014
.with_match_field_names(true)
1002-
.with_row_count(Some(build_indices.len()));
1015+
.with_row_count(Some(row_count));
10031016

10041017
return Ok(RecordBatch::try_new_with_options(
10051018
Arc::new(schema.clone()),

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5226,3 +5226,46 @@ DROP TABLE issue_20437_small;
52265226

52275227
statement count 0
52285228
DROP TABLE issue_20437_large;
5229+
5230+
# Test count(*) with right semi/anti joins returns correct row counts
5231+
# issue: https://github.com/apache/datafusion/issues/20669
5232+
5233+
statement ok
5234+
CREATE TABLE t1 (k INT, v INT);
5235+
5236+
statement ok
5237+
CREATE TABLE t2 (k INT, v INT);
5238+
5239+
statement ok
5240+
INSERT INTO t1 SELECT i AS k, i AS v FROM generate_series(1, 100) t(i);
5241+
5242+
statement ok
5243+
INSERT INTO t2 VALUES (1, 1);
5244+
5245+
query I
5246+
WITH t AS (
5247+
SELECT *
5248+
FROM t1
5249+
LEFT ANTI JOIN t2 ON t1.k = t2.k
5250+
)
5251+
SELECT count(*)
5252+
FROM t;
5253+
----
5254+
99
5255+
5256+
query I
5257+
WITH t AS (
5258+
SELECT *
5259+
FROM t1
5260+
LEFT SEMI JOIN t2 ON t1.k = t2.k
5261+
)
5262+
SELECT count(*)
5263+
FROM t;
5264+
----
5265+
1
5266+
5267+
statement count 0
5268+
DROP TABLE t1;
5269+
5270+
statement count 0
5271+
DROP TABLE t2;

0 commit comments

Comments
 (0)