Enforce max_buckets limit only in the final reduction phase (#36152)

Given that we check the max buckets limit on each shard when collecting the buckets, and that non final reduction cannot add buckets (see #35921), there is no point in counting and checking the number of buckets as part of non final reduction phases.

Such check is still needed though in the final reduction phases to make sure that the number of returned buckets is not above the allowed threshold.

Relates somehow to #32125 as we will make use of non final reduction phases in CCS alternate execution mode and that increases the chance that this check trips for nothing when reducing aggs in each remote cluster.
This commit is contained in:
Luca Cavanna 2018-12-03 13:55:18 +01:00 committed by GitHub
parent 36ddca7d0c
commit b5cae0af58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 11 deletions

View File

@ -1090,7 +1090,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce) {
return new InternalAggregation.ReduceContext(bigArrays, scriptService, multiBucketConsumerService.create(), finalReduce);
return new InternalAggregation.ReduceContext(bigArrays, scriptService,
finalReduce ? multiBucketConsumerService.create() : bucketCount -> {}, finalReduce);
}
public static final class CanMatchResponse extends SearchPhaseResult {

View File

@ -19,7 +19,6 @@
package org.elasticsearch.search;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
@ -59,6 +58,8 @@ import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
@ -155,7 +156,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
return Settings.builder().put("search.default_search_timeout", "5s").build();
}
public void testClearOnClose() throws ExecutionException, InterruptedException {
public void testClearOnClose() {
createIndex("index");
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();
@ -167,7 +168,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
assertEquals(0, service.getActiveContexts());
}
public void testClearOnStop() throws ExecutionException, InterruptedException {
public void testClearOnStop() {
createIndex("index");
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();
@ -179,7 +180,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
assertEquals(0, service.getActiveContexts());
}
public void testClearIndexDelete() throws ExecutionException, InterruptedException {
public void testClearIndexDelete() {
createIndex("index");
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();
@ -208,7 +209,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
assertEquals(activeRefs, indexShard.store().refCount());
}
public void testSearchWhileIndexDeleted() throws IOException, InterruptedException {
public void testSearchWhileIndexDeleted() throws InterruptedException {
createIndex("index");
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
@ -443,15 +444,15 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
protected void doWriteTo(StreamOutput out) {
}
@Override
protected void doXContent(XContentBuilder builder, Params params) throws IOException {
protected void doXContent(XContentBuilder builder, Params params) {
}
@Override
protected Query doToQuery(QueryShardContext context) throws IOException {
protected Query doToQuery(QueryShardContext context) {
return null;
}
@ -501,7 +502,6 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
assertFalse(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
new SearchSourceBuilder().query(new MatchNoneQueryBuilder()), Strings.EMPTY_ARRAY, false,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults, null, null)));
}
public void testCanRewriteToMatchNone() {
@ -519,7 +519,6 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
.suggest(new SuggestBuilder())));
assertFalse(SearchService.canRewriteToMatchNone(new SearchSourceBuilder().query(new TermQueryBuilder("foo", "bar"))
.suggest(new SuggestBuilder())));
}
public void testSetSearchThrottled() {
@ -568,4 +567,17 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
assertHitCount(client().prepareSearch().get(), 0L);
assertHitCount(client().prepareSearch().setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED).get(), 1L);
}
public void testCreateReduceContext() {
final SearchService service = getInstanceFromNode(SearchService.class);
{
InternalAggregation.ReduceContext reduceContext = service.createReduceContext(true);
expectThrows(MultiBucketConsumerService.TooManyBucketsException.class,
() -> reduceContext.consumeBucketsAndMaybeBreak(MultiBucketConsumerService.DEFAULT_MAX_BUCKETS + 1));
}
{
InternalAggregation.ReduceContext reduceContext = service.createReduceContext(false);
reduceContext.consumeBucketsAndMaybeBreak(MultiBucketConsumerService.DEFAULT_MAX_BUCKETS + 1);
}
}
}