Skip to content

Commit 04beab3

Browse files
authored
Performance improvement for nested cardinality aggregation
When a string cardinality aggregation is nested it end up being applied to different buckets. Dictionary encoding relies on a different dictionaries for each segment. As a result, during segment collection, we only collect term ordinals in a HashSet, and decode them in the term dictionary at the end of collection. Before this PR, this decoding phase was done once for each bucket, causing the same work to be done over and over. This PR introduce a coupon cache. The HLL sketch relies on a hash of the string values. We populate the cache before bucket collection, and get our values from it. This PR also rename "caching" "buffering" in aggregation (it was never caching), and does several cleanups.
1 parent 3cd9011 commit 04beab3

File tree

15 files changed

+583
-222
lines changed

15 files changed

+583
-222
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ regex = { version = "1.5.5", default-features = false, features = [
2424
"std",
2525
"unicode",
2626
] }
27+
murmurhash32 = "0.3"
2728
aho-corasick = "1.0"
2829
tantivy-fst = "0.5"
2930
memmap2 = { version = "0.9.0", optional = true }
@@ -65,7 +66,7 @@ tantivy-bitpacker = { version = "0.10", path = "./bitpacker" }
6566
common = { version = "0.11", path = "./common/", package = "tantivy-common" }
6667
tokenizer-api = { version = "0.7", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
6768
sketches-ddsketch = { version = "0.4", features = ["use_serde"] }
68-
datasketches = "0.2.0"
69+
datasketches = { git = "https://github.com/fulmicoton-dd/datasketches-rust", rev = "eb4ad64" }
6970
futures-util = { version = "0.3.28", optional = true }
7071
futures-channel = { version = "0.3.28", optional = true }
7172
fnv = "1.0.7"

benches/agg_bench.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ fn bench_agg(mut group: InputGroup<Index>) {
7878

7979
register!(group, cardinality_agg);
8080
register!(group, terms_status_with_cardinality_agg);
81+
register!(group, terms_100_buckets_with_cardinality_agg);
8182

8283
register!(group, range_agg);
8384
register!(group, range_agg_with_avg_sub_agg);
@@ -169,6 +170,22 @@ fn terms_status_with_cardinality_agg(index: &Index) {
169170
let agg_req = json!({
170171
"my_texts": {
171172
"terms": { "field": "text_few_terms_status" },
173+
"aggs": {
174+
"cardinality": {
175+
"cardinality": {
176+
"field": "text_few_terms_status"
177+
},
178+
}
179+
}
180+
},
181+
});
182+
execute_agg(index, agg_req);
183+
}
184+
185+
fn terms_100_buckets_with_cardinality_agg(index: &Index) {
186+
let agg_req = json!({
187+
"my_texts": {
188+
"terms": { "field": "text_1000_terms_zipf", "size": 100 },
172189
"aggs": {
173190
"cardinality": {
174191
"cardinality": {

columnar/src/block_accessor.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
3333
&mut self,
3434
docs: &[u32],
3535
accessor: &Column<T>,
36-
missing: Option<T>,
36+
missing_opt: Option<T>,
3737
) {
3838
self.fetch_block(docs, accessor);
3939
// no missing values
4040
if accessor.index.get_cardinality().is_full() {
4141
return;
4242
}
43-
let Some(missing) = missing else {
43+
let Some(missing) = missing_opt else {
4444
return;
4545
};
4646

@@ -191,6 +191,7 @@ where F: FnMut(u32) {
191191
}
192192

193193
#[cfg(test)]
194+
#[allow(clippy::field_reassign_with_default)]
194195
mod tests {
195196
use super::*;
196197

src/aggregation/bucket/composite/collector.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::aggregation::bucket::composite::map::{DynArrayHeapMap, MAX_DYN_ARRAY_
2121
use crate::aggregation::bucket::{
2222
CalendarInterval, CompositeAggregationSource, MissingOrder, Order,
2323
};
24-
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardSubAggCache};
24+
use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardSubAggBuffer};
2525
use crate::aggregation::intermediate_agg_result::{
2626
CompositeIntermediateKey, IntermediateAggregationResult, IntermediateAggregationResults,
2727
IntermediateBucketResult, IntermediateCompositeBucketEntry, IntermediateCompositeBucketResult,
@@ -119,7 +119,7 @@ pub struct SegmentCompositeCollector {
119119
/// One DynArrayHeapMap per parent bucket.
120120
parent_buckets: Vec<DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>>,
121121
accessor_idx: usize,
122-
sub_agg: Option<CachedSubAggs<HighCardSubAggCache>>,
122+
sub_agg: Option<BufferedSubAggs<HighCardSubAggBuffer>>,
123123
bucket_id_provider: BucketIdProvider,
124124
/// Number of sources, needed when creating new DynArrayHeapMaps.
125125
num_sources: usize,
@@ -218,7 +218,7 @@ impl SegmentCompositeCollector {
218218
let has_sub_aggregations = !node.children.is_empty();
219219
let sub_agg = if has_sub_aggregations {
220220
let sub_agg_collector = build_segment_agg_collectors(req_data, &node.children)?;
221-
Some(CachedSubAggs::new(sub_agg_collector))
221+
Some(BufferedSubAggs::new(sub_agg_collector))
222222
} else {
223223
None
224224
};
@@ -332,7 +332,7 @@ fn collect_bucket_with_limit(
332332
limit_num_buckets: usize,
333333
buckets: &mut DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
334334
key: &[InternalValueRepr],
335-
sub_agg: &mut Option<CachedSubAggs<HighCardSubAggCache>>,
335+
sub_agg: &mut Option<BufferedSubAggs<HighCardSubAggBuffer>>,
336336
bucket_id_provider: &mut BucketIdProvider,
337337
) {
338338
let mut record_in_bucket = |bucket: &mut CompositeBucketCollector| {
@@ -488,7 +488,7 @@ struct CompositeKeyVisitor<'a> {
488488
doc_id: crate::DocId,
489489
composite_agg_data: &'a CompositeAggReqData,
490490
buckets: &'a mut DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
491-
sub_agg: &'a mut Option<CachedSubAggs<HighCardSubAggCache>>,
491+
sub_agg: &'a mut Option<BufferedSubAggs<HighCardSubAggBuffer>>,
492492
bucket_id_provider: &'a mut BucketIdProvider,
493493
sub_level_values: SmallVec<[InternalValueRepr; MAX_DYN_ARRAY_SIZE]>,
494494
}

src/aggregation/bucket/composite/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -511,14 +511,14 @@ mod tests {
511511

512512
fn datetime_from_iso_str(date_str: &str) -> common::DateTime {
513513
let dt = OffsetDateTime::parse(date_str, &Rfc3339)
514-
.expect(&format!("Failed to parse date: {}", date_str));
514+
.unwrap_or_else(|_| panic!("Failed to parse date: {}", date_str));
515515
let timestamp_secs = dt.unix_timestamp_nanos();
516516
common::DateTime::from_timestamp_nanos(timestamp_secs as i64)
517517
}
518518

519519
fn ms_timestamp_from_iso_str(date_str: &str) -> i64 {
520520
let dt = OffsetDateTime::parse(date_str, &Rfc3339)
521-
.expect(&format!("Failed to parse date: {}", date_str));
521+
.unwrap_or_else(|_| panic!("Failed to parse date: {}", date_str));
522522
(dt.unix_timestamp_nanos() / 1_000_000) as i64
523523
}
524524

@@ -548,7 +548,7 @@ mod tests {
548548
agg_req_json["my_composite"]["composite"]["after"] = after_key.take().unwrap();
549549
}
550550
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
551-
let res = exec_request(agg_req.clone(), &index).unwrap();
551+
let res = exec_request(agg_req.clone(), index).unwrap();
552552
let expected_page_buckets = &expected_buckets_vec[page_idx * page_size
553553
..std::cmp::min((page_idx + 1) * page_size, expected_buckets_vec.len())];
554554
assert_eq!(
@@ -578,7 +578,7 @@ mod tests {
578578
}
579579
});
580580
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
581-
let res = exec_request(agg_req.clone(), &index).unwrap();
581+
let res = exec_request(agg_req.clone(), index).unwrap();
582582
assert_eq!(
583583
res["my_composite"]["buckets"],
584584
json!([]),

src/aggregation/bucket/filter.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer};
66
use crate::aggregation::agg_data::{
77
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
88
};
9-
use crate::aggregation::cached_sub_aggs::{
10-
CachedSubAggs, HighCardSubAggCache, LowCardSubAggCache, SubAggCache,
9+
use crate::aggregation::buffered_sub_aggs::{
10+
BufferedSubAggs, HighCardSubAggBuffer, LowCardSubAggBuffer, SubAggBuffer,
1111
};
1212
use crate::aggregation::intermediate_agg_result::{
1313
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
@@ -503,17 +503,17 @@ struct DocCount {
503503
}
504504

505505
/// Segment collector for filter aggregation
506-
pub struct SegmentFilterCollector<C: SubAggCache> {
506+
pub struct SegmentFilterCollector<B: SubAggBuffer> {
507507
/// Document counts per parent bucket
508508
parent_buckets: Vec<DocCount>,
509509
/// Sub-aggregation collectors
510-
sub_aggregations: Option<CachedSubAggs<C>>,
510+
sub_aggregations: Option<BufferedSubAggs<B>>,
511511
bucket_id_provider: BucketIdProvider,
512512
/// Accessor index for this filter aggregation (to access FilterAggReqData)
513513
accessor_idx: usize,
514514
}
515515

516-
impl<C: SubAggCache> SegmentFilterCollector<C> {
516+
impl<B: SubAggBuffer> SegmentFilterCollector<B> {
517517
/// Create a new filter segment collector following the new agg_data pattern
518518
pub(crate) fn from_req_and_validate(
519519
req: &mut AggregationsSegmentCtx,
@@ -525,7 +525,7 @@ impl<C: SubAggCache> SegmentFilterCollector<C> {
525525
} else {
526526
None
527527
};
528-
let sub_agg_collector = sub_agg_collector.map(CachedSubAggs::new);
528+
let sub_agg_collector = sub_agg_collector.map(BufferedSubAggs::new);
529529

530530
Ok(SegmentFilterCollector {
531531
parent_buckets: Vec::new(),
@@ -547,16 +547,16 @@ pub(crate) fn build_segment_filter_collector(
547547

548548
if is_top_level {
549549
Ok(Box::new(
550-
SegmentFilterCollector::<LowCardSubAggCache>::from_req_and_validate(req, node)?,
550+
SegmentFilterCollector::<LowCardSubAggBuffer>::from_req_and_validate(req, node)?,
551551
))
552552
} else {
553553
Ok(Box::new(
554-
SegmentFilterCollector::<HighCardSubAggCache>::from_req_and_validate(req, node)?,
554+
SegmentFilterCollector::<HighCardSubAggBuffer>::from_req_and_validate(req, node)?,
555555
))
556556
}
557557
}
558558

559-
impl<C: SubAggCache> Debug for SegmentFilterCollector<C> {
559+
impl<B: SubAggBuffer> Debug for SegmentFilterCollector<B> {
560560
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
561561
f.debug_struct("SegmentFilterCollector")
562562
.field("buckets", &self.parent_buckets)
@@ -566,7 +566,7 @@ impl<C: SubAggCache> Debug for SegmentFilterCollector<C> {
566566
}
567567
}
568568

569-
impl<C: SubAggCache> SegmentAggregationCollector for SegmentFilterCollector<C> {
569+
impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentFilterCollector<B> {
570570
fn add_intermediate_aggregation_result(
571571
&mut self,
572572
agg_data: &AggregationsSegmentCtx,

src/aggregation/bucket/histogram/histogram.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::aggregation::agg_data::{
1010
};
1111
use crate::aggregation::agg_req::Aggregations;
1212
use crate::aggregation::agg_result::BucketEntry;
13-
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardCachedSubAggs};
13+
use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardBufferedSubAggs};
1414
use crate::aggregation::intermediate_agg_result::{
1515
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
1616
IntermediateHistogramBucketEntry,
@@ -258,7 +258,7 @@ pub(crate) struct SegmentHistogramBucketEntry {
258258
impl SegmentHistogramBucketEntry {
259259
pub(crate) fn into_intermediate_bucket_entry(
260260
self,
261-
sub_aggregation: &mut Option<HighCardCachedSubAggs>,
261+
sub_aggregation: &mut Option<HighCardBufferedSubAggs>,
262262
agg_data: &AggregationsSegmentCtx,
263263
) -> crate::Result<IntermediateHistogramBucketEntry> {
264264
let mut sub_aggregation_res = IntermediateAggregationResults::default();
@@ -291,7 +291,7 @@ pub struct SegmentHistogramCollector {
291291
/// The buckets containing the aggregation data.
292292
/// One Histogram bucket per parent bucket id.
293293
parent_buckets: Vec<HistogramBuckets>,
294-
sub_agg: Option<HighCardCachedSubAggs>,
294+
sub_agg: Option<HighCardBufferedSubAggs>,
295295
accessor_idx: usize,
296296
bucket_id_provider: BucketIdProvider,
297297
}
@@ -444,7 +444,7 @@ impl SegmentHistogramCollector {
444444
max: f64::MAX,
445445
});
446446
req_data.offset = req_data.req.offset.unwrap_or(0.0);
447-
let sub_agg = sub_agg.map(CachedSubAggs::new);
447+
let sub_agg = sub_agg.map(BufferedSubAggs::new);
448448

449449
Ok(Self {
450450
parent_buckets: Default::default(),

src/aggregation/bucket/range.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ use crate::aggregation::agg_data::{
99
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
1010
};
1111
use crate::aggregation::agg_limits::AggregationLimitsGuard;
12-
use crate::aggregation::cached_sub_aggs::{
13-
CachedSubAggs, HighCardSubAggCache, LowCardCachedSubAggs, LowCardSubAggCache, SubAggCache,
12+
use crate::aggregation::buffered_sub_aggs::{
13+
BufferedSubAggs, HighCardSubAggBuffer, LowCardBufferedSubAggs, LowCardSubAggBuffer,
14+
SubAggBuffer,
1415
};
1516
use crate::aggregation::intermediate_agg_result::{
1617
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
@@ -155,13 +156,13 @@ pub(crate) struct SegmentRangeAndBucketEntry {
155156

156157
/// The collector puts values from the fast field into the correct buckets and does a conversion to
157158
/// the correct datatype.
158-
pub struct SegmentRangeCollector<C: SubAggCache> {
159+
pub struct SegmentRangeCollector<B: SubAggBuffer> {
159160
/// The buckets containing the aggregation data.
160161
/// One for each ParentBucketId
161162
parent_buckets: Vec<Vec<SegmentRangeAndBucketEntry>>,
162163
column_type: ColumnType,
163164
pub(crate) accessor_idx: usize,
164-
sub_agg: Option<CachedSubAggs<C>>,
165+
sub_agg: Option<BufferedSubAggs<B>>,
165166
/// Here things get a bit weird. We need to assign unique bucket ids across all
166167
/// parent buckets. So we keep track of the next available bucket id here.
167168
/// This allows a kind of flattening of the bucket ids across all parent buckets.
@@ -178,7 +179,7 @@ pub struct SegmentRangeCollector<C: SubAggCache> {
178179
limits: AggregationLimitsGuard,
179180
}
180181

181-
impl<C: SubAggCache> Debug for SegmentRangeCollector<C> {
182+
impl<B: SubAggBuffer> Debug for SegmentRangeCollector<B> {
182183
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183184
f.debug_struct("SegmentRangeCollector")
184185
.field("parent_buckets_len", &self.parent_buckets.len())
@@ -229,7 +230,7 @@ impl SegmentRangeBucketEntry {
229230
}
230231
}
231232

232-
impl<C: SubAggCache> SegmentAggregationCollector for SegmentRangeCollector<C> {
233+
impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentRangeCollector<B> {
233234
fn add_intermediate_aggregation_result(
234235
&mut self,
235236
agg_data: &AggregationsSegmentCtx,
@@ -350,17 +351,17 @@ pub(crate) fn build_segment_range_collector(
350351
};
351352

352353
if is_low_card {
353-
Ok(Box::new(SegmentRangeCollector::<LowCardSubAggCache> {
354-
sub_agg: sub_agg.map(LowCardCachedSubAggs::new),
354+
Ok(Box::new(SegmentRangeCollector::<LowCardSubAggBuffer> {
355+
sub_agg: sub_agg.map(LowCardBufferedSubAggs::new),
355356
column_type: field_type,
356357
accessor_idx,
357358
parent_buckets: Vec::new(),
358359
bucket_id_provider: BucketIdProvider::default(),
359360
limits: agg_data.context.limits.clone(),
360361
}))
361362
} else {
362-
Ok(Box::new(SegmentRangeCollector::<HighCardSubAggCache> {
363-
sub_agg: sub_agg.map(CachedSubAggs::new),
363+
Ok(Box::new(SegmentRangeCollector::<HighCardSubAggBuffer> {
364+
sub_agg: sub_agg.map(BufferedSubAggs::new),
364365
column_type: field_type,
365366
accessor_idx,
366367
parent_buckets: Vec::new(),
@@ -370,7 +371,7 @@ pub(crate) fn build_segment_range_collector(
370371
}
371372
}
372373

373-
impl<C: SubAggCache> SegmentRangeCollector<C> {
374+
impl<B: SubAggBuffer> SegmentRangeCollector<B> {
374375
pub(crate) fn create_new_buckets(
375376
&mut self,
376377
agg_data: &AggregationsSegmentCtx,
@@ -554,7 +555,7 @@ mod tests {
554555
pub fn get_collector_from_ranges(
555556
ranges: Vec<RangeAggregationRange>,
556557
field_type: ColumnType,
557-
) -> SegmentRangeCollector<HighCardSubAggCache> {
558+
) -> SegmentRangeCollector<HighCardSubAggBuffer> {
558559
let req = RangeAggregation {
559560
field: "dummy".to_string(),
560561
ranges,

0 commit comments

Comments
 (0)