From 4125f012b94032ff91c25a71fb12fef4f470b3e5 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 30 Mar 2017 14:32:42 +0200 Subject: [PATCH] Streamline shard index availability in all SearchPhaseResults (#23788) Today we have the shard target and the target request ID available in SearchPhaseResults. Yet, the coordinating node maintains a shard index to reference the request, response tuples internally which is also used in many other classes to reference back from fetch results to query results etc. Today this shard index is implicitly passed via the index in AtomicArray which causes an undesirable dependency on this interface. This commit moves the shard index into the SearchPhaseResult and removes some dependencies on AtomicArray. Further removals will follow in the future. The most important refactoring here is the removal of AtomicArray.Entry which used to be created for every element in the atomic array to maintain the shard index during result processing. This caused an unnecessary indirection, dependency and potentially thousands of unnecessary objects in every search phase. --- .../bulk/byscroll/ParentBulkByScrollTask.java | 49 ++++++---- .../search/AbstractSearchAsyncAction.java | 18 ++-- .../action/search/CountedCollector.java | 21 ++-- .../action/search/DfsQueryPhase.java | 46 +++++---- .../action/search/FetchSearchPhase.java | 43 ++++----- .../action/search/InitialSearchPhase.java | 32 +++---- .../action/search/SearchActionListener.java | 53 ++++++++++ .../SearchDfsQueryThenFetchAsyncAction.java | 2 +- .../action/search/SearchPhaseContext.java | 4 +- .../action/search/SearchPhaseController.java | 96 +++++++++---------- .../SearchQueryThenFetchAsyncAction.java | 10 +- .../SearchScrollQueryAndFetchAsyncAction.java | 30 +++--- ...SearchScrollQueryThenFetchAsyncAction.java | 64 +++++++------ .../action/search/SearchTransportService.java | 28 +++--- .../action/search/TransportSearchHelper.java | 7 +- .../support/tasks/TransportTasksAction.java | 12 +-- .../common/util/concurrent/AtomicArray.java | 30 +----- .../elasticsearch/indices/IndicesService.java | 3 +- .../search/SearchPhaseResult.java | 59 +++++++++++- .../elasticsearch/search/SearchService.java | 3 +- .../search/dfs/DfsSearchResult.java | 30 +----- .../search/fetch/FetchSearchResult.java | 30 ++---- .../search/fetch/QueryFetchSearchResult.java | 27 ++++-- .../fetch/ScrollQueryFetchSearchResult.java | 36 +++++-- .../search/query/QuerySearchResult.java | 27 ++---- .../query/QuerySearchResultProvider.java | 41 -------- .../search/query/ScrollQuerySearchResult.java | 42 ++++---- .../AbstractSearchAsyncActionTookTests.java | 2 +- .../action/search/CountedCollectorTests.java | 12 ++- .../action/search/DfsQueryPhaseTests.java | 48 ++++------ .../action/search/FetchSearchPhaseTests.java | 57 ++++++----- .../action/search/SearchAsyncActionTests.java | 32 ++----- .../search/SearchPhaseControllerTests.java | 65 +++++++------ .../search/SearchServiceTests.java | 5 +- 34 files changed, 543 insertions(+), 521 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java delete mode 100644 core/src/main/java/org/elasticsearch/search/query/QuerySearchResultProvider.java diff --git a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ParentBulkByScrollTask.java b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ParentBulkByScrollTask.java index dfaf03f4115..a37dc61897c 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ParentBulkByScrollTask.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/ParentBulkByScrollTask.java @@ -40,7 +40,7 @@ public class ParentBulkByScrollTask extends BulkByScrollTask { * Holds the responses as they come back. This uses {@link Tuple} as an "Either" style holder where only the response or the exception * is set. */ - private final AtomicArray> results; + private final AtomicArray results; private final AtomicInteger counter; public ParentBulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId, int slices) { @@ -82,13 +82,11 @@ public class ParentBulkByScrollTask extends BulkByScrollTask { } private void addResultsToList(List sliceStatuses) { - for (AtomicArray.Entry> t : results.asList()) { - if (t.value != null) { - if (t.value.v1() != null) { - sliceStatuses.set(t.index, new StatusOrException(t.value.v1().getStatus())); - } else { - sliceStatuses.set(t.index, new StatusOrException(t.value.v2())); - } + for (Result t : results.asList()) { + if (t.response != null) { + sliceStatuses.set(t.sliceId, new StatusOrException(t.response.getStatus())); + } else { + sliceStatuses.set(t.sliceId, new StatusOrException(t.failure)); } } } @@ -97,7 +95,7 @@ public class ParentBulkByScrollTask extends BulkByScrollTask { * Record a response from a slice and respond to the listener if the request is finished. */ public void onSliceResponse(ActionListener listener, int sliceId, BulkByScrollResponse response) { - results.setOnce(sliceId, new Tuple<>(response, null)); + results.setOnce(sliceId, new Result(sliceId, response)); /* If the request isn't finished we could automatically rethrottle the sub-requests here but we would only want to do that if we * were fairly sure they had a while left to go. */ recordSliceCompletionAndRespondIfAllDone(listener); @@ -107,7 +105,7 @@ public class ParentBulkByScrollTask extends BulkByScrollTask { * Record a failure from a slice and respond to the listener if the request is finished. */ void onSliceFailure(ActionListener listener, int sliceId, Exception e) { - results.setOnce(sliceId, new Tuple<>(null, e)); + results.setOnce(sliceId, new Result(sliceId, e)); recordSliceCompletionAndRespondIfAllDone(listener); // TODO cancel when a slice fails? } @@ -118,17 +116,17 @@ public class ParentBulkByScrollTask extends BulkByScrollTask { } List responses = new ArrayList<>(results.length()); Exception exception = null; - for (AtomicArray.Entry> t : results.asList()) { - if (t.value.v1() == null) { - assert t.value.v2() != null : "exception shouldn't be null if value is null"; + for (Result t : results.asList()) { + if (t.response == null) { + assert t.failure != null : "exception shouldn't be null if value is null"; if (exception == null) { - exception = t.value.v2(); + exception = t.failure; } else { - exception.addSuppressed(t.value.v2()); + exception.addSuppressed(t.failure); } } else { - assert t.value.v2() == null : "exception should be null if response is not null"; - responses.add(t.value.v1()); + assert t.failure == null : "exception should be null if response is not null"; + responses.add(t.response); } } if (exception == null) { @@ -138,4 +136,21 @@ public class ParentBulkByScrollTask extends BulkByScrollTask { } } + private static final class Result { + final BulkByScrollResponse response; + final int sliceId; + final Exception failure; + + private Result(int sliceId, BulkByScrollResponse response) { + this.sliceId = sliceId; + this.response = response; + failure = null; + } + + private Result(int sliceId, Exception failure) { + this.sliceId = sliceId; + this.failure = failure; + response = null; + } + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index b90b5c8240e..c2137803411 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -131,7 +131,7 @@ abstract class AbstractSearchAsyncAction exten } else { if (logger.isTraceEnabled()) { final String resultsFrom = results.getSuccessfulResults() - .map(r -> r.shardTarget().toString()).collect(Collectors.joining(",")); + .map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(",")); logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})", currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion); } @@ -159,10 +159,10 @@ abstract class AbstractSearchAsyncAction exten if (shardFailures == null) { return ShardSearchFailure.EMPTY_ARRAY; } - List> entries = shardFailures.asList(); + List entries = shardFailures.asList(); ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; for (int i = 0; i < failures.length; i++) { - failures[i] = entries.get(i).value; + failures[i] = entries.get(i); } return failures; } @@ -209,8 +209,8 @@ abstract class AbstractSearchAsyncAction exten private void raisePhaseFailure(SearchPhaseExecutionException exception) { results.getSuccessfulResults().forEach((entry) -> { try { - Transport.Connection connection = nodeIdToConnection.apply(entry.shardTarget().getNodeId()); - sendReleaseSearchContext(entry.id(), connection); + Transport.Connection connection = nodeIdToConnection.apply(entry.getSearchShardTarget().getNodeId()); + sendReleaseSearchContext(entry.getRequestId(), connection); } catch (Exception inner) { inner.addSuppressed(exception); logger.trace("failed to release context", inner); @@ -220,18 +220,18 @@ abstract class AbstractSearchAsyncAction exten } @Override - public final void onShardSuccess(int shardIndex, Result result) { + public final void onShardSuccess(Result result) { successfulOps.incrementAndGet(); - results.consumeResult(shardIndex, result); + results.consumeResult(result); if (logger.isTraceEnabled()) { - logger.trace("got first-phase result from {}", result != null ? result.shardTarget() : null); + logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null); } // clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level // so its ok concurrency wise to miss potentially the shard failures being created because of another failure // in the #addShardFailure, because by definition, it will happen on *another* shardIndex AtomicArray shardFailures = this.shardFailures.get(); if (shardFailures != null) { - shardFailures.set(shardIndex, null); + shardFailures.set(result.getShardIndex(), null); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/CountedCollector.java b/core/src/main/java/org/elasticsearch/action/search/CountedCollector.java index 65f2d2d280b..2dd255aa14c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/CountedCollector.java +++ b/core/src/main/java/org/elasticsearch/action/search/CountedCollector.java @@ -23,18 +23,20 @@ import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; +import java.util.function.Consumer; + /** * This is a simple base class to simplify fan out to shards and collect their results. Each results passed to - * {@link #onResult(int, SearchPhaseResult, SearchShardTarget)} will be set to the provided result array + * {@link #onResult(SearchPhaseResult)} will be set to the provided result array * where the given index is used to set the result on the array. */ final class CountedCollector { - private final ResultConsumer resultConsumer; + private final Consumer resultConsumer; private final CountDown counter; private final Runnable onFinish; private final SearchPhaseContext context; - CountedCollector(ResultConsumer resultConsumer, int expectedOps, Runnable onFinish, SearchPhaseContext context) { + CountedCollector(Consumer resultConsumer, int expectedOps, Runnable onFinish, SearchPhaseContext context) { this.resultConsumer = resultConsumer; this.counter = new CountDown(expectedOps); this.onFinish = onFinish; @@ -55,10 +57,9 @@ final class CountedCollector { /** * Sets the result to the given array index and then runs {@link #countDown()} */ - void onResult(int index, R result, SearchShardTarget target) { + void onResult(R result) { try { - result.shardTarget(target); - resultConsumer.consume(index, result); + resultConsumer.accept(result); } finally { countDown(); } @@ -75,12 +76,4 @@ final class CountedCollector { countDown(); } } - - /** - * A functional interface to plug in shard result consumers to this collector - */ - @FunctionalInterface - public interface ResultConsumer { - void consume(int shardIndex, R result); - } } diff --git a/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java b/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java index 0ac3c69b8eb..353baf11750 100644 --- a/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java @@ -20,16 +20,17 @@ package org.elasticsearch.action.search; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.query.QuerySearchRequest; -import org.elasticsearch.search.query.QuerySearchResultProvider; +import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.transport.Transport; import java.io.IOException; +import java.util.List; import java.util.function.Function; /** @@ -40,16 +41,16 @@ import java.util.function.Function; * @see CountedCollector#onFailure(int, SearchShardTarget, Exception) */ final class DfsQueryPhase extends SearchPhase { - private final InitialSearchPhase.SearchPhaseResults queryResult; + private final InitialSearchPhase.SearchPhaseResults queryResult; private final SearchPhaseController searchPhaseController; private final AtomicArray dfsSearchResults; - private final Function, SearchPhase> nextPhaseFactory; + private final Function, SearchPhase> nextPhaseFactory; private final SearchPhaseContext context; private final SearchTransportService searchTransportService; DfsQueryPhase(AtomicArray dfsSearchResults, SearchPhaseController searchPhaseController, - Function, SearchPhase> nextPhaseFactory, + Function, SearchPhase> nextPhaseFactory, SearchPhaseContext context) { super("dfs_query"); this.queryResult = searchPhaseController.newSearchPhaseResults(context.getRequest(), context.getNumShards()); @@ -64,22 +65,26 @@ final class DfsQueryPhase extends SearchPhase { public void run() throws IOException { // TODO we can potentially also consume the actual per shard results from the initial phase here in the aggregateDfs // to free up memory early - final AggregatedDfs dfs = searchPhaseController.aggregateDfs(dfsSearchResults); - final CountedCollector counter = new CountedCollector<>(queryResult::consumeResult, - dfsSearchResults.asList().size(), - () -> { - context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)); - }, context); - for (final AtomicArray.Entry entry : dfsSearchResults.asList()) { - DfsSearchResult dfsResult = entry.value; - final int shardIndex = entry.index; - final SearchShardTarget searchShardTarget = dfsResult.shardTarget(); + final List resultList = dfsSearchResults.asList(); + final AggregatedDfs dfs = searchPhaseController.aggregateDfs(resultList); + final CountedCollector counter = new CountedCollector<>(queryResult::consumeResult, + resultList.size(), + () -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)), context); + for (final DfsSearchResult dfsResult : resultList) { + final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget(); Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId()); - QuerySearchRequest querySearchRequest = new QuerySearchRequest(context.getRequest(), dfsResult.id(), dfs); + QuerySearchRequest querySearchRequest = new QuerySearchRequest(context.getRequest(), dfsResult.getRequestId(), dfs); + final int shardIndex = dfsResult.getShardIndex(); searchTransportService.sendExecuteQuery(connection, querySearchRequest, context.getTask(), - ActionListener.wrap( - result -> counter.onResult(shardIndex, result, searchShardTarget), - exception -> { + new SearchActionListener(searchShardTarget, shardIndex) { + + @Override + protected void innerOnResponse(QuerySearchResult response) { + counter.onResult(response); + } + + @Override + public void onFailure(Exception exception) { try { if (context.getLogger().isDebugEnabled()) { context.getLogger().debug((Supplier) () -> new ParameterizedMessage("[{}] Failed to execute query phase", @@ -92,7 +97,8 @@ final class DfsQueryPhase extends SearchPhase { // release it again to be in the safe side context.sendReleaseSearchContext(querySearchRequest.id(), connection); } - })); + } + }); } } } diff --git a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 20d91770675..fe9204d3364 100644 --- a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -23,15 +23,14 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.transport.Transport; import java.io.IOException; @@ -45,13 +44,13 @@ import java.util.function.Function; final class FetchSearchPhase extends SearchPhase { private final AtomicArray fetchResults; private final SearchPhaseController searchPhaseController; - private final AtomicArray queryResults; + private final AtomicArray queryResults; private final Function nextPhaseFactory; private final SearchPhaseContext context; private final Logger logger; - private final InitialSearchPhase.SearchPhaseResults resultConsumer; + private final InitialSearchPhase.SearchPhaseResults resultConsumer; - FetchSearchPhase(InitialSearchPhase.SearchPhaseResults resultConsumer, + FetchSearchPhase(InitialSearchPhase.SearchPhaseResults resultConsumer, SearchPhaseController searchPhaseController, SearchPhaseContext context) { this(resultConsumer, searchPhaseController, context, @@ -59,7 +58,7 @@ final class FetchSearchPhase extends SearchPhase { (finalResponse) -> sendResponsePhase(finalResponse, context))); } - FetchSearchPhase(InitialSearchPhase.SearchPhaseResults resultConsumer, + FetchSearchPhase(InitialSearchPhase.SearchPhaseResults resultConsumer, SearchPhaseController searchPhaseController, SearchPhaseContext context, Function nextPhaseFactory) { super("fetch"); @@ -98,35 +97,35 @@ final class FetchSearchPhase extends SearchPhase { private void innerRun() throws IOException { final int numShards = context.getNumShards(); final boolean isScrollSearch = context.getRequest().scroll() != null; - ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollSearch, queryResults); + List phaseResults = queryResults.asList(); + ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollSearch, phaseResults, context.getNumShards()); String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null; - List> queryResultsAsList = queryResults.asList(); final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce(); final boolean queryAndFetchOptimization = queryResults.length() == 1; final Runnable finishPhase = () -> moveToNextPhase(searchPhaseController, sortedShardDocs, scrollId, reducedQueryPhase, queryAndFetchOptimization ? queryResults : fetchResults); if (queryAndFetchOptimization) { - assert queryResults.get(0) == null || queryResults.get(0).fetchResult() != null; + assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null; // query AND fetch optimization finishPhase.run(); } else { final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, sortedShardDocs); if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return - queryResultsAsList.stream() - .map(e -> e.value.queryResult()) + phaseResults.stream() + .map(e -> e.queryResult()) .forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources finishPhase.run(); } else { final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ? searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, numShards) : null; - final CountedCollector counter = new CountedCollector<>(fetchResults::set, + final CountedCollector counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r), docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not finishPhase, context); for (int i = 0; i < docIdsToLoad.length; i++) { IntArrayList entry = docIdsToLoad[i]; - QuerySearchResultProvider queryResult = queryResults.get(i); + SearchPhaseResult queryResult = queryResults.get(i); if (entry == null) { // no results for this shard ID if (queryResult != null) { // if we got some hits from this shard we have to release the context there @@ -137,10 +136,10 @@ final class FetchSearchPhase extends SearchPhase { // in any case we count down this result since we don't talk to this shard anymore counter.countDown(); } else { - Transport.Connection connection = context.getConnection(queryResult.shardTarget().getNodeId()); - ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().id(), i, entry, + Transport.Connection connection = context.getConnection(queryResult.getSearchShardTarget().getNodeId()); + ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry, lastEmittedDocPerShard); - executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(), + executeFetch(i, queryResult.getSearchShardTarget(), counter, fetchSearchRequest, queryResult.queryResult(), connection); } } @@ -159,10 +158,10 @@ final class FetchSearchPhase extends SearchPhase { final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult, final Transport.Connection connection) { context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(), - new ActionListener() { + new SearchActionListener(shardTarget, shardIndex) { @Override - public void onResponse(FetchSearchResult result) { - counter.onResult(shardIndex, result, shardTarget); + public void innerOnResponse(FetchSearchResult result) { + counter.onResult(result); } @Override @@ -191,8 +190,8 @@ final class FetchSearchPhase extends SearchPhase { // and if it has at lease one hit that didn't make it to the global topDocs if (context.getRequest().scroll() == null && queryResult.hasHits()) { try { - Transport.Connection connection = context.getConnection(queryResult.shardTarget().getNodeId()); - context.sendReleaseSearchContext(queryResult.id(), connection); + Transport.Connection connection = context.getConnection(queryResult.getSearchShardTarget().getNodeId()); + context.sendReleaseSearchContext(queryResult.getRequestId(), connection); } catch (Exception e) { context.getLogger().trace("failed to release context", e); } @@ -201,7 +200,7 @@ final class FetchSearchPhase extends SearchPhase { private void moveToNextPhase(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs, String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase, - AtomicArray fetchResultsArr) { + AtomicArray fetchResultsArr) { final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null, sortedDocs, reducedQueryPhase, fetchResultsArr); context.executeNextPhase(this, nextPhaseFactory.apply(context.buildSearchResponse(internalResponse, scrollId))); diff --git a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index f21e9d228d6..be91cebe501 100644 --- a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -21,7 +21,6 @@ package org.elasticsearch.action.search; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.routing.GroupShardsIterator; @@ -144,10 +143,11 @@ abstract class InitialSearchPhase extends onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); } else { try { - executePhaseOnShard(shardIt, shard, new ActionListener() { + executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard.currentNodeId(), + shardIt.shardId()), shardIndex) { @Override - public void onResponse(FirstResult result) { - onShardResult(shardIndex, shard.currentNodeId(), result, shardIt); + public void innerOnResponse(FirstResult result) { + onShardResult(result, shardIt); } @Override @@ -164,9 +164,10 @@ abstract class InitialSearchPhase extends } } - private void onShardResult(int shardIndex, String nodeId, FirstResult result, ShardIterator shardIt) { - result.shardTarget(new SearchShardTarget(nodeId, shardIt.shardId())); - onShardSuccess(shardIndex, result); + private void onShardResult(FirstResult result, ShardIterator shardIt) { + assert result.getShardIndex() != -1 : "shard index is not set"; + assert result.getSearchShardTarget() != null : "search shard target must not be null"; + onShardSuccess(result); // we need to increment successful ops first before we compare the exit condition otherwise if we // are fast we could concurrently update totalOps but then preempt one of the threads which can // cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc. @@ -185,7 +186,7 @@ abstract class InitialSearchPhase extends /** * Executed once all shard results have been received and processed * @see #onShardFailure(int, SearchShardTarget, Exception) - * @see #onShardSuccess(int, SearchPhaseResult) + * @see #onShardSuccess(SearchPhaseResult) */ abstract void onPhaseDone(); // as a tribute to @kimchy aka. finishHim() @@ -201,12 +202,10 @@ abstract class InitialSearchPhase extends /** * Executed once for every successful shard level request. - * @param shardIndex the internal index for this shard. Each shard has an index / ordinal assigned that is used to reference - * it's results * @param result the result returned form the shard * */ - abstract void onShardSuccess(int shardIndex, FirstResult result); + abstract void onShardSuccess(FirstResult result); /** * Sends the request to the actual shard. @@ -214,7 +213,7 @@ abstract class InitialSearchPhase extends * @param shard the shard routing to send the request for * @param listener the listener to notify on response */ - protected abstract void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener listener); + protected abstract void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, SearchActionListener listener); /** * This class acts as a basic result collection that can be extended to do on-the-fly reduction or result processing @@ -237,17 +236,16 @@ abstract class InitialSearchPhase extends * A stream of all non-null (successful) shard results */ final Stream getSuccessfulResults() { - return results.asList().stream().map(e -> e.value); + return results.asList().stream(); } /** * Consumes a single shard result - * @param shardIndex the shards index, this is a 0-based id that is used to establish a 1 to 1 mapping to the searched shards * @param result the shards result */ - void consumeResult(int shardIndex, Result result) { - assert results.get(shardIndex) == null : "shardIndex: " + shardIndex + " is already set"; - results.set(shardIndex, result); + void consumeResult(Result result) { + assert results.get(result.getShardIndex()) == null : "shardIndex: " + result.getShardIndex() + " is already set"; + results.set(result.getShardIndex(), result); } /** diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java b/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java new file mode 100644 index 00000000000..709d1e5e237 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java @@ -0,0 +1,53 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; + +/** + * An base action listener that ensures shard target and shard index is set on all responses + * received by this listener. + */ +abstract class SearchActionListener implements ActionListener { + private final int requestIndex; + private final SearchShardTarget searchShardTarget; + + protected SearchActionListener(SearchShardTarget searchShardTarget, + int shardIndex) { + assert shardIndex >= 0 : "shard index must be positive"; + this.searchShardTarget = searchShardTarget; + this.requestIndex = shardIndex; + } + + @Override + public final void onResponse(T response) { + response.setShardIndex(requestIndex); + setSearchShardTarget(response); + innerOnResponse(response); + } + + protected void setSearchShardTarget(T response) { // some impls need to override this + response.setSearchShardTarget(searchShardTarget); + } + + protected abstract void innerOnResponse(T response); + +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index d3b2ea3a98e..7151c8712ed 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -72,7 +72,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction protected void executePhaseOnShard( final ShardIterator shardIt, final ShardRouting shard, - final ActionListener listener) { + final SearchActionListener listener) { getSearchTransport().sendExecuteDfs(getConnection(shard.currentNodeId()), buildShardSearchRequest(shardIt, shard) , getTask(), listener); } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java index 1a21eb3cc34..26c5403f4ab 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java @@ -93,8 +93,8 @@ interface SearchPhaseContext extends ActionListener, Executor { /** * Releases a search context with the given context ID on the node the given connection is connected to. - * @see org.elasticsearch.search.query.QuerySearchResult#id() - * @see org.elasticsearch.search.fetch.FetchSearchResult#id() + * @see org.elasticsearch.search.query.QuerySearchResult#getRequestId() + * @see org.elasticsearch.search.fetch.FetchSearchResult#getRequestId() * */ default void sendReleaseSearchContext(long contextId, Transport.Connection connection) { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index cec44d9e9e5..be111bf3464 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; @@ -52,7 +53,6 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.profile.SearchProfileShardResults; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.Suggest.Suggestion; import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry; @@ -81,13 +81,13 @@ public class SearchPhaseController extends AbstractComponent { this.scriptService = scriptService; } - public AggregatedDfs aggregateDfs(AtomicArray results) { + public AggregatedDfs aggregateDfs(List results) { ObjectObjectHashMap termStatistics = HppcMaps.newNoNullKeysMap(); ObjectObjectHashMap fieldStatistics = HppcMaps.newNoNullKeysMap(); long aggMaxDoc = 0; - for (AtomicArray.Entry lEntry : results.asList()) { - final Term[] terms = lEntry.value.terms(); - final TermStatistics[] stats = lEntry.value.termStatistics(); + for (DfsSearchResult lEntry : results) { + final Term[] terms = lEntry.terms(); + final TermStatistics[] stats = lEntry.termStatistics(); assert terms.length == stats.length; for (int i = 0; i < terms.length; i++) { assert terms[i] != null; @@ -105,9 +105,9 @@ public class SearchPhaseController extends AbstractComponent { } - assert !lEntry.value.fieldStatistics().containsKey(null); - final Object[] keys = lEntry.value.fieldStatistics().keys; - final Object[] values = lEntry.value.fieldStatistics().values; + assert !lEntry.fieldStatistics().containsKey(null); + final Object[] keys = lEntry.fieldStatistics().keys; + final Object[] values = lEntry.fieldStatistics().values; for (int i = 0; i < keys.length; i++) { if (keys[i] != null) { String key = (String) keys[i]; @@ -127,7 +127,7 @@ public class SearchPhaseController extends AbstractComponent { } } } - aggMaxDoc += lEntry.value.maxDoc(); + aggMaxDoc += lEntry.maxDoc(); } return new AggregatedDfs(termStatistics, fieldStatistics, aggMaxDoc); } @@ -146,10 +146,9 @@ public class SearchPhaseController extends AbstractComponent { * * @param ignoreFrom Whether to ignore the from and sort all hits in each shard result. * Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase. - * @param resultsArr Shard result holder + * @param results Shard result holder */ - public ScoreDoc[] sortDocs(boolean ignoreFrom, AtomicArray resultsArr) throws IOException { - List> results = resultsArr.asList(); + public ScoreDoc[] sortDocs(boolean ignoreFrom, List results, int numShards) throws IOException { if (results.isEmpty()) { return EMPTY_DOCS; } @@ -159,25 +158,25 @@ public class SearchPhaseController extends AbstractComponent { int shardIndex = -1; if (results.size() == 1) { canOptimize = true; - result = results.get(0).value.queryResult(); - shardIndex = results.get(0).index; + result = results.get(0).queryResult(); + shardIndex = result.getShardIndex(); } else { boolean hasResult = false; QuerySearchResult resultToOptimize = null; // lets see if we only got hits from a single shard, if so, we can optimize... - for (AtomicArray.Entry entry : results) { - if (entry.value.queryResult().hasHits()) { + for (SearchPhaseResult entry : results) { + if (entry.queryResult().hasHits()) { if (hasResult) { // we already have one, can't really optimize canOptimize = false; break; } canOptimize = true; hasResult = true; - resultToOptimize = entry.value.queryResult(); - shardIndex = entry.index; + resultToOptimize = entry.queryResult(); + shardIndex = resultToOptimize.getShardIndex(); } } - result = canOptimize ? resultToOptimize : results.get(0).value.queryResult(); + result = canOptimize ? resultToOptimize : results.get(0).queryResult(); assert result != null; } if (canOptimize) { @@ -228,7 +227,6 @@ public class SearchPhaseController extends AbstractComponent { final int from = ignoreFrom ? 0 : result.queryResult().from(); final TopDocs mergedTopDocs; - final int numShards = resultsArr.length(); if (result.queryResult().topDocs() instanceof CollapseTopFieldDocs) { CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) result.queryResult().topDocs(); final Sort sort = new Sort(firstTopDocs.fields); @@ -239,11 +237,11 @@ public class SearchPhaseController extends AbstractComponent { } else if (result.queryResult().topDocs() instanceof TopFieldDocs) { TopFieldDocs firstTopDocs = (TopFieldDocs) result.queryResult().topDocs(); final Sort sort = new Sort(firstTopDocs.fields); - final TopFieldDocs[] shardTopDocs = new TopFieldDocs[resultsArr.length()]; + final TopFieldDocs[] shardTopDocs = new TopFieldDocs[numShards]; fillTopDocs(shardTopDocs, results, new TopFieldDocs(0, new FieldDoc[0], sort.getSort(), Float.NaN)); mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs, true); } else { - final TopDocs[] shardTopDocs = new TopDocs[resultsArr.length()]; + final TopDocs[] shardTopDocs = new TopDocs[numShards]; fillTopDocs(shardTopDocs, results, Lucene.EMPTY_TOP_DOCS); mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs, true); } @@ -251,11 +249,11 @@ public class SearchPhaseController extends AbstractComponent { ScoreDoc[] scoreDocs = mergedTopDocs.scoreDocs; final Map>> groupedCompletionSuggestions = new HashMap<>(); // group suggestions and assign shard index - for (AtomicArray.Entry sortedResult : results) { - Suggest shardSuggest = sortedResult.value.queryResult().suggest(); + for (SearchPhaseResult sortedResult : results) { + Suggest shardSuggest = sortedResult.queryResult().suggest(); if (shardSuggest != null) { for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) { - suggestion.setShardIndex(sortedResult.index); + suggestion.setShardIndex(sortedResult.getShardIndex()); List> suggestions = groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); suggestions.add(suggestion); @@ -286,17 +284,17 @@ public class SearchPhaseController extends AbstractComponent { } static void fillTopDocs(T[] shardTopDocs, - List> results, + List results, T empytTopDocs) { if (results.size() != shardTopDocs.length) { // TopDocs#merge can't deal with null shard TopDocs Arrays.fill(shardTopDocs, empytTopDocs); } - for (AtomicArray.Entry resultProvider : results) { - final T topDocs = (T) resultProvider.value.queryResult().topDocs(); + for (SearchPhaseResult resultProvider : results) { + final T topDocs = (T) resultProvider.queryResult().topDocs(); assert topDocs != null : "top docs must not be null in a valid result"; // the 'index' field is the position in the resultsArr atomic array - shardTopDocs[resultProvider.index] = topDocs; + shardTopDocs[resultProvider.getShardIndex()] = topDocs; } } public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, @@ -340,11 +338,11 @@ public class SearchPhaseController extends AbstractComponent { */ public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs, ReducedQueryPhase reducedQueryPhase, - AtomicArray fetchResultsArr) { + AtomicArray fetchResultsArr) { if (reducedQueryPhase.isEmpty()) { return InternalSearchResponse.empty(); } - List> fetchResults = fetchResultsArr.asList(); + List fetchResults = fetchResultsArr.asList(); SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, sortedDocs, fetchResultsArr); if (reducedQueryPhase.suggest != null) { if (!fetchResults.isEmpty()) { @@ -353,7 +351,7 @@ public class SearchPhaseController extends AbstractComponent { final List suggestionOptions = suggestion.getOptions(); for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) { ScoreDoc shardDoc = sortedDocs[scoreDocIndex]; - QuerySearchResultProvider searchResultProvider = fetchResultsArr.get(shardDoc.shardIndex); + SearchPhaseResult searchResultProvider = fetchResultsArr.get(shardDoc.shardIndex); if (searchResultProvider == null) { continue; } @@ -364,7 +362,7 @@ public class SearchPhaseController extends AbstractComponent { CompletionSuggestion.Entry.Option suggestOption = suggestionOptions.get(scoreDocIndex - currentOffset); hit.score(shardDoc.score); - hit.shard(fetchResult.shardTarget()); + hit.shard(fetchResult.getSearchShardTarget()); suggestOption.setHit(hit); } } @@ -377,8 +375,8 @@ public class SearchPhaseController extends AbstractComponent { } private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, ScoreDoc[] sortedDocs, - AtomicArray fetchResultsArr) { - List> fetchResults = fetchResultsArr.asList(); + AtomicArray fetchResultsArr) { + List fetchResults = fetchResultsArr.asList(); boolean sorted = false; int sortScoreIndex = -1; if (reducedQueryPhase.oneResult.topDocs() instanceof TopFieldDocs) { @@ -396,8 +394,8 @@ public class SearchPhaseController extends AbstractComponent { } } // clean the fetch counter - for (AtomicArray.Entry entry : fetchResults) { - entry.value.fetchResult().initCounter(); + for (SearchPhaseResult entry : fetchResults) { + entry.fetchResult().initCounter(); } int from = ignoreFrom ? 0 : reducedQueryPhase.oneResult.queryResult().from(); int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.oneResult.size()); @@ -408,7 +406,7 @@ public class SearchPhaseController extends AbstractComponent { if (!fetchResults.isEmpty()) { for (int i = 0; i < numSearchHits; i++) { ScoreDoc shardDoc = sortedDocs[i]; - QuerySearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex); + SearchPhaseResult fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex); if (fetchResultProvider == null) { continue; } @@ -417,7 +415,7 @@ public class SearchPhaseController extends AbstractComponent { if (index < fetchResult.hits().internalHits().length) { SearchHit searchHit = fetchResult.hits().internalHits()[index]; searchHit.score(shardDoc.score); - searchHit.shard(fetchResult.shardTarget()); + searchHit.shard(fetchResult.getSearchShardTarget()); if (sorted) { FieldDoc fieldDoc = (FieldDoc) shardDoc; searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.oneResult.sortValueFormats()); @@ -437,7 +435,7 @@ public class SearchPhaseController extends AbstractComponent { * Reduces the given query results and consumes all aggregations and profile results. * @param queryResults a list of non-null query shard results */ - public final ReducedQueryPhase reducedQueryPhase(List> queryResults) { + public final ReducedQueryPhase reducedQueryPhase(List queryResults) { return reducedQueryPhase(queryResults, null, 0); } @@ -450,7 +448,7 @@ public class SearchPhaseController extends AbstractComponent { * @see QuerySearchResult#consumeAggs() * @see QuerySearchResult#consumeProfileResult() */ - private ReducedQueryPhase reducedQueryPhase(List> queryResults, + private ReducedQueryPhase reducedQueryPhase(List queryResults, List bufferdAggs, int numReducePhases) { assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases; numReducePhases++; // increment for this phase @@ -463,7 +461,7 @@ public class SearchPhaseController extends AbstractComponent { return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null, numReducePhases); } - final QuerySearchResult firstResult = queryResults.get(0).value.queryResult(); + final QuerySearchResult firstResult = queryResults.get(0).queryResult(); final boolean hasSuggest = firstResult.suggest() != null; final boolean hasProfileResults = firstResult.hasProfileResults(); final boolean consumeAggs; @@ -487,8 +485,8 @@ public class SearchPhaseController extends AbstractComponent { final Map> groupedSuggestions = hasSuggest ? new HashMap<>() : Collections.emptyMap(); final Map profileResults = hasProfileResults ? new HashMap<>(queryResults.size()) : Collections.emptyMap(); - for (AtomicArray.Entry entry : queryResults) { - QuerySearchResult result = entry.value.queryResult(); + for (SearchPhaseResult entry : queryResults) { + QuerySearchResult result = entry.queryResult(); if (result.searchTimedOut()) { timedOut = true; } @@ -515,7 +513,7 @@ public class SearchPhaseController extends AbstractComponent { aggregationsList.add((InternalAggregations) result.consumeAggs()); } if (hasProfileResults) { - String key = result.shardTarget().toString(); + String key = result.getSearchShardTarget().toString(); profileResults.put(key, result.consumeProfileResult()); } } @@ -622,7 +620,7 @@ public class SearchPhaseController extends AbstractComponent { * iff the buffer is exhausted. */ static final class QueryPhaseResultConsumer - extends InitialSearchPhase.SearchPhaseResults { + extends InitialSearchPhase.SearchPhaseResults { private final InternalAggregations[] buffer; private int index; private final SearchPhaseController controller; @@ -649,8 +647,8 @@ public class SearchPhaseController extends AbstractComponent { } @Override - public void consumeResult(int shardIndex, QuerySearchResultProvider result) { - super.consumeResult(shardIndex, result); + public void consumeResult(SearchPhaseResult result) { + super.consumeResult(result); QuerySearchResult queryResult = result.queryResult(); assert queryResult.hasAggs() : "this collector should only be used if aggs are requested"; consumeInternal(queryResult); @@ -691,7 +689,7 @@ public class SearchPhaseController extends AbstractComponent { /** * Returns a new SearchPhaseResults instance. This might return an instance that reduces search responses incrementally. */ - InitialSearchPhase.SearchPhaseResults newSearchPhaseResults(SearchRequest request, int numShards) { + InitialSearchPhase.SearchPhaseResults newSearchPhaseResults(SearchRequest request, int numShards) { SearchSourceBuilder source = request.source(); if (source != null && source.aggregations() != null) { if (request.getBatchedReduceSize() < numShards) { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index fe87b8f4dba..fd1d1977029 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -24,8 +24,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.transport.Transport; import java.util.Map; @@ -33,7 +33,7 @@ import java.util.concurrent.Executor; import java.util.function.Function; final class SearchQueryThenFetchAsyncAction - extends AbstractSearchAsyncAction { + extends AbstractSearchAsyncAction { private final SearchPhaseController searchPhaseController; @@ -69,11 +69,10 @@ final class SearchQueryThenFetchAsyncAction this.searchPhaseController = searchPhaseController; } - protected void executePhaseOnShard( final ShardIterator shardIt, final ShardRouting shard, - final ActionListener listener) { + final SearchActionListener listener) { getSearchTransport().sendExecuteQuery( getConnection(shard.currentNodeId()), buildShardSearchRequest(shardIt, shard), @@ -83,9 +82,8 @@ final class SearchQueryThenFetchAsyncAction @Override protected SearchPhase getNextPhase( - final SearchPhaseResults results, + final SearchPhaseResults results, final SearchPhaseContext context) { return new FetchSearchPhase(results, searchPhaseController, context); } - } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java index b005c0fc2fe..accf0bddf8a 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java @@ -32,13 +32,14 @@ import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.query.ScrollQuerySearchResult; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest; -class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { +final class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { private final Logger logger; private final SearchPhaseController searchPhaseController; @@ -70,21 +71,17 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { this.queryFetchResults = new AtomicArray<>(scrollId.getContext().length); } - protected final ShardSearchFailure[] buildShardFailures() { + private ShardSearchFailure[] buildShardFailures() { if (shardFailures == null) { return ShardSearchFailure.EMPTY_ARRAY; } - List> entries = shardFailures.asList(); - ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; - for (int i = 0; i < failures.length; i++) { - failures[i] = entries.get(i).value; - } - return failures; + List failures = shardFailures.asList(); + return failures.toArray(new ShardSearchFailure[failures.size()]); } // we do our best to return the shard failures, but its ok if its not fully concurrently safe // we simply try and return as much as possible - protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) { + private void addShardFailure(final int shardIndex, ShardSearchFailure failure) { if (shardFailures == null) { shardFailures = new AtomicArray<>(scrollId.getContext().length); } @@ -130,15 +127,20 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); - searchTransportService.sendExecuteFetch(node, internalRequest, task, new ActionListener() { + searchTransportService.sendExecuteScrollFetch(node, internalRequest, task, + new SearchActionListener(null, shardIndex) { @Override - public void onResponse(ScrollQueryFetchSearchResult result) { - queryFetchResults.set(shardIndex, result.result()); + protected void setSearchShardTarget(ScrollQueryFetchSearchResult response) { + // don't do this - it's part of the response... + assert response.getSearchShardTarget() != null : "search shard target must not be null"; + } + @Override + protected void innerOnResponse(ScrollQueryFetchSearchResult response) { + queryFetchResults.set(response.getShardIndex(), response.result()); if (counter.decrementAndGet() == 0) { finishHim(); } } - @Override public void onFailure(Exception t) { onPhaseFailure(t, searchId, shardIndex); @@ -170,7 +172,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { } private void innerFinishHim() throws Exception { - ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults); + ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults.asList(), queryFetchResults.length()); final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, searchPhaseController.reducedQueryPhase(queryFetchResults.asList()), queryFetchResults); String scrollId = null; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java index 13c81c1d5e6..2745e0b13ea 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.internal.InternalScrollSearchRequest; @@ -41,7 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest; -class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { +final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { private final Logger logger; private final SearchTask task; @@ -73,21 +74,17 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { this.fetchResults = new AtomicArray<>(scrollId.getContext().length); } - protected final ShardSearchFailure[] buildShardFailures() { + private ShardSearchFailure[] buildShardFailures() { if (shardFailures == null) { return ShardSearchFailure.EMPTY_ARRAY; } - List> entries = shardFailures.asList(); - ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; - for (int i = 0; i < failures.length; i++) { - failures[i] = entries.get(i).value; - } - return failures; + List failures = shardFailures.asList(); + return failures.toArray(new ShardSearchFailure[failures.size()]); } // we do our best to return the shard failures, but its ok if its not fully concurrently safe // we simply try and return as much as possible - protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) { + private void addShardFailure(final int shardIndex, ShardSearchFailure failure) { if (shardFailures == null) { shardFailures = new AtomicArray<>(scrollId.getContext().length); } @@ -99,8 +96,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY)); return; } - final AtomicInteger counter = new AtomicInteger(scrollId.getContext().length); - + final CountDown counter = new CountDown(scrollId.getContext().length); ScrollIdForNode[] context = scrollId.getContext(); for (int i = 0; i < context.length; i++) { ScrollIdForNode target = context[i]; @@ -112,7 +108,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { logger.debug("Node [{}] not available for scroll request [{}]", target.getNode(), scrollId.getSource()); } successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { + if (counter.countDown()) { try { executeFetchPhase(); } catch (Exception e) { @@ -124,13 +120,21 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { } } - private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) { + private void executeQueryPhase(final int shardIndex, final CountDown counter, DiscoveryNode node, final long searchId) { InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); - searchTransportService.sendExecuteQuery(node, internalRequest, task, new ActionListener() { + searchTransportService.sendExecuteScrollQuery(node, internalRequest, task, + new SearchActionListener(null, shardIndex) { + @Override - public void onResponse(ScrollQuerySearchResult result) { - queryResults.set(shardIndex, result.queryResult()); - if (counter.decrementAndGet() == 0) { + protected void setSearchShardTarget(ScrollQuerySearchResult response) { + // don't do this - it's part of the response... + assert response.getSearchShardTarget() != null : "search shard target must not be null"; + } + + @Override + protected void innerOnResponse(ScrollQuerySearchResult result) { + queryResults.setOnce(result.getShardIndex(), result.queryResult()); + if (counter.countDown()) { try { executeFetchPhase(); } catch (Exception e) { @@ -146,13 +150,13 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { }); } - void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Exception failure) { + void onQueryPhaseFailure(final int shardIndex, final CountDown counter, final long searchId, Exception failure) { if (logger.isDebugEnabled()) { logger.debug((Supplier) () -> new ParameterizedMessage("[{}] Failed to execute query phase", searchId), failure); } addShardFailure(shardIndex, new ShardSearchFailure(failure)); successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { + if (counter.countDown()) { if (successfulOps.get() == 0) { listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", failure, buildShardFailures())); } else { @@ -167,7 +171,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { } private void executeFetchPhase() throws Exception { - sortedShardDocs = searchPhaseController.sortDocs(true, queryResults); + sortedShardDocs = searchPhaseController.sortDocs(true, queryResults.asList(), queryResults.length()); if (sortedShardDocs.length == 0) { finishHim(searchPhaseController.reducedQueryPhase(queryResults.asList())); return; @@ -177,21 +181,21 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList()); final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, queryResults.length()); - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.length); + final CountDown counter = new CountDown(docIdsToLoad.length); for (int i = 0; i < docIdsToLoad.length; i++) { final int index = i; final IntArrayList docIds = docIdsToLoad[index]; if (docIds != null) { final QuerySearchResult querySearchResult = queryResults.get(index); ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[index]; - ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc); - DiscoveryNode node = nodes.get(querySearchResult.shardTarget().getNodeId()); - searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, task, new ActionListener() { + ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.getRequestId(), docIds, lastEmittedDoc); + DiscoveryNode node = nodes.get(querySearchResult.getSearchShardTarget().getNodeId()); + searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, task, + new SearchActionListener(querySearchResult.getSearchShardTarget(), index) { @Override - public void onResponse(FetchSearchResult result) { - result.shardTarget(querySearchResult.shardTarget()); - fetchResults.set(index, result); - if (counter.decrementAndGet() == 0) { + protected void innerOnResponse(FetchSearchResult response) { + fetchResults.setOnce(response.getShardIndex(), response); + if (counter.countDown()) { finishHim(reducedQueryPhase); } } @@ -202,14 +206,14 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { logger.debug("Failed to execute fetch phase", t); } successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { + if (counter.countDown()) { finishHim(reducedQueryPhase); } } }); } else { // the counter is set to the total size of docIdsToLoad which can have null values so we have to count them down too - if (counter.decrementAndGet() == 0) { + if (counter.countDown()) { finishHim(reducedQueryPhase); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 4ebf7c79c2a..80583e24c9c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult; @@ -42,7 +43,6 @@ import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.query.ScrollQuerySearchResult; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -118,17 +118,17 @@ public class SearchTransportService extends AbstractLifecycleComponent { } public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, - final ActionListener listener) { + final SearchActionListener listener) { transportService.sendChildRequest(connection, DFS_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, DfsSearchResult::new)); } public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, - final ActionListener listener) { + final SearchActionListener listener) { // we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request // this used to be the QUERY_AND_FETCH which doesn't exists anymore. final boolean fetchDocuments = request.numberOfShards() == 1; - Supplier supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new; + Supplier supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new; if (connection.getVersion().onOrBefore(Version.V_5_3_0_UNRELEASED) && fetchDocuments) { // TODO this BWC layer can be removed once this is back-ported to 5.3 transportService.sendChildRequest(connection, QUERY_FETCH_ACTION_NAME, request, task, @@ -140,35 +140,35 @@ public class SearchTransportService extends AbstractLifecycleComponent { } public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task, - final ActionListener listener) { + final SearchActionListener listener) { transportService.sendChildRequest(connection, QUERY_ID_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, QuerySearchResult::new)); } - public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task, - final ActionListener listener) { + public void sendExecuteScrollQuery(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task, + final SearchActionListener listener) { transportService.sendChildRequest(transportService.getConnection(node), QUERY_SCROLL_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new)); } - public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task, - final ActionListener listener) { + public void sendExecuteScrollFetch(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task, + final SearchActionListener listener) { transportService.sendChildRequest(transportService.getConnection(node), QUERY_FETCH_SCROLL_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new)); } public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task, - final ActionListener listener) { + final SearchActionListener listener) { sendExecuteFetch(connection, FETCH_ID_ACTION_NAME, request, task, listener); } public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, SearchTask task, - final ActionListener listener) { + final SearchActionListener listener) { sendExecuteFetch(transportService.getConnection(node), FETCH_ID_SCROLL_ACTION_NAME, request, task, listener); } private void sendExecuteFetch(Transport.Connection connection, String action, final ShardFetchRequest request, SearchTask task, - final ActionListener listener) { + final SearchActionListener listener) { transportService.sendChildRequest(connection, action, request, task, new ActionListenerResponseHandler<>(listener, FetchSearchResult::new)); } @@ -327,7 +327,7 @@ public class SearchTransportService extends AbstractLifecycleComponent { new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { - QuerySearchResultProvider result = searchService.executeQueryPhase(request, (SearchTask)task); + SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task); channel.sendResponse(result); } }); @@ -361,7 +361,7 @@ public class SearchTransportService extends AbstractLifecycleComponent { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { assert request.numberOfShards() == 1 : "expected single shard request but got: " + request.numberOfShards(); - QuerySearchResultProvider result = searchService.executeQueryPhase(request, (SearchTask)task); + SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task); channel.sendResponse(result); } }); diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java index 975c0be6f0f..e494bb6768d 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java @@ -38,10 +38,9 @@ final class TransportSearchHelper { try (RAMOutputStream out = new RAMOutputStream()) { out.writeString(searchPhaseResults.length() == 1 ? ParsedScrollId.QUERY_AND_FETCH_TYPE : ParsedScrollId.QUERY_THEN_FETCH_TYPE); out.writeVInt(searchPhaseResults.asList().size()); - for (AtomicArray.Entry entry : searchPhaseResults.asList()) { - SearchPhaseResult searchPhaseResult = entry.value; - out.writeLong(searchPhaseResult.id()); - out.writeString(searchPhaseResult.shardTarget().getNodeId()); + for (SearchPhaseResult searchPhaseResult : searchPhaseResults.asList()) { + out.writeLong(searchPhaseResult.getRequestId()); + out.writeString(searchPhaseResult.getSearchShardTarget().getNodeId()); } byte[] bytes = new byte[(int) out.getFilePointer()]; out.writeTo(bytes, 0); diff --git a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index 9d1cf5e37e7..86d158784c0 100644 --- a/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -135,14 +135,14 @@ public abstract class TransportTasksAction< } List results = new ArrayList<>(); List exceptions = new ArrayList<>(); - for (AtomicArray.Entry> response : responses.asList()) { - if (response.value.v1() == null) { - assert response.value.v2() != null; + for (Tuple response : responses.asList()) { + if (response.v1() == null) { + assert response.v2() != null; exceptions.add(new TaskOperationFailure(clusterService.localNode().getId(), tasks.get(taskIndex).getId(), - response.value.v2())); + response.v2())); } else { - assert response.value.v2() == null; - results.add(response.value.v1()); + assert response.v2() == null; + results.add(response.v1()); } } listener.onResponse(new NodeTasksResponse(clusterService.localNode().getId(), results, exceptions)); diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java index 2278220d9dd..2bf5e50a1c2 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java @@ -40,7 +40,7 @@ public class AtomicArray { } private final AtomicReferenceArray array; - private volatile List> nonNullList; + private volatile List nonNullList; public AtomicArray(int size) { array = new AtomicReferenceArray<>(size); @@ -87,19 +87,18 @@ public class AtomicArray { } /** - * Returns the it as a non null list, with an Entry wrapping each value allowing to - * retain its index. + * Returns the it as a non null list. */ - public List> asList() { + public List asList() { if (nonNullList == null) { if (array == null || array.length() == 0) { nonNullList = Collections.emptyList(); } else { - List> list = new ArrayList<>(array.length()); + List list = new ArrayList<>(array.length()); for (int i = 0; i < array.length(); i++) { E e = array.get(i); if (e != null) { - list.add(new Entry<>(i, e)); + list.add(e); } } nonNullList = list; @@ -120,23 +119,4 @@ public class AtomicArray { } return a; } - - /** - * An entry within the array. - */ - public static class Entry { - /** - * The original index of the value within the array. - */ - public final int index; - /** - * The value. - */ - public final E value; - - public Entry(int index, E value) { - this.index = index; - this.value = value; - } - } } diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index a4e4c83bc00..7bf80cc1986 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -121,7 +121,6 @@ import java.io.Closeable; import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; -import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -1145,7 +1144,7 @@ public class IndicesService extends AbstractLifecycleComponent final QuerySearchResult result = context.queryResult(); StreamInput in = new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistry); result.readFromWithId(context.id(), in); - result.shardTarget(context.shardTarget()); + result.setSearchShardTarget(context.shardTarget()); } else if (context.queryResult().searchTimedOut()) { // we have to invalidate the cache entry if we cached a query result form a request that timed out. // we can't really throw exceptions in the loading part to signal a timed out search to the outside world since if there are diff --git a/core/src/main/java/org/elasticsearch/search/SearchPhaseResult.java b/core/src/main/java/org/elasticsearch/search/SearchPhaseResult.java index 003f37616f5..ede9f525a5a 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchPhaseResult.java +++ b/core/src/main/java/org/elasticsearch/search/SearchPhaseResult.java @@ -20,12 +20,63 @@ package org.elasticsearch.search; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.transport.TransportResponse; -public interface SearchPhaseResult extends Streamable { +/** + * This class is a base class for all search releated results. It contains the shard target it + * was executed against, a shard index used to reference the result on the coordinating node + * and a request ID that is used to reference the request context on the executing node. The + * request ID is particularly important since it is used to reference and maintain a context + * across search phases to ensure the same point in time snapshot is used for querying and + * fetching etc. + */ +public abstract class SearchPhaseResult extends TransportResponse implements Streamable { - long id(); + private SearchShardTarget searchShardTarget; + private int shardIndex = -1; + protected long requestId; - SearchShardTarget shardTarget(); + /** + * Returns the results request ID that is used to reference the search context on the executing + * node + */ + public long getRequestId() { + return requestId; + } - void shardTarget(SearchShardTarget shardTarget); + /** + * Returns the shard index in the context of the currently executing search request that is + * used for accounting on the coordinating node + */ + public int getShardIndex() { + assert shardIndex != -1 : "shardIndex is not set"; + return shardIndex; + } + + public SearchShardTarget getSearchShardTarget() { + return searchShardTarget; + } + + public void setSearchShardTarget(SearchShardTarget shardTarget) { + this.searchShardTarget = shardTarget; + } + + public void setShardIndex(int shardIndex) { + assert shardIndex >= 0 : "shardIndex must be >= 0 but was: " + shardIndex; + this.shardIndex = shardIndex; + } + + /** + * Returns the query result iff it's included in this response otherwise null + */ + public QuerySearchResult queryResult() { + return null; + } + + /** + * Returns the fetch result iff it's included in this response otherwise null + */ + public FetchSearchResult fetchResult() { return null; } } diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 3d093e5ae72..a0352281952 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -75,7 +75,6 @@ import org.elasticsearch.search.profile.Profilers; import org.elasticsearch.search.query.QueryPhase; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.query.ScrollQuerySearchResult; import org.elasticsearch.search.rescore.RescoreBuilder; import org.elasticsearch.search.searchafter.SearchAfterBuilder; @@ -248,7 +247,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } } - public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException { + public SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException { final SearchContext context = createAndPutContext(request); final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); context.incRef(); diff --git a/core/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java b/core/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java index bf3d9527246..0cd624b00a3 100644 --- a/core/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java @@ -30,44 +30,24 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.transport.TransportResponse; import java.io.IOException; -public class DfsSearchResult extends TransportResponse implements SearchPhaseResult { +public class DfsSearchResult extends SearchPhaseResult { private static final Term[] EMPTY_TERMS = new Term[0]; private static final TermStatistics[] EMPTY_TERM_STATS = new TermStatistics[0]; - - private SearchShardTarget shardTarget; - private long id; private Term[] terms; private TermStatistics[] termStatistics; private ObjectObjectHashMap fieldStatistics = HppcMaps.newNoNullKeysMap(); private int maxDoc; public DfsSearchResult() { - } public DfsSearchResult(long id, SearchShardTarget shardTarget) { - this.id = id; - this.shardTarget = shardTarget; - } - - @Override - public long id() { - return this.id; - } - - @Override - public SearchShardTarget shardTarget() { - return shardTarget; - } - - @Override - public void shardTarget(SearchShardTarget shardTarget) { - this.shardTarget = shardTarget; + this.setSearchShardTarget(shardTarget); + this.requestId = id; } public DfsSearchResult maxDoc(int maxDoc) { @@ -105,7 +85,7 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - id = in.readLong(); + requestId = in.readLong(); int termsSize = in.readVInt(); if (termsSize == 0) { terms = EMPTY_TERMS; @@ -125,7 +105,7 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeLong(id); + out.writeLong(requestId); out.writeVInt(terms.length); for (Term term : terms) { out.writeString(term.field()); diff --git a/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java b/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java index 1e2def8cc61..a5f27733ad2 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java @@ -22,28 +22,25 @@ package org.elasticsearch.search.fetch; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import java.io.IOException; -public class FetchSearchResult extends QuerySearchResultProvider { +public final class FetchSearchResult extends SearchPhaseResult { - private long id; - private SearchShardTarget shardTarget; private SearchHits hits; // client side counter private transient int counter; public FetchSearchResult() { - } public FetchSearchResult(long id, SearchShardTarget shardTarget) { - this.id = id; - this.shardTarget = shardTarget; + this.requestId = id; + setSearchShardTarget(shardTarget); } @Override @@ -56,21 +53,6 @@ public class FetchSearchResult extends QuerySearchResultProvider { return this; } - @Override - public long id() { - return this.id; - } - - @Override - public SearchShardTarget shardTarget() { - return this.shardTarget; - } - - @Override - public void shardTarget(SearchShardTarget shardTarget) { - this.shardTarget = shardTarget; - } - public void hits(SearchHits hits) { assert assertNoSearchTarget(hits); this.hits = hits; @@ -105,14 +87,14 @@ public class FetchSearchResult extends QuerySearchResultProvider { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - id = in.readLong(); + requestId = in.readLong(); hits = SearchHits.readSearchHits(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeLong(id); + out.writeLong(requestId); hits.writeTo(out); } } diff --git a/core/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java b/core/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java index 35c4dbd6597..8d1e6276e65 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java @@ -21,22 +21,21 @@ package org.elasticsearch.search.fetch; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import java.io.IOException; import static org.elasticsearch.search.fetch.FetchSearchResult.readFetchSearchResult; import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchResult; -public class QueryFetchSearchResult extends QuerySearchResultProvider { +public final class QueryFetchSearchResult extends SearchPhaseResult { private QuerySearchResult queryResult; private FetchSearchResult fetchResult; public QueryFetchSearchResult() { - } public QueryFetchSearchResult(QuerySearchResult queryResult, FetchSearchResult fetchResult) { @@ -45,19 +44,27 @@ public class QueryFetchSearchResult extends QuerySearchResultProvider { } @Override - public long id() { - return queryResult.id(); + public long getRequestId() { + return queryResult.getRequestId(); } @Override - public SearchShardTarget shardTarget() { - return queryResult.shardTarget(); + public SearchShardTarget getSearchShardTarget() { + return queryResult.getSearchShardTarget(); } @Override - public void shardTarget(SearchShardTarget shardTarget) { - queryResult.shardTarget(shardTarget); - fetchResult.shardTarget(shardTarget); + public void setSearchShardTarget(SearchShardTarget shardTarget) { + super.setSearchShardTarget(shardTarget); + queryResult.setSearchShardTarget(shardTarget); + fetchResult.setSearchShardTarget(shardTarget); + } + + @Override + public void setShardIndex(int requestIndex) { + super.setShardIndex(requestIndex); + queryResult.setShardIndex(requestIndex); + fetchResult.setShardIndex(requestIndex); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java b/core/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java index e8a9af00127..55aa4a96d01 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java @@ -21,46 +21,64 @@ package org.elasticsearch.search.fetch; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.search.query.QuerySearchResult; import java.io.IOException; import static org.elasticsearch.search.fetch.QueryFetchSearchResult.readQueryFetchSearchResult; -public class ScrollQueryFetchSearchResult extends TransportResponse { +public final class ScrollQueryFetchSearchResult extends SearchPhaseResult { private QueryFetchSearchResult result; - private SearchShardTarget shardTarget; public ScrollQueryFetchSearchResult() { } public ScrollQueryFetchSearchResult(QueryFetchSearchResult result, SearchShardTarget shardTarget) { this.result = result; - this.shardTarget = shardTarget; + setSearchShardTarget(shardTarget); } public QueryFetchSearchResult result() { return result; } - public SearchShardTarget shardTarget() { - return shardTarget; + @Override + public void setSearchShardTarget(SearchShardTarget shardTarget) { + super.setSearchShardTarget(shardTarget); + result.setSearchShardTarget(shardTarget); + } + + @Override + public void setShardIndex(int shardIndex) { + super.setShardIndex(shardIndex); + result.setShardIndex(shardIndex); + } + + @Override + public QuerySearchResult queryResult() { + return result.queryResult(); + } + + @Override + public FetchSearchResult fetchResult() { + return result.fetchResult(); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - shardTarget = new SearchShardTarget(in); + SearchShardTarget searchShardTarget = new SearchShardTarget(in); result = readQueryFetchSearchResult(in); - result.shardTarget(shardTarget); + setSearchShardTarget(searchShardTarget); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - shardTarget.writeTo(out); + getSearchShardTarget().writeTo(out); result.writeTo(out); } } diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index a8d8ae74062..15403f99677 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -24,6 +24,7 @@ import org.apache.lucene.search.TopDocs; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregations; @@ -40,10 +41,8 @@ import static java.util.Collections.emptyList; import static org.elasticsearch.common.lucene.Lucene.readTopDocs; import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; -public final class QuerySearchResult extends QuerySearchResultProvider { +public final class QuerySearchResult extends SearchPhaseResult { - private long id; - private SearchShardTarget shardTarget; private int from; private int size; private TopDocs topDocs; @@ -61,8 +60,8 @@ public final class QuerySearchResult extends QuerySearchResultProvider { } public QuerySearchResult(long id, SearchShardTarget shardTarget) { - this.id = id; - this.shardTarget = shardTarget; + this.requestId = id; + setSearchShardTarget(shardTarget); } @Override @@ -70,20 +69,6 @@ public final class QuerySearchResult extends QuerySearchResultProvider { return this; } - @Override - public long id() { - return this.id; - } - - @Override - public SearchShardTarget shardTarget() { - return shardTarget; - } - - @Override - public void shardTarget(SearchShardTarget shardTarget) { - this.shardTarget = shardTarget; - } public void searchTimedOut(boolean searchTimedOut) { this.searchTimedOut = searchTimedOut; @@ -230,7 +215,7 @@ public final class QuerySearchResult extends QuerySearchResultProvider { } public void readFromWithId(long id, StreamInput in) throws IOException { - this.id = id; + this.requestId = id; from = in.readVInt(); size = in.readVInt(); int numSortFieldsPlus1 = in.readVInt(); @@ -260,7 +245,7 @@ public final class QuerySearchResult extends QuerySearchResultProvider { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeLong(id); + out.writeLong(requestId); writeToNoId(out); } diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResultProvider.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchResultProvider.java deleted file mode 100644 index 852a97e5248..00000000000 --- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchResultProvider.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.search.query; - -import org.elasticsearch.search.SearchPhaseResult; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.transport.TransportResponse; - -public abstract class QuerySearchResultProvider extends TransportResponse implements SearchPhaseResult { - - /** - * Returns the query result iff it's included in this response otherwise null - */ - public QuerySearchResult queryResult() { - return null; - } - - /** - * Returns the fetch result iff it's included in this response otherwise null - */ - public FetchSearchResult fetchResult() { - return null; - } -} diff --git a/core/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java b/core/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java index 9137a72acb5..64014594899 100644 --- a/core/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java +++ b/core/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java @@ -21,46 +21,54 @@ package org.elasticsearch.search.query; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.transport.TransportResponse; import java.io.IOException; import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchResult; -public class ScrollQuerySearchResult extends TransportResponse { +public final class ScrollQuerySearchResult extends SearchPhaseResult { - private QuerySearchResult queryResult; - private SearchShardTarget shardTarget; + private QuerySearchResult result; public ScrollQuerySearchResult() { } - public ScrollQuerySearchResult(QuerySearchResult queryResult, SearchShardTarget shardTarget) { - this.queryResult = queryResult; - this.shardTarget = shardTarget; + public ScrollQuerySearchResult(QuerySearchResult result, SearchShardTarget shardTarget) { + this.result = result; + setSearchShardTarget(shardTarget); } + @Override + public void setSearchShardTarget(SearchShardTarget shardTarget) { + super.setSearchShardTarget(shardTarget); + result.setSearchShardTarget(shardTarget); + } + + @Override + public void setShardIndex(int shardIndex) { + super.setShardIndex(shardIndex); + result.setShardIndex(shardIndex); + } + + @Override public QuerySearchResult queryResult() { - return queryResult; - } - - public SearchShardTarget shardTarget() { - return shardTarget; + return result; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - shardTarget = new SearchShardTarget(in); - queryResult = readQuerySearchResult(in); - queryResult.shardTarget(shardTarget); + SearchShardTarget shardTarget = new SearchShardTarget(in); + result = readQuerySearchResult(in); + setSearchShardTarget(shardTarget); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - shardTarget.writeTo(out); - queryResult.writeTo(out); + getSearchShardTarget().writeTo(out); + result.writeTo(out); } } diff --git a/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTookTests.java b/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTookTests.java index cd8b0743675..beec582b13f 100644 --- a/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTookTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTookTests.java @@ -125,7 +125,7 @@ public class AbstractSearchAsyncActionTookTests extends ESTestCase { protected void executePhaseOnShard( final ShardIterator shardIt, final ShardRouting shard, - final ActionListener listener) { + final SearchActionListener listener) { } diff --git a/core/src/test/java/org/elasticsearch/action/search/CountedCollectorTests.java b/core/src/test/java/org/elasticsearch/action/search/CountedCollectorTests.java index 6995ad93f25..ccb75ff3ab4 100644 --- a/core/src/test/java/org/elasticsearch/action/search/CountedCollectorTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/CountedCollectorTests.java @@ -46,7 +46,7 @@ public class CountedCollectorTests extends ESTestCase { runnable.run(); } }; - CountedCollector collector = new CountedCollector<>(results::set, numResultsExpected, + CountedCollector collector = new CountedCollector<>(r -> results.set(r.getShardIndex(), r), numResultsExpected, latch::countDown, context); for (int i = 0; i < numResultsExpected; i++) { int shardID = i; @@ -57,8 +57,12 @@ public class CountedCollectorTests extends ESTestCase { break; case 1: state.add(1); - executor.execute(() -> collector.onResult(shardID, new DfsSearchResult(shardID, null), new SearchShardTarget("foo", - new Index("bar", "baz"), shardID))); + executor.execute(() -> { + DfsSearchResult dfsSearchResult = new DfsSearchResult(shardID, null); + dfsSearchResult.setShardIndex(shardID); + dfsSearchResult.setSearchShardTarget(new SearchShardTarget("foo", + new Index("bar", "baz"), shardID)); + collector.onResult(dfsSearchResult);}); break; case 2: state.add(2); @@ -79,7 +83,7 @@ public class CountedCollectorTests extends ESTestCase { break; case 1: assertNotNull(results.get(i)); - assertEquals(i, results.get(i).id()); + assertEquals(i, results.get(i).getRequestId()); break; case 2: final int shardId = i; diff --git a/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java index ba01559e0f0..c2f21a7cc2c 100644 --- a/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java @@ -18,52 +18,42 @@ */ package org.elasticsearch.action.search; -import org.apache.logging.log4j.Logger; import org.apache.lucene.index.Term; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TermStatistics; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.MockDirectoryWrapper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.dfs.DfsSearchResult; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.fetch.QueryFetchSearchResult; -import org.elasticsearch.search.fetch.ShardFetchSearchRequest; -import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.Transport; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class DfsQueryPhaseTests extends ESTestCase { + private static DfsSearchResult newSearchResult(int shardIndex, long requestId, SearchShardTarget target) { + DfsSearchResult result = new DfsSearchResult(requestId, target); + result.setShardIndex(shardIndex); + return result; + } + public void testDfsWith2Shards() throws IOException { AtomicArray results = new AtomicArray<>(2); - AtomicReference> responseRef = new AtomicReference<>(); - results.set(0, new DfsSearchResult(1, new SearchShardTarget("node1", new Index("test", "na"), 0))); - results.set(1, new DfsSearchResult(2, new SearchShardTarget("node2", new Index("test", "na"), 0))); + AtomicReference> responseRef = new AtomicReference<>(); + results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new Index("test", "na"), 0))); + results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new Index("test", "na"), 0))); results.get(0).termsStatistics(new Term[0], new TermStatistics[0]); results.get(1).termsStatistics(new Term[0], new TermStatistics[0]); @@ -73,7 +63,7 @@ public class DfsQueryPhaseTests extends ESTestCase { @Override public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, - ActionListener listener) { + SearchActionListener listener) { if (request.id() == 1) { QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); @@ -116,9 +106,9 @@ public class DfsQueryPhaseTests extends ESTestCase { public void testDfsWith1ShardFailed() throws IOException { AtomicArray results = new AtomicArray<>(2); - AtomicReference> responseRef = new AtomicReference<>(); - results.set(0, new DfsSearchResult(1, new SearchShardTarget("node1", new Index("test", "na"), 0))); - results.set(1, new DfsSearchResult(2, new SearchShardTarget("node2", new Index("test", "na"), 0))); + AtomicReference> responseRef = new AtomicReference<>(); + results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new Index("test", "na"), 0))); + results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new Index("test", "na"), 0))); results.get(0).termsStatistics(new Term[0], new TermStatistics[0]); results.get(1).termsStatistics(new Term[0], new TermStatistics[0]); @@ -128,7 +118,7 @@ public class DfsQueryPhaseTests extends ESTestCase { @Override public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, - ActionListener listener) { + SearchActionListener listener) { if (request.id() == 1) { QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); @@ -171,9 +161,9 @@ public class DfsQueryPhaseTests extends ESTestCase { public void testFailPhaseOnException() throws IOException { AtomicArray results = new AtomicArray<>(2); - AtomicReference> responseRef = new AtomicReference<>(); - results.set(0, new DfsSearchResult(1, new SearchShardTarget("node1", new Index("test", "na"), 0))); - results.set(1, new DfsSearchResult(2, new SearchShardTarget("node2", new Index("test", "na"), 0))); + AtomicReference> responseRef = new AtomicReference<>(); + results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new Index("test", "na"), 0))); + results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new Index("test", "na"), 0))); results.get(0).termsStatistics(new Term[0], new TermStatistics[0]); results.get(1).termsStatistics(new Term[0], new TermStatistics[0]); @@ -183,7 +173,7 @@ public class DfsQueryPhaseTests extends ESTestCase { @Override public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, - ActionListener listener) { + SearchActionListener listener) { if (request.id() == 1) { QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); diff --git a/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java index 14c2eb6f63f..239f8f10a41 100644 --- a/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java @@ -21,20 +21,18 @@ package org.elasticsearch.action.search; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.MockDirectoryWrapper; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.Transport; @@ -48,7 +46,7 @@ public class FetchSearchPhaseTests extends ESTestCase { public void testShortcutQueryAndFetchOptimization() throws IOException { SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); - InitialSearchPhase.SearchPhaseResults results = + InitialSearchPhase.SearchPhaseResults results = controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 1); AtomicReference responseRef = new AtomicReference<>(); boolean hasHits = randomBoolean(); @@ -59,7 +57,9 @@ public class FetchSearchPhaseTests extends ESTestCase { queryResult.size(1); FetchSearchResult fetchResult = new FetchSearchResult(); fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(42)}, 1, 1.0F)); - results.consumeResult(0, new QueryFetchSearchResult(queryResult, fetchResult)); + QueryFetchSearchResult fetchSearchResult = new QueryFetchSearchResult(queryResult, fetchResult); + fetchSearchResult.setShardIndex(0); + results.consumeResult(fetchSearchResult); numHits = 1; } else { numHits = 0; @@ -86,25 +86,27 @@ public class FetchSearchPhaseTests extends ESTestCase { public void testFetchTwoDocument() throws IOException { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); - InitialSearchPhase.SearchPhaseResults results = + InitialSearchPhase.SearchPhaseResults results = controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); AtomicReference responseRef = new AtomicReference<>(); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set - results.consumeResult(0, queryResult); + queryResult.setShardIndex(0); + results.consumeResult(queryResult); queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); - results.consumeResult(1, queryResult); + queryResult.setShardIndex(1); + results.consumeResult(queryResult); SearchTransportService searchTransportService = new SearchTransportService( Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, - ActionListener listener) { + SearchActionListener listener) { FetchSearchResult fetchResult = new FetchSearchResult(); if (request.id() == 321) { fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)}, 1, 2.0F)); @@ -138,25 +140,27 @@ public class FetchSearchPhaseTests extends ESTestCase { public void testFailFetchOneDoc() throws IOException { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); - InitialSearchPhase.SearchPhaseResults results = + InitialSearchPhase.SearchPhaseResults results = controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); AtomicReference responseRef = new AtomicReference<>(); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set - results.consumeResult(0, queryResult); + queryResult.setShardIndex(0); + results.consumeResult(queryResult); queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); - results.consumeResult(1, queryResult); + queryResult.setShardIndex(1); + results.consumeResult(queryResult); SearchTransportService searchTransportService = new SearchTransportService( Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, - ActionListener listener) { + SearchActionListener listener) { if (request.id() == 321) { FetchSearchResult fetchResult = new FetchSearchResult(); fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)}, 1, 2.0F)); @@ -195,20 +199,21 @@ public class FetchSearchPhaseTests extends ESTestCase { int numHits = randomIntBetween(2, 100); // also numshards --> 1 hit per shard SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits); - InitialSearchPhase.SearchPhaseResults results = + InitialSearchPhase.SearchPhaseResults results = controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), numHits); AtomicReference responseRef = new AtomicReference<>(); for (int i = 0; i < numHits; i++) { QuerySearchResult queryResult = new QuerySearchResult(i, new SearchShardTarget("node1", new Index("test", "na"), 0)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(i+1, i)}, i), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set - results.consumeResult(i, queryResult); + queryResult.setShardIndex(i); + results.consumeResult(queryResult); } SearchTransportService searchTransportService = new SearchTransportService( Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, - ActionListener listener) { + SearchActionListener listener) { new Thread(() -> { FetchSearchResult fetchResult = new FetchSearchResult(); fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit((int) (request.id()+1))}, 1, 100F)); @@ -249,25 +254,27 @@ public class FetchSearchPhaseTests extends ESTestCase { public void testExceptionFailsPhase() throws IOException { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); - InitialSearchPhase.SearchPhaseResults results = + InitialSearchPhase.SearchPhaseResults results = controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); AtomicReference responseRef = new AtomicReference<>(); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set - results.consumeResult(0, queryResult); + queryResult.setShardIndex(0); + results.consumeResult(queryResult); queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); - results.consumeResult(1, queryResult); + queryResult.setShardIndex(1); + results.consumeResult(queryResult); AtomicInteger numFetches = new AtomicInteger(0); SearchTransportService searchTransportService = new SearchTransportService( Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, - ActionListener listener) { + SearchActionListener listener) { FetchSearchResult fetchResult = new FetchSearchResult(); if (numFetches.incrementAndGet() == 1) { throw new RuntimeException("BOOM"); @@ -300,25 +307,27 @@ public class FetchSearchPhaseTests extends ESTestCase { public void testCleanupIrrelevantContexts() throws IOException { // contexts that are not fetched should be cleaned up MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); - InitialSearchPhase.SearchPhaseResults results = + InitialSearchPhase.SearchPhaseResults results = controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); AtomicReference responseRef = new AtomicReference<>(); int resultSetSize = 1; QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set - results.consumeResult(0, queryResult); + queryResult.setShardIndex(0); + results.consumeResult(queryResult); queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); - results.consumeResult(1, queryResult); + queryResult.setShardIndex(1); + results.consumeResult(queryResult); SearchTransportService searchTransportService = new SearchTransportService( Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, - ActionListener listener) { + SearchActionListener listener) { FetchSearchResult fetchResult = new FetchSearchResult(); if (request.id() == 321) { fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)}, 1, 2.0F)); diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 53e4eb59ae5..4813dc8ae7d 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; -import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.Transport; @@ -111,13 +110,14 @@ public class SearchAsyncActionTests extends ESTestCase { TestSearchResponse response = new TestSearchResponse(); @Override - protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener listener) { + protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, SearchActionListener + listener) { assertTrue("shard: " + shard.shardId() + " has been queried twice", response.queried.add(shard.shardId())); Transport.Connection connection = getConnection(shard.currentNodeId()); TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(), connection.getNode()); Set ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> new HashSet<>()); - ids.add(testSearchPhaseResult.id); + ids.add(testSearchPhaseResult.getRequestId()); if (randomBoolean()) { listener.onResponse(testSearchPhaseResult); } else { @@ -132,8 +132,8 @@ public class SearchAsyncActionTests extends ESTestCase { public void run() throws IOException { for (int i = 0; i < results.getNumShards(); i++) { TestSearchPhaseResult result = results.results.get(i); - assertEquals(result.node.getId(), result.shardTarget().getNodeId()); - sendReleaseSearchContext(result.id(), new MockConnection(result.node)); + assertEquals(result.node.getId(), result.getSearchShardTarget().getNodeId()); + sendReleaseSearchContext(result.getRequestId(), new MockConnection(result.node)); } responseListener.onResponse(response); latch.countDown(); @@ -193,32 +193,14 @@ public class SearchAsyncActionTests extends ESTestCase { public final Set queried = new HashSet<>(); } - public static class TestSearchPhaseResult implements SearchPhaseResult { - final long id; + public static class TestSearchPhaseResult extends SearchPhaseResult { final DiscoveryNode node; - SearchShardTarget shardTarget; public TestSearchPhaseResult(long id, DiscoveryNode node) { - this.id = id; + this.requestId = id; this.node = node; } - @Override - public long id() { - return id; - } - - @Override - public SearchShardTarget shardTarget() { - return this.shardTarget; - } - - @Override - public void shardTarget(SearchShardTarget shardTarget) { - this.shardTarget = shardTarget; - - } - @Override public void readFrom(StreamInput in) throws IOException { diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 36756aba946..ec29612ba46 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.InternalAggregations; @@ -38,7 +39,6 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.elasticsearch.test.ESTestCase; @@ -74,8 +74,8 @@ public class SearchPhaseControllerTests extends ESTestCase { } int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); - AtomicArray results = generateQueryResults(nShards, suggestions, queryResultSize, false); - ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results); + AtomicArray results = generateQueryResults(nShards, suggestions, queryResultSize, false); + ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList(), nShards); int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results)); for (Suggest.Suggestion suggestion : reducedSuggest(results)) { int suggestionSize = suggestion.getEntries().get(0).getOptions().size(); @@ -87,12 +87,12 @@ public class SearchPhaseControllerTests extends ESTestCase { public void testSortIsIdempotent() throws IOException { int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); - AtomicArray results = generateQueryResults(nShards, Collections.emptyList(), queryResultSize, + AtomicArray results = generateQueryResults(nShards, Collections.emptyList(), queryResultSize, randomBoolean() || true); boolean ignoreFrom = randomBoolean(); - ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results); + ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList(), nShards); - ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results); + ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList(), nShards); assertArrayEquals(sortedDocs, sortedDocs2); } @@ -103,7 +103,7 @@ public class SearchPhaseControllerTests extends ESTestCase { } int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); - AtomicArray queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false); + AtomicArray queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false); // calculate offsets and score doc array List mergedScoreDocs = new ArrayList<>(); @@ -138,10 +138,10 @@ public class SearchPhaseControllerTests extends ESTestCase { } } - private AtomicArray generateQueryResults(int nShards, - List suggestions, - int searchHitsSize, boolean useConstantScore) { - AtomicArray queryResults = new AtomicArray<>(nShards); + private AtomicArray generateQueryResults(int nShards, + List suggestions, + int searchHitsSize, boolean useConstantScore) { + AtomicArray queryResults = new AtomicArray<>(nShards); for (int shardIndex = 0; shardIndex < nShards; shardIndex++) { QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex, new SearchShardTarget("", new Index("", ""), shardIndex)); @@ -181,23 +181,24 @@ public class SearchPhaseControllerTests extends ESTestCase { querySearchResult.topDocs(topDocs, null); querySearchResult.size(searchHitsSize); querySearchResult.suggest(new Suggest(new ArrayList<>(shardSuggestion))); + querySearchResult.setShardIndex(shardIndex); queryResults.set(shardIndex, querySearchResult); } return queryResults; } - private int getTotalQueryHits(AtomicArray results) { + private int getTotalQueryHits(AtomicArray results) { int resultCount = 0; - for (AtomicArray.Entry shardResult : results.asList()) { - resultCount += shardResult.value.queryResult().topDocs().totalHits; + for (SearchPhaseResult shardResult : results.asList()) { + resultCount += shardResult.queryResult().topDocs().totalHits; } return resultCount; } - private Suggest reducedSuggest(AtomicArray results) { + private Suggest reducedSuggest(AtomicArray results) { Map>> groupedSuggestion = new HashMap<>(); - for (AtomicArray.Entry entry : results.asList()) { - for (Suggest.Suggestion suggestion : entry.value.queryResult().suggest()) { + for (SearchPhaseResult entry : results.asList()) { + for (Suggest.Suggestion suggestion : entry.queryResult().suggest()) { List> suggests = groupedSuggestion.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); suggests.add((Suggest.Suggestion) suggestion); @@ -207,18 +208,18 @@ public class SearchPhaseControllerTests extends ESTestCase { .collect(Collectors.toList())); } - private ScoreDoc[] getTopShardDocs(AtomicArray results) throws IOException { - List> resultList = results.asList(); + private ScoreDoc[] getTopShardDocs(AtomicArray results) throws IOException { + List resultList = results.asList(); TopDocs[] shardTopDocs = new TopDocs[resultList.size()]; for (int i = 0; i < resultList.size(); i++) { - shardTopDocs[i] = resultList.get(i).value.queryResult().topDocs(); + shardTopDocs[i] = resultList.get(i).queryResult().topDocs(); } int topN = Math.min(results.get(0).queryResult().size(), getTotalQueryHits(results)); return TopDocs.merge(topN, shardTopDocs).scoreDocs; } - private AtomicArray generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) { - AtomicArray fetchResults = new AtomicArray<>(nShards); + private AtomicArray generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) { + AtomicArray fetchResults = new AtomicArray<>(nShards); for (int shardIndex = 0; shardIndex < nShards; shardIndex++) { float maxScore = -1F; SearchShardTarget shardTarget = new SearchShardTarget("", new Index("", ""), shardIndex); @@ -257,27 +258,30 @@ public class SearchPhaseControllerTests extends ESTestCase { SearchRequest request = new SearchRequest(); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); - InitialSearchPhase.SearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, 3); + InitialSearchPhase.SearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, 3); QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new Index("a", "b"), 0)); result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]); InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 1.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); - consumer.consumeResult(0, result); + result.setShardIndex(0); + consumer.consumeResult(result); result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0)); result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]); aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 3.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); - consumer.consumeResult(2, result); + result.setShardIndex(2); + consumer.consumeResult(result); result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0)); result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]); aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 2.0D, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); - consumer.consumeResult(1, result); + result.setShardIndex(1); + consumer.consumeResult(result); int numTotalReducePhases = 1; if (bufferSize == 2) { assertThat(consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)); @@ -301,7 +305,7 @@ public class SearchPhaseControllerTests extends ESTestCase { SearchRequest request = new SearchRequest(); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.setBatchedReduceSize(bufferSize); - InitialSearchPhase.SearchPhaseResults consumer = + InitialSearchPhase.SearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, expectedNumResults); AtomicInteger max = new AtomicInteger(); CountDownLatch latch = new CountDownLatch(expectedNumResults); @@ -315,7 +319,8 @@ public class SearchPhaseControllerTests extends ESTestCase { InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number, DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); result.aggregations(aggs); - consumer.consumeResult(id, result); + result.setShardIndex(id); + consumer.consumeResult(result); latch.countDown(); }); @@ -337,7 +342,7 @@ public class SearchPhaseControllerTests extends ESTestCase { request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); } request.setBatchedReduceSize(bufferSize); - InitialSearchPhase.SearchPhaseResults consumer + InitialSearchPhase.SearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, expectedNumResults); if (hasAggs && expectedNumResults > bufferSize) { assertThat("expectedNumResults: " + expectedNumResults + " bufferSize: " + bufferSize, @@ -354,7 +359,7 @@ public class SearchPhaseControllerTests extends ESTestCase { for (int iters = 0; iters < maxIters; iters++) { TopDocs[] topDocs = new TopDocs[randomIntBetween(2, 100)]; int numShards = topDocs.length; - AtomicArray resultProviderAtomicArray = generateQueryResults(numShards, Collections.emptyList(), + AtomicArray resultProviderAtomicArray = generateQueryResults(numShards, Collections.emptyList(), 2, randomBoolean()); if (randomBoolean()) { int maxNull = randomIntBetween(1, topDocs.length - 1); diff --git a/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 0f626903609..f3ff6be1cc1 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -48,7 +48,6 @@ import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchLocalRequest; -import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.test.ESSingleNodeTestCase; import java.io.IOException; @@ -184,13 +183,13 @@ public class SearchServiceTests extends ESSingleNodeTestCase { final int rounds = scaledRandomIntBetween(100, 10000); for (int i = 0; i < rounds; i++) { try { - QuerySearchResultProvider querySearchResultProvider = service.executeQueryPhase( + SearchPhaseResult searchPhaseResult = service.executeQueryPhase( new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f), new SearchTask(123L, "", "", "", null)); IntArrayList intCursors = new IntArrayList(1); intCursors.add(0); - ShardFetchRequest req = new ShardFetchRequest(querySearchResultProvider.id(), intCursors, null /* not a scroll */); + ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getRequestId(), intCursors, null /* not a scroll */); service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null)); } catch (AlreadyClosedException ex) { throw ex;