Streamline shard index availability in all SearchPhaseResults (#23788)

Today we have the shard target and the target request ID available in SearchPhaseResults.
Yet, the coordinating node maintains a shard index to reference the request, response tuples
internally which is also used in many other classes to reference back from fetch results to
query results etc. Today this shard index is implicitly passed via the index in AtomicArray
which causes an undesirable dependency on this interface.
This commit moves the shard index into the SearchPhaseResult and removes some dependencies
on AtomicArray. Further removals will follow in the future. The most important refactoring here
is the removal of AtomicArray.Entry which used to be created for every element in the atomic array
to maintain the shard index during result processing. This caused an unnecessary indirection, dependency
and potentially thousands of unnecessary objects in every search phase.
This commit is contained in:
Simon Willnauer 2017-03-30 14:32:42 +02:00 committed by GitHub
parent ef1329727d
commit 4125f012b9
34 changed files with 543 additions and 521 deletions

View File

@ -40,7 +40,7 @@ public class ParentBulkByScrollTask extends BulkByScrollTask {
* Holds the responses as they come back. This uses {@link Tuple} as an "Either" style holder where only the response or the exception * Holds the responses as they come back. This uses {@link Tuple} as an "Either" style holder where only the response or the exception
* is set. * is set.
*/ */
private final AtomicArray<Tuple<BulkByScrollResponse, Exception>> results; private final AtomicArray<Result> results;
private final AtomicInteger counter; private final AtomicInteger counter;
public ParentBulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId, int slices) { public ParentBulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId, int slices) {
@ -82,13 +82,11 @@ public class ParentBulkByScrollTask extends BulkByScrollTask {
} }
private void addResultsToList(List<StatusOrException> sliceStatuses) { private void addResultsToList(List<StatusOrException> sliceStatuses) {
for (AtomicArray.Entry<Tuple<BulkByScrollResponse, Exception>> t : results.asList()) { for (Result t : results.asList()) {
if (t.value != null) { if (t.response != null) {
if (t.value.v1() != null) { sliceStatuses.set(t.sliceId, new StatusOrException(t.response.getStatus()));
sliceStatuses.set(t.index, new StatusOrException(t.value.v1().getStatus())); } else {
} else { sliceStatuses.set(t.sliceId, new StatusOrException(t.failure));
sliceStatuses.set(t.index, new StatusOrException(t.value.v2()));
}
} }
} }
} }
@ -97,7 +95,7 @@ public class ParentBulkByScrollTask extends BulkByScrollTask {
* Record a response from a slice and respond to the listener if the request is finished. * Record a response from a slice and respond to the listener if the request is finished.
*/ */
public void onSliceResponse(ActionListener<BulkByScrollResponse> listener, int sliceId, BulkByScrollResponse response) { public void onSliceResponse(ActionListener<BulkByScrollResponse> listener, int sliceId, BulkByScrollResponse response) {
results.setOnce(sliceId, new Tuple<>(response, null)); results.setOnce(sliceId, new Result(sliceId, response));
/* If the request isn't finished we could automatically rethrottle the sub-requests here but we would only want to do that if we /* If the request isn't finished we could automatically rethrottle the sub-requests here but we would only want to do that if we
* were fairly sure they had a while left to go. */ * were fairly sure they had a while left to go. */
recordSliceCompletionAndRespondIfAllDone(listener); recordSliceCompletionAndRespondIfAllDone(listener);
@ -107,7 +105,7 @@ public class ParentBulkByScrollTask extends BulkByScrollTask {
* Record a failure from a slice and respond to the listener if the request is finished. * Record a failure from a slice and respond to the listener if the request is finished.
*/ */
void onSliceFailure(ActionListener<BulkByScrollResponse> listener, int sliceId, Exception e) { void onSliceFailure(ActionListener<BulkByScrollResponse> listener, int sliceId, Exception e) {
results.setOnce(sliceId, new Tuple<>(null, e)); results.setOnce(sliceId, new Result(sliceId, e));
recordSliceCompletionAndRespondIfAllDone(listener); recordSliceCompletionAndRespondIfAllDone(listener);
// TODO cancel when a slice fails? // TODO cancel when a slice fails?
} }
@ -118,17 +116,17 @@ public class ParentBulkByScrollTask extends BulkByScrollTask {
} }
List<BulkByScrollResponse> responses = new ArrayList<>(results.length()); List<BulkByScrollResponse> responses = new ArrayList<>(results.length());
Exception exception = null; Exception exception = null;
for (AtomicArray.Entry<Tuple<BulkByScrollResponse, Exception>> t : results.asList()) { for (Result t : results.asList()) {
if (t.value.v1() == null) { if (t.response == null) {
assert t.value.v2() != null : "exception shouldn't be null if value is null"; assert t.failure != null : "exception shouldn't be null if value is null";
if (exception == null) { if (exception == null) {
exception = t.value.v2(); exception = t.failure;
} else { } else {
exception.addSuppressed(t.value.v2()); exception.addSuppressed(t.failure);
} }
} else { } else {
assert t.value.v2() == null : "exception should be null if response is not null"; assert t.failure == null : "exception should be null if response is not null";
responses.add(t.value.v1()); responses.add(t.response);
} }
} }
if (exception == null) { if (exception == null) {
@ -138,4 +136,21 @@ public class ParentBulkByScrollTask extends BulkByScrollTask {
} }
} }
private static final class Result {
final BulkByScrollResponse response;
final int sliceId;
final Exception failure;
private Result(int sliceId, BulkByScrollResponse response) {
this.sliceId = sliceId;
this.response = response;
failure = null;
}
private Result(int sliceId, Exception failure) {
this.sliceId = sliceId;
this.failure = failure;
response = null;
}
}
} }

View File

@ -131,7 +131,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
} else { } else {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults() final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.shardTarget().toString()).collect(Collectors.joining(",")); .map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})", logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion); currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
} }
@ -159,10 +159,10 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
if (shardFailures == null) { if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY; return ShardSearchFailure.EMPTY_ARRAY;
} }
List<AtomicArray.Entry<ShardSearchFailure>> entries = shardFailures.asList(); List<ShardSearchFailure> entries = shardFailures.asList();
ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()];
for (int i = 0; i < failures.length; i++) { for (int i = 0; i < failures.length; i++) {
failures[i] = entries.get(i).value; failures[i] = entries.get(i);
} }
return failures; return failures;
} }
@ -209,8 +209,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private void raisePhaseFailure(SearchPhaseExecutionException exception) { private void raisePhaseFailure(SearchPhaseExecutionException exception) {
results.getSuccessfulResults().forEach((entry) -> { results.getSuccessfulResults().forEach((entry) -> {
try { try {
Transport.Connection connection = nodeIdToConnection.apply(entry.shardTarget().getNodeId()); Transport.Connection connection = nodeIdToConnection.apply(entry.getSearchShardTarget().getNodeId());
sendReleaseSearchContext(entry.id(), connection); sendReleaseSearchContext(entry.getRequestId(), connection);
} catch (Exception inner) { } catch (Exception inner) {
inner.addSuppressed(exception); inner.addSuppressed(exception);
logger.trace("failed to release context", inner); logger.trace("failed to release context", inner);
@ -220,18 +220,18 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
} }
@Override @Override
public final void onShardSuccess(int shardIndex, Result result) { public final void onShardSuccess(Result result) {
successfulOps.incrementAndGet(); successfulOps.incrementAndGet();
results.consumeResult(shardIndex, result); results.consumeResult(result);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("got first-phase result from {}", result != null ? result.shardTarget() : null); logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);
} }
// clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level // clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level
// so its ok concurrency wise to miss potentially the shard failures being created because of another failure // so its ok concurrency wise to miss potentially the shard failures being created because of another failure
// in the #addShardFailure, because by definition, it will happen on *another* shardIndex // in the #addShardFailure, because by definition, it will happen on *another* shardIndex
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get(); AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
if (shardFailures != null) { if (shardFailures != null) {
shardFailures.set(shardIndex, null); shardFailures.set(result.getShardIndex(), null);
} }
} }

View File

@ -23,18 +23,20 @@ import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import java.util.function.Consumer;
/** /**
* This is a simple base class to simplify fan out to shards and collect their results. Each results passed to * This is a simple base class to simplify fan out to shards and collect their results. Each results passed to
* {@link #onResult(int, SearchPhaseResult, SearchShardTarget)} will be set to the provided result array * {@link #onResult(SearchPhaseResult)} will be set to the provided result array
* where the given index is used to set the result on the array. * where the given index is used to set the result on the array.
*/ */
final class CountedCollector<R extends SearchPhaseResult> { final class CountedCollector<R extends SearchPhaseResult> {
private final ResultConsumer<R> resultConsumer; private final Consumer<R> resultConsumer;
private final CountDown counter; private final CountDown counter;
private final Runnable onFinish; private final Runnable onFinish;
private final SearchPhaseContext context; private final SearchPhaseContext context;
CountedCollector(ResultConsumer<R> resultConsumer, int expectedOps, Runnable onFinish, SearchPhaseContext context) { CountedCollector(Consumer<R> resultConsumer, int expectedOps, Runnable onFinish, SearchPhaseContext context) {
this.resultConsumer = resultConsumer; this.resultConsumer = resultConsumer;
this.counter = new CountDown(expectedOps); this.counter = new CountDown(expectedOps);
this.onFinish = onFinish; this.onFinish = onFinish;
@ -55,10 +57,9 @@ final class CountedCollector<R extends SearchPhaseResult> {
/** /**
* Sets the result to the given array index and then runs {@link #countDown()} * Sets the result to the given array index and then runs {@link #countDown()}
*/ */
void onResult(int index, R result, SearchShardTarget target) { void onResult(R result) {
try { try {
result.shardTarget(target); resultConsumer.accept(result);
resultConsumer.consume(index, result);
} finally { } finally {
countDown(); countDown();
} }
@ -75,12 +76,4 @@ final class CountedCollector<R extends SearchPhaseResult> {
countDown(); countDown();
} }
} }
/**
* A functional interface to plug in shard result consumers to this collector
*/
@FunctionalInterface
public interface ResultConsumer<R extends SearchPhaseResult> {
void consume(int shardIndex, R result);
}
} }

View File

@ -20,16 +20,17 @@ package org.elasticsearch.action.search;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.function.Function; import java.util.function.Function;
/** /**
@ -40,16 +41,16 @@ import java.util.function.Function;
* @see CountedCollector#onFailure(int, SearchShardTarget, Exception) * @see CountedCollector#onFailure(int, SearchShardTarget, Exception)
*/ */
final class DfsQueryPhase extends SearchPhase { final class DfsQueryPhase extends SearchPhase {
private final InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> queryResult; private final InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> queryResult;
private final SearchPhaseController searchPhaseController; private final SearchPhaseController searchPhaseController;
private final AtomicArray<DfsSearchResult> dfsSearchResults; private final AtomicArray<DfsSearchResult> dfsSearchResults;
private final Function<InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider>, SearchPhase> nextPhaseFactory; private final Function<InitialSearchPhase.SearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory;
private final SearchPhaseContext context; private final SearchPhaseContext context;
private final SearchTransportService searchTransportService; private final SearchTransportService searchTransportService;
DfsQueryPhase(AtomicArray<DfsSearchResult> dfsSearchResults, DfsQueryPhase(AtomicArray<DfsSearchResult> dfsSearchResults,
SearchPhaseController searchPhaseController, SearchPhaseController searchPhaseController,
Function<InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider>, SearchPhase> nextPhaseFactory, Function<InitialSearchPhase.SearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory,
SearchPhaseContext context) { SearchPhaseContext context) {
super("dfs_query"); super("dfs_query");
this.queryResult = searchPhaseController.newSearchPhaseResults(context.getRequest(), context.getNumShards()); this.queryResult = searchPhaseController.newSearchPhaseResults(context.getRequest(), context.getNumShards());
@ -64,22 +65,26 @@ final class DfsQueryPhase extends SearchPhase {
public void run() throws IOException { public void run() throws IOException {
// TODO we can potentially also consume the actual per shard results from the initial phase here in the aggregateDfs // TODO we can potentially also consume the actual per shard results from the initial phase here in the aggregateDfs
// to free up memory early // to free up memory early
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(dfsSearchResults); final List<DfsSearchResult> resultList = dfsSearchResults.asList();
final CountedCollector<QuerySearchResultProvider> counter = new CountedCollector<>(queryResult::consumeResult, final AggregatedDfs dfs = searchPhaseController.aggregateDfs(resultList);
dfsSearchResults.asList().size(), final CountedCollector<SearchPhaseResult> counter = new CountedCollector<>(queryResult::consumeResult,
() -> { resultList.size(),
context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)); () -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)), context);
}, context); for (final DfsSearchResult dfsResult : resultList) {
for (final AtomicArray.Entry<DfsSearchResult> entry : dfsSearchResults.asList()) { final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget();
DfsSearchResult dfsResult = entry.value;
final int shardIndex = entry.index;
final SearchShardTarget searchShardTarget = dfsResult.shardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId()); Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(context.getRequest(), dfsResult.id(), dfs); QuerySearchRequest querySearchRequest = new QuerySearchRequest(context.getRequest(), dfsResult.getRequestId(), dfs);
final int shardIndex = dfsResult.getShardIndex();
searchTransportService.sendExecuteQuery(connection, querySearchRequest, context.getTask(), searchTransportService.sendExecuteQuery(connection, querySearchRequest, context.getTask(),
ActionListener.wrap( new SearchActionListener<QuerySearchResult>(searchShardTarget, shardIndex) {
result -> counter.onResult(shardIndex, result, searchShardTarget),
exception -> { @Override
protected void innerOnResponse(QuerySearchResult response) {
counter.onResult(response);
}
@Override
public void onFailure(Exception exception) {
try { try {
if (context.getLogger().isDebugEnabled()) { if (context.getLogger().isDebugEnabled()) {
context.getLogger().debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute query phase", context.getLogger().debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute query phase",
@ -92,7 +97,8 @@ final class DfsQueryPhase extends SearchPhase {
// release it again to be in the safe side // release it again to be in the safe side
context.sendReleaseSearchContext(querySearchRequest.id(), connection); context.sendReleaseSearchContext(querySearchRequest.id(), connection);
} }
})); }
});
} }
} }
} }

View File

@ -23,15 +23,14 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import java.io.IOException; import java.io.IOException;
@ -45,13 +44,13 @@ import java.util.function.Function;
final class FetchSearchPhase extends SearchPhase { final class FetchSearchPhase extends SearchPhase {
private final AtomicArray<FetchSearchResult> fetchResults; private final AtomicArray<FetchSearchResult> fetchResults;
private final SearchPhaseController searchPhaseController; private final SearchPhaseController searchPhaseController;
private final AtomicArray<QuerySearchResultProvider> queryResults; private final AtomicArray<SearchPhaseResult> queryResults;
private final Function<SearchResponse, SearchPhase> nextPhaseFactory; private final Function<SearchResponse, SearchPhase> nextPhaseFactory;
private final SearchPhaseContext context; private final SearchPhaseContext context;
private final Logger logger; private final Logger logger;
private final InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> resultConsumer; private final InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer;
FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> resultConsumer, FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer,
SearchPhaseController searchPhaseController, SearchPhaseController searchPhaseController,
SearchPhaseContext context) { SearchPhaseContext context) {
this(resultConsumer, searchPhaseController, context, this(resultConsumer, searchPhaseController, context,
@ -59,7 +58,7 @@ final class FetchSearchPhase extends SearchPhase {
(finalResponse) -> sendResponsePhase(finalResponse, context))); (finalResponse) -> sendResponsePhase(finalResponse, context)));
} }
FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> resultConsumer, FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer,
SearchPhaseController searchPhaseController, SearchPhaseController searchPhaseController,
SearchPhaseContext context, Function<SearchResponse, SearchPhase> nextPhaseFactory) { SearchPhaseContext context, Function<SearchResponse, SearchPhase> nextPhaseFactory) {
super("fetch"); super("fetch");
@ -98,35 +97,35 @@ final class FetchSearchPhase extends SearchPhase {
private void innerRun() throws IOException { private void innerRun() throws IOException {
final int numShards = context.getNumShards(); final int numShards = context.getNumShards();
final boolean isScrollSearch = context.getRequest().scroll() != null; final boolean isScrollSearch = context.getRequest().scroll() != null;
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollSearch, queryResults); List<SearchPhaseResult> phaseResults = queryResults.asList();
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollSearch, phaseResults, context.getNumShards());
String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null; String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null;
List<AtomicArray.Entry<QuerySearchResultProvider>> queryResultsAsList = queryResults.asList();
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce(); final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();
final boolean queryAndFetchOptimization = queryResults.length() == 1; final boolean queryAndFetchOptimization = queryResults.length() == 1;
final Runnable finishPhase = () final Runnable finishPhase = ()
-> moveToNextPhase(searchPhaseController, sortedShardDocs, scrollId, reducedQueryPhase, queryAndFetchOptimization ? -> moveToNextPhase(searchPhaseController, sortedShardDocs, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults); queryResults : fetchResults);
if (queryAndFetchOptimization) { if (queryAndFetchOptimization) {
assert queryResults.get(0) == null || queryResults.get(0).fetchResult() != null; assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null;
// query AND fetch optimization // query AND fetch optimization
finishPhase.run(); finishPhase.run();
} else { } else {
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, sortedShardDocs); final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, sortedShardDocs);
if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return
queryResultsAsList.stream() phaseResults.stream()
.map(e -> e.value.queryResult()) .map(e -> e.queryResult())
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources .forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
finishPhase.run(); finishPhase.run();
} else { } else {
final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ? final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?
searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, numShards) searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, numShards)
: null; : null;
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(fetchResults::set, final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
finishPhase, context); finishPhase, context);
for (int i = 0; i < docIdsToLoad.length; i++) { for (int i = 0; i < docIdsToLoad.length; i++) {
IntArrayList entry = docIdsToLoad[i]; IntArrayList entry = docIdsToLoad[i];
QuerySearchResultProvider queryResult = queryResults.get(i); SearchPhaseResult queryResult = queryResults.get(i);
if (entry == null) { // no results for this shard ID if (entry == null) { // no results for this shard ID
if (queryResult != null) { if (queryResult != null) {
// if we got some hits from this shard we have to release the context there // if we got some hits from this shard we have to release the context there
@ -137,10 +136,10 @@ final class FetchSearchPhase extends SearchPhase {
// in any case we count down this result since we don't talk to this shard anymore // in any case we count down this result since we don't talk to this shard anymore
counter.countDown(); counter.countDown();
} else { } else {
Transport.Connection connection = context.getConnection(queryResult.shardTarget().getNodeId()); Transport.Connection connection = context.getConnection(queryResult.getSearchShardTarget().getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().id(), i, entry, ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
lastEmittedDocPerShard); lastEmittedDocPerShard);
executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(), executeFetch(i, queryResult.getSearchShardTarget(), counter, fetchSearchRequest, queryResult.queryResult(),
connection); connection);
} }
} }
@ -159,10 +158,10 @@ final class FetchSearchPhase extends SearchPhase {
final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult, final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult,
final Transport.Connection connection) { final Transport.Connection connection) {
context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(), context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),
new ActionListener<FetchSearchResult>() { new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {
@Override @Override
public void onResponse(FetchSearchResult result) { public void innerOnResponse(FetchSearchResult result) {
counter.onResult(shardIndex, result, shardTarget); counter.onResult(result);
} }
@Override @Override
@ -191,8 +190,8 @@ final class FetchSearchPhase extends SearchPhase {
// and if it has at lease one hit that didn't make it to the global topDocs // and if it has at lease one hit that didn't make it to the global topDocs
if (context.getRequest().scroll() == null && queryResult.hasHits()) { if (context.getRequest().scroll() == null && queryResult.hasHits()) {
try { try {
Transport.Connection connection = context.getConnection(queryResult.shardTarget().getNodeId()); Transport.Connection connection = context.getConnection(queryResult.getSearchShardTarget().getNodeId());
context.sendReleaseSearchContext(queryResult.id(), connection); context.sendReleaseSearchContext(queryResult.getRequestId(), connection);
} catch (Exception e) { } catch (Exception e) {
context.getLogger().trace("failed to release context", e); context.getLogger().trace("failed to release context", e);
} }
@ -201,7 +200,7 @@ final class FetchSearchPhase extends SearchPhase {
private void moveToNextPhase(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs, private void moveToNextPhase(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs,
String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase, String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) { AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {
final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null, final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,
sortedDocs, reducedQueryPhase, fetchResultsArr); sortedDocs, reducedQueryPhase, fetchResultsArr);
context.executeNextPhase(this, nextPhaseFactory.apply(context.buildSearchResponse(internalResponse, scrollId))); context.executeNextPhase(this, nextPhaseFactory.apply(context.buildSearchResponse(internalResponse, scrollId)));

View File

@ -21,7 +21,6 @@ package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.GroupShardsIterator;
@ -144,10 +143,11 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else { } else {
try { try {
executePhaseOnShard(shardIt, shard, new ActionListener<FirstResult>() { executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId()), shardIndex) {
@Override @Override
public void onResponse(FirstResult result) { public void innerOnResponse(FirstResult result) {
onShardResult(shardIndex, shard.currentNodeId(), result, shardIt); onShardResult(result, shardIt);
} }
@Override @Override
@ -164,9 +164,10 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
} }
} }
private void onShardResult(int shardIndex, String nodeId, FirstResult result, ShardIterator shardIt) { private void onShardResult(FirstResult result, ShardIterator shardIt) {
result.shardTarget(new SearchShardTarget(nodeId, shardIt.shardId())); assert result.getShardIndex() != -1 : "shard index is not set";
onShardSuccess(shardIndex, result); assert result.getSearchShardTarget() != null : "search shard target must not be null";
onShardSuccess(result);
// we need to increment successful ops first before we compare the exit condition otherwise if we // we need to increment successful ops first before we compare the exit condition otherwise if we
// are fast we could concurrently update totalOps but then preempt one of the threads which can // are fast we could concurrently update totalOps but then preempt one of the threads which can
// cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc. // cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc.
@ -185,7 +186,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
/** /**
* Executed once all shard results have been received and processed * Executed once all shard results have been received and processed
* @see #onShardFailure(int, SearchShardTarget, Exception) * @see #onShardFailure(int, SearchShardTarget, Exception)
* @see #onShardSuccess(int, SearchPhaseResult) * @see #onShardSuccess(SearchPhaseResult)
*/ */
abstract void onPhaseDone(); // as a tribute to @kimchy aka. finishHim() abstract void onPhaseDone(); // as a tribute to @kimchy aka. finishHim()
@ -201,12 +202,10 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
/** /**
* Executed once for every successful shard level request. * Executed once for every successful shard level request.
* @param shardIndex the internal index for this shard. Each shard has an index / ordinal assigned that is used to reference
* it's results
* @param result the result returned form the shard * @param result the result returned form the shard
* *
*/ */
abstract void onShardSuccess(int shardIndex, FirstResult result); abstract void onShardSuccess(FirstResult result);
/** /**
* Sends the request to the actual shard. * Sends the request to the actual shard.
@ -214,7 +213,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
* @param shard the shard routing to send the request for * @param shard the shard routing to send the request for
* @param listener the listener to notify on response * @param listener the listener to notify on response
*/ */
protected abstract void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener<FirstResult> listener); protected abstract void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, SearchActionListener<FirstResult> listener);
/** /**
* This class acts as a basic result collection that can be extended to do on-the-fly reduction or result processing * This class acts as a basic result collection that can be extended to do on-the-fly reduction or result processing
@ -237,17 +236,16 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
* A stream of all non-null (successful) shard results * A stream of all non-null (successful) shard results
*/ */
final Stream<Result> getSuccessfulResults() { final Stream<Result> getSuccessfulResults() {
return results.asList().stream().map(e -> e.value); return results.asList().stream();
} }
/** /**
* Consumes a single shard result * Consumes a single shard result
* @param shardIndex the shards index, this is a 0-based id that is used to establish a 1 to 1 mapping to the searched shards
* @param result the shards result * @param result the shards result
*/ */
void consumeResult(int shardIndex, Result result) { void consumeResult(Result result) {
assert results.get(shardIndex) == null : "shardIndex: " + shardIndex + " is already set"; assert results.get(result.getShardIndex()) == null : "shardIndex: " + result.getShardIndex() + " is already set";
results.set(shardIndex, result); results.set(result.getShardIndex(), result);
} }
/** /**

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
/**
* An base action listener that ensures shard target and shard index is set on all responses
* received by this listener.
*/
abstract class SearchActionListener<T extends SearchPhaseResult> implements ActionListener<T> {
private final int requestIndex;
private final SearchShardTarget searchShardTarget;
protected SearchActionListener(SearchShardTarget searchShardTarget,
int shardIndex) {
assert shardIndex >= 0 : "shard index must be positive";
this.searchShardTarget = searchShardTarget;
this.requestIndex = shardIndex;
}
@Override
public final void onResponse(T response) {
response.setShardIndex(requestIndex);
setSearchShardTarget(response);
innerOnResponse(response);
}
protected void setSearchShardTarget(T response) { // some impls need to override this
response.setSearchShardTarget(searchShardTarget);
}
protected abstract void innerOnResponse(T response);
}

View File

@ -72,7 +72,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
protected void executePhaseOnShard( protected void executePhaseOnShard(
final ShardIterator shardIt, final ShardIterator shardIt,
final ShardRouting shard, final ShardRouting shard,
final ActionListener<DfsSearchResult> listener) { final SearchActionListener<DfsSearchResult> listener) {
getSearchTransport().sendExecuteDfs(getConnection(shard.currentNodeId()), getSearchTransport().sendExecuteDfs(getConnection(shard.currentNodeId()),
buildShardSearchRequest(shardIt, shard) , getTask(), listener); buildShardSearchRequest(shardIt, shard) , getTask(), listener);
} }

View File

@ -93,8 +93,8 @@ interface SearchPhaseContext extends ActionListener<SearchResponse>, Executor {
/** /**
* Releases a search context with the given context ID on the node the given connection is connected to. * Releases a search context with the given context ID on the node the given connection is connected to.
* @see org.elasticsearch.search.query.QuerySearchResult#id() * @see org.elasticsearch.search.query.QuerySearchResult#getRequestId()
* @see org.elasticsearch.search.fetch.FetchSearchResult#id() * @see org.elasticsearch.search.fetch.FetchSearchResult#getRequestId()
* *
*/ */
default void sendReleaseSearchContext(long contextId, Transport.Connection connection) { default void sendReleaseSearchContext(long contextId, Transport.Connection connection) {

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
@ -52,7 +53,6 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults; import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.Suggest.Suggestion; import org.elasticsearch.search.suggest.Suggest.Suggestion;
import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry; import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry;
@ -81,13 +81,13 @@ public class SearchPhaseController extends AbstractComponent {
this.scriptService = scriptService; this.scriptService = scriptService;
} }
public AggregatedDfs aggregateDfs(AtomicArray<DfsSearchResult> results) { public AggregatedDfs aggregateDfs(List<DfsSearchResult> results) {
ObjectObjectHashMap<Term, TermStatistics> termStatistics = HppcMaps.newNoNullKeysMap(); ObjectObjectHashMap<Term, TermStatistics> termStatistics = HppcMaps.newNoNullKeysMap();
ObjectObjectHashMap<String, CollectionStatistics> fieldStatistics = HppcMaps.newNoNullKeysMap(); ObjectObjectHashMap<String, CollectionStatistics> fieldStatistics = HppcMaps.newNoNullKeysMap();
long aggMaxDoc = 0; long aggMaxDoc = 0;
for (AtomicArray.Entry<DfsSearchResult> lEntry : results.asList()) { for (DfsSearchResult lEntry : results) {
final Term[] terms = lEntry.value.terms(); final Term[] terms = lEntry.terms();
final TermStatistics[] stats = lEntry.value.termStatistics(); final TermStatistics[] stats = lEntry.termStatistics();
assert terms.length == stats.length; assert terms.length == stats.length;
for (int i = 0; i < terms.length; i++) { for (int i = 0; i < terms.length; i++) {
assert terms[i] != null; assert terms[i] != null;
@ -105,9 +105,9 @@ public class SearchPhaseController extends AbstractComponent {
} }
assert !lEntry.value.fieldStatistics().containsKey(null); assert !lEntry.fieldStatistics().containsKey(null);
final Object[] keys = lEntry.value.fieldStatistics().keys; final Object[] keys = lEntry.fieldStatistics().keys;
final Object[] values = lEntry.value.fieldStatistics().values; final Object[] values = lEntry.fieldStatistics().values;
for (int i = 0; i < keys.length; i++) { for (int i = 0; i < keys.length; i++) {
if (keys[i] != null) { if (keys[i] != null) {
String key = (String) keys[i]; String key = (String) keys[i];
@ -127,7 +127,7 @@ public class SearchPhaseController extends AbstractComponent {
} }
} }
} }
aggMaxDoc += lEntry.value.maxDoc(); aggMaxDoc += lEntry.maxDoc();
} }
return new AggregatedDfs(termStatistics, fieldStatistics, aggMaxDoc); return new AggregatedDfs(termStatistics, fieldStatistics, aggMaxDoc);
} }
@ -146,10 +146,9 @@ public class SearchPhaseController extends AbstractComponent {
* *
* @param ignoreFrom Whether to ignore the from and sort all hits in each shard result. * @param ignoreFrom Whether to ignore the from and sort all hits in each shard result.
* Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase. * Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase.
* @param resultsArr Shard result holder * @param results Shard result holder
*/ */
public ScoreDoc[] sortDocs(boolean ignoreFrom, AtomicArray<? extends QuerySearchResultProvider> resultsArr) throws IOException { public ScoreDoc[] sortDocs(boolean ignoreFrom, List<? extends SearchPhaseResult> results, int numShards) throws IOException {
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> results = resultsArr.asList();
if (results.isEmpty()) { if (results.isEmpty()) {
return EMPTY_DOCS; return EMPTY_DOCS;
} }
@ -159,25 +158,25 @@ public class SearchPhaseController extends AbstractComponent {
int shardIndex = -1; int shardIndex = -1;
if (results.size() == 1) { if (results.size() == 1) {
canOptimize = true; canOptimize = true;
result = results.get(0).value.queryResult(); result = results.get(0).queryResult();
shardIndex = results.get(0).index; shardIndex = result.getShardIndex();
} else { } else {
boolean hasResult = false; boolean hasResult = false;
QuerySearchResult resultToOptimize = null; QuerySearchResult resultToOptimize = null;
// lets see if we only got hits from a single shard, if so, we can optimize... // lets see if we only got hits from a single shard, if so, we can optimize...
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) { for (SearchPhaseResult entry : results) {
if (entry.value.queryResult().hasHits()) { if (entry.queryResult().hasHits()) {
if (hasResult) { // we already have one, can't really optimize if (hasResult) { // we already have one, can't really optimize
canOptimize = false; canOptimize = false;
break; break;
} }
canOptimize = true; canOptimize = true;
hasResult = true; hasResult = true;
resultToOptimize = entry.value.queryResult(); resultToOptimize = entry.queryResult();
shardIndex = entry.index; shardIndex = resultToOptimize.getShardIndex();
} }
} }
result = canOptimize ? resultToOptimize : results.get(0).value.queryResult(); result = canOptimize ? resultToOptimize : results.get(0).queryResult();
assert result != null; assert result != null;
} }
if (canOptimize) { if (canOptimize) {
@ -228,7 +227,6 @@ public class SearchPhaseController extends AbstractComponent {
final int from = ignoreFrom ? 0 : result.queryResult().from(); final int from = ignoreFrom ? 0 : result.queryResult().from();
final TopDocs mergedTopDocs; final TopDocs mergedTopDocs;
final int numShards = resultsArr.length();
if (result.queryResult().topDocs() instanceof CollapseTopFieldDocs) { if (result.queryResult().topDocs() instanceof CollapseTopFieldDocs) {
CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) result.queryResult().topDocs(); CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) result.queryResult().topDocs();
final Sort sort = new Sort(firstTopDocs.fields); final Sort sort = new Sort(firstTopDocs.fields);
@ -239,11 +237,11 @@ public class SearchPhaseController extends AbstractComponent {
} else if (result.queryResult().topDocs() instanceof TopFieldDocs) { } else if (result.queryResult().topDocs() instanceof TopFieldDocs) {
TopFieldDocs firstTopDocs = (TopFieldDocs) result.queryResult().topDocs(); TopFieldDocs firstTopDocs = (TopFieldDocs) result.queryResult().topDocs();
final Sort sort = new Sort(firstTopDocs.fields); final Sort sort = new Sort(firstTopDocs.fields);
final TopFieldDocs[] shardTopDocs = new TopFieldDocs[resultsArr.length()]; final TopFieldDocs[] shardTopDocs = new TopFieldDocs[numShards];
fillTopDocs(shardTopDocs, results, new TopFieldDocs(0, new FieldDoc[0], sort.getSort(), Float.NaN)); fillTopDocs(shardTopDocs, results, new TopFieldDocs(0, new FieldDoc[0], sort.getSort(), Float.NaN));
mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs, true); mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs, true);
} else { } else {
final TopDocs[] shardTopDocs = new TopDocs[resultsArr.length()]; final TopDocs[] shardTopDocs = new TopDocs[numShards];
fillTopDocs(shardTopDocs, results, Lucene.EMPTY_TOP_DOCS); fillTopDocs(shardTopDocs, results, Lucene.EMPTY_TOP_DOCS);
mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs, true); mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs, true);
} }
@ -251,11 +249,11 @@ public class SearchPhaseController extends AbstractComponent {
ScoreDoc[] scoreDocs = mergedTopDocs.scoreDocs; ScoreDoc[] scoreDocs = mergedTopDocs.scoreDocs;
final Map<String, List<Suggestion<CompletionSuggestion.Entry>>> groupedCompletionSuggestions = new HashMap<>(); final Map<String, List<Suggestion<CompletionSuggestion.Entry>>> groupedCompletionSuggestions = new HashMap<>();
// group suggestions and assign shard index // group suggestions and assign shard index
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : results) { for (SearchPhaseResult sortedResult : results) {
Suggest shardSuggest = sortedResult.value.queryResult().suggest(); Suggest shardSuggest = sortedResult.queryResult().suggest();
if (shardSuggest != null) { if (shardSuggest != null) {
for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) { for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {
suggestion.setShardIndex(sortedResult.index); suggestion.setShardIndex(sortedResult.getShardIndex());
List<Suggestion<CompletionSuggestion.Entry>> suggestions = List<Suggestion<CompletionSuggestion.Entry>> suggestions =
groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
suggestions.add(suggestion); suggestions.add(suggestion);
@ -286,17 +284,17 @@ public class SearchPhaseController extends AbstractComponent {
} }
static <T extends TopDocs> void fillTopDocs(T[] shardTopDocs, static <T extends TopDocs> void fillTopDocs(T[] shardTopDocs,
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> results, List<? extends SearchPhaseResult> results,
T empytTopDocs) { T empytTopDocs) {
if (results.size() != shardTopDocs.length) { if (results.size() != shardTopDocs.length) {
// TopDocs#merge can't deal with null shard TopDocs // TopDocs#merge can't deal with null shard TopDocs
Arrays.fill(shardTopDocs, empytTopDocs); Arrays.fill(shardTopDocs, empytTopDocs);
} }
for (AtomicArray.Entry<? extends QuerySearchResultProvider> resultProvider : results) { for (SearchPhaseResult resultProvider : results) {
final T topDocs = (T) resultProvider.value.queryResult().topDocs(); final T topDocs = (T) resultProvider.queryResult().topDocs();
assert topDocs != null : "top docs must not be null in a valid result"; assert topDocs != null : "top docs must not be null in a valid result";
// the 'index' field is the position in the resultsArr atomic array // the 'index' field is the position in the resultsArr atomic array
shardTopDocs[resultProvider.index] = topDocs; shardTopDocs[resultProvider.getShardIndex()] = topDocs;
} }
} }
public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase,
@ -340,11 +338,11 @@ public class SearchPhaseController extends AbstractComponent {
*/ */
public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs, public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs,
ReducedQueryPhase reducedQueryPhase, ReducedQueryPhase reducedQueryPhase,
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) { AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {
if (reducedQueryPhase.isEmpty()) { if (reducedQueryPhase.isEmpty()) {
return InternalSearchResponse.empty(); return InternalSearchResponse.empty();
} }
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> fetchResults = fetchResultsArr.asList(); List<? extends SearchPhaseResult> fetchResults = fetchResultsArr.asList();
SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, sortedDocs, fetchResultsArr); SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, sortedDocs, fetchResultsArr);
if (reducedQueryPhase.suggest != null) { if (reducedQueryPhase.suggest != null) {
if (!fetchResults.isEmpty()) { if (!fetchResults.isEmpty()) {
@ -353,7 +351,7 @@ public class SearchPhaseController extends AbstractComponent {
final List<CompletionSuggestion.Entry.Option> suggestionOptions = suggestion.getOptions(); final List<CompletionSuggestion.Entry.Option> suggestionOptions = suggestion.getOptions();
for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) { for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) {
ScoreDoc shardDoc = sortedDocs[scoreDocIndex]; ScoreDoc shardDoc = sortedDocs[scoreDocIndex];
QuerySearchResultProvider searchResultProvider = fetchResultsArr.get(shardDoc.shardIndex); SearchPhaseResult searchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
if (searchResultProvider == null) { if (searchResultProvider == null) {
continue; continue;
} }
@ -364,7 +362,7 @@ public class SearchPhaseController extends AbstractComponent {
CompletionSuggestion.Entry.Option suggestOption = CompletionSuggestion.Entry.Option suggestOption =
suggestionOptions.get(scoreDocIndex - currentOffset); suggestionOptions.get(scoreDocIndex - currentOffset);
hit.score(shardDoc.score); hit.score(shardDoc.score);
hit.shard(fetchResult.shardTarget()); hit.shard(fetchResult.getSearchShardTarget());
suggestOption.setHit(hit); suggestOption.setHit(hit);
} }
} }
@ -377,8 +375,8 @@ public class SearchPhaseController extends AbstractComponent {
} }
private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, ScoreDoc[] sortedDocs, private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom, ScoreDoc[] sortedDocs,
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) { AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> fetchResults = fetchResultsArr.asList(); List<? extends SearchPhaseResult> fetchResults = fetchResultsArr.asList();
boolean sorted = false; boolean sorted = false;
int sortScoreIndex = -1; int sortScoreIndex = -1;
if (reducedQueryPhase.oneResult.topDocs() instanceof TopFieldDocs) { if (reducedQueryPhase.oneResult.topDocs() instanceof TopFieldDocs) {
@ -396,8 +394,8 @@ public class SearchPhaseController extends AbstractComponent {
} }
} }
// clean the fetch counter // clean the fetch counter
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : fetchResults) { for (SearchPhaseResult entry : fetchResults) {
entry.value.fetchResult().initCounter(); entry.fetchResult().initCounter();
} }
int from = ignoreFrom ? 0 : reducedQueryPhase.oneResult.queryResult().from(); int from = ignoreFrom ? 0 : reducedQueryPhase.oneResult.queryResult().from();
int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.oneResult.size()); int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.oneResult.size());
@ -408,7 +406,7 @@ public class SearchPhaseController extends AbstractComponent {
if (!fetchResults.isEmpty()) { if (!fetchResults.isEmpty()) {
for (int i = 0; i < numSearchHits; i++) { for (int i = 0; i < numSearchHits; i++) {
ScoreDoc shardDoc = sortedDocs[i]; ScoreDoc shardDoc = sortedDocs[i];
QuerySearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex); SearchPhaseResult fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
if (fetchResultProvider == null) { if (fetchResultProvider == null) {
continue; continue;
} }
@ -417,7 +415,7 @@ public class SearchPhaseController extends AbstractComponent {
if (index < fetchResult.hits().internalHits().length) { if (index < fetchResult.hits().internalHits().length) {
SearchHit searchHit = fetchResult.hits().internalHits()[index]; SearchHit searchHit = fetchResult.hits().internalHits()[index];
searchHit.score(shardDoc.score); searchHit.score(shardDoc.score);
searchHit.shard(fetchResult.shardTarget()); searchHit.shard(fetchResult.getSearchShardTarget());
if (sorted) { if (sorted) {
FieldDoc fieldDoc = (FieldDoc) shardDoc; FieldDoc fieldDoc = (FieldDoc) shardDoc;
searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.oneResult.sortValueFormats()); searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.oneResult.sortValueFormats());
@ -437,7 +435,7 @@ public class SearchPhaseController extends AbstractComponent {
* Reduces the given query results and consumes all aggregations and profile results. * Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results * @param queryResults a list of non-null query shard results
*/ */
public final ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults) { public final ReducedQueryPhase reducedQueryPhase(List<? extends SearchPhaseResult> queryResults) {
return reducedQueryPhase(queryResults, null, 0); return reducedQueryPhase(queryResults, null, 0);
} }
@ -450,7 +448,7 @@ public class SearchPhaseController extends AbstractComponent {
* @see QuerySearchResult#consumeAggs() * @see QuerySearchResult#consumeAggs()
* @see QuerySearchResult#consumeProfileResult() * @see QuerySearchResult#consumeProfileResult()
*/ */
private ReducedQueryPhase reducedQueryPhase(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults, private ReducedQueryPhase reducedQueryPhase(List<? extends SearchPhaseResult> queryResults,
List<InternalAggregations> bufferdAggs, int numReducePhases) { List<InternalAggregations> bufferdAggs, int numReducePhases) {
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases; assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
numReducePhases++; // increment for this phase numReducePhases++; // increment for this phase
@ -463,7 +461,7 @@ public class SearchPhaseController extends AbstractComponent {
return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null, return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null,
numReducePhases); numReducePhases);
} }
final QuerySearchResult firstResult = queryResults.get(0).value.queryResult(); final QuerySearchResult firstResult = queryResults.get(0).queryResult();
final boolean hasSuggest = firstResult.suggest() != null; final boolean hasSuggest = firstResult.suggest() != null;
final boolean hasProfileResults = firstResult.hasProfileResults(); final boolean hasProfileResults = firstResult.hasProfileResults();
final boolean consumeAggs; final boolean consumeAggs;
@ -487,8 +485,8 @@ public class SearchPhaseController extends AbstractComponent {
final Map<String, List<Suggestion>> groupedSuggestions = hasSuggest ? new HashMap<>() : Collections.emptyMap(); final Map<String, List<Suggestion>> groupedSuggestions = hasSuggest ? new HashMap<>() : Collections.emptyMap();
final Map<String, ProfileShardResult> profileResults = hasProfileResults ? new HashMap<>(queryResults.size()) final Map<String, ProfileShardResult> profileResults = hasProfileResults ? new HashMap<>(queryResults.size())
: Collections.emptyMap(); : Collections.emptyMap();
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) { for (SearchPhaseResult entry : queryResults) {
QuerySearchResult result = entry.value.queryResult(); QuerySearchResult result = entry.queryResult();
if (result.searchTimedOut()) { if (result.searchTimedOut()) {
timedOut = true; timedOut = true;
} }
@ -515,7 +513,7 @@ public class SearchPhaseController extends AbstractComponent {
aggregationsList.add((InternalAggregations) result.consumeAggs()); aggregationsList.add((InternalAggregations) result.consumeAggs());
} }
if (hasProfileResults) { if (hasProfileResults) {
String key = result.shardTarget().toString(); String key = result.getSearchShardTarget().toString();
profileResults.put(key, result.consumeProfileResult()); profileResults.put(key, result.consumeProfileResult());
} }
} }
@ -622,7 +620,7 @@ public class SearchPhaseController extends AbstractComponent {
* iff the buffer is exhausted. * iff the buffer is exhausted.
*/ */
static final class QueryPhaseResultConsumer static final class QueryPhaseResultConsumer
extends InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> { extends InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> {
private final InternalAggregations[] buffer; private final InternalAggregations[] buffer;
private int index; private int index;
private final SearchPhaseController controller; private final SearchPhaseController controller;
@ -649,8 +647,8 @@ public class SearchPhaseController extends AbstractComponent {
} }
@Override @Override
public void consumeResult(int shardIndex, QuerySearchResultProvider result) { public void consumeResult(SearchPhaseResult result) {
super.consumeResult(shardIndex, result); super.consumeResult(result);
QuerySearchResult queryResult = result.queryResult(); QuerySearchResult queryResult = result.queryResult();
assert queryResult.hasAggs() : "this collector should only be used if aggs are requested"; assert queryResult.hasAggs() : "this collector should only be used if aggs are requested";
consumeInternal(queryResult); consumeInternal(queryResult);
@ -691,7 +689,7 @@ public class SearchPhaseController extends AbstractComponent {
/** /**
* Returns a new SearchPhaseResults instance. This might return an instance that reduces search responses incrementally. * Returns a new SearchPhaseResults instance. This might return an instance that reduces search responses incrementally.
*/ */
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> newSearchPhaseResults(SearchRequest request, int numShards) { InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchRequest request, int numShards) {
SearchSourceBuilder source = request.source(); SearchSourceBuilder source = request.source();
if (source != null && source.aggregations() != null) { if (source != null && source.aggregations() != null) {
if (request.getBatchedReduceSize() < numShards) { if (request.getBatchedReduceSize() < numShards) {

View File

@ -24,8 +24,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import java.util.Map; import java.util.Map;
@ -33,7 +33,7 @@ import java.util.concurrent.Executor;
import java.util.function.Function; import java.util.function.Function;
final class SearchQueryThenFetchAsyncAction final class SearchQueryThenFetchAsyncAction
extends AbstractSearchAsyncAction<QuerySearchResultProvider> { extends AbstractSearchAsyncAction<SearchPhaseResult> {
private final SearchPhaseController searchPhaseController; private final SearchPhaseController searchPhaseController;
@ -69,11 +69,10 @@ final class SearchQueryThenFetchAsyncAction
this.searchPhaseController = searchPhaseController; this.searchPhaseController = searchPhaseController;
} }
protected void executePhaseOnShard( protected void executePhaseOnShard(
final ShardIterator shardIt, final ShardIterator shardIt,
final ShardRouting shard, final ShardRouting shard,
final ActionListener<QuerySearchResultProvider> listener) { final SearchActionListener<SearchPhaseResult> listener) {
getSearchTransport().sendExecuteQuery( getSearchTransport().sendExecuteQuery(
getConnection(shard.currentNodeId()), getConnection(shard.currentNodeId()),
buildShardSearchRequest(shardIt, shard), buildShardSearchRequest(shardIt, shard),
@ -83,9 +82,8 @@ final class SearchQueryThenFetchAsyncAction
@Override @Override
protected SearchPhase getNextPhase( protected SearchPhase getNextPhase(
final SearchPhaseResults<QuerySearchResultProvider> results, final SearchPhaseResults<SearchPhaseResult> results,
final SearchPhaseContext context) { final SearchPhaseContext context) {
return new FetchSearchPhase(results, searchPhaseController, context); return new FetchSearchPhase(results, searchPhaseController, context);
} }
} }

View File

@ -32,13 +32,14 @@ import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest; import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest;
class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { final class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
private final Logger logger; private final Logger logger;
private final SearchPhaseController searchPhaseController; private final SearchPhaseController searchPhaseController;
@ -70,21 +71,17 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
this.queryFetchResults = new AtomicArray<>(scrollId.getContext().length); this.queryFetchResults = new AtomicArray<>(scrollId.getContext().length);
} }
protected final ShardSearchFailure[] buildShardFailures() { private ShardSearchFailure[] buildShardFailures() {
if (shardFailures == null) { if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY; return ShardSearchFailure.EMPTY_ARRAY;
} }
List<AtomicArray.Entry<ShardSearchFailure>> entries = shardFailures.asList(); List<ShardSearchFailure> failures = shardFailures.asList();
ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; return failures.toArray(new ShardSearchFailure[failures.size()]);
for (int i = 0; i < failures.length; i++) {
failures[i] = entries.get(i).value;
}
return failures;
} }
// we do our best to return the shard failures, but its ok if its not fully concurrently safe // we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible // we simply try and return as much as possible
protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) { private void addShardFailure(final int shardIndex, ShardSearchFailure failure) {
if (shardFailures == null) { if (shardFailures == null) {
shardFailures = new AtomicArray<>(scrollId.getContext().length); shardFailures = new AtomicArray<>(scrollId.getContext().length);
} }
@ -130,15 +127,20 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
searchTransportService.sendExecuteFetch(node, internalRequest, task, new ActionListener<ScrollQueryFetchSearchResult>() { searchTransportService.sendExecuteScrollFetch(node, internalRequest, task,
new SearchActionListener<ScrollQueryFetchSearchResult>(null, shardIndex) {
@Override @Override
public void onResponse(ScrollQueryFetchSearchResult result) { protected void setSearchShardTarget(ScrollQueryFetchSearchResult response) {
queryFetchResults.set(shardIndex, result.result()); // don't do this - it's part of the response...
assert response.getSearchShardTarget() != null : "search shard target must not be null";
}
@Override
protected void innerOnResponse(ScrollQueryFetchSearchResult response) {
queryFetchResults.set(response.getShardIndex(), response.result());
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
finishHim(); finishHim();
} }
} }
@Override @Override
public void onFailure(Exception t) { public void onFailure(Exception t) {
onPhaseFailure(t, searchId, shardIndex); onPhaseFailure(t, searchId, shardIndex);
@ -170,7 +172,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
} }
private void innerFinishHim() throws Exception { private void innerFinishHim() throws Exception {
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults); ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults.asList(), queryFetchResults.length());
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs,
searchPhaseController.reducedQueryPhase(queryFetchResults.asList()), queryFetchResults); searchPhaseController.reducedQueryPhase(queryFetchResults.asList()), queryFetchResults);
String scrollId = null; String scrollId = null;

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.fetch.ShardFetchRequest;
import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalScrollSearchRequest;
@ -41,7 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest; import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest;
class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private final Logger logger; private final Logger logger;
private final SearchTask task; private final SearchTask task;
@ -73,21 +74,17 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
this.fetchResults = new AtomicArray<>(scrollId.getContext().length); this.fetchResults = new AtomicArray<>(scrollId.getContext().length);
} }
protected final ShardSearchFailure[] buildShardFailures() { private ShardSearchFailure[] buildShardFailures() {
if (shardFailures == null) { if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY; return ShardSearchFailure.EMPTY_ARRAY;
} }
List<AtomicArray.Entry<ShardSearchFailure>> entries = shardFailures.asList(); List<ShardSearchFailure> failures = shardFailures.asList();
ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; return failures.toArray(new ShardSearchFailure[failures.size()]);
for (int i = 0; i < failures.length; i++) {
failures[i] = entries.get(i).value;
}
return failures;
} }
// we do our best to return the shard failures, but its ok if its not fully concurrently safe // we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible // we simply try and return as much as possible
protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) { private void addShardFailure(final int shardIndex, ShardSearchFailure failure) {
if (shardFailures == null) { if (shardFailures == null) {
shardFailures = new AtomicArray<>(scrollId.getContext().length); shardFailures = new AtomicArray<>(scrollId.getContext().length);
} }
@ -99,8 +96,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY)); listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY));
return; return;
} }
final AtomicInteger counter = new AtomicInteger(scrollId.getContext().length); final CountDown counter = new CountDown(scrollId.getContext().length);
ScrollIdForNode[] context = scrollId.getContext(); ScrollIdForNode[] context = scrollId.getContext();
for (int i = 0; i < context.length; i++) { for (int i = 0; i < context.length; i++) {
ScrollIdForNode target = context[i]; ScrollIdForNode target = context[i];
@ -112,7 +108,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
logger.debug("Node [{}] not available for scroll request [{}]", target.getNode(), scrollId.getSource()); logger.debug("Node [{}] not available for scroll request [{}]", target.getNode(), scrollId.getSource());
} }
successfulOps.decrementAndGet(); successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) { if (counter.countDown()) {
try { try {
executeFetchPhase(); executeFetchPhase();
} catch (Exception e) { } catch (Exception e) {
@ -124,13 +120,21 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
} }
} }
private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) { private void executeQueryPhase(final int shardIndex, final CountDown counter, DiscoveryNode node, final long searchId) {
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
searchTransportService.sendExecuteQuery(node, internalRequest, task, new ActionListener<ScrollQuerySearchResult>() { searchTransportService.sendExecuteScrollQuery(node, internalRequest, task,
new SearchActionListener<ScrollQuerySearchResult>(null, shardIndex) {
@Override @Override
public void onResponse(ScrollQuerySearchResult result) { protected void setSearchShardTarget(ScrollQuerySearchResult response) {
queryResults.set(shardIndex, result.queryResult()); // don't do this - it's part of the response...
if (counter.decrementAndGet() == 0) { assert response.getSearchShardTarget() != null : "search shard target must not be null";
}
@Override
protected void innerOnResponse(ScrollQuerySearchResult result) {
queryResults.setOnce(result.getShardIndex(), result.queryResult());
if (counter.countDown()) {
try { try {
executeFetchPhase(); executeFetchPhase();
} catch (Exception e) { } catch (Exception e) {
@ -146,13 +150,13 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
}); });
} }
void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Exception failure) { void onQueryPhaseFailure(final int shardIndex, final CountDown counter, final long searchId, Exception failure) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute query phase", searchId), failure); logger.debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute query phase", searchId), failure);
} }
addShardFailure(shardIndex, new ShardSearchFailure(failure)); addShardFailure(shardIndex, new ShardSearchFailure(failure));
successfulOps.decrementAndGet(); successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) { if (counter.countDown()) {
if (successfulOps.get() == 0) { if (successfulOps.get() == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", failure, buildShardFailures())); listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", failure, buildShardFailures()));
} else { } else {
@ -167,7 +171,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
} }
private void executeFetchPhase() throws Exception { private void executeFetchPhase() throws Exception {
sortedShardDocs = searchPhaseController.sortDocs(true, queryResults); sortedShardDocs = searchPhaseController.sortDocs(true, queryResults.asList(), queryResults.length());
if (sortedShardDocs.length == 0) { if (sortedShardDocs.length == 0) {
finishHim(searchPhaseController.reducedQueryPhase(queryResults.asList())); finishHim(searchPhaseController.reducedQueryPhase(queryResults.asList()));
return; return;
@ -177,21 +181,21 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList()); SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList());
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs,
queryResults.length()); queryResults.length());
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.length); final CountDown counter = new CountDown(docIdsToLoad.length);
for (int i = 0; i < docIdsToLoad.length; i++) { for (int i = 0; i < docIdsToLoad.length; i++) {
final int index = i; final int index = i;
final IntArrayList docIds = docIdsToLoad[index]; final IntArrayList docIds = docIdsToLoad[index];
if (docIds != null) { if (docIds != null) {
final QuerySearchResult querySearchResult = queryResults.get(index); final QuerySearchResult querySearchResult = queryResults.get(index);
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[index]; ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[index];
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc); ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.getRequestId(), docIds, lastEmittedDoc);
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().getNodeId()); DiscoveryNode node = nodes.get(querySearchResult.getSearchShardTarget().getNodeId());
searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, task, new ActionListener<FetchSearchResult>() { searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, task,
new SearchActionListener<FetchSearchResult>(querySearchResult.getSearchShardTarget(), index) {
@Override @Override
public void onResponse(FetchSearchResult result) { protected void innerOnResponse(FetchSearchResult response) {
result.shardTarget(querySearchResult.shardTarget()); fetchResults.setOnce(response.getShardIndex(), response);
fetchResults.set(index, result); if (counter.countDown()) {
if (counter.decrementAndGet() == 0) {
finishHim(reducedQueryPhase); finishHim(reducedQueryPhase);
} }
} }
@ -202,14 +206,14 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
logger.debug("Failed to execute fetch phase", t); logger.debug("Failed to execute fetch phase", t);
} }
successfulOps.decrementAndGet(); successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) { if (counter.countDown()) {
finishHim(reducedQueryPhase); finishHim(reducedQueryPhase);
} }
} }
}); });
} else { } else {
// the counter is set to the total size of docIdsToLoad which can have null values so we have to count them down too // the counter is set to the total size of docIdsToLoad which can have null values so we have to count them down too
if (counter.decrementAndGet() == 0) { if (counter.countDown()) {
finishHim(reducedQueryPhase); finishHim(reducedQueryPhase);
} }
} }

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult;
@ -42,7 +43,6 @@ import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.query.ScrollQuerySearchResult; import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -118,17 +118,17 @@ public class SearchTransportService extends AbstractLifecycleComponent {
} }
public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
final ActionListener<DfsSearchResult> listener) { final SearchActionListener<DfsSearchResult> listener) {
transportService.sendChildRequest(connection, DFS_ACTION_NAME, request, task, transportService.sendChildRequest(connection, DFS_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, DfsSearchResult::new)); new ActionListenerResponseHandler<>(listener, DfsSearchResult::new));
} }
public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, public void sendExecuteQuery(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
final ActionListener<QuerySearchResultProvider> listener) { final SearchActionListener<SearchPhaseResult> listener) {
// we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request // we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request
// this used to be the QUERY_AND_FETCH which doesn't exists anymore. // this used to be the QUERY_AND_FETCH which doesn't exists anymore.
final boolean fetchDocuments = request.numberOfShards() == 1; final boolean fetchDocuments = request.numberOfShards() == 1;
Supplier<QuerySearchResultProvider> supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new; Supplier<SearchPhaseResult> supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
if (connection.getVersion().onOrBefore(Version.V_5_3_0_UNRELEASED) && fetchDocuments) { if (connection.getVersion().onOrBefore(Version.V_5_3_0_UNRELEASED) && fetchDocuments) {
// TODO this BWC layer can be removed once this is back-ported to 5.3 // TODO this BWC layer can be removed once this is back-ported to 5.3
transportService.sendChildRequest(connection, QUERY_FETCH_ACTION_NAME, request, task, transportService.sendChildRequest(connection, QUERY_FETCH_ACTION_NAME, request, task,
@ -140,35 +140,35 @@ public class SearchTransportService extends AbstractLifecycleComponent {
} }
public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task, public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task,
final ActionListener<QuerySearchResult> listener) { final SearchActionListener<QuerySearchResult> listener) {
transportService.sendChildRequest(connection, QUERY_ID_ACTION_NAME, request, task, transportService.sendChildRequest(connection, QUERY_ID_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, QuerySearchResult::new)); new ActionListenerResponseHandler<>(listener, QuerySearchResult::new));
} }
public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task, public void sendExecuteScrollQuery(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task,
final ActionListener<ScrollQuerySearchResult> listener) { final SearchActionListener<ScrollQuerySearchResult> listener) {
transportService.sendChildRequest(transportService.getConnection(node), QUERY_SCROLL_ACTION_NAME, request, task, transportService.sendChildRequest(transportService.getConnection(node), QUERY_SCROLL_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new)); new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new));
} }
public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task, public void sendExecuteScrollFetch(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task,
final ActionListener<ScrollQueryFetchSearchResult> listener) { final SearchActionListener<ScrollQueryFetchSearchResult> listener) {
transportService.sendChildRequest(transportService.getConnection(node), QUERY_FETCH_SCROLL_ACTION_NAME, request, task, transportService.sendChildRequest(transportService.getConnection(node), QUERY_FETCH_SCROLL_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new)); new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new));
} }
public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task, public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task,
final ActionListener<FetchSearchResult> listener) { final SearchActionListener<FetchSearchResult> listener) {
sendExecuteFetch(connection, FETCH_ID_ACTION_NAME, request, task, listener); sendExecuteFetch(connection, FETCH_ID_ACTION_NAME, request, task, listener);
} }
public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, SearchTask task, public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, SearchTask task,
final ActionListener<FetchSearchResult> listener) { final SearchActionListener<FetchSearchResult> listener) {
sendExecuteFetch(transportService.getConnection(node), FETCH_ID_SCROLL_ACTION_NAME, request, task, listener); sendExecuteFetch(transportService.getConnection(node), FETCH_ID_SCROLL_ACTION_NAME, request, task, listener);
} }
private void sendExecuteFetch(Transport.Connection connection, String action, final ShardFetchRequest request, SearchTask task, private void sendExecuteFetch(Transport.Connection connection, String action, final ShardFetchRequest request, SearchTask task,
final ActionListener<FetchSearchResult> listener) { final SearchActionListener<FetchSearchResult> listener) {
transportService.sendChildRequest(connection, action, request, task, transportService.sendChildRequest(connection, action, request, task,
new ActionListenerResponseHandler<>(listener, FetchSearchResult::new)); new ActionListenerResponseHandler<>(listener, FetchSearchResult::new));
} }
@ -327,7 +327,7 @@ public class SearchTransportService extends AbstractLifecycleComponent {
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() { new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override @Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
QuerySearchResultProvider result = searchService.executeQueryPhase(request, (SearchTask)task); SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result); channel.sendResponse(result);
} }
}); });
@ -361,7 +361,7 @@ public class SearchTransportService extends AbstractLifecycleComponent {
@Override @Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
assert request.numberOfShards() == 1 : "expected single shard request but got: " + request.numberOfShards(); assert request.numberOfShards() == 1 : "expected single shard request but got: " + request.numberOfShards();
QuerySearchResultProvider result = searchService.executeQueryPhase(request, (SearchTask)task); SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result); channel.sendResponse(result);
} }
}); });

View File

@ -38,10 +38,9 @@ final class TransportSearchHelper {
try (RAMOutputStream out = new RAMOutputStream()) { try (RAMOutputStream out = new RAMOutputStream()) {
out.writeString(searchPhaseResults.length() == 1 ? ParsedScrollId.QUERY_AND_FETCH_TYPE : ParsedScrollId.QUERY_THEN_FETCH_TYPE); out.writeString(searchPhaseResults.length() == 1 ? ParsedScrollId.QUERY_AND_FETCH_TYPE : ParsedScrollId.QUERY_THEN_FETCH_TYPE);
out.writeVInt(searchPhaseResults.asList().size()); out.writeVInt(searchPhaseResults.asList().size());
for (AtomicArray.Entry<? extends SearchPhaseResult> entry : searchPhaseResults.asList()) { for (SearchPhaseResult searchPhaseResult : searchPhaseResults.asList()) {
SearchPhaseResult searchPhaseResult = entry.value; out.writeLong(searchPhaseResult.getRequestId());
out.writeLong(searchPhaseResult.id()); out.writeString(searchPhaseResult.getSearchShardTarget().getNodeId());
out.writeString(searchPhaseResult.shardTarget().getNodeId());
} }
byte[] bytes = new byte[(int) out.getFilePointer()]; byte[] bytes = new byte[(int) out.getFilePointer()];
out.writeTo(bytes, 0); out.writeTo(bytes, 0);

View File

@ -135,14 +135,14 @@ public abstract class TransportTasksAction<
} }
List<TaskResponse> results = new ArrayList<>(); List<TaskResponse> results = new ArrayList<>();
List<TaskOperationFailure> exceptions = new ArrayList<>(); List<TaskOperationFailure> exceptions = new ArrayList<>();
for (AtomicArray.Entry<Tuple<TaskResponse, Exception>> response : responses.asList()) { for (Tuple<TaskResponse, Exception> response : responses.asList()) {
if (response.value.v1() == null) { if (response.v1() == null) {
assert response.value.v2() != null; assert response.v2() != null;
exceptions.add(new TaskOperationFailure(clusterService.localNode().getId(), tasks.get(taskIndex).getId(), exceptions.add(new TaskOperationFailure(clusterService.localNode().getId(), tasks.get(taskIndex).getId(),
response.value.v2())); response.v2()));
} else { } else {
assert response.value.v2() == null; assert response.v2() == null;
results.add(response.value.v1()); results.add(response.v1());
} }
} }
listener.onResponse(new NodeTasksResponse(clusterService.localNode().getId(), results, exceptions)); listener.onResponse(new NodeTasksResponse(clusterService.localNode().getId(), results, exceptions));

View File

@ -40,7 +40,7 @@ public class AtomicArray<E> {
} }
private final AtomicReferenceArray<E> array; private final AtomicReferenceArray<E> array;
private volatile List<Entry<E>> nonNullList; private volatile List<E> nonNullList;
public AtomicArray(int size) { public AtomicArray(int size) {
array = new AtomicReferenceArray<>(size); array = new AtomicReferenceArray<>(size);
@ -87,19 +87,18 @@ public class AtomicArray<E> {
} }
/** /**
* Returns the it as a non null list, with an Entry wrapping each value allowing to * Returns the it as a non null list.
* retain its index.
*/ */
public List<Entry<E>> asList() { public List<E> asList() {
if (nonNullList == null) { if (nonNullList == null) {
if (array == null || array.length() == 0) { if (array == null || array.length() == 0) {
nonNullList = Collections.emptyList(); nonNullList = Collections.emptyList();
} else { } else {
List<Entry<E>> list = new ArrayList<>(array.length()); List<E> list = new ArrayList<>(array.length());
for (int i = 0; i < array.length(); i++) { for (int i = 0; i < array.length(); i++) {
E e = array.get(i); E e = array.get(i);
if (e != null) { if (e != null) {
list.add(new Entry<>(i, e)); list.add(e);
} }
} }
nonNullList = list; nonNullList = list;
@ -120,23 +119,4 @@ public class AtomicArray<E> {
} }
return a; return a;
} }
/**
* An entry within the array.
*/
public static class Entry<E> {
/**
* The original index of the value within the array.
*/
public final int index;
/**
* The value.
*/
public final E value;
public Entry(int index, E value) {
this.index = index;
this.value = value;
}
}
} }

View File

@ -121,7 +121,6 @@ import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -1145,7 +1144,7 @@ public class IndicesService extends AbstractLifecycleComponent
final QuerySearchResult result = context.queryResult(); final QuerySearchResult result = context.queryResult();
StreamInput in = new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistry); StreamInput in = new NamedWriteableAwareStreamInput(bytesReference.streamInput(), namedWriteableRegistry);
result.readFromWithId(context.id(), in); result.readFromWithId(context.id(), in);
result.shardTarget(context.shardTarget()); result.setSearchShardTarget(context.shardTarget());
} else if (context.queryResult().searchTimedOut()) { } else if (context.queryResult().searchTimedOut()) {
// we have to invalidate the cache entry if we cached a query result form a request that timed out. // we have to invalidate the cache entry if we cached a query result form a request that timed out.
// we can't really throw exceptions in the loading part to signal a timed out search to the outside world since if there are // we can't really throw exceptions in the loading part to signal a timed out search to the outside world since if there are

View File

@ -20,12 +20,63 @@
package org.elasticsearch.search; package org.elasticsearch.search;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.transport.TransportResponse;
public interface SearchPhaseResult extends Streamable { /**
* This class is a base class for all search releated results. It contains the shard target it
* was executed against, a shard index used to reference the result on the coordinating node
* and a request ID that is used to reference the request context on the executing node. The
* request ID is particularly important since it is used to reference and maintain a context
* across search phases to ensure the same point in time snapshot is used for querying and
* fetching etc.
*/
public abstract class SearchPhaseResult extends TransportResponse implements Streamable {
long id(); private SearchShardTarget searchShardTarget;
private int shardIndex = -1;
protected long requestId;
SearchShardTarget shardTarget(); /**
* Returns the results request ID that is used to reference the search context on the executing
* node
*/
public long getRequestId() {
return requestId;
}
void shardTarget(SearchShardTarget shardTarget); /**
* Returns the shard index in the context of the currently executing search request that is
* used for accounting on the coordinating node
*/
public int getShardIndex() {
assert shardIndex != -1 : "shardIndex is not set";
return shardIndex;
}
public SearchShardTarget getSearchShardTarget() {
return searchShardTarget;
}
public void setSearchShardTarget(SearchShardTarget shardTarget) {
this.searchShardTarget = shardTarget;
}
public void setShardIndex(int shardIndex) {
assert shardIndex >= 0 : "shardIndex must be >= 0 but was: " + shardIndex;
this.shardIndex = shardIndex;
}
/**
* Returns the query result iff it's included in this response otherwise <code>null</code>
*/
public QuerySearchResult queryResult() {
return null;
}
/**
* Returns the fetch result iff it's included in this response otherwise <code>null</code>
*/
public FetchSearchResult fetchResult() { return null; }
} }

View File

@ -75,7 +75,6 @@ import org.elasticsearch.search.profile.Profilers;
import org.elasticsearch.search.query.QueryPhase; import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.query.ScrollQuerySearchResult; import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.search.rescore.RescoreBuilder; import org.elasticsearch.search.rescore.RescoreBuilder;
import org.elasticsearch.search.searchafter.SearchAfterBuilder; import org.elasticsearch.search.searchafter.SearchAfterBuilder;
@ -248,7 +247,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
} }
} }
public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException { public SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
final SearchContext context = createAndPutContext(request); final SearchContext context = createAndPutContext(request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef(); context.incRef();

View File

@ -30,44 +30,24 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException; import java.io.IOException;
public class DfsSearchResult extends TransportResponse implements SearchPhaseResult { public class DfsSearchResult extends SearchPhaseResult {
private static final Term[] EMPTY_TERMS = new Term[0]; private static final Term[] EMPTY_TERMS = new Term[0];
private static final TermStatistics[] EMPTY_TERM_STATS = new TermStatistics[0]; private static final TermStatistics[] EMPTY_TERM_STATS = new TermStatistics[0];
private SearchShardTarget shardTarget;
private long id;
private Term[] terms; private Term[] terms;
private TermStatistics[] termStatistics; private TermStatistics[] termStatistics;
private ObjectObjectHashMap<String, CollectionStatistics> fieldStatistics = HppcMaps.newNoNullKeysMap(); private ObjectObjectHashMap<String, CollectionStatistics> fieldStatistics = HppcMaps.newNoNullKeysMap();
private int maxDoc; private int maxDoc;
public DfsSearchResult() { public DfsSearchResult() {
} }
public DfsSearchResult(long id, SearchShardTarget shardTarget) { public DfsSearchResult(long id, SearchShardTarget shardTarget) {
this.id = id; this.setSearchShardTarget(shardTarget);
this.shardTarget = shardTarget; this.requestId = id;
}
@Override
public long id() {
return this.id;
}
@Override
public SearchShardTarget shardTarget() {
return shardTarget;
}
@Override
public void shardTarget(SearchShardTarget shardTarget) {
this.shardTarget = shardTarget;
} }
public DfsSearchResult maxDoc(int maxDoc) { public DfsSearchResult maxDoc(int maxDoc) {
@ -105,7 +85,7 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
id = in.readLong(); requestId = in.readLong();
int termsSize = in.readVInt(); int termsSize = in.readVInt();
if (termsSize == 0) { if (termsSize == 0) {
terms = EMPTY_TERMS; terms = EMPTY_TERMS;
@ -125,7 +105,7 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeLong(id); out.writeLong(requestId);
out.writeVInt(terms.length); out.writeVInt(terms.length);
for (Term term : terms) { for (Term term : terms) {
out.writeString(term.field()); out.writeString(term.field());

View File

@ -22,28 +22,25 @@ package org.elasticsearch.search.fetch;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import java.io.IOException; import java.io.IOException;
public class FetchSearchResult extends QuerySearchResultProvider { public final class FetchSearchResult extends SearchPhaseResult {
private long id;
private SearchShardTarget shardTarget;
private SearchHits hits; private SearchHits hits;
// client side counter // client side counter
private transient int counter; private transient int counter;
public FetchSearchResult() { public FetchSearchResult() {
} }
public FetchSearchResult(long id, SearchShardTarget shardTarget) { public FetchSearchResult(long id, SearchShardTarget shardTarget) {
this.id = id; this.requestId = id;
this.shardTarget = shardTarget; setSearchShardTarget(shardTarget);
} }
@Override @Override
@ -56,21 +53,6 @@ public class FetchSearchResult extends QuerySearchResultProvider {
return this; return this;
} }
@Override
public long id() {
return this.id;
}
@Override
public SearchShardTarget shardTarget() {
return this.shardTarget;
}
@Override
public void shardTarget(SearchShardTarget shardTarget) {
this.shardTarget = shardTarget;
}
public void hits(SearchHits hits) { public void hits(SearchHits hits) {
assert assertNoSearchTarget(hits); assert assertNoSearchTarget(hits);
this.hits = hits; this.hits = hits;
@ -105,14 +87,14 @@ public class FetchSearchResult extends QuerySearchResultProvider {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
id = in.readLong(); requestId = in.readLong();
hits = SearchHits.readSearchHits(in); hits = SearchHits.readSearchHits(in);
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeLong(id); out.writeLong(requestId);
hits.writeTo(out); hits.writeTo(out);
} }
} }

View File

@ -21,22 +21,21 @@ package org.elasticsearch.search.fetch;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.search.fetch.FetchSearchResult.readFetchSearchResult; import static org.elasticsearch.search.fetch.FetchSearchResult.readFetchSearchResult;
import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchResult; import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchResult;
public class QueryFetchSearchResult extends QuerySearchResultProvider { public final class QueryFetchSearchResult extends SearchPhaseResult {
private QuerySearchResult queryResult; private QuerySearchResult queryResult;
private FetchSearchResult fetchResult; private FetchSearchResult fetchResult;
public QueryFetchSearchResult() { public QueryFetchSearchResult() {
} }
public QueryFetchSearchResult(QuerySearchResult queryResult, FetchSearchResult fetchResult) { public QueryFetchSearchResult(QuerySearchResult queryResult, FetchSearchResult fetchResult) {
@ -45,19 +44,27 @@ public class QueryFetchSearchResult extends QuerySearchResultProvider {
} }
@Override @Override
public long id() { public long getRequestId() {
return queryResult.id(); return queryResult.getRequestId();
} }
@Override @Override
public SearchShardTarget shardTarget() { public SearchShardTarget getSearchShardTarget() {
return queryResult.shardTarget(); return queryResult.getSearchShardTarget();
} }
@Override @Override
public void shardTarget(SearchShardTarget shardTarget) { public void setSearchShardTarget(SearchShardTarget shardTarget) {
queryResult.shardTarget(shardTarget); super.setSearchShardTarget(shardTarget);
fetchResult.shardTarget(shardTarget); queryResult.setSearchShardTarget(shardTarget);
fetchResult.setSearchShardTarget(shardTarget);
}
@Override
public void setShardIndex(int requestIndex) {
super.setShardIndex(requestIndex);
queryResult.setShardIndex(requestIndex);
fetchResult.setShardIndex(requestIndex);
} }
@Override @Override

View File

@ -21,46 +21,64 @@ package org.elasticsearch.search.fetch;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.search.query.QuerySearchResult;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.search.fetch.QueryFetchSearchResult.readQueryFetchSearchResult; import static org.elasticsearch.search.fetch.QueryFetchSearchResult.readQueryFetchSearchResult;
public class ScrollQueryFetchSearchResult extends TransportResponse { public final class ScrollQueryFetchSearchResult extends SearchPhaseResult {
private QueryFetchSearchResult result; private QueryFetchSearchResult result;
private SearchShardTarget shardTarget;
public ScrollQueryFetchSearchResult() { public ScrollQueryFetchSearchResult() {
} }
public ScrollQueryFetchSearchResult(QueryFetchSearchResult result, SearchShardTarget shardTarget) { public ScrollQueryFetchSearchResult(QueryFetchSearchResult result, SearchShardTarget shardTarget) {
this.result = result; this.result = result;
this.shardTarget = shardTarget; setSearchShardTarget(shardTarget);
} }
public QueryFetchSearchResult result() { public QueryFetchSearchResult result() {
return result; return result;
} }
public SearchShardTarget shardTarget() { @Override
return shardTarget; public void setSearchShardTarget(SearchShardTarget shardTarget) {
super.setSearchShardTarget(shardTarget);
result.setSearchShardTarget(shardTarget);
}
@Override
public void setShardIndex(int shardIndex) {
super.setShardIndex(shardIndex);
result.setShardIndex(shardIndex);
}
@Override
public QuerySearchResult queryResult() {
return result.queryResult();
}
@Override
public FetchSearchResult fetchResult() {
return result.fetchResult();
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
shardTarget = new SearchShardTarget(in); SearchShardTarget searchShardTarget = new SearchShardTarget(in);
result = readQueryFetchSearchResult(in); result = readQueryFetchSearchResult(in);
result.shardTarget(shardTarget); setSearchShardTarget(searchShardTarget);
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
shardTarget.writeTo(out); getSearchShardTarget().writeTo(out);
result.writeTo(out); result.writeTo(out);
} }
} }

View File

@ -24,6 +24,7 @@ import org.apache.lucene.search.TopDocs;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
@ -40,10 +41,8 @@ import static java.util.Collections.emptyList;
import static org.elasticsearch.common.lucene.Lucene.readTopDocs; import static org.elasticsearch.common.lucene.Lucene.readTopDocs;
import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; import static org.elasticsearch.common.lucene.Lucene.writeTopDocs;
public final class QuerySearchResult extends QuerySearchResultProvider { public final class QuerySearchResult extends SearchPhaseResult {
private long id;
private SearchShardTarget shardTarget;
private int from; private int from;
private int size; private int size;
private TopDocs topDocs; private TopDocs topDocs;
@ -61,8 +60,8 @@ public final class QuerySearchResult extends QuerySearchResultProvider {
} }
public QuerySearchResult(long id, SearchShardTarget shardTarget) { public QuerySearchResult(long id, SearchShardTarget shardTarget) {
this.id = id; this.requestId = id;
this.shardTarget = shardTarget; setSearchShardTarget(shardTarget);
} }
@Override @Override
@ -70,20 +69,6 @@ public final class QuerySearchResult extends QuerySearchResultProvider {
return this; return this;
} }
@Override
public long id() {
return this.id;
}
@Override
public SearchShardTarget shardTarget() {
return shardTarget;
}
@Override
public void shardTarget(SearchShardTarget shardTarget) {
this.shardTarget = shardTarget;
}
public void searchTimedOut(boolean searchTimedOut) { public void searchTimedOut(boolean searchTimedOut) {
this.searchTimedOut = searchTimedOut; this.searchTimedOut = searchTimedOut;
@ -230,7 +215,7 @@ public final class QuerySearchResult extends QuerySearchResultProvider {
} }
public void readFromWithId(long id, StreamInput in) throws IOException { public void readFromWithId(long id, StreamInput in) throws IOException {
this.id = id; this.requestId = id;
from = in.readVInt(); from = in.readVInt();
size = in.readVInt(); size = in.readVInt();
int numSortFieldsPlus1 = in.readVInt(); int numSortFieldsPlus1 = in.readVInt();
@ -260,7 +245,7 @@ public final class QuerySearchResult extends QuerySearchResultProvider {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeLong(id); out.writeLong(requestId);
writeToNoId(out); writeToNoId(out);
} }

View File

@ -1,41 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.query;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.transport.TransportResponse;
public abstract class QuerySearchResultProvider extends TransportResponse implements SearchPhaseResult {
/**
* Returns the query result iff it's included in this response otherwise <code>null</code>
*/
public QuerySearchResult queryResult() {
return null;
}
/**
* Returns the fetch result iff it's included in this response otherwise <code>null</code>
*/
public FetchSearchResult fetchResult() {
return null;
}
}

View File

@ -21,46 +21,54 @@ package org.elasticsearch.search.query;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchResult; import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchResult;
public class ScrollQuerySearchResult extends TransportResponse { public final class ScrollQuerySearchResult extends SearchPhaseResult {
private QuerySearchResult queryResult; private QuerySearchResult result;
private SearchShardTarget shardTarget;
public ScrollQuerySearchResult() { public ScrollQuerySearchResult() {
} }
public ScrollQuerySearchResult(QuerySearchResult queryResult, SearchShardTarget shardTarget) { public ScrollQuerySearchResult(QuerySearchResult result, SearchShardTarget shardTarget) {
this.queryResult = queryResult; this.result = result;
this.shardTarget = shardTarget; setSearchShardTarget(shardTarget);
} }
@Override
public void setSearchShardTarget(SearchShardTarget shardTarget) {
super.setSearchShardTarget(shardTarget);
result.setSearchShardTarget(shardTarget);
}
@Override
public void setShardIndex(int shardIndex) {
super.setShardIndex(shardIndex);
result.setShardIndex(shardIndex);
}
@Override
public QuerySearchResult queryResult() { public QuerySearchResult queryResult() {
return queryResult; return result;
}
public SearchShardTarget shardTarget() {
return shardTarget;
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
shardTarget = new SearchShardTarget(in); SearchShardTarget shardTarget = new SearchShardTarget(in);
queryResult = readQuerySearchResult(in); result = readQuerySearchResult(in);
queryResult.shardTarget(shardTarget); setSearchShardTarget(shardTarget);
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
shardTarget.writeTo(out); getSearchShardTarget().writeTo(out);
queryResult.writeTo(out); result.writeTo(out);
} }
} }

View File

@ -125,7 +125,7 @@ public class AbstractSearchAsyncActionTookTests extends ESTestCase {
protected void executePhaseOnShard( protected void executePhaseOnShard(
final ShardIterator shardIt, final ShardIterator shardIt,
final ShardRouting shard, final ShardRouting shard,
final ActionListener<SearchPhaseResult> listener) { final SearchActionListener<SearchPhaseResult> listener) {
} }

View File

@ -46,7 +46,7 @@ public class CountedCollectorTests extends ESTestCase {
runnable.run(); runnable.run();
} }
}; };
CountedCollector<SearchPhaseResult> collector = new CountedCollector<>(results::set, numResultsExpected, CountedCollector<SearchPhaseResult> collector = new CountedCollector<>(r -> results.set(r.getShardIndex(), r), numResultsExpected,
latch::countDown, context); latch::countDown, context);
for (int i = 0; i < numResultsExpected; i++) { for (int i = 0; i < numResultsExpected; i++) {
int shardID = i; int shardID = i;
@ -57,8 +57,12 @@ public class CountedCollectorTests extends ESTestCase {
break; break;
case 1: case 1:
state.add(1); state.add(1);
executor.execute(() -> collector.onResult(shardID, new DfsSearchResult(shardID, null), new SearchShardTarget("foo", executor.execute(() -> {
new Index("bar", "baz"), shardID))); DfsSearchResult dfsSearchResult = new DfsSearchResult(shardID, null);
dfsSearchResult.setShardIndex(shardID);
dfsSearchResult.setSearchShardTarget(new SearchShardTarget("foo",
new Index("bar", "baz"), shardID));
collector.onResult(dfsSearchResult);});
break; break;
case 2: case 2:
state.add(2); state.add(2);
@ -79,7 +83,7 @@ public class CountedCollectorTests extends ESTestCase {
break; break;
case 1: case 1:
assertNotNull(results.get(i)); assertNotNull(results.get(i));
assertEquals(i, results.get(i).id()); assertEquals(i, results.get(i).getRequestId());
break; break;
case 2: case 2:
final int shardId = i; final int shardId = i;

View File

@ -18,52 +18,42 @@
*/ */
package org.elasticsearch.action.search; package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TermStatistics; import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
public class DfsQueryPhaseTests extends ESTestCase { public class DfsQueryPhaseTests extends ESTestCase {
private static DfsSearchResult newSearchResult(int shardIndex, long requestId, SearchShardTarget target) {
DfsSearchResult result = new DfsSearchResult(requestId, target);
result.setShardIndex(shardIndex);
return result;
}
public void testDfsWith2Shards() throws IOException { public void testDfsWith2Shards() throws IOException {
AtomicArray<DfsSearchResult> results = new AtomicArray<>(2); AtomicArray<DfsSearchResult> results = new AtomicArray<>(2);
AtomicReference<AtomicArray<QuerySearchResultProvider>> responseRef = new AtomicReference<>(); AtomicReference<AtomicArray<SearchPhaseResult>> responseRef = new AtomicReference<>();
results.set(0, new DfsSearchResult(1, new SearchShardTarget("node1", new Index("test", "na"), 0))); results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new Index("test", "na"), 0)));
results.set(1, new DfsSearchResult(2, new SearchShardTarget("node2", new Index("test", "na"), 0))); results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new Index("test", "na"), 0)));
results.get(0).termsStatistics(new Term[0], new TermStatistics[0]); results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
results.get(1).termsStatistics(new Term[0], new TermStatistics[0]); results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
@ -73,7 +63,7 @@ public class DfsQueryPhaseTests extends ESTestCase {
@Override @Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
ActionListener<QuerySearchResult> listener) { SearchActionListener<QuerySearchResult> listener) {
if (request.id() == 1) { if (request.id() == 1) {
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
@ -116,9 +106,9 @@ public class DfsQueryPhaseTests extends ESTestCase {
public void testDfsWith1ShardFailed() throws IOException { public void testDfsWith1ShardFailed() throws IOException {
AtomicArray<DfsSearchResult> results = new AtomicArray<>(2); AtomicArray<DfsSearchResult> results = new AtomicArray<>(2);
AtomicReference<AtomicArray<QuerySearchResultProvider>> responseRef = new AtomicReference<>(); AtomicReference<AtomicArray<SearchPhaseResult>> responseRef = new AtomicReference<>();
results.set(0, new DfsSearchResult(1, new SearchShardTarget("node1", new Index("test", "na"), 0))); results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new Index("test", "na"), 0)));
results.set(1, new DfsSearchResult(2, new SearchShardTarget("node2", new Index("test", "na"), 0))); results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new Index("test", "na"), 0)));
results.get(0).termsStatistics(new Term[0], new TermStatistics[0]); results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
results.get(1).termsStatistics(new Term[0], new TermStatistics[0]); results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
@ -128,7 +118,7 @@ public class DfsQueryPhaseTests extends ESTestCase {
@Override @Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
ActionListener<QuerySearchResult> listener) { SearchActionListener<QuerySearchResult> listener) {
if (request.id() == 1) { if (request.id() == 1) {
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
@ -171,9 +161,9 @@ public class DfsQueryPhaseTests extends ESTestCase {
public void testFailPhaseOnException() throws IOException { public void testFailPhaseOnException() throws IOException {
AtomicArray<DfsSearchResult> results = new AtomicArray<>(2); AtomicArray<DfsSearchResult> results = new AtomicArray<>(2);
AtomicReference<AtomicArray<QuerySearchResultProvider>> responseRef = new AtomicReference<>(); AtomicReference<AtomicArray<SearchPhaseResult>> responseRef = new AtomicReference<>();
results.set(0, new DfsSearchResult(1, new SearchShardTarget("node1", new Index("test", "na"), 0))); results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new Index("test", "na"), 0)));
results.set(1, new DfsSearchResult(2, new SearchShardTarget("node2", new Index("test", "na"), 0))); results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new Index("test", "na"), 0)));
results.get(0).termsStatistics(new Term[0], new TermStatistics[0]); results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
results.get(1).termsStatistics(new Term[0], new TermStatistics[0]); results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
@ -183,7 +173,7 @@ public class DfsQueryPhaseTests extends ESTestCase {
@Override @Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
ActionListener<QuerySearchResult> listener) { SearchActionListener<QuerySearchResult> listener) {
if (request.id() == 1) { if (request.id() == 1) {
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);

View File

@ -21,20 +21,18 @@ package org.elasticsearch.action.search;
import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
@ -48,7 +46,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
public void testShortcutQueryAndFetchOptimization() throws IOException { public void testShortcutQueryAndFetchOptimization() throws IOException {
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> results = InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 1); controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 1);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>(); AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
boolean hasHits = randomBoolean(); boolean hasHits = randomBoolean();
@ -59,7 +57,9 @@ public class FetchSearchPhaseTests extends ESTestCase {
queryResult.size(1); queryResult.size(1);
FetchSearchResult fetchResult = new FetchSearchResult(); FetchSearchResult fetchResult = new FetchSearchResult();
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(42)}, 1, 1.0F)); fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(42)}, 1, 1.0F));
results.consumeResult(0, new QueryFetchSearchResult(queryResult, fetchResult)); QueryFetchSearchResult fetchSearchResult = new QueryFetchSearchResult(queryResult, fetchResult);
fetchSearchResult.setShardIndex(0);
results.consumeResult(fetchSearchResult);
numHits = 1; numHits = 1;
} else { } else {
numHits = 0; numHits = 0;
@ -86,25 +86,27 @@ public class FetchSearchPhaseTests extends ESTestCase {
public void testFetchTwoDocument() throws IOException { public void testFetchTwoDocument() throws IOException {
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> results = InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>(); AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = randomIntBetween(2, 10); int resultSetSize = randomIntBetween(2, 10);
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set queryResult.size(resultSetSize); // the size of the result set
results.consumeResult(0, queryResult); queryResult.setShardIndex(0);
results.consumeResult(queryResult);
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1)); queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); queryResult.size(resultSetSize);
results.consumeResult(1, queryResult); queryResult.setShardIndex(1);
results.consumeResult(queryResult);
SearchTransportService searchTransportService = new SearchTransportService( SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) { Settings.builder().put("search.remote.connect", false).build(), null, null) {
@Override @Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
ActionListener<FetchSearchResult> listener) { SearchActionListener<FetchSearchResult> listener) {
FetchSearchResult fetchResult = new FetchSearchResult(); FetchSearchResult fetchResult = new FetchSearchResult();
if (request.id() == 321) { if (request.id() == 321) {
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)}, 1, 2.0F)); fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)}, 1, 2.0F));
@ -138,25 +140,27 @@ public class FetchSearchPhaseTests extends ESTestCase {
public void testFailFetchOneDoc() throws IOException { public void testFailFetchOneDoc() throws IOException {
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> results = InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>(); AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = randomIntBetween(2, 10); int resultSetSize = randomIntBetween(2, 10);
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set queryResult.size(resultSetSize); // the size of the result set
results.consumeResult(0, queryResult); queryResult.setShardIndex(0);
results.consumeResult(queryResult);
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1)); queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); queryResult.size(resultSetSize);
results.consumeResult(1, queryResult); queryResult.setShardIndex(1);
results.consumeResult(queryResult);
SearchTransportService searchTransportService = new SearchTransportService( SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) { Settings.builder().put("search.remote.connect", false).build(), null, null) {
@Override @Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
ActionListener<FetchSearchResult> listener) { SearchActionListener<FetchSearchResult> listener) {
if (request.id() == 321) { if (request.id() == 321) {
FetchSearchResult fetchResult = new FetchSearchResult(); FetchSearchResult fetchResult = new FetchSearchResult();
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)}, 1, 2.0F)); fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)}, 1, 2.0F));
@ -195,20 +199,21 @@ public class FetchSearchPhaseTests extends ESTestCase {
int numHits = randomIntBetween(2, 100); // also numshards --> 1 hit per shard int numHits = randomIntBetween(2, 100); // also numshards --> 1 hit per shard
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits);
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> results = InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), numHits); controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), numHits);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>(); AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
for (int i = 0; i < numHits; i++) { for (int i = 0; i < numHits; i++) {
QuerySearchResult queryResult = new QuerySearchResult(i, new SearchShardTarget("node1", new Index("test", "na"), 0)); QuerySearchResult queryResult = new QuerySearchResult(i, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(i+1, i)}, i), new DocValueFormat[0]); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(i+1, i)}, i), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set queryResult.size(resultSetSize); // the size of the result set
results.consumeResult(i, queryResult); queryResult.setShardIndex(i);
results.consumeResult(queryResult);
} }
SearchTransportService searchTransportService = new SearchTransportService( SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) { Settings.builder().put("search.remote.connect", false).build(), null, null) {
@Override @Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
ActionListener<FetchSearchResult> listener) { SearchActionListener<FetchSearchResult> listener) {
new Thread(() -> { new Thread(() -> {
FetchSearchResult fetchResult = new FetchSearchResult(); FetchSearchResult fetchResult = new FetchSearchResult();
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit((int) (request.id()+1))}, 1, 100F)); fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit((int) (request.id()+1))}, 1, 100F));
@ -249,25 +254,27 @@ public class FetchSearchPhaseTests extends ESTestCase {
public void testExceptionFailsPhase() throws IOException { public void testExceptionFailsPhase() throws IOException {
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> results = InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>(); AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = randomIntBetween(2, 10); int resultSetSize = randomIntBetween(2, 10);
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set queryResult.size(resultSetSize); // the size of the result set
results.consumeResult(0, queryResult); queryResult.setShardIndex(0);
results.consumeResult(queryResult);
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1)); queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); queryResult.size(resultSetSize);
results.consumeResult(1, queryResult); queryResult.setShardIndex(1);
results.consumeResult(queryResult);
AtomicInteger numFetches = new AtomicInteger(0); AtomicInteger numFetches = new AtomicInteger(0);
SearchTransportService searchTransportService = new SearchTransportService( SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) { Settings.builder().put("search.remote.connect", false).build(), null, null) {
@Override @Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
ActionListener<FetchSearchResult> listener) { SearchActionListener<FetchSearchResult> listener) {
FetchSearchResult fetchResult = new FetchSearchResult(); FetchSearchResult fetchResult = new FetchSearchResult();
if (numFetches.incrementAndGet() == 1) { if (numFetches.incrementAndGet() == 1) {
throw new RuntimeException("BOOM"); throw new RuntimeException("BOOM");
@ -300,25 +307,27 @@ public class FetchSearchPhaseTests extends ESTestCase {
public void testCleanupIrrelevantContexts() throws IOException { // contexts that are not fetched should be cleaned up public void testCleanupIrrelevantContexts() throws IOException { // contexts that are not fetched should be cleaned up
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> results = InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>(); AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = 1; int resultSetSize = 1;
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set queryResult.size(resultSetSize); // the size of the result set
results.consumeResult(0, queryResult); queryResult.setShardIndex(0);
results.consumeResult(queryResult);
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1)); queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); queryResult.size(resultSetSize);
results.consumeResult(1, queryResult); queryResult.setShardIndex(1);
results.consumeResult(queryResult);
SearchTransportService searchTransportService = new SearchTransportService( SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) { Settings.builder().put("search.remote.connect", false).build(), null, null) {
@Override @Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
ActionListener<FetchSearchResult> listener) { SearchActionListener<FetchSearchResult> listener) {
FetchSearchResult fetchResult = new FetchSearchResult(); FetchSearchResult fetchResult = new FetchSearchResult();
if (request.id() == 321) { if (request.id() == 321) {
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)}, 1, 2.0F)); fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)}, 1, 2.0F));

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
@ -111,13 +110,14 @@ public class SearchAsyncActionTests extends ESTestCase {
TestSearchResponse response = new TestSearchResponse(); TestSearchResponse response = new TestSearchResponse();
@Override @Override
protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener<TestSearchPhaseResult> listener) { protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, SearchActionListener<TestSearchPhaseResult>
listener) {
assertTrue("shard: " + shard.shardId() + " has been queried twice", response.queried.add(shard.shardId())); assertTrue("shard: " + shard.shardId() + " has been queried twice", response.queried.add(shard.shardId()));
Transport.Connection connection = getConnection(shard.currentNodeId()); Transport.Connection connection = getConnection(shard.currentNodeId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(), TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
connection.getNode()); connection.getNode());
Set<Long> ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> new HashSet<>()); Set<Long> ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> new HashSet<>());
ids.add(testSearchPhaseResult.id); ids.add(testSearchPhaseResult.getRequestId());
if (randomBoolean()) { if (randomBoolean()) {
listener.onResponse(testSearchPhaseResult); listener.onResponse(testSearchPhaseResult);
} else { } else {
@ -132,8 +132,8 @@ public class SearchAsyncActionTests extends ESTestCase {
public void run() throws IOException { public void run() throws IOException {
for (int i = 0; i < results.getNumShards(); i++) { for (int i = 0; i < results.getNumShards(); i++) {
TestSearchPhaseResult result = results.results.get(i); TestSearchPhaseResult result = results.results.get(i);
assertEquals(result.node.getId(), result.shardTarget().getNodeId()); assertEquals(result.node.getId(), result.getSearchShardTarget().getNodeId());
sendReleaseSearchContext(result.id(), new MockConnection(result.node)); sendReleaseSearchContext(result.getRequestId(), new MockConnection(result.node));
} }
responseListener.onResponse(response); responseListener.onResponse(response);
latch.countDown(); latch.countDown();
@ -193,32 +193,14 @@ public class SearchAsyncActionTests extends ESTestCase {
public final Set<ShardId> queried = new HashSet<>(); public final Set<ShardId> queried = new HashSet<>();
} }
public static class TestSearchPhaseResult implements SearchPhaseResult { public static class TestSearchPhaseResult extends SearchPhaseResult {
final long id;
final DiscoveryNode node; final DiscoveryNode node;
SearchShardTarget shardTarget;
public TestSearchPhaseResult(long id, DiscoveryNode node) { public TestSearchPhaseResult(long id, DiscoveryNode node) {
this.id = id; this.requestId = id;
this.node = node; this.node = node;
} }
@Override
public long id() {
return id;
}
@Override
public SearchShardTarget shardTarget() {
return this.shardTarget;
}
@Override
public void shardTarget(SearchShardTarget shardTarget) {
this.shardTarget = shardTarget;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
@ -38,7 +39,6 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -74,8 +74,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
} }
int nShards = randomIntBetween(1, 20); int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<QuerySearchResultProvider> results = generateQueryResults(nShards, suggestions, queryResultSize, false); AtomicArray<SearchPhaseResult> results = generateQueryResults(nShards, suggestions, queryResultSize, false);
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results); ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList(), nShards);
int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results)); int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results));
for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) { for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) {
int suggestionSize = suggestion.getEntries().get(0).getOptions().size(); int suggestionSize = suggestion.getEntries().get(0).getOptions().size();
@ -87,12 +87,12 @@ public class SearchPhaseControllerTests extends ESTestCase {
public void testSortIsIdempotent() throws IOException { public void testSortIsIdempotent() throws IOException {
int nShards = randomIntBetween(1, 20); int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<QuerySearchResultProvider> results = generateQueryResults(nShards, Collections.emptyList(), queryResultSize, AtomicArray<SearchPhaseResult> results = generateQueryResults(nShards, Collections.emptyList(), queryResultSize,
randomBoolean() || true); randomBoolean() || true);
boolean ignoreFrom = randomBoolean(); boolean ignoreFrom = randomBoolean();
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results); ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList(), nShards);
ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results); ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList(), nShards);
assertArrayEquals(sortedDocs, sortedDocs2); assertArrayEquals(sortedDocs, sortedDocs2);
} }
@ -103,7 +103,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
} }
int nShards = randomIntBetween(1, 20); int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<QuerySearchResultProvider> queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false); AtomicArray<SearchPhaseResult> queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false);
// calculate offsets and score doc array // calculate offsets and score doc array
List<ScoreDoc> mergedScoreDocs = new ArrayList<>(); List<ScoreDoc> mergedScoreDocs = new ArrayList<>();
@ -138,10 +138,10 @@ public class SearchPhaseControllerTests extends ESTestCase {
} }
} }
private AtomicArray<QuerySearchResultProvider> generateQueryResults(int nShards, private AtomicArray<SearchPhaseResult> generateQueryResults(int nShards,
List<CompletionSuggestion> suggestions, List<CompletionSuggestion> suggestions,
int searchHitsSize, boolean useConstantScore) { int searchHitsSize, boolean useConstantScore) {
AtomicArray<QuerySearchResultProvider> queryResults = new AtomicArray<>(nShards); AtomicArray<SearchPhaseResult> queryResults = new AtomicArray<>(nShards);
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) { for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex, QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex,
new SearchShardTarget("", new Index("", ""), shardIndex)); new SearchShardTarget("", new Index("", ""), shardIndex));
@ -181,23 +181,24 @@ public class SearchPhaseControllerTests extends ESTestCase {
querySearchResult.topDocs(topDocs, null); querySearchResult.topDocs(topDocs, null);
querySearchResult.size(searchHitsSize); querySearchResult.size(searchHitsSize);
querySearchResult.suggest(new Suggest(new ArrayList<>(shardSuggestion))); querySearchResult.suggest(new Suggest(new ArrayList<>(shardSuggestion)));
querySearchResult.setShardIndex(shardIndex);
queryResults.set(shardIndex, querySearchResult); queryResults.set(shardIndex, querySearchResult);
} }
return queryResults; return queryResults;
} }
private int getTotalQueryHits(AtomicArray<QuerySearchResultProvider> results) { private int getTotalQueryHits(AtomicArray<SearchPhaseResult> results) {
int resultCount = 0; int resultCount = 0;
for (AtomicArray.Entry<QuerySearchResultProvider> shardResult : results.asList()) { for (SearchPhaseResult shardResult : results.asList()) {
resultCount += shardResult.value.queryResult().topDocs().totalHits; resultCount += shardResult.queryResult().topDocs().totalHits;
} }
return resultCount; return resultCount;
} }
private Suggest reducedSuggest(AtomicArray<QuerySearchResultProvider> results) { private Suggest reducedSuggest(AtomicArray<SearchPhaseResult> results) {
Map<String, List<Suggest.Suggestion<CompletionSuggestion.Entry>>> groupedSuggestion = new HashMap<>(); Map<String, List<Suggest.Suggestion<CompletionSuggestion.Entry>>> groupedSuggestion = new HashMap<>();
for (AtomicArray.Entry<QuerySearchResultProvider> entry : results.asList()) { for (SearchPhaseResult entry : results.asList()) {
for (Suggest.Suggestion<?> suggestion : entry.value.queryResult().suggest()) { for (Suggest.Suggestion<?> suggestion : entry.queryResult().suggest()) {
List<Suggest.Suggestion<CompletionSuggestion.Entry>> suggests = List<Suggest.Suggestion<CompletionSuggestion.Entry>> suggests =
groupedSuggestion.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); groupedSuggestion.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
suggests.add((Suggest.Suggestion<CompletionSuggestion.Entry>) suggestion); suggests.add((Suggest.Suggestion<CompletionSuggestion.Entry>) suggestion);
@ -207,18 +208,18 @@ public class SearchPhaseControllerTests extends ESTestCase {
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
private ScoreDoc[] getTopShardDocs(AtomicArray<QuerySearchResultProvider> results) throws IOException { private ScoreDoc[] getTopShardDocs(AtomicArray<SearchPhaseResult> results) throws IOException {
List<AtomicArray.Entry<QuerySearchResultProvider>> resultList = results.asList(); List<SearchPhaseResult> resultList = results.asList();
TopDocs[] shardTopDocs = new TopDocs[resultList.size()]; TopDocs[] shardTopDocs = new TopDocs[resultList.size()];
for (int i = 0; i < resultList.size(); i++) { for (int i = 0; i < resultList.size(); i++) {
shardTopDocs[i] = resultList.get(i).value.queryResult().topDocs(); shardTopDocs[i] = resultList.get(i).queryResult().topDocs();
} }
int topN = Math.min(results.get(0).queryResult().size(), getTotalQueryHits(results)); int topN = Math.min(results.get(0).queryResult().size(), getTotalQueryHits(results));
return TopDocs.merge(topN, shardTopDocs).scoreDocs; return TopDocs.merge(topN, shardTopDocs).scoreDocs;
} }
private AtomicArray<QuerySearchResultProvider> generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) { private AtomicArray<SearchPhaseResult> generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) {
AtomicArray<QuerySearchResultProvider> fetchResults = new AtomicArray<>(nShards); AtomicArray<SearchPhaseResult> fetchResults = new AtomicArray<>(nShards);
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) { for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
float maxScore = -1F; float maxScore = -1F;
SearchShardTarget shardTarget = new SearchShardTarget("", new Index("", ""), shardIndex); SearchShardTarget shardTarget = new SearchShardTarget("", new Index("", ""), shardIndex);
@ -257,27 +258,30 @@ public class SearchPhaseControllerTests extends ESTestCase {
SearchRequest request = new SearchRequest(); SearchRequest request = new SearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
request.setBatchedReduceSize(bufferSize); request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> consumer = searchPhaseController.newSearchPhaseResults(request, 3); InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer = searchPhaseController.newSearchPhaseResults(request, 3);
QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new Index("a", "b"), 0)); QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new Index("a", "b"), 0));
result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]); result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]);
InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 1.0D, DocValueFormat.RAW, InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 1.0D, DocValueFormat.RAW,
Collections.emptyList(), Collections.emptyMap()))); Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs); result.aggregations(aggs);
consumer.consumeResult(0, result); result.setShardIndex(0);
consumer.consumeResult(result);
result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0)); result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0));
result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]); result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]);
aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 3.0D, DocValueFormat.RAW, aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 3.0D, DocValueFormat.RAW,
Collections.emptyList(), Collections.emptyMap()))); Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs); result.aggregations(aggs);
consumer.consumeResult(2, result); result.setShardIndex(2);
consumer.consumeResult(result);
result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0)); result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0));
result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]); result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]);
aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 2.0D, DocValueFormat.RAW, aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 2.0D, DocValueFormat.RAW,
Collections.emptyList(), Collections.emptyMap()))); Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs); result.aggregations(aggs);
consumer.consumeResult(1, result); result.setShardIndex(1);
consumer.consumeResult(result);
int numTotalReducePhases = 1; int numTotalReducePhases = 1;
if (bufferSize == 2) { if (bufferSize == 2) {
assertThat(consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)); assertThat(consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class));
@ -301,7 +305,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
SearchRequest request = new SearchRequest(); SearchRequest request = new SearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
request.setBatchedReduceSize(bufferSize); request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> consumer = InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
searchPhaseController.newSearchPhaseResults(request, expectedNumResults); searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
AtomicInteger max = new AtomicInteger(); AtomicInteger max = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(expectedNumResults); CountDownLatch latch = new CountDownLatch(expectedNumResults);
@ -315,7 +319,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number, InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number,
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs); result.aggregations(aggs);
consumer.consumeResult(id, result); result.setShardIndex(id);
consumer.consumeResult(result);
latch.countDown(); latch.countDown();
}); });
@ -337,7 +342,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
} }
request.setBatchedReduceSize(bufferSize); request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults<QuerySearchResultProvider> consumer InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer
= searchPhaseController.newSearchPhaseResults(request, expectedNumResults); = searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
if (hasAggs && expectedNumResults > bufferSize) { if (hasAggs && expectedNumResults > bufferSize) {
assertThat("expectedNumResults: " + expectedNumResults + " bufferSize: " + bufferSize, assertThat("expectedNumResults: " + expectedNumResults + " bufferSize: " + bufferSize,
@ -354,7 +359,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
for (int iters = 0; iters < maxIters; iters++) { for (int iters = 0; iters < maxIters; iters++) {
TopDocs[] topDocs = new TopDocs[randomIntBetween(2, 100)]; TopDocs[] topDocs = new TopDocs[randomIntBetween(2, 100)];
int numShards = topDocs.length; int numShards = topDocs.length;
AtomicArray<QuerySearchResultProvider> resultProviderAtomicArray = generateQueryResults(numShards, Collections.emptyList(), AtomicArray<SearchPhaseResult> resultProviderAtomicArray = generateQueryResults(numShards, Collections.emptyList(),
2, randomBoolean()); 2, randomBoolean());
if (randomBoolean()) { if (randomBoolean()) {
int maxNull = randomIntBetween(1, topDocs.length - 1); int maxNull = randomIntBetween(1, topDocs.length - 1);

View File

@ -48,7 +48,6 @@ import org.elasticsearch.search.fetch.ShardFetchRequest;
import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchLocalRequest; import org.elasticsearch.search.internal.ShardSearchLocalRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.ESSingleNodeTestCase;
import java.io.IOException; import java.io.IOException;
@ -184,13 +183,13 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
final int rounds = scaledRandomIntBetween(100, 10000); final int rounds = scaledRandomIntBetween(100, 10000);
for (int i = 0; i < rounds; i++) { for (int i = 0; i < rounds; i++) {
try { try {
QuerySearchResultProvider querySearchResultProvider = service.executeQueryPhase( SearchPhaseResult searchPhaseResult = service.executeQueryPhase(
new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f), new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f),
new SearchTask(123L, "", "", "", null)); new SearchTask(123L, "", "", "", null));
IntArrayList intCursors = new IntArrayList(1); IntArrayList intCursors = new IntArrayList(1);
intCursors.add(0); intCursors.add(0);
ShardFetchRequest req = new ShardFetchRequest(querySearchResultProvider.id(), intCursors, null /* not a scroll */); ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getRequestId(), intCursors, null /* not a scroll */);
service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null)); service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null));
} catch (AlreadyClosedException ex) { } catch (AlreadyClosedException ex) {
throw ex; throw ex;