Fixes the per term error in the terms aggregation

When multiple reduce phases were needed the per term error got lost in subsequent reduces in some situations:

When a previous reduce phase had calculated a non-zero error for a particular bucket we were not accounting for this error in subsequent reduce phases and instead were relying on the overall error for the agg which meant we were implicitly assuming that all shards that made up that aggregation had returned the term. This is plainly not true so we need to make sure the per term error for the aggregation is used when calcualting the error for that term in the new reduced aggregation.
This commit is contained in:
Colin Goodheart-Smithe 2017-02-28 09:42:14 +00:00
parent 17a0b4e69c
commit 406d2f7a64
No known key found for this signature in database
GPG Key ID: 876C3A617D8EA73A
1 changed files with 16 additions and 7 deletions

View File

@ -120,6 +120,10 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
public B reduce(List<B> buckets, ReduceContext context) {
long docCount = 0;
// For the per term doc count error we add up the errors from the
// shards that did not respond with the term. To do this we add up
// the errors from the shards that did respond with the terms and
// subtract that from the sum of the error from all shards
long docCountError = 0;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (B bucket : buckets) {
@ -246,7 +250,14 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
}
setDocCountError(thisAggDocCountError);
for (B bucket : terms.getBucketsInternal()) {
bucket.docCountError = thisAggDocCountError;
// If there is already a doc count error for this bucket
// subtract this aggs doc count error from it to make the
// new value for the bucket. This then means that when the
// final error for the bucket is calculated below we account
// for the existing error calculated in a previous reduce.
// Note that if the error is unbounded (-1) this will be fixed
// later in this method.
bucket.docCountError -= thisAggDocCountError;
List<B> bucketList = buckets.get(bucket.getKey());
if (bucketList == null) {
bucketList = new ArrayList<>();
@ -260,12 +271,10 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
final BucketPriorityQueue<B> ordered = new BucketPriorityQueue<>(size, order.comparator(null));
for (List<B> sameTermBuckets : buckets.values()) {
final B b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext);
if (b.docCountError != -1) {
if (sumDocCountError == -1) {
b.docCountError = -1;
} else {
b.docCountError = sumDocCountError - b.docCountError;
}
if (sumDocCountError == -1) {
b.docCountError = -1;
} else {
b.docCountError += sumDocCountError;
}
if (b.docCount >= minDocCount || reduceContext.isFinalReduce() == false) {
B removed = ordered.insertWithOverflow(b);