Fixes terms error count for multiple reduce phases

Previously when multiple reduces occured for the terms aggregation we would add up the errors for the aggregations but would not take into account the errors that had already been calculated for the previous reduce phases.

This change corrects that by adding the previously created errors to the new error value.

Closes #23286
This commit is contained in:
Colin Goodheart-Smithe 2017-02-21 15:20:04 +00:00
parent 756e26cb33
commit 1ceaef0de6
No known key found for this signature in database
GPG Key ID: 876C3A617D8EA73A
2 changed files with 128 additions and 36 deletions

View File

@ -33,7 +33,6 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -226,7 +225,15 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
if (terms.getBucketsInternal().size() < getShardSize() || InternalOrder.isTermOrder(order)) {
thisAggDocCountError = 0;
} else if (InternalOrder.isCountDesc(this.order)) {
thisAggDocCountError = terms.getBucketsInternal().get(terms.getBucketsInternal().size() - 1).docCount;
if (terms.getDocCountError() > 0) {
// If there is an existing docCountError for this agg then
// use this as the error for this aggregation
thisAggDocCountError = terms.getDocCountError();
} else {
// otherwise use the doc count of the last term in the
// aggregation
thisAggDocCountError = terms.getBucketsInternal().get(terms.getBucketsInternal().size() - 1).docCount;
}
} else {
thisAggDocCountError = -1;
}

View File

@ -20,10 +20,7 @@
package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
@ -33,11 +30,12 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.client.RandomizingClient;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
@ -64,36 +62,6 @@ public class TermsDocCountErrorIT extends ESIntegTestCase {
private static int numRoutingValues;
public static Client client() {
Client client = ESIntegTestCase.client();
if (client instanceof RandomizingClient) {
return new FilterClient(client) {
/* this test doesn't work with multiple reduce phases since:
* the error for a term is the sum of the errors across all aggs that need to be reduced.
* if the term is in the aggregation, then we just use the associated error, but if it is not we need to use the
* aggregation-level error, ie. the maximum count that a doc that is not in the top list could have.
*
* the problem is that the logic we have today assumes there is a single reduce. So for instance for the agg-level error
* it takes the count of the last term. This is correct if the agg was produced on a shard: if it had a greater count
* then it would be in the top list. However if we are on an intermediate reduce, this does not work anymore.
*
* Another assumption that does not hold is that right now if a term is present in an agg, we assume its count is accurate.
* Again this is true if the agg was produced on a shard, but not if this is the result of an intermediate reduce.
*
* try with this seed and remove the setReduceUpTo below
* -Dtests.seed=B32081B1E8589AE5 -Dtests.class=org.elasticsearch.search.aggregations.bucket.TermsDocCountErrorIT
* -Dtests.method="testDoubleValueField" -Dtests.locale=lv -Dtests.timezone=WET
* This must will be addressed in a followup to #23253
*/
@Override
public SearchRequestBuilder prepareSearch(String... indices) {
return this.in.prepareSearch(indices).setBatchedReduceSize(512);
}
};
}
return client;
}
@Override
public void setupSuiteScopeCluster() throws Exception {
assertAcked(client().admin().indices().prepareCreate("idx")
@ -133,6 +101,68 @@ public class TermsDocCountErrorIT extends ESIntegTestCase {
.field(DOUBLE_FIELD_NAME, 1.0 * randomInt(numUniqueTerms))
.endObject()));
}
assertAcked(prepareCreate("idx_fixed_docs_0").addMapping("type", STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)));
Map<String, Integer> shard0DocsPerTerm = new HashMap<>();
shard0DocsPerTerm.put("A", 25);
shard0DocsPerTerm.put("B", 18);
shard0DocsPerTerm.put("C", 6);
shard0DocsPerTerm.put("D", 3);
shard0DocsPerTerm.put("E", 2);
shard0DocsPerTerm.put("F", 2);
shard0DocsPerTerm.put("G", 2);
shard0DocsPerTerm.put("H", 2);
shard0DocsPerTerm.put("I", 1);
shard0DocsPerTerm.put("J", 1);
for (Map.Entry<String, Integer> entry : shard0DocsPerTerm.entrySet()) {
for (int i = 0; i < entry.getValue(); i++) {
String term = entry.getKey();
builders.add(client().prepareIndex("idx_fixed_docs_0", "type", term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).endObject()));
}
}
assertAcked(prepareCreate("idx_fixed_docs_1").addMapping("type", STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)));
Map<String, Integer> shard1DocsPerTerm = new HashMap<>();
shard1DocsPerTerm.put("A", 30);
shard1DocsPerTerm.put("B", 25);
shard1DocsPerTerm.put("F", 17);
shard1DocsPerTerm.put("Z", 16);
shard1DocsPerTerm.put("G", 15);
shard1DocsPerTerm.put("H", 14);
shard1DocsPerTerm.put("I", 10);
shard1DocsPerTerm.put("Q", 6);
shard1DocsPerTerm.put("J", 8);
shard1DocsPerTerm.put("C", 4);
for (Map.Entry<String, Integer> entry : shard1DocsPerTerm.entrySet()) {
for (int i = 0; i < entry.getValue(); i++) {
String term = entry.getKey();
builders.add(client().prepareIndex("idx_fixed_docs_1", "type", term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).field("shard", 1).endObject()));
}
}
assertAcked(prepareCreate("idx_fixed_docs_2")
.addMapping("type", STRING_FIELD_NAME, "type=keyword")
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)));
Map<String, Integer> shard2DocsPerTerm = new HashMap<>();
shard2DocsPerTerm.put("A", 45);
shard2DocsPerTerm.put("C", 44);
shard2DocsPerTerm.put("Z", 36);
shard2DocsPerTerm.put("G", 30);
shard2DocsPerTerm.put("E", 29);
shard2DocsPerTerm.put("H", 28);
shard2DocsPerTerm.put("Q", 2);
shard2DocsPerTerm.put("D", 1);
for (Map.Entry<String, Integer> entry : shard2DocsPerTerm.entrySet()) {
for (int i = 0; i < entry.getValue(); i++) {
String term = entry.getKey();
builders.add(client().prepareIndex("idx_fixed_docs_2", "type", term + "-" + i)
.setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, term).field("shard", 2).endObject()));
}
}
indexRandom(true, builders);
ensureSearchable();
}
@ -938,5 +968,60 @@ public class TermsDocCountErrorIT extends ESIntegTestCase {
assertUnboundedDocCountError(size, accurateResponse, testResponse);
}
/**
* Test a case where we know exactly how many of each term is on each shard
* so we know the exact error value for each term. To do this we search over
* 3 one-shard indices.
*/
public void testFixedDocs() throws Exception {
SearchResponse response = client().prepareSearch("idx_fixed_docs_0", "idx_fixed_docs_1", "idx_fixed_docs_2").setTypes("type")
.addAggregation(terms("terms")
.executionHint(randomExecutionHint())
.field(STRING_FIELD_NAME)
.showTermDocCountError(true)
.size(5).shardSize(5)
.collectMode(randomFrom(SubAggCollectionMode.values())))
.execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getDocCountError(), equalTo(46L));
List<Bucket> buckets = terms.getBuckets();
assertThat(buckets, notNullValue());
assertThat(buckets.size(), equalTo(5));
Bucket bucket = buckets.get(0);
assertThat(bucket, notNullValue());
assertThat(bucket.getKey(), equalTo("A"));
assertThat(bucket.getDocCount(), equalTo(100L));
assertThat(bucket.getDocCountError(), equalTo(0L));
bucket = buckets.get(1);
assertThat(bucket, notNullValue());
assertThat(bucket.getKey(), equalTo("Z"));
assertThat(bucket.getDocCount(), equalTo(52L));
assertThat(bucket.getDocCountError(), equalTo(2L));
bucket = buckets.get(2);
assertThat(bucket, notNullValue());
assertThat(bucket.getKey(), equalTo("C"));
assertThat(bucket.getDocCount(), equalTo(50L));
assertThat(bucket.getDocCountError(), equalTo(15L));
bucket = buckets.get(3);
assertThat(bucket, notNullValue());
assertThat(bucket.getKey(), equalTo("G"));
assertThat(bucket.getDocCount(), equalTo(45L));
assertThat(bucket.getDocCountError(), equalTo(2L));
bucket = buckets.get(4);
assertThat(bucket, notNullValue());
assertThat(bucket.getKey(), equalTo("B"));
assertThat(bucket.getDocCount(), equalTo(43L));
assertThat(bucket.getDocCountError(), equalTo(29L));
}
}