diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml
index 54f148894da..309fd865a22 100644
--- a/buildSrc/src/main/resources/checkstyle_suppressions.xml
+++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml
@@ -157,7 +157,6 @@
-
diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java
index 1e09e890a0b..77e7cdab937 100644
--- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java
+++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java
@@ -53,6 +53,6 @@ public class TransportNoopSearchAction extends HandledTransportAction> queryResults) {
- return reducedQueryPhase(queryResults, null);
+ return reducedQueryPhase(queryResults, null, 0);
}
/**
@@ -473,18 +473,22 @@ public class SearchPhaseController extends AbstractComponent {
* @param queryResults a list of non-null query shard results
* @param bufferdAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed
* from all non-null query results.
+ * @param numReducePhases the number of non-final reduce phases applied to the query results.
* @see QuerySearchResult#consumeAggs()
* @see QuerySearchResult#consumeProfileResult()
*/
private ReducedQueryPhase reducedQueryPhase(List extends AtomicArray.Entry extends QuerySearchResultProvider>> queryResults,
- List bufferdAggs) {
+ List bufferdAggs, int numReducePhases) {
+ assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
+ numReducePhases++; // increment for this phase
long totalHits = 0;
long fetchHits = 0;
float maxScore = Float.NEGATIVE_INFINITY;
boolean timedOut = false;
Boolean terminatedEarly = null;
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
- return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null);
+ return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null,
+ numReducePhases);
}
final QuerySearchResult firstResult = queryResults.get(0).value.queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
@@ -493,6 +497,7 @@ public class SearchPhaseController extends AbstractComponent {
final List aggregationsList;
if (bufferdAggs != null) {
consumeAggs = false;
+ assert numReducePhases > 1 : "num reduce phases must be > 1 but was: " + numReducePhases;
// we already have results from intermediate reduces and just need to perform the final reduce
assert firstResult.hasAggs() : "firstResult has no aggs but we got non null buffered aggs?";
aggregationsList = bufferdAggs;
@@ -548,7 +553,7 @@ public class SearchPhaseController extends AbstractComponent {
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations,
- shardResults);
+ shardResults, numReducePhases);
}
@@ -597,10 +602,15 @@ public class SearchPhaseController extends AbstractComponent {
final InternalAggregations aggregations;
// the reduced profile results
final SearchProfileShardResults shardResults;
+ // the number of reduces phases
+ final int numReducePhases;
ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly,
QuerySearchResult oneResult, Suggest suggest, InternalAggregations aggregations,
- SearchProfileShardResults shardResults) {
+ SearchProfileShardResults shardResults, int numReducePhases) {
+ if (numReducePhases <= 0) {
+ throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases);
+ }
this.totalHits = totalHits;
this.fetchHits = fetchHits;
if (Float.isInfinite(maxScore)) {
@@ -614,6 +624,7 @@ public class SearchPhaseController extends AbstractComponent {
this.suggest = suggest;
this.aggregations = aggregations;
this.shardResults = shardResults;
+ this.numReducePhases = numReducePhases;
}
/**
@@ -621,7 +632,7 @@ public class SearchPhaseController extends AbstractComponent {
* @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, AtomicArray)
*/
public InternalSearchResponse buildResponse(SearchHits hits) {
- return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly);
+ return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases);
}
/**
@@ -643,6 +654,7 @@ public class SearchPhaseController extends AbstractComponent {
private final InternalAggregations[] buffer;
private int index;
private final SearchPhaseController controller;
+ private int numReducePhases = 0;
/**
* Creates a new {@link QueryPhaseResultConsumer}
@@ -677,6 +689,7 @@ public class SearchPhaseController extends AbstractComponent {
if (index == buffer.length) {
InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(buffer));
Arrays.fill(buffer, null);
+ numReducePhases++;
buffer[0] = reducedAggs;
index = 1;
}
@@ -690,7 +703,7 @@ public class SearchPhaseController extends AbstractComponent {
@Override
public ReducedQueryPhase reduce() {
- return controller.reducedQueryPhase(results.asList(), getRemaining());
+ return controller.reducedQueryPhase(results.asList(), getRemaining(), numReducePhases);
}
/**
@@ -699,6 +712,8 @@ public class SearchPhaseController extends AbstractComponent {
int getNumBuffered() {
return index;
}
+
+ int getNumReducePhases() { return numReducePhases; }
}
/**
@@ -707,9 +722,9 @@ public class SearchPhaseController extends AbstractComponent {
InitialSearchPhase.SearchPhaseResults newSearchPhaseResults(SearchRequest request, int numShards) {
SearchSourceBuilder source = request.source();
if (source != null && source.aggregations() != null) {
- if (request.getReduceUpTo() < numShards) {
+ if (request.getBatchedReduceSize() < numShards) {
// only use this if there are aggs and if there are more shards than we should reduce at once
- return new QueryPhaseResultConsumer(this, numShards, request.getReduceUpTo());
+ return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize());
}
}
return new InitialSearchPhase.SearchPhaseResults(numShards) {
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java
index 0c1189d1d69..6e2701ad417 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java
@@ -71,7 +71,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
private Scroll scroll;
- private int reduceUpTo = 512;
+ private int batchedReduceSize = 512;
private String[] types = Strings.EMPTY_ARRAY;
@@ -281,19 +281,19 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
*/
- public void setReduceUpTo(int reduceUpTo) {
- if (reduceUpTo <= 1) {
- throw new IllegalArgumentException("reduceUpTo must be >= 2");
+ public void setBatchedReduceSize(int batchedReduceSize) {
+ if (batchedReduceSize <= 1) {
+ throw new IllegalArgumentException("batchedReduceSize must be >= 2");
}
- this.reduceUpTo = reduceUpTo;
+ this.batchedReduceSize = batchedReduceSize;
}
/**
* Returns the number of shard results that should be reduced at once on the coordinating node. This value should be used as a
* protection mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
*/
- public int getReduceUpTo() {
- return reduceUpTo;
+ public int getBatchedReduceSize() {
+ return batchedReduceSize;
}
/**
@@ -343,7 +343,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
indicesOptions = IndicesOptions.readIndicesOptions(in);
requestCache = in.readOptionalBoolean();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
- reduceUpTo = in.readVInt();
+ batchedReduceSize = in.readVInt();
}
}
@@ -363,7 +363,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
indicesOptions.writeIndicesOptions(out);
out.writeOptionalBoolean(requestCache);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
- out.writeVInt(reduceUpTo);
+ out.writeVInt(batchedReduceSize);
}
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java
index c6c5e0fbf3d..ffe2c1b20c5 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java
@@ -524,8 +524,12 @@ public class SearchRequestBuilder extends ActionRequestBuilder reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
@@ -132,7 +132,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener listener) {
assertTrue(executedMultiSearch.compareAndSet(false, true));
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits,
- null, null, null, false, null);
+ null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
listener.onResponse(new MultiSearchResponse(new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new RuntimeException("boom")),
@@ -146,7 +146,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
new SearchHit(2, "ID2", new Text("type"),
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(collapseValue))))}, 1,
1.0F);
- InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null);
+ InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
AtomicReference reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
@@ -180,7 +180,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(null)))),
new SearchHit(2, "ID2", new Text("type"),
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(null))))}, 1, 1.0F);
- InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null);
+ InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
AtomicReference reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
index 21d5b6aee90..5270fd59ce9 100644
--- a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
+++ b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java
@@ -29,10 +29,8 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
-import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.SearchHit;
@@ -245,7 +243,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
int bufferSize = randomIntBetween(2, 3);
SearchRequest request = new SearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
- request.setReduceUpTo(bufferSize);
+ request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, 3);
QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new Index("a", "b"), 0));
result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]);
@@ -267,15 +265,18 @@ public class SearchPhaseControllerTests extends ESTestCase {
Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs);
consumer.consumeResult(1, result);
-
+ int numTotalReducePhases = 1;
if (bufferSize == 2) {
assertThat(consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class));
+ assertEquals(1, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumReducePhases());
assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumBuffered());
+ numTotalReducePhases++;
} else {
assertThat(consumer, not(instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)));
}
SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
+ assertEquals(numTotalReducePhases, reduce.numReducePhases);
InternalMax max = (InternalMax) reduce.aggregations.asList().get(0);
assertEquals(3.0D, max.getValue(), 0.0D);
}
@@ -286,7 +287,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
SearchRequest request = new SearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
- request.setReduceUpTo(bufferSize);
+ request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults consumer =
searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
AtomicInteger max = new AtomicInteger();
@@ -322,7 +323,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
if ((hasAggs = randomBoolean())) {
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
}
- request.setReduceUpTo(bufferSize);
+ request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults consumer
= searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
if (hasAggs && expectedNumResults > bufferSize) {
diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java
index a7173dc4c22..3f2163f25d9 100644
--- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java
+++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java
@@ -38,7 +38,6 @@ import org.elasticsearch.test.client.RandomizingClient;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.function.Function;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
@@ -88,7 +87,7 @@ public class TermsDocCountErrorIT extends ESIntegTestCase {
*/
@Override
public SearchRequestBuilder prepareSearch(String... indices) {
- return this.in.prepareSearch(indices).setReduceUpTo(512);
+ return this.in.prepareSearch(indices).setBatchedReduceSize(512);
}
};
}
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json
index 328383e1639..dc5fda57439 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json
+++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json
@@ -158,6 +158,11 @@
"request_cache": {
"type" : "boolean",
"description" : "Specify if request cache should be used for this request or not, defaults to index level setting"
+ },
+ "batched_reduce_size" : {
+ "type" : "number",
+ "description" : "The number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.",
+ "default" : 512
}
}
},
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/120_batch_reduce_size.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/120_batch_reduce_size.yaml
new file mode 100644
index 00000000000..4c2054c2964
--- /dev/null
+++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/120_batch_reduce_size.yaml
@@ -0,0 +1,70 @@
+setup:
+ - do:
+ indices.create:
+ index: test_1
+ body:
+ settings:
+ number_of_shards: 5
+ number_of_replicas: 0
+ mappings:
+ test:
+ properties:
+ str:
+ type: keyword
+
+---
+"batched_reduce_size lower limit":
+ - skip:
+ version: " - 5.99.99"
+ reason: this was added in 6.0.0
+ - do:
+ catch: /batchedReduceSize must be >= 2/
+ search:
+ index: test_1
+ batched_reduce_size: 1
+
+
+---
+"batched_reduce_size 2 with 5 shards":
+ - skip:
+ version: " - 5.99.99"
+ reason: this was added in 6.0.0
+ - do:
+ index:
+ index: test_1
+ type: test
+ id: 1
+ body: { "str" : "abc" }
+
+ - do:
+ index:
+ index: test_1
+ type: test
+ id: 2
+ body: { "str": "abc" }
+
+ - do:
+ index:
+ index: test_1
+ type: test
+ id: 3
+ body: { "str": "bcd" }
+ - do:
+ indices.refresh: {}
+
+ - do:
+ search:
+ batched_reduce_size: 2
+ body: { "size" : 0, "aggs" : { "str_terms" : { "terms" : { "field" : "str" } } } }
+
+ - match: { num_reduce_phases: 4 }
+ - match: { hits.total: 3 }
+ - length: { aggregations.str_terms.buckets: 2 }
+ - match: { aggregations.str_terms.buckets.0.key: "abc" }
+ - is_false: aggregations.str_terms.buckets.0.key_as_string
+ - match: { aggregations.str_terms.buckets.0.doc_count: 2 }
+ - match: { aggregations.str_terms.buckets.1.key: "bcd" }
+ - is_false: aggregations.str_terms.buckets.1.key_as_string
+ - match: { aggregations.str_terms.buckets.1.doc_count: 1 }
+
+
diff --git a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java
index 77fb115c5f7..a7d9a72e6b7 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java
@@ -36,7 +36,7 @@ public class RandomizingClient extends FilterClient {
private final SearchType defaultSearchType;
private final String defaultPreference;
- private final int reduceUpTo;
+ private final int batchedReduceSize;
public RandomizingClient(Client client, Random random) {
@@ -54,13 +54,14 @@ public class RandomizingClient extends FilterClient {
} else {
defaultPreference = null;
}
- this.reduceUpTo = 2 + random.nextInt(10);
+ this.batchedReduceSize = 2 + random.nextInt(10);
}
@Override
public SearchRequestBuilder prepareSearch(String... indices) {
- return in.prepareSearch(indices).setSearchType(defaultSearchType).setPreference(defaultPreference).setReduceUpTo(reduceUpTo);
+ return in.prepareSearch(indices).setSearchType(defaultSearchType).setPreference(defaultPreference)
+ .setBatchedReduceSize(batchedReduceSize);
}
@Override