Skip to content

Commit 3859cc8

Browse files
nuri-yoonryoo
andauthored
fix: deduplicate doc counts in term aggregation for multi-valued fields (#2854)
* fix: deduplicate doc counts in term aggregation for multi-valued fields Term aggregation was counting term occurrences instead of documents for multi-valued fields. A document with the same value appearing multiple times would inflate doc_count. Add `fetch_block_with_missing_unique_per_doc` to ColumnBlockAccessor that deduplicates (doc_id, value) pairs, and use it in term aggregation. Fixes #2721 * refactor: only deduplicate for multivalue cardinality Duplicates can only occur with multivalue columns, so narrow the check from !is_full() to is_multivalue(). * fix: handle non-consecutive duplicate values in dedup Sort values within each doc_id group before deduplicating, so that non-adjacent duplicates are correctly handled. Add unit tests for dedup_docid_val_pairs: consecutive duplicates, non-consecutive duplicates, multi-doc groups, no duplicates, and single element. * perf: skip dedup when block has no multivalue entries Add early return when no consecutive doc_ids are equal, avoiding unnecessary sort and dedup passes. Remove the 2-element swap optimization as it is not needed by the dedup algorithm. --------- Co-authored-by: nryoo <nryoo@nryooui-MacBookPro.local>
1 parent 545169c commit 3859cc8

File tree

2 files changed

+134
-8
lines changed

2 files changed

+134
-8
lines changed

columnar/src/block_accessor.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,78 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
5858
}
5959
}
6060

61+
/// Like `fetch_block_with_missing`, but deduplicates (doc_id, value) pairs
62+
/// so that each unique value per document is returned only once.
63+
///
64+
/// This is necessary for correct document counting in aggregations,
65+
/// where multi-valued fields can produce duplicate entries that inflate counts.
66+
#[inline]
67+
pub fn fetch_block_with_missing_unique_per_doc(
68+
&mut self,
69+
docs: &[u32],
70+
accessor: &Column<T>,
71+
missing: Option<T>,
72+
) where
73+
T: Ord,
74+
{
75+
self.fetch_block_with_missing(docs, accessor, missing);
76+
if accessor.index.get_cardinality().is_multivalue() {
77+
self.dedup_docid_val_pairs();
78+
}
79+
}
80+
81+
/// Removes duplicate (doc_id, value) pairs from the caches.
82+
///
83+
/// After `fetch_block`, entries are sorted by doc_id, but values within
84+
/// the same doc may not be sorted (e.g. `(0,1), (0,2), (0,1)`).
85+
/// We group consecutive entries by doc_id, sort values within each group
86+
/// if it has more than 2 elements, then deduplicate adjacent pairs.
87+
///
88+
/// Skips entirely if no doc_id appears more than once in the block.
89+
fn dedup_docid_val_pairs(&mut self)
90+
where T: Ord {
91+
if self.docid_cache.len() <= 1 {
92+
return;
93+
}
94+
95+
// Quick check: if no consecutive doc_ids are equal, no dedup needed.
96+
let has_multivalue = self.docid_cache.windows(2).any(|w| w[0] == w[1]);
97+
if !has_multivalue {
98+
return;
99+
}
100+
101+
// Sort values within each doc_id group so duplicates become adjacent.
102+
let mut start = 0;
103+
while start < self.docid_cache.len() {
104+
let doc = self.docid_cache[start];
105+
let mut end = start + 1;
106+
while end < self.docid_cache.len() && self.docid_cache[end] == doc {
107+
end += 1;
108+
}
109+
if end - start > 2 {
110+
self.val_cache[start..end].sort();
111+
}
112+
start = end;
113+
}
114+
115+
// Now duplicates are adjacent — deduplicate in place.
116+
let mut write = 0;
117+
for read in 1..self.docid_cache.len() {
118+
if self.docid_cache[read] != self.docid_cache[write]
119+
|| self.val_cache[read] != self.val_cache[write]
120+
{
121+
write += 1;
122+
if write != read {
123+
self.docid_cache[write] = self.docid_cache[read];
124+
self.val_cache[write] = self.val_cache[read];
125+
}
126+
}
127+
}
128+
let new_len = write + 1;
129+
self.docid_cache.truncate(new_len);
130+
self.val_cache.truncate(new_len);
131+
}
132+
61133
#[inline]
62134
pub fn iter_vals(&self) -> impl Iterator<Item = T> + '_ {
63135
self.val_cache.iter().cloned()
@@ -163,4 +235,56 @@ mod tests {
163235

164236
assert_eq!(missing_docs, vec![1, 2, 3, 4, 5]);
165237
}
238+
239+
#[test]
240+
fn test_dedup_docid_val_pairs_consecutive() {
241+
let mut accessor = ColumnBlockAccessor::<u64>::default();
242+
accessor.docid_cache = vec![0, 0, 2, 3];
243+
accessor.val_cache = vec![10, 10, 10, 10];
244+
accessor.dedup_docid_val_pairs();
245+
assert_eq!(accessor.docid_cache, vec![0, 2, 3]);
246+
assert_eq!(accessor.val_cache, vec![10, 10, 10]);
247+
}
248+
249+
#[test]
250+
fn test_dedup_docid_val_pairs_non_consecutive() {
251+
// (0,1), (0,2), (0,1) — duplicate value not adjacent
252+
let mut accessor = ColumnBlockAccessor::<u64>::default();
253+
accessor.docid_cache = vec![0, 0, 0];
254+
accessor.val_cache = vec![1, 2, 1];
255+
accessor.dedup_docid_val_pairs();
256+
assert_eq!(accessor.docid_cache, vec![0, 0]);
257+
assert_eq!(accessor.val_cache, vec![1, 2]);
258+
}
259+
260+
#[test]
261+
fn test_dedup_docid_val_pairs_multi_doc() {
262+
// doc 0: values [3, 1, 3], doc 1: values [5, 5]
263+
let mut accessor = ColumnBlockAccessor::<u64>::default();
264+
accessor.docid_cache = vec![0, 0, 0, 1, 1];
265+
accessor.val_cache = vec![3, 1, 3, 5, 5];
266+
accessor.dedup_docid_val_pairs();
267+
assert_eq!(accessor.docid_cache, vec![0, 0, 1]);
268+
assert_eq!(accessor.val_cache, vec![1, 3, 5]);
269+
}
270+
271+
#[test]
272+
fn test_dedup_docid_val_pairs_no_duplicates() {
273+
let mut accessor = ColumnBlockAccessor::<u64>::default();
274+
accessor.docid_cache = vec![0, 0, 1];
275+
accessor.val_cache = vec![1, 2, 3];
276+
accessor.dedup_docid_val_pairs();
277+
assert_eq!(accessor.docid_cache, vec![0, 0, 1]);
278+
assert_eq!(accessor.val_cache, vec![1, 2, 3]);
279+
}
280+
281+
#[test]
282+
fn test_dedup_docid_val_pairs_single_element() {
283+
let mut accessor = ColumnBlockAccessor::<u64>::default();
284+
accessor.docid_cache = vec![0];
285+
accessor.val_cache = vec![1];
286+
accessor.dedup_docid_val_pairs();
287+
assert_eq!(accessor.docid_cache, vec![0]);
288+
assert_eq!(accessor.val_cache, vec![1]);
289+
}
166290
}

src/aggregation/bucket/term_agg.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -807,11 +807,13 @@ impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
807807

808808
let req_data = &mut self.terms_req_data;
809809

810-
agg_data.column_block_accessor.fetch_block_with_missing(
811-
docs,
812-
&req_data.accessor,
813-
req_data.missing_value_for_accessor,
814-
);
810+
agg_data
811+
.column_block_accessor
812+
.fetch_block_with_missing_unique_per_doc(
813+
docs,
814+
&req_data.accessor,
815+
req_data.missing_value_for_accessor,
816+
);
815817

816818
if let Some(sub_agg) = &mut self.sub_agg {
817819
let term_buckets = &mut self.parent_buckets[parent_bucket_id as usize];
@@ -2347,7 +2349,7 @@ mod tests {
23472349

23482350
// text field
23492351
assert_eq!(res["my_texts"]["buckets"][0]["key"], "Hello Hello");
2350-
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 5);
2352+
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4);
23512353
assert_eq!(res["my_texts"]["buckets"][1]["key"], "Empty");
23522354
assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 2);
23532355
assert_eq!(
@@ -2356,7 +2358,7 @@ mod tests {
23562358
);
23572359
// text field with number as missing fallback
23582360
assert_eq!(res["my_texts2"]["buckets"][0]["key"], "Hello Hello");
2359-
assert_eq!(res["my_texts2"]["buckets"][0]["doc_count"], 5);
2361+
assert_eq!(res["my_texts2"]["buckets"][0]["doc_count"], 4);
23602362
assert_eq!(res["my_texts2"]["buckets"][1]["key"], 1337.0);
23612363
assert_eq!(res["my_texts2"]["buckets"][1]["doc_count"], 2);
23622364
assert_eq!(
@@ -2370,7 +2372,7 @@ mod tests {
23702372
assert_eq!(res["my_ids"]["buckets"][0]["key"], 1337.0);
23712373
assert_eq!(res["my_ids"]["buckets"][0]["doc_count"], 4);
23722374
assert_eq!(res["my_ids"]["buckets"][1]["key"], 1.0);
2373-
assert_eq!(res["my_ids"]["buckets"][1]["doc_count"], 3);
2375+
assert_eq!(res["my_ids"]["buckets"][1]["doc_count"], 2);
23742376
assert_eq!(res["my_ids"]["buckets"][2]["key"], serde_json::Value::Null);
23752377

23762378
Ok(())

0 commit comments

Comments
 (0)