Skip to content

Commit c2be660

Browse files
PSeitzfulmicoton
andauthored
alternative mixed field aggregation collection (#2135)
* alternative mixed field aggregation collection instead of having multiple accessor in one AggregationWithAccessor split it into multiple independent AggregationWithAccessor * Update src/aggregation/agg_req_with_accessor.rs Co-authored-by: Paul Masurel <paul@quickwit.io> --------- Co-authored-by: Paul Masurel <paul@quickwit.io>
1 parent c805f08 commit c2be660

File tree

4 files changed

+55
-166
lines changed

4 files changed

+55
-166
lines changed

src/aggregation/agg_req_with_accessor.rs

Lines changed: 48 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,6 @@ pub struct AggregationWithAccessor {
3737
pub(crate) accessor: Column<u64>,
3838
pub(crate) str_dict_column: Option<StrColumn>,
3939
pub(crate) field_type: ColumnType,
40-
/// In case there are multiple types of fast fields, e.g. string and numeric.
41-
/// Only used for term aggregations currently.
42-
pub(crate) accessor2: Option<(Column<u64>, ColumnType)>,
4340
pub(crate) sub_aggregation: AggregationsWithAccessor,
4441
pub(crate) limits: ResourceLimitGuard,
4542
pub(crate) column_block_accessor: ColumnBlockAccessor<u64>,
@@ -52,20 +49,31 @@ impl AggregationWithAccessor {
5249
sub_aggregation: &Aggregations,
5350
reader: &SegmentReader,
5451
limits: AggregationLimits,
55-
) -> crate::Result<AggregationWithAccessor> {
52+
) -> crate::Result<Vec<AggregationWithAccessor>> {
5653
let mut str_dict_column = None;
57-
let mut accessor2 = None;
5854
use AggregationVariants::*;
59-
let (accessor, field_type) = match &agg.agg {
55+
let acc_field_types: Vec<(Column, ColumnType)> = match &agg.agg {
6056
Range(RangeAggregation {
6157
field: field_name, ..
62-
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
58+
}) => vec![get_ff_reader(
59+
reader,
60+
field_name,
61+
Some(get_numeric_or_date_column_types()),
62+
)?],
6363
Histogram(HistogramAggregation {
6464
field: field_name, ..
65-
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
65+
}) => vec![get_ff_reader(
66+
reader,
67+
field_name,
68+
Some(get_numeric_or_date_column_types()),
69+
)?],
6670
DateHistogram(DateHistogramAggregationReq {
6771
field: field_name, ..
68-
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
72+
}) => vec![get_ff_reader(
73+
reader,
74+
field_name,
75+
Some(get_numeric_or_date_column_types()),
76+
)?],
6977
Terms(TermsAggregation {
7078
field: field_name, ..
7179
}) => {
@@ -80,11 +88,7 @@ impl AggregationWithAccessor {
8088
// ColumnType::IpAddr Unsupported
8189
// ColumnType::DateTime Unsupported
8290
];
83-
let mut columns =
84-
get_all_ff_reader_or_empty(reader, field_name, Some(&allowed_column_types))?;
85-
let first = columns.pop().unwrap();
86-
accessor2 = columns.pop();
87-
first
91+
get_all_ff_reader_or_empty(reader, field_name, Some(&allowed_column_types))?
8892
}
8993
Average(AverageAggregation { field: field_name })
9094
| Count(CountAggregation { field: field_name })
@@ -95,33 +99,37 @@ impl AggregationWithAccessor {
9599
let (accessor, field_type) =
96100
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
97101

98-
(accessor, field_type)
102+
vec![(accessor, field_type)]
99103
}
100104
Percentiles(percentiles) => {
101105
let (accessor, field_type) = get_ff_reader(
102106
reader,
103107
percentiles.field_name(),
104108
Some(get_numeric_or_date_column_types()),
105109
)?;
106-
(accessor, field_type)
110+
vec![(accessor, field_type)]
107111
}
108112
};
109113

110-
let sub_aggregation = sub_aggregation.clone();
111-
Ok(AggregationWithAccessor {
112-
accessor,
113-
accessor2,
114-
field_type,
115-
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
116-
&sub_aggregation,
117-
reader,
118-
&limits,
119-
)?,
120-
agg: agg.clone(),
121-
str_dict_column,
122-
limits: limits.new_guard(),
123-
column_block_accessor: Default::default(),
124-
})
114+
let aggs: Vec<AggregationWithAccessor> = acc_field_types
115+
.into_iter()
116+
.map(|(accessor, field_type)| {
117+
Ok(AggregationWithAccessor {
118+
accessor,
119+
field_type,
120+
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
121+
sub_aggregation,
122+
reader,
123+
&limits,
124+
)?,
125+
agg: agg.clone(),
126+
str_dict_column: str_dict_column.clone(),
127+
limits: limits.new_guard(),
128+
column_block_accessor: Default::default(),
129+
})
130+
})
131+
.collect::<crate::Result<_>>()?;
132+
Ok(aggs)
125133
}
126134
}
127135

@@ -141,15 +149,15 @@ pub(crate) fn get_aggs_with_segment_accessor_and_validate(
141149
) -> crate::Result<AggregationsWithAccessor> {
142150
let mut aggss = Vec::new();
143151
for (key, agg) in aggs.iter() {
144-
aggss.push((
145-
key.to_string(),
146-
AggregationWithAccessor::try_from_agg(
147-
agg,
148-
agg.sub_aggregation(),
149-
reader,
150-
limits.clone(),
151-
)?,
152-
));
152+
let aggs = AggregationWithAccessor::try_from_agg(
153+
agg,
154+
agg.sub_aggregation(),
155+
reader,
156+
limits.clone(),
157+
)?;
158+
for agg in aggs {
159+
aggss.push((key.to_string(), agg));
160+
}
153161
}
154162
Ok(AggregationsWithAccessor::from_data(
155163
VecWithNames::from_entries(aggss),

src/aggregation/bucket/range.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ mod tests {
465465
SegmentRangeCollector::from_req_and_validate(
466466
&req,
467467
&mut Default::default(),
468-
&mut AggregationLimits::default().new_guard(),
468+
&AggregationLimits::default().new_guard(),
469469
field_type,
470470
0,
471471
)

src/aggregation/bucket/term_agg.rs

Lines changed: 0 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -224,110 +224,6 @@ impl TermBuckets {
224224
}
225225
}
226226

227-
/// The composite collector is used, when we have different types under one field, to support a term
228-
/// aggregation on both.
229-
#[derive(Clone, Debug)]
230-
pub struct SegmentTermCollectorComposite {
231-
term_agg1: SegmentTermCollector, // field type 1, e.g. strings
232-
term_agg2: SegmentTermCollector, // field type 2, e.g. u64
233-
accessor_idx: usize,
234-
}
235-
impl SegmentAggregationCollector for SegmentTermCollectorComposite {
236-
fn add_intermediate_aggregation_result(
237-
self: Box<Self>,
238-
agg_with_accessor: &AggregationsWithAccessor,
239-
results: &mut IntermediateAggregationResults,
240-
) -> crate::Result<()> {
241-
let name = agg_with_accessor.aggs.keys[self.accessor_idx].to_string();
242-
let agg_with_accessor = &agg_with_accessor.aggs.values[self.accessor_idx];
243-
244-
let bucket = self
245-
.term_agg1
246-
.into_intermediate_bucket_result(agg_with_accessor)?;
247-
results.push(
248-
name.to_string(),
249-
IntermediateAggregationResult::Bucket(bucket),
250-
)?;
251-
let bucket = self
252-
.term_agg2
253-
.into_intermediate_bucket_result(agg_with_accessor)?;
254-
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;
255-
256-
Ok(())
257-
}
258-
259-
#[inline]
260-
fn collect(
261-
&mut self,
262-
doc: crate::DocId,
263-
agg_with_accessor: &mut AggregationsWithAccessor,
264-
) -> crate::Result<()> {
265-
self.term_agg1.collect_block(&[doc], agg_with_accessor)?;
266-
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
267-
self.term_agg2.collect_block(&[doc], agg_with_accessor)?;
268-
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
269-
Ok(())
270-
}
271-
272-
#[inline]
273-
fn collect_block(
274-
&mut self,
275-
docs: &[crate::DocId],
276-
agg_with_accessor: &mut AggregationsWithAccessor,
277-
) -> crate::Result<()> {
278-
self.term_agg1.collect_block(docs, agg_with_accessor)?;
279-
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
280-
self.term_agg2.collect_block(docs, agg_with_accessor)?;
281-
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
282-
283-
Ok(())
284-
}
285-
286-
fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
287-
self.term_agg1.flush(agg_with_accessor)?;
288-
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
289-
self.term_agg2.flush(agg_with_accessor)?;
290-
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
291-
292-
Ok(())
293-
}
294-
}
295-
296-
impl SegmentTermCollectorComposite {
297-
/// Swaps the accessor and field type with the second accessor and field type.
298-
/// This way we can use the same code for both aggregations.
299-
fn swap_accessor(&self, aggregations: &mut AggregationWithAccessor) {
300-
if let Some(accessor) = aggregations.accessor2.as_mut() {
301-
std::mem::swap(&mut accessor.0, &mut aggregations.accessor);
302-
std::mem::swap(&mut accessor.1, &mut aggregations.field_type);
303-
}
304-
}
305-
306-
pub(crate) fn from_req_and_validate(
307-
req: &TermsAggregation,
308-
sub_aggregations: &mut AggregationsWithAccessor,
309-
field_type: ColumnType,
310-
field_type2: ColumnType,
311-
accessor_idx: usize,
312-
) -> crate::Result<Self> {
313-
Ok(Self {
314-
term_agg1: SegmentTermCollector::from_req_and_validate(
315-
req,
316-
sub_aggregations,
317-
field_type,
318-
accessor_idx,
319-
)?,
320-
term_agg2: SegmentTermCollector::from_req_and_validate(
321-
req,
322-
sub_aggregations,
323-
field_type2,
324-
accessor_idx,
325-
)?,
326-
accessor_idx,
327-
})
328-
}
329-
}
330-
331227
/// The collector puts values from the fast field into the correct buckets and does a conversion to
332228
/// the correct datatype.
333229
#[derive(Clone, Debug)]

src/aggregation/segment_agg_result.rs

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use super::metric::{
1515
SegmentPercentilesCollector, SegmentStatsCollector, SegmentStatsType, StatsAggregation,
1616
SumAggregation,
1717
};
18-
use crate::aggregation::bucket::SegmentTermCollectorComposite;
1918

2019
pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug {
2120
fn add_intermediate_aggregation_result(
@@ -81,26 +80,12 @@ pub(crate) fn build_single_agg_segment_collector(
8180
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
8281
use AggregationVariants::*;
8382
match &req.agg.agg {
84-
Terms(terms_req) => {
85-
if let Some(acc2) = req.accessor2.as_ref() {
86-
Ok(Box::new(
87-
SegmentTermCollectorComposite::from_req_and_validate(
88-
terms_req,
89-
&mut req.sub_aggregation,
90-
req.field_type,
91-
acc2.1,
92-
accessor_idx,
93-
)?,
94-
))
95-
} else {
96-
Ok(Box::new(SegmentTermCollector::from_req_and_validate(
97-
terms_req,
98-
&mut req.sub_aggregation,
99-
req.field_type,
100-
accessor_idx,
101-
)?))
102-
}
103-
}
83+
Terms(terms_req) => Ok(Box::new(SegmentTermCollector::from_req_and_validate(
84+
terms_req,
85+
&mut req.sub_aggregation,
86+
req.field_type,
87+
accessor_idx,
88+
)?)),
10489
Range(range_req) => Ok(Box::new(SegmentRangeCollector::from_req_and_validate(
10590
range_req,
10691
&mut req.sub_aggregation,

0 commit comments

Comments
 (0)