Skip to content

Commit a065f1c

Browse files
alambde-bgunter
authored andcommitted
Simplify logic for memory pressure partial emit from ordered group by (apache#20559)
## Which issue does this PR close? - related to apache#20445 - Follow on to apache#20446 ## Rationale for this change I found the formulation of the fix in apache#20446 hard to follow (see apache#20446 (review) for details). Basically the meaning of emit_to and 0 are inverted in this case. ## What changes are included in this PR? Pull the logic of what to emit into its own function with more comments that I think make it clearer what is going on ## Are these changes tested? Yes by existing tests ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 566611e commit a065f1c

2 files changed

Lines changed: 91 additions & 17 deletions

File tree

datafusion/physical-plan/src/aggregates/order/mod.rs

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ impl GroupOrdering {
5252
}
5353
}
5454

55-
// How many groups be emitted, or None if no data can be emitted
55+
/// Returns how many groups be emitted while respecting the current ordering
56+
/// guarantees, or `None` if no data can be emitted
5657
pub fn emit_to(&self) -> Option<EmitTo> {
5758
match self {
5859
GroupOrdering::None => None,
@@ -61,6 +62,28 @@ impl GroupOrdering {
6162
}
6263
}
6364

65+
/// Returns the emit strategy to use under memory pressure (OOM).
66+
///
67+
/// Returns the strategy that must be used when emitting up to `n` groups
68+
/// while respecting the current ordering guarantees.
69+
///
70+
/// Returns `None` if no data can be emitted.
71+
pub fn oom_emit_to(&self, n: usize) -> Option<EmitTo> {
72+
if n == 0 {
73+
return None;
74+
}
75+
76+
match self {
77+
GroupOrdering::None => Some(EmitTo::First(n)),
78+
GroupOrdering::Partial(_) | GroupOrdering::Full(_) => {
79+
self.emit_to().map(|emit_to| match emit_to {
80+
EmitTo::First(max) => EmitTo::First(n.min(max)),
81+
EmitTo::All => EmitTo::First(n),
82+
})
83+
}
84+
}
85+
}
86+
6487
/// Updates the state the input is done
6588
pub fn input_done(&mut self) {
6689
match self {
@@ -122,3 +145,63 @@ impl GroupOrdering {
122145
}
123146
}
124147
}
148+
149+
#[cfg(test)]
150+
mod tests {
151+
use super::*;
152+
153+
use std::sync::Arc;
154+
155+
use arrow::array::{ArrayRef, Int32Array};
156+
157+
#[test]
158+
fn test_oom_emit_to_none_ordering() {
159+
let group_ordering = GroupOrdering::None;
160+
161+
assert_eq!(group_ordering.oom_emit_to(0), None);
162+
assert_eq!(group_ordering.oom_emit_to(5), Some(EmitTo::First(5)));
163+
}
164+
165+
/// Creates a partially ordered grouping state with three groups.
166+
///
167+
/// `sort_key_values` controls whether a sort boundary exists in the batch:
168+
/// distinct values such as `[1, 2, 3]` create boundaries, while repeated
169+
/// values such as `[1, 1, 1]` do not.
170+
fn partial_ordering(sort_key_values: Vec<i32>) -> Result<GroupOrdering> {
171+
let mut group_ordering =
172+
GroupOrdering::Partial(GroupOrderingPartial::try_new(vec![0])?);
173+
174+
let batch_group_values: Vec<ArrayRef> = vec![
175+
Arc::new(Int32Array::from(sort_key_values)),
176+
Arc::new(Int32Array::from(vec![10, 20, 30])),
177+
];
178+
let group_indices = vec![0, 1, 2];
179+
180+
group_ordering.new_groups(&batch_group_values, &group_indices, 3)?;
181+
182+
Ok(group_ordering)
183+
}
184+
185+
#[test]
186+
fn test_oom_emit_to_partial_clamps_to_boundary() -> Result<()> {
187+
let group_ordering = partial_ordering(vec![1, 2, 3])?;
188+
189+
// Can emit both `1` and `2` groups because we have seen `3`
190+
assert_eq!(group_ordering.emit_to(), Some(EmitTo::First(2)));
191+
assert_eq!(group_ordering.oom_emit_to(1), Some(EmitTo::First(1)));
192+
assert_eq!(group_ordering.oom_emit_to(3), Some(EmitTo::First(2)));
193+
194+
Ok(())
195+
}
196+
197+
#[test]
198+
fn test_oom_emit_to_partial_without_boundary() -> Result<()> {
199+
let group_ordering = partial_ordering(vec![1, 1, 1])?;
200+
201+
// Can't emit the last `1` group as it may have more values
202+
assert_eq!(group_ordering.emit_to(), None);
203+
assert_eq!(group_ordering.oom_emit_to(3), None);
204+
205+
Ok(())
206+
}
207+
}

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,25 +1037,16 @@ impl GroupedHashAggregateStream {
10371037
self.group_values.len()
10381038
};
10391039

1040-
// Clamp to the sort boundary when using partial group ordering,
1041-
// otherwise remove_groups panics (#20445).
1042-
let n = match &self.group_ordering {
1043-
GroupOrdering::None => n,
1044-
_ => match self.group_ordering.emit_to() {
1045-
Some(EmitTo::First(max)) => n.min(max),
1046-
_ => 0,
1047-
},
1048-
};
1049-
1050-
if n > 0
1051-
&& let Some(batch) = self.emit(EmitTo::First(n), false)?
1040+
if let Some(emit_to) = self.group_ordering.oom_emit_to(n)
1041+
&& let Some(batch) = self.emit(emit_to, false)?
10521042
{
1053-
Ok(Some(ExecutionState::ProducingOutput(batch)))
1054-
} else {
1055-
Err(oom)
1043+
return Ok(Some(ExecutionState::ProducingOutput(batch)));
10561044
}
1045+
Err(oom)
10571046
}
1058-
_ => Err(oom),
1047+
OutOfMemoryMode::EmitEarly
1048+
| OutOfMemoryMode::Spill
1049+
| OutOfMemoryMode::ReportError => Err(oom),
10591050
}
10601051
}
10611052

0 commit comments

Comments
 (0)