Expose `batched_reduce_size` via `_search` (#23288)

In #23253 we added an the ability to incrementally reduce search results.
This change exposes the parameter to control the batch since and therefore
the memory consumption of a large search request.
This commit is contained in:
Simon Willnauer 2017-02-21 18:36:59 +01:00 committed by GitHub
parent 1ba9770037
commit ce625ebdcc
15 changed files with 176 additions and 55 deletions

View File

@ -157,7 +157,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]SimulatePipelineRequestBuilder.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]SimulatePipelineTransportAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]MultiSearchRequestBuilder.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchResponse.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]ShardSearchFailure.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportClearScrollAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportMultiSearchAction.java" checks="LineLength" />

View File

@ -53,6 +53,6 @@ public class TransportNoopSearchAction extends HandledTransportAction<SearchRequ
new SearchHit[0], 0L, 0.0f),
new InternalAggregations(Collections.emptyList()),
new Suggest(Collections.emptyList()),
new SearchProfileShardResults(Collections.emptyMap()), false, false), "", 1, 1, 0, new ShardSearchFailure[0]));
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, new ShardSearchFailure[0]));
}
}

View File

@ -465,7 +465,7 @@ public class SearchPhaseController extends AbstractComponent {
* @param queryResults a list of non-null query shard results
*/
public final ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults) {
return reducedQueryPhase(queryResults, null);
return reducedQueryPhase(queryResults, null, 0);
}
/**
@ -473,18 +473,22 @@ public class SearchPhaseController extends AbstractComponent {
* @param queryResults a list of non-null query shard results
* @param bufferdAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed
* from all non-null query results.
* @param numReducePhases the number of non-final reduce phases applied to the query results.
* @see QuerySearchResult#consumeAggs()
* @see QuerySearchResult#consumeProfileResult()
*/
private ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults,
List<InternalAggregations> bufferdAggs) {
List<InternalAggregations> bufferdAggs, int numReducePhases) {
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
numReducePhases++; // increment for this phase
long totalHits = 0;
long fetchHits = 0;
float maxScore = Float.NEGATIVE_INFINITY;
boolean timedOut = false;
Boolean terminatedEarly = null;
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null);
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null,
numReducePhases);
}
final QuerySearchResult firstResult = queryResults.get(0).value.queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
@ -493,6 +497,7 @@ public class SearchPhaseController extends AbstractComponent {
final List<InternalAggregations> aggregationsList;
if (bufferdAggs != null) {
consumeAggs = false;
assert numReducePhases > 1 : "num reduce phases must be > 1 but was: " + numReducePhases;
// we already have results from intermediate reduces and just need to perform the final reduce
assert firstResult.hasAggs() : "firstResult has no aggs but we got non null buffered aggs?";
aggregationsList = bufferdAggs;
@ -548,7 +553,7 @@ public class SearchPhaseController extends AbstractComponent {
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, firstResult, suggest, aggregations,
shardResults);
shardResults, numReducePhases);
}
@ -597,10 +602,15 @@ public class SearchPhaseController extends AbstractComponent {
final InternalAggregations aggregations;
// the reduced profile results
final SearchProfileShardResults shardResults;
// the number of reduces phases
final int numReducePhases;
ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly,
QuerySearchResult oneResult, Suggest suggest, InternalAggregations aggregations,
SearchProfileShardResults shardResults) {
SearchProfileShardResults shardResults, int numReducePhases) {
if (numReducePhases <= 0) {
throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases);
}
this.totalHits = totalHits;
this.fetchHits = fetchHits;
if (Float.isInfinite(maxScore)) {
@ -614,6 +624,7 @@ public class SearchPhaseController extends AbstractComponent {
this.suggest = suggest;
this.aggregations = aggregations;
this.shardResults = shardResults;
this.numReducePhases = numReducePhases;
}
/**
@ -621,7 +632,7 @@ public class SearchPhaseController extends AbstractComponent {
* @see #merge(boolean, ScoreDoc[], ReducedQueryPhase, AtomicArray)
*/
public InternalSearchResponse buildResponse(SearchHits hits) {
return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly);
return new InternalSearchResponse(hits, aggregations, suggest, shardResults, timedOut, terminatedEarly, numReducePhases);
}
/**
@ -643,6 +654,7 @@ public class SearchPhaseController extends AbstractComponent {
private final InternalAggregations[] buffer;
private int index;
private final SearchPhaseController controller;
private int numReducePhases = 0;
/**
* Creates a new {@link QueryPhaseResultConsumer}
@ -677,6 +689,7 @@ public class SearchPhaseController extends AbstractComponent {
if (index == buffer.length) {
InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(buffer));
Arrays.fill(buffer, null);
numReducePhases++;
buffer[0] = reducedAggs;
index = 1;
}
@ -690,7 +703,7 @@ public class SearchPhaseController extends AbstractComponent {
@Override
public ReducedQueryPhase reduce() {
return controller.reducedQueryPhase(results.asList(), getRemaining());
return controller.reducedQueryPhase(results.asList(), getRemaining(), numReducePhases);
}
/**
@ -699,6 +712,8 @@ public class SearchPhaseController extends AbstractComponent {
int getNumBuffered() {
return index;
}
int getNumReducePhases() { return numReducePhases; }
}
/**
@ -707,9 +722,9 @@ public class SearchPhaseController extends AbstractComponent {
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> newSearchPhaseResults(SearchRequest request, int numShards) {
SearchSourceBuilder source = request.source();
if (source != null && source.aggregations() != null) {
if (request.getReduceUpTo() < numShards) {
if (request.getBatchedReduceSize() < numShards) {
// only use this if there are aggs and if there are more shards than we should reduce at once
return new QueryPhaseResultConsumer(this, numShards, request.getReduceUpTo());
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize());
}
}
return new InitialSearchPhase.SearchPhaseResults(numShards) {

View File

@ -71,7 +71,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
private Scroll scroll;
private int reduceUpTo = 512;
private int batchedReduceSize = 512;
private String[] types = Strings.EMPTY_ARRAY;
@ -281,19 +281,19 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
*/
public void setReduceUpTo(int reduceUpTo) {
if (reduceUpTo <= 1) {
throw new IllegalArgumentException("reduceUpTo must be >= 2");
public void setBatchedReduceSize(int batchedReduceSize) {
if (batchedReduceSize <= 1) {
throw new IllegalArgumentException("batchedReduceSize must be >= 2");
}
this.reduceUpTo = reduceUpTo;
this.batchedReduceSize = batchedReduceSize;
}
/**
* Returns the number of shard results that should be reduced at once on the coordinating node. This value should be used as a
* protection mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
*/
public int getReduceUpTo() {
return reduceUpTo;
public int getBatchedReduceSize() {
return batchedReduceSize;
}
/**
@ -343,7 +343,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
indicesOptions = IndicesOptions.readIndicesOptions(in);
requestCache = in.readOptionalBoolean();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
reduceUpTo = in.readVInt();
batchedReduceSize = in.readVInt();
}
}
@ -363,7 +363,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
indicesOptions.writeIndicesOptions(out);
out.writeOptionalBoolean(requestCache);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeVInt(reduceUpTo);
out.writeVInt(batchedReduceSize);
}
}

View File

@ -524,8 +524,12 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
return request.source();
}
public SearchRequestBuilder setReduceUpTo(int reduceUpTo) {
this.request.setReduceUpTo(reduceUpTo);
/**
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
*/
public SearchRequestBuilder setBatchedReduceSize(int batchedReduceSize) {
this.request.setBatchedReduceSize(batchedReduceSize);
return this;
}
}

View File

@ -61,7 +61,8 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
public SearchResponse() {
}
public SearchResponse(InternalSearchResponse internalResponse, String scrollId, int totalShards, int successfulShards, long tookInMillis, ShardSearchFailure[] shardFailures) {
public SearchResponse(InternalSearchResponse internalResponse, String scrollId, int totalShards, int successfulShards,
long tookInMillis, ShardSearchFailure[] shardFailures) {
this.internalResponse = internalResponse;
this.scrollId = scrollId;
this.totalShards = totalShards;
@ -106,6 +107,13 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
return internalResponse.terminatedEarly();
}
/**
* Returns the number of reduce phases applied to obtain this search response
*/
public int getNumReducePhases() {
return internalResponse.getNumReducePhases();
}
/**
* How long the search took.
*/
@ -172,13 +180,6 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
return internalResponse.profile();
}
static final class Fields {
static final String _SCROLL_ID = "_scroll_id";
static final String TOOK = "took";
static final String TIMED_OUT = "timed_out";
static final String TERMINATED_EARLY = "terminated_early";
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -189,14 +190,18 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
if (scrollId != null) {
builder.field(Fields._SCROLL_ID, scrollId);
builder.field("_scroll_id", scrollId);
}
builder.field(Fields.TOOK, tookInMillis);
builder.field(Fields.TIMED_OUT, isTimedOut());
builder.field("took", tookInMillis);
builder.field("timed_out", isTimedOut());
if (isTerminatedEarly() != null) {
builder.field(Fields.TERMINATED_EARLY, isTerminatedEarly());
builder.field("terminated_early", isTerminatedEarly());
}
RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getFailedShards(), getShardFailures());
if (getNumReducePhases() != 1) {
builder.field("num_reduce_phases", getNumReducePhases());
}
RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getFailedShards(),
getShardFailures());
internalResponse.toXContent(builder, params);
return builder;
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.rest.action.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
@ -94,6 +93,9 @@ public class RestSearchAction extends BaseRestHandler {
searchRequest.source().parseXContent(context);
}
final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());
searchRequest.setBatchedReduceSize(batchedReduceSize);
// do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types
// from the REST layer. these modes are an internal optimization and should
// not be specified explicitly by the user.

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.internal;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -38,7 +39,7 @@ import java.util.Map;
public class InternalSearchResponse implements Streamable, ToXContent {
public static InternalSearchResponse empty() {
return new InternalSearchResponse(SearchHits.empty(), null, null, null, false, null);
return new InternalSearchResponse(SearchHits.empty(), null, null, null, false, null, 1);
}
private SearchHits hits;
@ -53,17 +54,21 @@ public class InternalSearchResponse implements Streamable, ToXContent {
private Boolean terminatedEarly = null;
private int numReducePhases = 1;
private InternalSearchResponse() {
}
public InternalSearchResponse(SearchHits hits, InternalAggregations aggregations, Suggest suggest,
SearchProfileShardResults profileResults, boolean timedOut, Boolean terminatedEarly) {
SearchProfileShardResults profileResults, boolean timedOut, Boolean terminatedEarly,
int numReducePhases) {
this.hits = hits;
this.aggregations = aggregations;
this.suggest = suggest;
this.profileResults = profileResults;
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
this.numReducePhases = numReducePhases;
}
public boolean timedOut() {
@ -86,6 +91,13 @@ public class InternalSearchResponse implements Streamable, ToXContent {
return suggest;
}
/**
* Returns the number of reduce phases applied to obtain this search response
*/
public int getNumReducePhases() {
return numReducePhases;
}
/**
* Returns the profile results for this search response (including all shards).
* An empty map is returned if profiling was not enabled
@ -132,6 +144,11 @@ public class InternalSearchResponse implements Streamable, ToXContent {
timedOut = in.readBoolean();
terminatedEarly = in.readOptionalBoolean();
profileResults = in.readOptionalWriteable(SearchProfileShardResults::new);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
numReducePhases = in.readVInt();
} else {
numReducePhases = 1;
}
}
@Override
@ -152,5 +169,8 @@ public class InternalSearchResponse implements Streamable, ToXContent {
out.writeBoolean(timedOut);
out.writeOptionalBoolean(terminatedEarly);
out.writeOptionalWriteable(profileResults);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeVInt(numReducePhases);
}
}
}

View File

@ -446,7 +446,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
// Now we can simulate a response and check the delay that we used for the task
SearchHit hit = new SearchHit(0, "id", new Text("type"), emptyMap());
SearchHits hits = new SearchHits(new SearchHit[] { hit }, 0, 0);
InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false);
InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false, 1);
SearchResponse searchResponse = new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), null);
if (randomBoolean()) {

View File

@ -79,7 +79,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits,
null, null, null, false, null);
null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
listener.onResponse(new MultiSearchResponse(new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(response, null)
@ -91,7 +91,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
SearchHits hits = new SearchHits(new SearchHit[]{new SearchHit(1, "ID", new Text("type"),
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(collapseValue))))},
1, 1.0F);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
AtomicReference<SearchResponse> reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
@ -132,7 +132,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
assertTrue(executedMultiSearch.compareAndSet(false, true));
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(collapsedHits,
null, null, null, false, null);
null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
listener.onResponse(new MultiSearchResponse(new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new RuntimeException("boom")),
@ -146,7 +146,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
new SearchHit(2, "ID2", new Text("type"),
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(collapseValue))))}, 1,
1.0F);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
AtomicReference<SearchResponse> reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
@ -180,7 +180,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(null)))),
new SearchHit(2, "ID2", new Text("type"),
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(null))))}, 1, 1.0F);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
AtomicReference<SearchResponse> reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->

View File

@ -29,10 +29,8 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.SearchHit;
@ -245,7 +243,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
int bufferSize = randomIntBetween(2, 3);
SearchRequest request = new SearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
request.setReduceUpTo(bufferSize);
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> consumer = searchPhaseController.newSearchPhaseResults(request, 3);
QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new Index("a", "b"), 0));
result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]);
@ -267,15 +265,18 @@ public class SearchPhaseControllerTests extends ESTestCase {
Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs);
consumer.consumeResult(1, result);
int numTotalReducePhases = 1;
if (bufferSize == 2) {
assertThat(consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class));
assertEquals(1, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumReducePhases());
assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumBuffered());
numTotalReducePhases++;
} else {
assertThat(consumer, not(instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)));
}
SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
assertEquals(numTotalReducePhases, reduce.numReducePhases);
InternalMax max = (InternalMax) reduce.aggregations.asList().get(0);
assertEquals(3.0D, max.getValue(), 0.0D);
}
@ -286,7 +287,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
SearchRequest request = new SearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
request.setReduceUpTo(bufferSize);
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> consumer =
searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
AtomicInteger max = new AtomicInteger();
@ -322,7 +323,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
if ((hasAggs = randomBoolean())) {
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
}
request.setReduceUpTo(bufferSize);
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> consumer
= searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
if (hasAggs && expectedNumResults > bufferSize) {

View File

@ -38,7 +38,6 @@ import org.elasticsearch.test.client.RandomizingClient;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
@ -88,7 +87,7 @@ public class TermsDocCountErrorIT extends ESIntegTestCase {
*/
@Override
public SearchRequestBuilder prepareSearch(String... indices) {
return this.in.prepareSearch(indices).setReduceUpTo(512);
return this.in.prepareSearch(indices).setBatchedReduceSize(512);
}
};
}

View File

@ -158,6 +158,11 @@
"request_cache": {
"type" : "boolean",
"description" : "Specify if request cache should be used for this request or not, defaults to index level setting"
},
"batched_reduce_size" : {
"type" : "number",
"description" : "The number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.",
"default" : 512
}
}
},

View File

@ -0,0 +1,70 @@
setup:
- do:
indices.create:
index: test_1
body:
settings:
number_of_shards: 5
number_of_replicas: 0
mappings:
test:
properties:
str:
type: keyword
---
"batched_reduce_size lower limit":
- skip:
version: " - 5.99.99"
reason: this was added in 6.0.0
- do:
catch: /batchedReduceSize must be >= 2/
search:
index: test_1
batched_reduce_size: 1
---
"batched_reduce_size 2 with 5 shards":
- skip:
version: " - 5.99.99"
reason: this was added in 6.0.0
- do:
index:
index: test_1
type: test
id: 1
body: { "str" : "abc" }
- do:
index:
index: test_1
type: test
id: 2
body: { "str": "abc" }
- do:
index:
index: test_1
type: test
id: 3
body: { "str": "bcd" }
- do:
indices.refresh: {}
- do:
search:
batched_reduce_size: 2
body: { "size" : 0, "aggs" : { "str_terms" : { "terms" : { "field" : "str" } } } }
- match: { num_reduce_phases: 4 }
- match: { hits.total: 3 }
- length: { aggregations.str_terms.buckets: 2 }
- match: { aggregations.str_terms.buckets.0.key: "abc" }
- is_false: aggregations.str_terms.buckets.0.key_as_string
- match: { aggregations.str_terms.buckets.0.doc_count: 2 }
- match: { aggregations.str_terms.buckets.1.key: "bcd" }
- is_false: aggregations.str_terms.buckets.1.key_as_string
- match: { aggregations.str_terms.buckets.1.doc_count: 1 }

View File

@ -36,7 +36,7 @@ public class RandomizingClient extends FilterClient {
private final SearchType defaultSearchType;
private final String defaultPreference;
private final int reduceUpTo;
private final int batchedReduceSize;
public RandomizingClient(Client client, Random random) {
@ -54,13 +54,14 @@ public class RandomizingClient extends FilterClient {
} else {
defaultPreference = null;
}
this.reduceUpTo = 2 + random.nextInt(10);
this.batchedReduceSize = 2 + random.nextInt(10);
}
@Override
public SearchRequestBuilder prepareSearch(String... indices) {
return in.prepareSearch(indices).setSearchType(defaultSearchType).setPreference(defaultPreference).setReduceUpTo(reduceUpTo);
return in.prepareSearch(indices).setSearchType(defaultSearchType).setPreference(defaultPreference)
.setBatchedReduceSize(batchedReduceSize);
}
@Override