Skip final reduction if SearchRequest holds a cluster alias (#37000)

With #36997 we added the ability to provide a cluster alias with a
SearchRequest.

The next step is to disable the final reduction whenever a cluster alias
is provided with the SearchRequest. A cluster alias will be provided
when executing a cross-cluster search request with alternate execution
mode, where each cluster does its own reduction locally. In order for
the CCS node to be able to later perform an additional reduction of the
results, we need to make sure that all the needed info stays available.
This means that terms aggregations can be reduced but not pruned, and
pipeline aggs should not be executed. The final reduction will happen
later in the CCS coordinating node.

Relates to #36997 & #32125
This commit is contained in:
Luca Cavanna 2018-12-28 14:58:20 +01:00 committed by GitHub
parent 34d22f378d
commit cb6bac3f88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 28 deletions

View File

@ -407,17 +407,18 @@ public final class SearchPhaseController {
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
*/
public ReducedQueryPhase reducedScrollQueryPhase(Collection<? extends SearchPhaseResult> queryResults) {
return reducedQueryPhase(queryResults, true, true);
ReducedQueryPhase reducedScrollQueryPhase(Collection<? extends SearchPhaseResult> queryResults) {
return reducedQueryPhase(queryResults, true, true, true);
}
/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
*/
public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults,
boolean isScrollRequest, boolean trackTotalHits) {
return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(trackTotalHits), 0, isScrollRequest);
ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults,
boolean isScrollRequest, boolean trackTotalHits, boolean performFinalReduce) {
return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(trackTotalHits), 0, isScrollRequest,
performFinalReduce);
}
/**
@ -433,7 +434,8 @@ public final class SearchPhaseController {
*/
private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults,
List<InternalAggregations> bufferedAggs, List<TopDocs> bufferedTopDocs,
TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest) {
TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest,
boolean performFinalReduce) {
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
numReducePhases++; // increment for this phase
boolean timedOut = false;
@ -499,7 +501,7 @@ public final class SearchPhaseController {
}
}
final Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
ReduceContext reduceContext = reduceContextFunction.apply(true);
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
@ -507,7 +509,7 @@ public final class SearchPhaseController {
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
timedOut, terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs,
firstResult.sortValueFormats(), numReducePhases, size, from, firstResult == null);
firstResult.sortValueFormats(), numReducePhases, size, from, false);
}
/**
@ -617,6 +619,7 @@ public final class SearchPhaseController {
private final SearchPhaseController controller;
private int numReducePhases = 0;
private final TopDocsStats topDocsStats = new TopDocsStats();
private final boolean performFinalReduce;
/**
* Creates a new {@link QueryPhaseResultConsumer}
@ -626,7 +629,7 @@ public final class SearchPhaseController {
* the buffer is used to incrementally reduce aggregation results before all shards responded.
*/
private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize,
boolean hasTopDocs, boolean hasAggs) {
boolean hasTopDocs, boolean hasAggs, boolean performFinalReduce) {
super(expectedResultSize);
if (expectedResultSize != 1 && bufferSize < 2) {
throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result");
@ -644,6 +647,7 @@ public final class SearchPhaseController {
this.hasTopDocs = hasTopDocs;
this.hasAggs = hasAggs;
this.bufferSize = bufferSize;
this.performFinalReduce = performFinalReduce;
}
@Override
@ -693,7 +697,7 @@ public final class SearchPhaseController {
@Override
public ReducedQueryPhase reduce() {
return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats,
numReducePhases, false);
numReducePhases, false, performFinalReduce);
}
/**
@ -715,18 +719,19 @@ public final class SearchPhaseController {
final boolean hasAggs = source != null && source.aggregations() != null;
final boolean hasTopDocs = source == null || source.size() != 0;
final boolean trackTotalHits = source == null || source.trackTotalHits();
final boolean finalReduce = request.getLocalClusterAlias() == null;
if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
if (request.getBatchedReduceSize() < numShards) {
// only use this if there are aggs and if there are more shards than we should reduce at once
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs);
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, finalReduce);
}
}
return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
@Override
ReducedQueryPhase reduce() {
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits);
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits, finalReduce);
}
};
}

View File

@ -56,28 +56,34 @@ import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
public class SearchPhaseControllerTests extends ESTestCase {
private SearchPhaseController searchPhaseController;
private List<Boolean> reductions;
@Before
public void setup() {
reductions = new CopyOnWriteArrayList<>();
searchPhaseController = new SearchPhaseController(
(b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
(finalReduce) -> {
reductions.add(finalReduce);
return new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, finalReduce);
});
}
public void testSort() {
@ -158,7 +164,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
AtomicArray<SearchPhaseResult> queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false);
for (boolean trackTotalHits : new boolean[] {true, false}) {
SearchPhaseController.ReducedQueryPhase reducedQueryPhase =
searchPhaseController.reducedQueryPhase(queryResults.asList(), false, trackTotalHits);
searchPhaseController.reducedQueryPhase(queryResults.asList(), false, trackTotalHits, true);
AtomicArray<SearchPhaseResult> fetchResults = generateFetchResults(nShards,
reducedQueryPhase.sortedTopDocs.scoreDocs, reducedQueryPhase.suggest);
InternalSearchResponse mergedResponse = searchPhaseController.merge(false,
@ -308,14 +314,15 @@ public class SearchPhaseControllerTests extends ESTestCase {
public void testConsumer() {
int bufferSize = randomIntBetween(2, 3);
SearchRequest request = new SearchRequest();
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer = searchPhaseController.newSearchPhaseResults(request, 3);
assertEquals(0, reductions.size());
QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new Index("a", "b"), 0, null));
result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN),
new DocValueFormat[0]);
InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 1.0D, DocValueFormat.RAW,
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 1.0D, DocValueFormat.RAW,
Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs);
result.setShardIndex(0);
@ -324,7 +331,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0, null));
result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN),
new DocValueFormat[0]);
aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 3.0D, DocValueFormat.RAW,
aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 3.0D, DocValueFormat.RAW,
Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs);
result.setShardIndex(2);
@ -333,23 +340,29 @@ public class SearchPhaseControllerTests extends ESTestCase {
result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0, null));
result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN),
new DocValueFormat[0]);
aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 2.0D, DocValueFormat.RAW,
aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 2.0D, DocValueFormat.RAW,
Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs);
result.setShardIndex(1);
consumer.consumeResult(result);
int numTotalReducePhases = 1;
final int numTotalReducePhases;
if (bufferSize == 2) {
assertThat(consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class));
assertEquals(1, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumReducePhases());
assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumBuffered());
numTotalReducePhases++;
assertEquals(1, reductions.size());
assertEquals(false, reductions.get(0));
numTotalReducePhases = 2;
} else {
assertThat(consumer, not(instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)));
assertEquals(0, reductions.size());
numTotalReducePhases = 1;
}
SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
assertEquals(numTotalReducePhases, reduce.numReducePhases);
assertEquals(numTotalReducePhases, reductions.size());
assertFinalReduction(request);
InternalMax max = (InternalMax) reduce.aggregations.asList().get(0);
assertEquals(3.0D, max.getValue(), 0.0D);
assertFalse(reduce.sortedTopDocs.isSortedByField);
@ -362,7 +375,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
int expectedNumResults = randomIntBetween(1, 100);
int bufferSize = randomIntBetween(2, 200);
SearchRequest request = new SearchRequest();
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
@ -378,7 +391,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
result.topDocs(new TopDocsAndMaxScore(
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(0, number)}), number),
new DocValueFormat[0]);
InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number,
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", (double) number,
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs);
result.setShardIndex(id);
@ -392,6 +405,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
threads[i].join();
}
SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
assertFinalReduction(request);
InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0);
assertEquals(max.get(), internalMax.getValue(), 0.0D);
assertEquals(1, reduce.sortedTopDocs.scoreDocs.length);
@ -407,7 +421,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
public void testConsumerOnlyAggs() {
int expectedNumResults = randomIntBetween(1, 100);
int bufferSize = randomIntBetween(2, 200);
SearchRequest request = new SearchRequest();
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0));
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
@ -419,7 +433,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new Index("a", "b"), i, null));
result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), number),
new DocValueFormat[0]);
InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number,
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", (double) number,
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs);
result.setShardIndex(i);
@ -427,6 +441,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
consumer.consumeResult(result);
}
SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
assertFinalReduction(request);
InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0);
assertEquals(max.get(), internalMax.getValue(), 0.0D);
assertEquals(0, reduce.sortedTopDocs.scoreDocs.length);
@ -441,7 +456,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
public void testConsumerOnlyHits() {
int expectedNumResults = randomIntBetween(1, 100);
int bufferSize = randomIntBetween(2, 200);
SearchRequest request = new SearchRequest();
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
if (randomBoolean()) {
request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10)));
}
@ -460,6 +475,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
consumer.consumeResult(result);
}
SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
assertFinalReduction(request);
assertEquals(1, reduce.sortedTopDocs.scoreDocs.length);
assertEquals(max.get(), reduce.maxScore, 0.0f);
assertEquals(expectedNumResults, reduce.totalHits.value);
@ -470,6 +486,12 @@ public class SearchPhaseControllerTests extends ESTestCase {
assertNull(reduce.sortedTopDocs.collapseValues);
}
private void assertFinalReduction(SearchRequest searchRequest) {
assertThat(reductions.size(), greaterThanOrEqualTo(1));
//the last reduction step was the final one only if no cluster alias was provided with the search request
assertEquals(searchRequest.getLocalClusterAlias() == null, reductions.get(reductions.size() - 1));
}
public void testNewSearchPhaseResults() {
for (int i = 0; i < 10; i++) {
int expectedNumResults = randomIntBetween(1, 10);
@ -540,7 +562,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
public void testConsumerSortByField() {
int expectedNumResults = randomIntBetween(1, 100);
int bufferSize = randomIntBetween(2, 200);
SearchRequest request = new SearchRequest();
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
int size = randomIntBetween(1, 10);
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
@ -560,6 +582,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
consumer.consumeResult(result);
}
SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
assertFinalReduction(request);
assertEquals(Math.min(expectedNumResults, size), reduce.sortedTopDocs.scoreDocs.length);
assertEquals(expectedNumResults, reduce.totalHits.value);
assertEquals(max.get(), ((FieldDoc)reduce.sortedTopDocs.scoreDocs[0]).fields[0]);
@ -574,7 +597,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
public void testConsumerFieldCollapsing() {
int expectedNumResults = randomIntBetween(30, 100);
int bufferSize = randomIntBetween(2, 200);
SearchRequest request = new SearchRequest();
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
int size = randomIntBetween(5, 10);
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
@ -596,6 +619,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
consumer.consumeResult(result);
}
SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce();
assertFinalReduction(request);
assertEquals(3, reduce.sortedTopDocs.scoreDocs.length);
assertEquals(expectedNumResults, reduce.totalHits.value);
assertEquals(a, ((FieldDoc)reduce.sortedTopDocs.scoreDocs[0]).fields[0]);