diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 82f7760c1ab..66b0317146f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -407,17 +407,18 @@ public final class SearchPhaseController { * Reduces the given query results and consumes all aggregations and profile results. * @param queryResults a list of non-null query shard results */ - public ReducedQueryPhase reducedScrollQueryPhase(Collection queryResults) { - return reducedQueryPhase(queryResults, true, true); + ReducedQueryPhase reducedScrollQueryPhase(Collection queryResults) { + return reducedQueryPhase(queryResults, true, true, true); } /** * Reduces the given query results and consumes all aggregations and profile results. * @param queryResults a list of non-null query shard results */ - public ReducedQueryPhase reducedQueryPhase(Collection queryResults, - boolean isScrollRequest, boolean trackTotalHits) { - return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(trackTotalHits), 0, isScrollRequest); + ReducedQueryPhase reducedQueryPhase(Collection queryResults, + boolean isScrollRequest, boolean trackTotalHits, boolean performFinalReduce) { + return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(trackTotalHits), 0, isScrollRequest, + performFinalReduce); } /** @@ -433,7 +434,8 @@ public final class SearchPhaseController { */ private ReducedQueryPhase reducedQueryPhase(Collection queryResults, List bufferedAggs, List bufferedTopDocs, - TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest) { + TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest, + boolean performFinalReduce) { assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases; numReducePhases++; // increment for this phase boolean timedOut = false; @@ -499,7 +501,7 @@ public final class SearchPhaseController { } } final Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions)); - ReduceContext reduceContext = reduceContextFunction.apply(true); + ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce); final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList, firstResult.pipelineAggregators(), reduceContext); final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults); @@ -507,7 +509,7 @@ public final class SearchPhaseController { final TotalHits totalHits = topDocsStats.getTotalHits(); return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore, timedOut, terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs, - firstResult.sortValueFormats(), numReducePhases, size, from, firstResult == null); + firstResult.sortValueFormats(), numReducePhases, size, from, false); } /** @@ -617,6 +619,7 @@ public final class SearchPhaseController { private final SearchPhaseController controller; private int numReducePhases = 0; private final TopDocsStats topDocsStats = new TopDocsStats(); + private final boolean performFinalReduce; /** * Creates a new {@link QueryPhaseResultConsumer} @@ -626,7 +629,7 @@ public final class SearchPhaseController { * the buffer is used to incrementally reduce aggregation results before all shards responded. */ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize, - boolean hasTopDocs, boolean hasAggs) { + boolean hasTopDocs, boolean hasAggs, boolean performFinalReduce) { super(expectedResultSize); if (expectedResultSize != 1 && bufferSize < 2) { throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result"); @@ -644,6 +647,7 @@ public final class SearchPhaseController { this.hasTopDocs = hasTopDocs; this.hasAggs = hasAggs; this.bufferSize = bufferSize; + this.performFinalReduce = performFinalReduce; } @Override @@ -693,7 +697,7 @@ public final class SearchPhaseController { @Override public ReducedQueryPhase reduce() { return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats, - numReducePhases, false); + numReducePhases, false, performFinalReduce); } /** @@ -715,18 +719,19 @@ public final class SearchPhaseController { final boolean hasAggs = source != null && source.aggregations() != null; final boolean hasTopDocs = source == null || source.size() != 0; final boolean trackTotalHits = source == null || source.trackTotalHits(); + final boolean finalReduce = request.getLocalClusterAlias() == null; if (isScrollRequest == false && (hasAggs || hasTopDocs)) { // no incremental reduce if scroll is used - we only hit a single shard or sometimes more... if (request.getBatchedReduceSize() < numShards) { // only use this if there are aggs and if there are more shards than we should reduce at once - return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs); + return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, finalReduce); } } return new InitialSearchPhase.ArraySearchPhaseResults(numShards) { @Override ReducedQueryPhase reduce() { - return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits); + return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits, finalReduce); } }; } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 585108fef8a..3859e3b7f38 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -56,28 +56,34 @@ import org.elasticsearch.test.ESTestCase; import org.junit.Before; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; public class SearchPhaseControllerTests extends ESTestCase { private SearchPhaseController searchPhaseController; + private List reductions; @Before public void setup() { + reductions = new CopyOnWriteArrayList<>(); searchPhaseController = new SearchPhaseController( - (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); + (finalReduce) -> { + reductions.add(finalReduce); + return new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, finalReduce); + }); } public void testSort() { @@ -158,7 +164,7 @@ public class SearchPhaseControllerTests extends ESTestCase { AtomicArray queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false); for (boolean trackTotalHits : new boolean[] {true, false}) { SearchPhaseController.ReducedQueryPhase reducedQueryPhase = - searchPhaseController.reducedQueryPhase(queryResults.asList(), false, trackTotalHits); + searchPhaseController.reducedQueryPhase(queryResults.asList(), false, trackTotalHits, true); AtomicArray fetchResults = generateFetchResults(nShards, reducedQueryPhase.sortedTopDocs.scoreDocs, reducedQueryPhase.suggest); InternalSearchResponse mergedResponse = searchPhaseController.merge(false, @@ -308,14 +314,15 @@ public class SearchPhaseControllerTests extends ESTestCase { public void testConsumer() { int bufferSize = randomIntBetween(2, 3); - SearchRequest request = new SearchRequest(); + SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, 3); + assertEquals(0, reductions.size()); QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new Index("a", "b"), 0, null)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), new DocValueFormat[0]); - InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 1.0D, DocValueFormat.RAW, + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 1.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(0); @@ -324,7 +331,7 @@ public class SearchPhaseControllerTests extends ESTestCase { result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0, null)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), new DocValueFormat[0]); - aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 3.0D, DocValueFormat.RAW, + aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 3.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(2); @@ -333,23 +340,29 @@ public class SearchPhaseControllerTests extends ESTestCase { result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0, null)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), new DocValueFormat[0]); - aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 2.0D, DocValueFormat.RAW, + aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 2.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(1); consumer.consumeResult(result); - int numTotalReducePhases = 1; + final int numTotalReducePhases; if (bufferSize == 2) { assertThat(consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)); assertEquals(1, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumReducePhases()); assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumBuffered()); - numTotalReducePhases++; + assertEquals(1, reductions.size()); + assertEquals(false, reductions.get(0)); + numTotalReducePhases = 2; } else { assertThat(consumer, not(instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class))); + assertEquals(0, reductions.size()); + numTotalReducePhases = 1; } SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); assertEquals(numTotalReducePhases, reduce.numReducePhases); + assertEquals(numTotalReducePhases, reductions.size()); + assertFinalReduction(request); InternalMax max = (InternalMax) reduce.aggregations.asList().get(0); assertEquals(3.0D, max.getValue(), 0.0D); assertFalse(reduce.sortedTopDocs.isSortedByField); @@ -362,7 +375,7 @@ public class SearchPhaseControllerTests extends ESTestCase { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = new SearchRequest(); + SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = @@ -378,7 +391,7 @@ public class SearchPhaseControllerTests extends ESTestCase { result.topDocs(new TopDocsAndMaxScore( new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(0, number)}), number), new DocValueFormat[0]); - InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number, + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", (double) number, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(id); @@ -392,6 +405,7 @@ public class SearchPhaseControllerTests extends ESTestCase { threads[i].join(); } SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertFinalReduction(request); InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0); assertEquals(max.get(), internalMax.getValue(), 0.0D); assertEquals(1, reduce.sortedTopDocs.scoreDocs.length); @@ -407,7 +421,7 @@ public class SearchPhaseControllerTests extends ESTestCase { public void testConsumerOnlyAggs() { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = new SearchRequest(); + SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0)); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = @@ -419,7 +433,7 @@ public class SearchPhaseControllerTests extends ESTestCase { QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new Index("a", "b"), i, null)); result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), number), new DocValueFormat[0]); - InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number, + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", (double) number, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); result.setShardIndex(i); @@ -427,6 +441,7 @@ public class SearchPhaseControllerTests extends ESTestCase { consumer.consumeResult(result); } SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertFinalReduction(request); InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0); assertEquals(max.get(), internalMax.getValue(), 0.0D); assertEquals(0, reduce.sortedTopDocs.scoreDocs.length); @@ -441,7 +456,7 @@ public class SearchPhaseControllerTests extends ESTestCase { public void testConsumerOnlyHits() { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = new SearchRequest(); + SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); if (randomBoolean()) { request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10))); } @@ -460,6 +475,7 @@ public class SearchPhaseControllerTests extends ESTestCase { consumer.consumeResult(result); } SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertFinalReduction(request); assertEquals(1, reduce.sortedTopDocs.scoreDocs.length); assertEquals(max.get(), reduce.maxScore, 0.0f); assertEquals(expectedNumResults, reduce.totalHits.value); @@ -470,6 +486,12 @@ public class SearchPhaseControllerTests extends ESTestCase { assertNull(reduce.sortedTopDocs.collapseValues); } + private void assertFinalReduction(SearchRequest searchRequest) { + assertThat(reductions.size(), greaterThanOrEqualTo(1)); + //the last reduction step was the final one only if no cluster alias was provided with the search request + assertEquals(searchRequest.getLocalClusterAlias() == null, reductions.get(reductions.size() - 1)); + } + public void testNewSearchPhaseResults() { for (int i = 0; i < 10; i++) { int expectedNumResults = randomIntBetween(1, 10); @@ -540,7 +562,7 @@ public class SearchPhaseControllerTests extends ESTestCase { public void testConsumerSortByField() { int expectedNumResults = randomIntBetween(1, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = new SearchRequest(); + SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); int size = randomIntBetween(1, 10); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = @@ -560,6 +582,7 @@ public class SearchPhaseControllerTests extends ESTestCase { consumer.consumeResult(result); } SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertFinalReduction(request); assertEquals(Math.min(expectedNumResults, size), reduce.sortedTopDocs.scoreDocs.length); assertEquals(expectedNumResults, reduce.totalHits.value); assertEquals(max.get(), ((FieldDoc)reduce.sortedTopDocs.scoreDocs[0]).fields[0]); @@ -574,7 +597,7 @@ public class SearchPhaseControllerTests extends ESTestCase { public void testConsumerFieldCollapsing() { int expectedNumResults = randomIntBetween(30, 100); int bufferSize = randomIntBetween(2, 200); - SearchRequest request = new SearchRequest(); + SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote"); int size = randomIntBetween(5, 10); request.setBatchedReduceSize(bufferSize); InitialSearchPhase.ArraySearchPhaseResults consumer = @@ -596,6 +619,7 @@ public class SearchPhaseControllerTests extends ESTestCase { consumer.consumeResult(result); } SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + assertFinalReduction(request); assertEquals(3, reduce.sortedTopDocs.scoreDocs.length); assertEquals(expectedNumResults, reduce.totalHits.value); assertEquals(a, ((FieldDoc)reduce.sortedTopDocs.scoreDocs[0]).fields[0]);