Skip to content

Commit be1d376

Browse files
authored
feat: ResourceExhausted for memory limit in GroupedHashAggregateStream (#4371)
* feat: `ResourceExhausted` for memory limit in `GroupedHashAggregateStream` Closes #3940. * fix: `ScalarValue` size calculations * refactor: de-dup code
1 parent fe8aee6 commit be1d376

3 files changed

Lines changed: 303 additions & 130 deletions

File tree

datafusion/common/src/scalar.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2297,7 +2297,7 @@ impl ScalarValue {
22972297
/// Estimate size if bytes including `Self`. For values with internal containers such as `String`
22982298
/// includes the allocated size (`capacity`) rather than the current length (`len`)
22992299
pub fn size(&self) -> usize {
2300-
std::mem::size_of_val(&self)
2300+
std::mem::size_of_val(self)
23012301
+ match self {
23022302
ScalarValue::Null
23032303
| ScalarValue::Boolean(_)
@@ -2364,7 +2364,8 @@ impl ScalarValue {
23642364
///
23652365
/// Includes the size of the [`Vec`] container itself.
23662366
pub fn size_of_vec(vec: &Vec<Self>) -> usize {
2367-
(std::mem::size_of::<ScalarValue>() * vec.capacity())
2367+
std::mem::size_of_val(vec)
2368+
+ (std::mem::size_of::<ScalarValue>() * vec.capacity())
23682369
+ vec
23692370
.iter()
23702371
.map(|sv| sv.size() - std::mem::size_of_val(sv))
@@ -2375,7 +2376,8 @@ impl ScalarValue {
23752376
///
23762377
/// Includes the size of the [`HashSet`] container itself.
23772378
pub fn size_of_hashset<S>(set: &HashSet<Self, S>) -> usize {
2378-
(std::mem::size_of::<ScalarValue>() * set.capacity())
2379+
std::mem::size_of_val(set)
2380+
+ (std::mem::size_of::<ScalarValue>() * set.capacity())
23792381
+ set
23802382
.iter()
23812383
.map(|sv| sv.size() - std::mem::size_of_val(sv))
@@ -3281,6 +3283,36 @@ mod tests {
32813283
assert_eq!(std::mem::size_of::<ScalarValue>(), 48);
32823284
}
32833285

3286+
#[test]
3287+
fn memory_size() {
3288+
let sv = ScalarValue::Binary(Some(Vec::with_capacity(10)));
3289+
assert_eq!(sv.size(), std::mem::size_of::<ScalarValue>() + 10,);
3290+
let sv_size = sv.size();
3291+
3292+
let mut v = Vec::with_capacity(10);
3293+
// do NOT clone `sv` here because this may shrink the vector capacity
3294+
v.push(sv);
3295+
assert_eq!(v.capacity(), 10);
3296+
assert_eq!(
3297+
ScalarValue::size_of_vec(&v),
3298+
std::mem::size_of::<Vec<ScalarValue>>()
3299+
+ (9 * std::mem::size_of::<ScalarValue>())
3300+
+ sv_size,
3301+
);
3302+
3303+
let mut s = HashSet::with_capacity(0);
3304+
// do NOT clone `sv` here because this may shrink the vector capacity
3305+
s.insert(v.pop().unwrap());
3306+
// hashsets may easily grow during insert, so capacity is dynamic
3307+
let s_capacity = s.capacity();
3308+
assert_eq!(
3309+
ScalarValue::size_of_hashset(&s),
3310+
std::mem::size_of::<HashSet<ScalarValue>>()
3311+
+ ((s_capacity - 1) * std::mem::size_of::<ScalarValue>())
3312+
+ sv_size,
3313+
);
3314+
}
3315+
32843316
#[test]
32853317
fn scalar_eq_array() {
32863318
// Validate that eq_array has the same semantics as ScalarValue::eq

0 commit comments

Comments
 (0)