Detach SearchPhases from AbstractSearchAsyncAction (#23118)

Today all search phases are inner classes of AbstractSearchAsyncAction or one of it's
subclasses. This makes unit testing of these classes practically impossible. This commit
Extracts `DfsQueryPhase` and `FetchSearchPhase` or of the code that composes the actual
query execution types and moves most of the fan-out and collect code into an `InitialSearchPhase`
class that can be used to build initial search phases (phases that retry on shards). This will
make modification to these classes simpler and allows to easily compose or add new search phases
down the road if additional roundtrips are required.
This commit is contained in:
Simon Willnauer 2017-02-14 12:34:25 +01:00 committed by GitHub
parent 5b459a0bdc
commit aef0665ddb
21 changed files with 1907 additions and 580 deletions

View File

@ -19,74 +19,67 @@
package org.elasticsearch.action.search;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends InitialSearchPhase<Result>
implements SearchPhaseContext {
private static final float DEFAULT_INDEX_BOOST = 1.0f;
protected final Logger logger;
protected final SearchTransportService searchTransportService;
private final Logger logger;
private final SearchTransportService searchTransportService;
private final Executor executor;
protected final ActionListener<SearchResponse> listener;
private final GroupShardsIterator shardsIts;
protected final SearchRequest request;
/** Used by subclasses to resolve node ids to DiscoveryNodes. **/
protected final Function<String, Transport.Connection> nodeIdToConnection;
protected final SearchPhaseController searchPhaseController;
protected final SearchTask task;
private final int expectedSuccessfulOps;
private final int expectedTotalOps;
private final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger totalOps = new AtomicInteger();
private final AtomicArray<FirstResult> initialResults;
private final ActionListener<SearchResponse> listener;
private final SearchRequest request;
/**
* Used by subclasses to resolve node ids to DiscoveryNodes.
**/
private final Function<String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
private final AtomicArray<Result> results;
private final long clusterStateVersion;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
private final long clusterStateVersion;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicInteger successfulOps = new AtomicInteger();
private final long startTime;
protected AbstractSearchAsyncAction(Logger logger, SearchTransportService searchTransportService,
protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
Function<String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
SearchPhaseController searchPhaseController, Executor executor, SearchRequest request,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, long startTime,
long clusterStateVersion, SearchTask task) {
super(startTime);
super(name, request, shardsIts, logger);
this.startTime = startTime;
this.logger = logger;
this.searchPhaseController = searchPhaseController;
this.searchTransportService = searchTransportService;
this.executor = executor;
this.request = request;
@ -94,175 +87,75 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
this.listener = listener;
this.nodeIdToConnection = nodeIdToConnection;
this.clusterStateVersion = clusterStateVersion;
this.shardsIts = shardsIts;
expectedSuccessfulOps = shardsIts.size();
// we need to add 1 for non active partition, since we count it in the total!
expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
initialResults = new AtomicArray<>(shardsIts.size());
this.aliasFilter = aliasFilter;
results = new AtomicArray<>(shardsIts.size());
this.concreteIndexBoosts = concreteIndexBoosts;
this.aliasFilter = aliasFilter;
}
public void start() {
if (expectedSuccessfulOps == 0) {
/**
* Builds how long it took to execute the search.
*/
private long buildTookInMillis() {
// protect ourselves against time going backwards
// negative values don't make sense and we want to be able to serialize that thing as a vLong
return Math.max(1, System.currentTimeMillis() - startTime);
}
/**
* This is the main entry point for a search. This method starts the search execution of the initial phase.
*/
public final void start() {
if (results.length() == 0) {
//no search shards to search on, bail with empty response
//(it happens with search across _all with no indices around and consistent with broadcast operations)
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, buildTookInMillis(),
ShardSearchFailure.EMPTY_ARRAY));
return;
}
int shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performInitialPhase(shardIndex, shardIt, shard);
} else {
// really, no shards active in this group
onInitialPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
}
}
void performInitialPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
if (shard == null) {
// TODO upgrade this to an assert...
// no more active shards... (we should not really get here, but just for safety)
onInitialPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
try {
final Transport.Connection connection = nodeIdToConnection.apply(shard.currentNodeId());
AliasFilter filter = this.aliasFilter.get(shard.index().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shard.index().getUUID(), DEFAULT_INDEX_BOOST);
ShardSearchTransportRequest transportRequest = new ShardSearchTransportRequest(request, shardIt.shardId(), shardsIts.size(),
filter, indexBoost, startTime());
sendExecuteFirstPhase(connection, transportRequest, new ActionListener<FirstResult>() {
@Override
public void onResponse(FirstResult result) {
onInitialPhaseResult(shardIndex, shard.currentNodeId(), result, shardIt);
executePhase(this);
}
@Override
public void onFailure(Exception t) {
onInitialPhaseResult(shardIndex, shard, connection.getNode().getId(), shardIt, t);
public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase) {
/* This is the main search phase transition where we move to the next phase. At this point we check if there is
* at least one successful operation left and if so we move to the next phase. If not we immediately fail the
* search phase as "all shards failed"*/
if (successfulOps.get() == 0) { // we have 0 successful results that means we shortcut stuff and return a failure
if (logger.isDebugEnabled()) {
final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
Throwable cause = ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
logger.debug((Supplier<?>) () -> new ParameterizedMessage("All shards failed for phase: [{}]", getName()),
cause);
}
});
} catch (ConnectTransportException | IllegalArgumentException ex) {
// we are getting the connection early here so we might run into nodes that are not connected. in that case we move on to
// the next shard. previously when using discovery nodes here we had a special case for null when a node was not connected
// at all which is not not needed anymore.
onInitialPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, ex);
onPhaseFailure(currentPhase, "all shards failed", null);
} else {
if (logger.isTraceEnabled()) {
final String resultsFrom = results.asList().stream()
.map(r -> r.value.shardTarget().toString()).collect(Collectors.joining(","));
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
}
executePhase(nextPhase);
}
}
private void onInitialPhaseResult(int shardIndex, String nodeId, FirstResult result, ShardIterator shardIt) {
result.shardTarget(new SearchShardTarget(nodeId, shardIt.shardId()));
processFirstPhaseResult(shardIndex, result);
// we need to increment successful ops first before we compare the exit condition otherwise if we
// are fast we could concurrently update totalOps but then preempt one of the threads which can
// cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc.
successfulOps.incrementAndGet();
// increment all the "future" shards to update the total ops since we some may work and some may not...
// and when that happens, we break on total ops, so we must maintain them
final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1);
if (xTotalOps == expectedTotalOps) {
executePhase(initialPhaseName(), innerGetNextPhase(), null);
} else if (xTotalOps > expectedTotalOps) {
// this is fatal - something is completely wrong here?
throw new AssertionError( "unexpected higher total ops [" + xTotalOps + "] compared to expected ["
+ expectedTotalOps + "]");
}
}
protected void executePhase(String phaseName, CheckedRunnable<Exception> phase, Exception suppressedException) {
private void executePhase(SearchPhase phase) {
try {
phase.run();
} catch (Exception e) {
if (suppressedException != null) {
e.addSuppressed(suppressedException);
}
if (logger.isDebugEnabled()) {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"Failed to execute [{}] while moving to second phase", request),
"Failed to execute [{}] while moving to [{}] phase", request, phase.getName()),
e);
}
raisePhaseFailure(new ReduceSearchPhaseException(phaseName, "", e, buildShardFailures()));
onPhaseFailure(phase, "", e);
}
}
private void onInitialPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId,
final ShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId());
addShardFailure(shardIndex, shardTarget, e);
if (totalOps.incrementAndGet() == expectedTotalOps) {
if (logger.isDebugEnabled()) {
if (e != null && !TransportActions.isShardNotAvailableException(e)) {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"{}: Failed to execute [{}]",
shard != null ? shard.shortSummary() :
shardIt.shardId(),
request),
e);
} else if (logger.isTraceEnabled()) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{}: Failed to execute [{}]", shard, request), e);
}
}
final ShardSearchFailure[] shardSearchFailures = buildShardFailures();
if (successfulOps.get() == 0) {
if (logger.isDebugEnabled()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("All shards failed for phase: [{}]", initialPhaseName()), e);
}
// no successful ops, raise an exception
raisePhaseFailure(new SearchPhaseExecutionException(initialPhaseName(), "all shards failed", e, shardSearchFailures));
} else {
executePhase(initialPhaseName(), innerGetNextPhase(), e);
}
} else {
final ShardRouting nextShard = shardIt.nextOrNull();
final boolean lastShard = nextShard == null;
// trace log this exception
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage(
"{}: Failed to execute [{}] lastShard [{}]",
shard != null ? shard.shortSummary() : shardIt.shardId(),
request,
lastShard),
e);
if (!lastShard) {
try {
performInitialPhase(shardIndex, shardIt, nextShard);
} catch (Exception inner) {
inner.addSuppressed(e);
onInitialPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, inner);
}
} else {
// no more shards active, add a failure
if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception
if (e != null && !TransportActions.isShardNotAvailableException(e)) {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"{}: Failed to execute [{}] lastShard [{}]",
shard != null ? shard.shortSummary() :
shardIt.shardId(),
request,
lastShard),
e);
}
}
}
}
}
protected final ShardSearchFailure[] buildShardFailures() {
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures;
private ShardSearchFailure[] buildShardFailures() {
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
@ -274,17 +167,19 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
return failures;
}
protected final void addShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) {
public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) {
// we don't aggregate shard failures on non active shards (but do keep the header counts right)
if (TransportActions.isShardNotAvailableException(e)) {
return;
}
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
// lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
if (shardFailures == null) {
if (shardFailures == null) { // this is double checked locking but it's fine since SetOnce uses a volatile read internally
synchronized (shardFailuresMutex) {
if (shardFailures == null) {
shardFailures = new AtomicArray<>(shardsIts.size());
shardFailures = this.shardFailures.get(); // read again otherwise somebody else has created it?
if (shardFailures == null) { // still null so we are the first and create a new instance
shardFailures = new AtomicArray<>(results.length());
this.shardFailures.set(shardFailures);
}
}
}
@ -298,15 +193,21 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
}
}
if (results.get(shardIndex) != null) {
assert failure == null : "shard failed before but shouldn't: " + failure;
successfulOps.decrementAndGet(); // if this shard was successful before (initial phase) we have to adjust the counter
}
}
/**
* This method should be called if a search phase failed to ensure all relevant search contexts and resources are released.
* this method will also notify the listener and sends back a failure to the user.
*
* @param exception the exception explaining or causing the phase failure
*/
protected void raisePhaseFailure(SearchPhaseExecutionException exception) {
for (AtomicArray.Entry<FirstResult> entry : initialResults.asList()) {
private void raisePhaseFailure(SearchPhaseExecutionException exception) {
for (AtomicArray.Entry<Result> entry : results.asList()) {
try {
Transport.Connection connection = nodeIdToConnection.apply(entry.value.shardTarget().getNodeId());
sendReleaseSearchContext(entry.value.id(), connection);
@ -318,254 +219,97 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
listener.onFailure(exception);
}
protected void sendReleaseSearchContext(long contextId, Transport.Connection connection) {
if (connection != null) {
searchTransportService.sendFreeContext(connection, contextId, request);
}
}
protected ShardFetchSearchRequest createFetchRequest(long queryId, int index, IntArrayList entry,
ScoreDoc[] lastEmittedDocPerShard) {
final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[index] : null;
return new ShardFetchSearchRequest(request, queryId, entry, lastEmittedDoc);
}
protected abstract void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request,
ActionListener<FirstResult> listener);
protected final void processFirstPhaseResult(int shardIndex, FirstResult result) {
initialResults.set(shardIndex, result);
@Override
public final void onShardSuccess(int shardIndex, Result result) {
successfulOps.incrementAndGet();
results.set(shardIndex, result);
if (logger.isTraceEnabled()) {
logger.trace("got first-phase result from {}", result != null ? result.shardTarget() : null);
}
// clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level
// so its ok concurrency wise to miss potentially the shard failures being created because of another failure
// in the #addShardFailure, because by definition, it will happen on *another* shardIndex
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures;
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
if (shardFailures != null) {
shardFailures.set(shardIndex, null);
}
}
final CheckedRunnable<Exception> innerGetNextPhase() {
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
boolean hadOne = false;
for (int i = 0; i < initialResults.length(); i++) {
FirstResult result = initialResults.get(i);
if (result == null) {
continue; // failure
}
if (hadOne) {
sb.append(",");
} else {
hadOne = true;
}
sb.append(result.shardTarget());
}
logger.trace("Moving to second phase, based on results from: {} (cluster state version: {})", sb, clusterStateVersion);
}
return getNextPhase(initialResults);
}
protected abstract CheckedRunnable<Exception> getNextPhase(AtomicArray<FirstResult> initialResults);
protected abstract String initialPhaseName();
protected Executor getExecutor() {
return executor;
}
// this is a simple base class to simplify fan out to shards and collect
final class CountedCollector<R extends SearchPhaseResult> {
private final AtomicArray<R> resultArray;
private final CountDown counter;
private final IntConsumer onFinish;
CountedCollector(AtomicArray<R> resultArray, int expectedOps, IntConsumer onFinish) {
this.resultArray = resultArray;
this.counter = new CountDown(expectedOps);
this.onFinish = onFinish;
}
void countDown() {
if (counter.countDown()) {
onFinish.accept(successfulOps.get());
}
}
void onResult(int index, R result, SearchShardTarget target) {
try {
result.shardTarget(target);
resultArray.set(index, result);
} finally {
countDown();
}
}
void onFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) {
try {
addShardFailure(shardIndex, shardTarget, e);
} finally {
successfulOps.decrementAndGet();
countDown();
}
}
}
/*
* At this point AbstractSearchAsyncAction is just a base-class for the first phase of a search where we have multiple replicas
* for each shardID. If one of them is not available we move to the next one. Yet, once we passed that first stage we have to work with
* the shards we succeeded on the initial phase.
* Unfortunately, subsequent phases are not fully detached from the initial phase since they are all non-static inner classes.
* In future changes this will be changed to detach the inner classes to test them in isolation and to simplify their creation.
* The AbstractSearchAsyncAction should be final and it should just get a factory for the next phase instead of requiring subclasses
* etc.
*/
final class FetchPhase implements CheckedRunnable<Exception> {
private final AtomicArray<FetchSearchResult> fetchResults;
private final SearchPhaseController searchPhaseController;
private final AtomicArray<QuerySearchResultProvider> queryResults;
FetchPhase(AtomicArray<QuerySearchResultProvider> queryResults,
SearchPhaseController searchPhaseController) {
this.fetchResults = new AtomicArray<>(queryResults.length());
this.searchPhaseController = searchPhaseController;
this.queryResults = queryResults;
@Override
public final void onPhaseDone() {
executeNextPhase(this, getNextPhase(results, this));
}
@Override
public void run() throws Exception {
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
// we do the heavy lifting in this inner run method where we reduce aggs etc. that's why we fork this phase
// off immediately instead of forking when we send back the response to the user since there we only need
// to merge together the fetched results which is a linear operation.
innerRun();
public final int getNumShards() {
return results.length();
}
@Override
public void onFailure(Exception e) {
raisePhaseFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
});
}
private void innerRun() throws IOException{
final int numShards = shardsIts.size();
final boolean isScrollRequest = request.scroll() != null;
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults);
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(queryResults) : null;
List<AtomicArray.Entry<QuerySearchResultProvider>> queryResultsAsList = queryResults.asList();
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResultsAsList);
final boolean queryAndFetchOptimization = queryResults.length() == 1;
final IntConsumer finishPhase = successOpts
-> sendResponse(searchPhaseController, sortedShardDocs, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults);
if (queryAndFetchOptimization) {
assert queryResults.get(0) == null || queryResults.get(0).fetchResult() != null;
// query AND fetch optimization
finishPhase.accept(successfulOps.get());
} else {
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, sortedShardDocs);
if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return
queryResultsAsList.stream()
.map(e -> e.value.queryResult())
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
finishPhase.accept(successfulOps.get());
} else {
final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ?
searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, numShards)
: null;
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(fetchResults,
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
finishPhase);
for (int i = 0; i < docIdsToLoad.length; i++) {
IntArrayList entry = docIdsToLoad[i];
QuerySearchResultProvider queryResult = queryResults.get(i);
if (entry == null) { // no results for this shard ID
if (queryResult != null) {
// if we got some hits from this shard we have to release the context there
// we do this as we go since it will free up resources and passing on the request on the
// transport layer is cheap.
releaseIrrelevantSearchContext(queryResult.queryResult());
}
// in any case we count down this result since we don't talk to this shard anymore
counter.countDown();
} else {
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().id(), i, entry,
lastEmittedDocPerShard);
executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(),
connection);
}
}
}
}
}
private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget,
final CountedCollector<FetchSearchResult> counter,
final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult,
final Transport.Connection connection) {
searchTransportService.sendExecuteFetch(connection, fetchSearchRequest, task, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
counter.onResult(shardIndex, result, shardTarget);
public final Logger getLogger() {
return logger;
}
@Override
public void onFailure(Exception e) {
try {
if (logger.isDebugEnabled()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute fetch phase",
fetchSearchRequest.id()), e);
public final SearchTask getTask() {
return task;
}
counter.onFailure(shardIndex, shardTarget, e);
} finally {
// the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context.
releaseIrrelevantSearchContext(querySearchResult);
@Override
public final SearchRequest getRequest() {
return request;
}
@Override
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
return new SearchResponse(internalSearchResponse, scrollId, results.length(), successfulOps.get(),
buildTookInMillis(), buildShardFailures());
}
});
@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}
@Override
public final Transport.Connection getConnection(String nodeId) {
return nodeIdToConnection.apply(nodeId);
}
@Override
public final SearchTransportService getSearchTransport() {
return searchTransportService;
}
@Override
public final void execute(Runnable command) {
executor.execute(command);
}
@Override
public final void onResponse(SearchResponse response) {
listener.onResponse(response);
}
@Override
public final void onFailure(Exception e) {
listener.onFailure(e);
}
public final ShardSearchTransportRequest buildShardSearchRequest(ShardIterator shardIt, ShardRouting shard) {
AliasFilter filter = aliasFilter.get(shard.index().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shard.index().getUUID(), DEFAULT_INDEX_BOOST);
return new ShardSearchTransportRequest(request, shardIt.shardId(), getNumShards(),
filter, indexBoost, startTime);
}
/**
* Releases shard targets that are not used in the docsIdsToLoad.
* Returns the next phase based on the results of the initial search phase
* @param results the results of the initial search phase. Each non null element in the result array represent a successfully
* executed shard request
* @param context the search context for the next phase
*/
private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) {
// we only release search context that we did not fetch from if we are not scrolling
// and if it has at lease one hit that didn't make it to the global topDocs
if (request.scroll() == null && queryResult.hasHits()) {
try {
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId());
sendReleaseSearchContext(queryResult.id(), connection);
} catch (Exception e) {
logger.trace("failed to release context", e);
}
}
}
/**
* Sends back a result to the user.
*/
private void sendResponse(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs,
String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) {
final boolean isScrollRequest = request.scroll() != null;
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs, reducedQueryPhase,
fetchResultsArr);
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
}
protected abstract SearchPhase getNextPhase(AtomicArray<Result> results, SearchPhaseContext context);
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
/**
* This is a simple base class to simplify fan out to shards and collect their results. Each results passed to
* {@link #onResult(int, SearchPhaseResult, SearchShardTarget)} will be set to the provided result array
* where the given index is used to set the result on the array.
*/
final class CountedCollector<R extends SearchPhaseResult> {
private final AtomicArray<R> resultArray;
private final CountDown counter;
private final Runnable onFinish;
private final SearchPhaseContext context;
CountedCollector(AtomicArray<R> resultArray, int expectedOps, Runnable onFinish, SearchPhaseContext context) {
if (expectedOps > resultArray.length()) {
throw new IllegalStateException("unexpected number of operations. got: " + expectedOps + " but array size is: "
+ resultArray.length());
}
this.resultArray = resultArray;
this.counter = new CountDown(expectedOps);
this.onFinish = onFinish;
this.context = context;
}
/**
* Forcefully counts down an operation and executes the provided runnable
* if all expected operations where executed
*/
void countDown() {
assert counter.isCountedDown() == false : "more operations executed than specified";
if (counter.countDown()) {
onFinish.run();
}
}
/**
* Sets the result to the given array index and then runs {@link #countDown()}
*/
void onResult(int index, R result, SearchShardTarget target) {
try {
result.shardTarget(target);
resultArray.set(index, result);
} finally {
countDown();
}
}
/**
* Escalates the failure via {@link SearchPhaseContext#onShardFailure(int, SearchShardTarget, Exception)}
* and then runs {@link #countDown()}
*/
void onFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) {
try {
context.onShardFailure(shardIndex, shardTarget, e);
} finally {
countDown();
}
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.transport.Transport;
import java.io.IOException;
import java.util.function.Function;
/**
* This search phase fans out to every shards to execute a distributed search with a pre-collected distributed frequencies for all
* search terms used in the actual search query. This phase is very similar to a the default query-then-fetch search phase but it doesn't
* retry on another shard if any of the shards are failing. Failures are treated as shard failures and are counted as a non-successful
* operation.
* @see CountedCollector#onFailure(int, SearchShardTarget, Exception)
*/
final class DfsQueryPhase extends SearchPhase {
private final AtomicArray<QuerySearchResultProvider> queryResult;
private final SearchPhaseController searchPhaseController;
private final AtomicArray<DfsSearchResult> dfsSearchResults;
private final Function<AtomicArray<QuerySearchResultProvider>, SearchPhase> nextPhaseFactory;
private final SearchPhaseContext context;
private final SearchTransportService searchTransportService;
DfsQueryPhase(AtomicArray<DfsSearchResult> dfsSearchResults,
SearchPhaseController searchPhaseController,
Function<AtomicArray<QuerySearchResultProvider>, SearchPhase> nextPhaseFactory, SearchPhaseContext context) {
super("dfs_query");
this.queryResult = new AtomicArray<>(dfsSearchResults.length());
this.searchPhaseController = searchPhaseController;
this.dfsSearchResults = dfsSearchResults;
this.nextPhaseFactory = nextPhaseFactory;
this.context = context;
this.searchTransportService = context.getSearchTransport();
}
@Override
public void run() throws IOException {
// TODO we can potentially also consume the actual per shard results from the initial phase here in the aggregateDfs
// to free up memory early
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(dfsSearchResults);
final CountedCollector<QuerySearchResultProvider> counter = new CountedCollector<>(queryResult, dfsSearchResults.asList().size(),
() -> {
context.executeNextPhase(this, nextPhaseFactory.apply(queryResult));
}, context);
for (final AtomicArray.Entry<DfsSearchResult> entry : dfsSearchResults.asList()) {
DfsSearchResult dfsResult = entry.value;
final int shardIndex = entry.index;
final SearchShardTarget searchShardTarget = dfsResult.shardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(context.getRequest(), dfsResult.id(), dfs);
searchTransportService.sendExecuteQuery(connection, querySearchRequest, context.getTask(),
ActionListener.wrap(
result -> counter.onResult(shardIndex, result, searchShardTarget),
exception -> {
try {
if (context.getLogger().isDebugEnabled()) {
context.getLogger().debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute query phase",
querySearchRequest.id()), exception);
}
counter.onFailure(shardIndex, searchShardTarget, exception);
} finally {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
context.sendReleaseSearchContext(querySearchRequest.id(), connection);
}
}));
}
}
}

View File

@ -0,0 +1,214 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.transport.Transport;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
/**
* This search phase merges the query results from the previous phase together and calculates the topN hits for this search.
* Then it reaches out to all relevant shards to fetch the topN hits.
*/
final class FetchSearchPhase extends SearchPhase {
private final AtomicArray<FetchSearchResult> fetchResults;
private final SearchPhaseController searchPhaseController;
private final AtomicArray<QuerySearchResultProvider> queryResults;
private final Function<SearchResponse, SearchPhase> nextPhaseFactory;
private final SearchPhaseContext context;
private final Logger logger;
FetchSearchPhase(AtomicArray<QuerySearchResultProvider> queryResults,
SearchPhaseController searchPhaseController,
SearchPhaseContext context) {
this(queryResults, searchPhaseController, context, (response) -> sendResponsePhase(response, context));
}
FetchSearchPhase(AtomicArray<QuerySearchResultProvider> queryResults,
SearchPhaseController searchPhaseController,
SearchPhaseContext context, Function<SearchResponse, SearchPhase> nextPhaseFactory) {
super("fetch");
if (context.getNumShards() != queryResults.length()) {
throw new IllegalStateException("number of shards must match the length of the query results but doesn't:"
+ context.getNumShards() + "!=" + queryResults.length());
}
this.fetchResults = new AtomicArray<>(queryResults.length());
this.searchPhaseController = searchPhaseController;
this.queryResults = queryResults;
this.nextPhaseFactory = nextPhaseFactory;
this.context = context;
this.logger = context.getLogger();
}
@Override
public void run() throws IOException {
context.execute(new ActionRunnable<SearchResponse>(context) {
@Override
public void doRun() throws IOException {
// we do the heavy lifting in this inner run method where we reduce aggs etc. that's why we fork this phase
// off immediately instead of forking when we send back the response to the user since there we only need
// to merge together the fetched results which is a linear operation.
innerRun();
}
@Override
public void onFailure(Exception e) {
context.onPhaseFailure(FetchSearchPhase.this, "", e);
}
});
}
private void innerRun() throws IOException {
final int numShards = context.getNumShards();
final boolean isScrollSearch = context.getRequest().scroll() != null;
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollSearch, queryResults);
String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null;
List<AtomicArray.Entry<QuerySearchResultProvider>> queryResultsAsList = queryResults.asList();
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResultsAsList);
final boolean queryAndFetchOptimization = queryResults.length() == 1;
final Runnable finishPhase = ()
-> moveToNextPhase(searchPhaseController, sortedShardDocs, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults);
if (queryAndFetchOptimization) {
assert queryResults.get(0) == null || queryResults.get(0).fetchResult() != null;
// query AND fetch optimization
finishPhase.run();
} else {
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, sortedShardDocs);
if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return
queryResultsAsList.stream()
.map(e -> e.value.queryResult())
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
finishPhase.run();
} else {
final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?
searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, numShards)
: null;
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(fetchResults,
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
finishPhase, context);
for (int i = 0; i < docIdsToLoad.length; i++) {
IntArrayList entry = docIdsToLoad[i];
QuerySearchResultProvider queryResult = queryResults.get(i);
if (entry == null) { // no results for this shard ID
if (queryResult != null) {
// if we got some hits from this shard we have to release the context there
// we do this as we go since it will free up resources and passing on the request on the
// transport layer is cheap.
releaseIrrelevantSearchContext(queryResult.queryResult());
}
// in any case we count down this result since we don't talk to this shard anymore
counter.countDown();
} else {
Transport.Connection connection = context.getConnection(queryResult.shardTarget().getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().id(), i, entry,
lastEmittedDocPerShard);
executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(),
connection);
}
}
}
}
}
protected ShardFetchSearchRequest createFetchRequest(long queryId, int index, IntArrayList entry,
ScoreDoc[] lastEmittedDocPerShard) {
final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[index] : null;
return new ShardFetchSearchRequest(context.getRequest(), queryId, entry, lastEmittedDoc);
}
private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget,
final CountedCollector<FetchSearchResult> counter,
final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult,
final Transport.Connection connection) {
context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),
new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
counter.onResult(shardIndex, result, shardTarget);
}
@Override
public void onFailure(Exception e) {
try {
if (logger.isDebugEnabled()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute fetch phase",
fetchSearchRequest.id()), e);
}
counter.onFailure(shardIndex, shardTarget, e);
} finally {
// the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context.
releaseIrrelevantSearchContext(querySearchResult);
}
}
});
}
/**
* Releases shard targets that are not used in the docsIdsToLoad.
*/
private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) {
// we only release search context that we did not fetch from if we are not scrolling
// and if it has at lease one hit that didn't make it to the global topDocs
if (context.getRequest().scroll() == null && queryResult.hasHits()) {
try {
Transport.Connection connection = context.getConnection(queryResult.shardTarget().getNodeId());
context.sendReleaseSearchContext(queryResult.id(), connection);
} catch (Exception e) {
context.getLogger().trace("failed to release context", e);
}
}
}
private void moveToNextPhase(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs,
String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) {
final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,
sortedDocs, reducedQueryPhase, fetchResultsArr);
context.executeNextPhase(this, nextPhaseFactory.apply(context.buildSearchResponse(internalResponse, scrollId)));
}
private static SearchPhase sendResponsePhase(SearchResponse response, SearchPhaseContext context) {
return new SearchPhase("response") {
@Override
public void run() throws IOException {
context.onResponse(response);
}
};
}
}

View File

@ -0,0 +1,216 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.transport.ConnectTransportException;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This is an abstract base class that encapsulates the logic to fan out to all shards in provided {@link GroupShardsIterator}
* and collect the results. If a shard request returns a failure this class handles the advance to the next replica of the shard until
* the shards replica iterator is exhausted. Each shard is referenced by position in the {@link GroupShardsIterator} which is later
* referred to as the <tt>shardIndex</tt>.
* The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection
* distributed frequencies
*/
abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends SearchPhase {
private final SearchRequest request;
private final GroupShardsIterator shardsIts;
private final Logger logger;
private final int expectedTotalOps;
private final AtomicInteger totalOps = new AtomicInteger();
InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger) {
super(name);
this.request = request;
this.shardsIts = shardsIts;
this.logger = logger;
// we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up
// it's number of active shards but use 1 as the default if no replica of a shard is active at this point.
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
// we process hence we add one for the non active partition here.
this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
}
private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId,
final ShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId());
onShardFailure(shardIndex, shardTarget, e);
if (totalOps.incrementAndGet() == expectedTotalOps) {
if (logger.isDebugEnabled()) {
if (e != null && !TransportActions.isShardNotAvailableException(e)) {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"{}: Failed to execute [{}]",
shard != null ? shard.shortSummary() :
shardIt.shardId(),
request),
e);
} else if (logger.isTraceEnabled()) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{}: Failed to execute [{}]", shard, request), e);
}
}
onPhaseDone();
} else {
final ShardRouting nextShard = shardIt.nextOrNull();
final boolean lastShard = nextShard == null;
// trace log this exception
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage(
"{}: Failed to execute [{}] lastShard [{}]",
shard != null ? shard.shortSummary() : shardIt.shardId(),
request,
lastShard),
e);
if (!lastShard) {
try {
performPhaseOnShard(shardIndex, shardIt, nextShard);
} catch (Exception inner) {
inner.addSuppressed(e);
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, inner);
}
} else {
// no more shards active, add a failure
if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception
if (e != null && !TransportActions.isShardNotAvailableException(e)) {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"{}: Failed to execute [{}] lastShard [{}]",
shard != null ? shard.shortSummary() :
shardIt.shardId(),
request,
lastShard),
e);
}
}
}
}
}
@Override
public final void run() throws IOException {
int shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performPhaseOnShard(shardIndex, shardIt, shard);
} else {
// really, no shards active in this group
onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
}
}
private void performPhaseOnShard(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
if (shard == null) {
// TODO upgrade this to an assert...
// no more active shards... (we should not really get here, but just for safety)
onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
try {
executePhaseOnShard(shardIt, shard, new ActionListener<FirstResult>() {
@Override
public void onResponse(FirstResult result) {
onShardResult(shardIndex, shard.currentNodeId(), result, shardIt);
}
@Override
public void onFailure(Exception t) {
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);
}
});
} catch (ConnectTransportException | IllegalArgumentException ex) {
// we are getting the connection early here so we might run into nodes that are not connected. in that case we move on to
// the next shard. previously when using discovery nodes here we had a special case for null when a node was not connected
// at all which is not not needed anymore.
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, ex);
}
}
}
private void onShardResult(int shardIndex, String nodeId, FirstResult result, ShardIterator shardIt) {
result.shardTarget(new SearchShardTarget(nodeId, shardIt.shardId()));
onShardSuccess(shardIndex, result);
// we need to increment successful ops first before we compare the exit condition otherwise if we
// are fast we could concurrently update totalOps but then preempt one of the threads which can
// cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc.
// increment all the "future" shards to update the total ops since we some may work and some may not...
// and when that happens, we break on total ops, so we must maintain them
final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1);
if (xTotalOps == expectedTotalOps) {
onPhaseDone();
} else if (xTotalOps > expectedTotalOps) {
throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected ["
+ expectedTotalOps + "]");
}
}
/**
* Executed once all shard results have been received and processed
* @see #onShardFailure(int, SearchShardTarget, Exception)
* @see #onShardSuccess(int, SearchPhaseResult)
*/
abstract void onPhaseDone(); // as a tribute to @kimchy aka. finishHim()
/**
* Executed once for every failed shard level request. This method is invoked before the next replica is tried for the given
* shard target.
* @param shardIndex the internal index for this shard. Each shard has an index / ordinal assigned that is used to reference
* it's results
* @param shardTarget the shard target for this failure
* @param ex the failure reason
*/
abstract void onShardFailure(int shardIndex, SearchShardTarget shardTarget, Exception ex);
/**
* Executed once for every successful shard level request.
* @param shardIndex the internal index for this shard. Each shard has an index / ordinal assigned that is used to reference
* it's results
* @param result the result returned form the shard
*
*/
abstract void onShardSuccess(int shardIndex, FirstResult result);
/**
* Sends the request to the actual shard.
* @param shardIt the shards iterator
* @param shard the shard routing to send the request for
* @param listener the listener to notify on response
*/
protected abstract void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener<FirstResult> listener);
}

View File

@ -20,19 +20,13 @@
package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.transport.Transport;
import java.util.Map;
@ -40,6 +34,7 @@ import java.util.concurrent.Executor;
import java.util.function.Function;
final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {
private final SearchPhaseController searchPhaseController;
SearchDfsQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
Function<String, Transport.Connection> nodeIdToConnection,
@ -47,81 +42,20 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
SearchPhaseController searchPhaseController, Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, long startTime,
long clusterStateVersion, SearchTask task) {
super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor,
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor,
request, listener, shardsIts, startTime, clusterStateVersion, task);
}
@Override
protected String initialPhaseName() {
return "dfs";
}
@Override
protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request,
ActionListener<DfsSearchResult> listener) {
searchTransportService.sendExecuteDfs(connection, request, task, listener);
}
@Override
protected CheckedRunnable<Exception> getNextPhase(AtomicArray<DfsSearchResult> initialResults) {
return new DfsQueryPhase(initialResults, searchPhaseController,
(queryResults) -> new FetchPhase(queryResults, searchPhaseController));
}
private final class DfsQueryPhase implements CheckedRunnable<Exception> {
private final AtomicArray<QuerySearchResultProvider> queryResult;
private final SearchPhaseController searchPhaseController;
private final AtomicArray<DfsSearchResult> firstResults;
private final Function<AtomicArray<QuerySearchResultProvider>, CheckedRunnable<Exception>> nextPhaseFactory;
DfsQueryPhase(AtomicArray<DfsSearchResult> firstResults,
SearchPhaseController searchPhaseController,
Function<AtomicArray<QuerySearchResultProvider>, CheckedRunnable<Exception>> nextPhaseFactory) {
this.queryResult = new AtomicArray<>(firstResults.length());
this.searchPhaseController = searchPhaseController;
this.firstResults = firstResults;
this.nextPhaseFactory = nextPhaseFactory;
}
@Override
public void run() throws Exception {
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
final CountedCollector<QuerySearchResultProvider> counter = new CountedCollector<>(queryResult, firstResults.asList().size(),
(successfulOps) -> {
if (successfulOps == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures()));
} else {
executePhase("fetch", this.nextPhaseFactory.apply(queryResult), null);
}
});
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
final int shardIndex = entry.index;
Transport.Connection connection = nodeIdToConnection.apply(dfsResult.shardTarget().getNodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
searchTransportService.sendExecuteQuery(connection, querySearchRequest, task, new ActionListener<QuerySearchResult>() {
@Override
public void onResponse(QuerySearchResult result) {
counter.onResult(shardIndex, result, dfsResult.shardTarget());
protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener listener) {
getSearchTransport().sendExecuteDfs(getConnection(shard.currentNodeId()),
buildShardSearchRequest(shardIt, shard) , getTask(), listener);
}
@Override
public void onFailure(Exception e) {
try {
if (logger.isDebugEnabled()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute query phase",
querySearchRequest.id()), e);
}
counter.onFailure(shardIndex, dfsResult.shardTarget(), e);
} finally {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
sendReleaseSearchContext(querySearchRequest.id(), connection);
}
}
});
}
}
protected SearchPhase getNextPhase(AtomicArray<DfsSearchResult> results, SearchPhaseContext context) {
return new DfsQueryPhase(results, searchPhaseController,
(queryResults) -> new FetchSearchPhase(queryResults, searchPhaseController, context), context);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.common.CheckedRunnable;
import java.io.IOException;
import java.util.Objects;
/**
* Base class for all individual search phases like collecting distributed frequencies, fetching documents, querying shards.
*/
abstract class SearchPhase implements CheckedRunnable<IOException> {
private final String name;
protected SearchPhase(String name) {
this.name = Objects.requireNonNull(name, "name must not be null");
}
/**
* Returns the phases name.
*/
public String getName() {
return name;
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.transport.Transport;
import java.util.concurrent.Executor;
/**
* This class provide contextual state and access to resources across multiple search phases.
*/
interface SearchPhaseContext extends ActionListener<SearchResponse>, Executor {
// TODO maybe we can make this concrete later - for now we just implement this in the base class for all initial phases
/**
* Returns the total number of shards to the current search across all indices
*/
int getNumShards();
/**
* Returns a logger for this context to prevent each individual phase to create their own logger.
*/
Logger getLogger();
/**
* Returns the currently executing search task
*/
SearchTask getTask();
/**
* Returns the currently executing search request
*/
SearchRequest getRequest();
/**
* Builds the final search response that should be send back to the user.
* @param internalSearchResponse the internal search response
* @param scrollId an optional scroll ID if this search is a scroll search
*/
SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId);
/**
* This method will communicate a fatal phase failure back to the user. In contrast to a shard failure
* will this method immediately fail the search request and return the failure to the issuer of the request
* @param phase the phase that failed
* @param msg an optional message
* @param cause the cause of the phase failure
*/
void onPhaseFailure(SearchPhase phase, String msg, Throwable cause);
/**
* This method will record a shard failure for the given shard index. In contrast to a phase failure
* ({@link #onPhaseFailure(SearchPhase, String, Throwable)}) this method will immediately return to the user but will record
* a shard failure for the given shard index. This should be called if a shard failure happens after we successfully retrieved
* a result from that shard in a previous phase.
*/
void onShardFailure(int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e);
/**
* Returns a connection to the node if connected otherwise and {@link org.elasticsearch.transport.ConnectTransportException} will be
* thrown.
*/
Transport.Connection getConnection(String nodeId);
/**
* Returns the {@link SearchTransportService} to send shard request to other nodes
*/
SearchTransportService getSearchTransport();
/**
* Releases a search context with the given context ID on the node the given connection is connected to.
* @see org.elasticsearch.search.query.QuerySearchResult#id()
* @see org.elasticsearch.search.fetch.FetchSearchResult#id()
*
*/
default void sendReleaseSearchContext(long contextId, Transport.Connection connection) {
if (connection != null) {
getSearchTransport().sendFreeContext(connection, contextId, getRequest());
}
}
/**
* Builds an request for the initial search phase.
*/
ShardSearchTransportRequest buildShardSearchRequest(ShardIterator shardIt, ShardRouting shard);
/**
* Processes the phase transition from on phase to another. This method handles all errors that happen during the initial run execution
* of the next phase. If there are no successful operations in the context when this method is executed the search is aborted and
* a response is returned to the user indicating that all shards have failed.
*/
void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase);
}

View File

@ -22,10 +22,10 @@ package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.transport.Transport;
@ -34,6 +34,7 @@ import java.util.concurrent.Executor;
import java.util.function.Function;
final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySearchResultProvider> {
private final SearchPhaseController searchPhaseController;
SearchQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
Function<String, Transport.Connection> nodeIdToConnection,
@ -42,24 +43,18 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Qu
SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
SearchTask task) {
super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor,
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor,
request, listener, shardsIts, startTime, clusterStateVersion, task);
this.searchPhaseController = searchPhaseController;
}
protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener listener) {
getSearchTransport().sendExecuteQuery(getConnection(shard.currentNodeId()),
buildShardSearchRequest(shardIt, shard), getTask(), listener);
}
@Override
protected String initialPhaseName() {
return "query";
}
@Override
protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request,
ActionListener<QuerySearchResultProvider> listener) {
searchTransportService.sendExecuteQuery(connection, request, task, listener);
}
@Override
protected CheckedRunnable<Exception> getNextPhase(AtomicArray<QuerySearchResultProvider> initialResults) {
return new FetchPhase(initialResults, searchPhaseController);
protected SearchPhase getNextPhase(AtomicArray<QuerySearchResultProvider> results, SearchPhaseContext context) {
return new FetchSearchPhase(results, searchPhaseController, context);
}
}

View File

@ -30,7 +30,7 @@ import java.util.List;
* ShardsIterators are always returned in ascending order independently of their order at construction
* time. The incoming iterators are sorted to ensure consistent iteration behavior across Nodes / JVMs.
*/
public class GroupShardsIterator implements Iterable<ShardIterator> {
public final class GroupShardsIterator implements Iterable<ShardIterator> {
private final List<ShardIterator> iterators;
@ -61,12 +61,7 @@ public class GroupShardsIterator implements Iterable<ShardIterator> {
public int totalSizeWith1ForEmpty() {
int size = 0;
for (ShardIterator shard : iterators) {
int sizeActive = shard.size();
if (sizeActive == 0) {
size += 1;
} else {
size += sizeActive;
}
size += Math.max(1, shard.size());
}
return size;
}

View File

@ -102,12 +102,6 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes
return fieldStatistics;
}
public static DfsSearchResult readDfsSearchResult(StreamInput in) throws IOException, ClassNotFoundException {
DfsSearchResult result = new DfsSearchResult();
result.readFrom(in);
return result;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -128,7 +122,6 @@ public class DfsSearchResult extends TransportResponse implements SearchPhaseRes
maxDoc = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -0,0 +1,97 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
public class CountedCollectorTests extends ESTestCase {
public void testCollect() throws InterruptedException {
AtomicArray<SearchPhaseResult> results = new AtomicArray<>(randomIntBetween(1, 100));
List<Integer> state = new ArrayList<>();
int numResultsExpected = randomIntBetween(1, results.length());
MockSearchPhaseContext context = new MockSearchPhaseContext(results.length());
CountDownLatch latch = new CountDownLatch(1);
boolean maybeFork = randomBoolean();
Executor executor = (runnable) -> {
if (randomBoolean() && maybeFork) {
new Thread(runnable).start();
} else {
runnable.run();
}
};
CountedCollector<SearchPhaseResult> collector = new CountedCollector<>(results, numResultsExpected,
latch::countDown, context);
for (int i = 0; i < numResultsExpected; i++) {
int shardID = i;
switch (randomIntBetween(0, 2)) {
case 0:
state.add(0);
executor.execute(() -> collector.countDown());
break;
case 1:
state.add(1);
executor.execute(() -> collector.onResult(shardID, new DfsSearchResult(shardID, null), new SearchShardTarget("foo",
new Index("bar", "baz"), shardID)));
break;
case 2:
state.add(2);
executor.execute(() -> collector.onFailure(shardID, new SearchShardTarget("foo", new Index("bar", "baz"),
shardID), new RuntimeException("boom")));
break;
default:
fail("unknown state");
}
}
latch.await();
assertEquals(numResultsExpected, state.size());
for (int i = 0; i < numResultsExpected; i++) {
switch (state.get(i)) {
case 0:
assertNull(results.get(i));
break;
case 1:
assertNotNull(results.get(i));
assertEquals(i, results.get(i).id());
break;
case 2:
final int shardId = i;
assertEquals(1, context.failures.stream().filter(f -> f.shardId() == shardId).count());
break;
default:
fail("unknown state");
}
}
for (int i = numResultsExpected; i < results.length(); i++) {
assertNull("index: " + i, results.get(i));
}
}
}

View File

@ -0,0 +1,213 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class DfsQueryPhaseTests extends ESTestCase {
public void testDfsWith2Shards() throws IOException {
AtomicArray<DfsSearchResult> results = new AtomicArray<>(2);
AtomicReference<AtomicArray<QuerySearchResultProvider>> responseRef = new AtomicReference<>();
results.set(0, new DfsSearchResult(1, new SearchShardTarget("node1", new Index("test", "na"), 0)));
results.set(1, new DfsSearchResult(2, new SearchShardTarget("node2", new Index("test", "na"), 0)));
results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
ActionListener<QuerySearchResult> listener) {
if (request.id() == 1) {
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(2); // the size of the result set
listener.onResponse(queryResult);
} else if (request.id() == 2) {
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node2", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(2); // the size of the result set
listener.onResponse(queryResult);
} else {
fail("no such request ID: " + request.id());
}
}
};
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
mockSearchPhaseContext.searchTransport = searchTransportService;
DfsQueryPhase phase = new DfsQueryPhase(results, controller,
(response) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(response);
}
}, mockSearchPhaseContext);
assertEquals("dfs_query", phase.getName());
phase.run();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(responseRef.get());
assertNotNull(responseRef.get().get(0));
assertNull(responseRef.get().get(0).fetchResult());
assertEquals(1, responseRef.get().get(0).queryResult().topDocs().totalHits);
assertEquals(42, responseRef.get().get(0).queryResult().topDocs().scoreDocs[0].doc);
assertNotNull(responseRef.get().get(1));
assertNull(responseRef.get().get(1).fetchResult());
assertEquals(1, responseRef.get().get(1).queryResult().topDocs().totalHits);
assertEquals(84, responseRef.get().get(1).queryResult().topDocs().scoreDocs[0].doc);
assertTrue(mockSearchPhaseContext.releasedSearchContexts.isEmpty());
assertEquals(2, mockSearchPhaseContext.numSuccess.get());
}
public void testDfsWith1ShardFailed() throws IOException {
AtomicArray<DfsSearchResult> results = new AtomicArray<>(2);
AtomicReference<AtomicArray<QuerySearchResultProvider>> responseRef = new AtomicReference<>();
results.set(0, new DfsSearchResult(1, new SearchShardTarget("node1", new Index("test", "na"), 0)));
results.set(1, new DfsSearchResult(2, new SearchShardTarget("node2", new Index("test", "na"), 0)));
results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
ActionListener<QuerySearchResult> listener) {
if (request.id() == 1) {
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(2); // the size of the result set
listener.onResponse(queryResult);
} else if (request.id() == 2) {
listener.onFailure(new MockDirectoryWrapper.FakeIOException());
} else {
fail("no such request ID: " + request.id());
}
}
};
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
mockSearchPhaseContext.searchTransport = searchTransportService;
DfsQueryPhase phase = new DfsQueryPhase(results, controller,
(response) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(response);
}
}, mockSearchPhaseContext);
assertEquals("dfs_query", phase.getName());
phase.run();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(responseRef.get());
assertNotNull(responseRef.get().get(0));
assertNull(responseRef.get().get(0).fetchResult());
assertEquals(1, responseRef.get().get(0).queryResult().topDocs().totalHits);
assertEquals(42, responseRef.get().get(0).queryResult().topDocs().scoreDocs[0].doc);
assertNull(responseRef.get().get(1));
assertEquals(1, mockSearchPhaseContext.numSuccess.get());
assertEquals(1, mockSearchPhaseContext.failures.size());
assertTrue(mockSearchPhaseContext.failures.get(0).getCause() instanceof MockDirectoryWrapper.FakeIOException);
assertEquals(1, mockSearchPhaseContext.releasedSearchContexts.size());
assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(2L));
assertNull(responseRef.get().get(1));
}
public void testFailPhaseOnException() throws IOException {
AtomicArray<DfsSearchResult> results = new AtomicArray<>(2);
AtomicReference<AtomicArray<QuerySearchResultProvider>> responseRef = new AtomicReference<>();
results.set(0, new DfsSearchResult(1, new SearchShardTarget("node1", new Index("test", "na"), 0)));
results.set(1, new DfsSearchResult(2, new SearchShardTarget("node2", new Index("test", "na"), 0)));
results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
ActionListener<QuerySearchResult> listener) {
if (request.id() == 1) {
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(2); // the size of the result set
listener.onResponse(queryResult);
} else if (request.id() == 2) {
throw new UncheckedIOException(new MockDirectoryWrapper.FakeIOException());
} else {
fail("no such request ID: " + request.id());
}
}
};
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
mockSearchPhaseContext.searchTransport = searchTransportService;
DfsQueryPhase phase = new DfsQueryPhase(results, controller,
(response) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(response);
}
}, mockSearchPhaseContext);
assertEquals("dfs_query", phase.getName());
expectThrows(UncheckedIOException.class, () -> phase.run());
assertTrue(mockSearchPhaseContext.releasedSearchContexts.isEmpty()); // phase execution will clean up on the contexts
}
}

View File

@ -0,0 +1,346 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class FetchSearchPhaseTests extends ESTestCase {
public void testShortcutQueryAndFetchOptimization() throws IOException {
AtomicArray<QuerySearchResultProvider> results = new AtomicArray<>(1);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
boolean hasHits = randomBoolean();
final int numHits;
if (hasHits) {
QuerySearchResult queryResult = new QuerySearchResult();
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 1.0F), new DocValueFormat[0]);
queryResult.size(1);
FetchSearchResult fetchResult = new FetchSearchResult();
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(42)}, 1, 1.0F));
results.set(0, new QueryFetchSearchResult(queryResult, fetchResult));
numHits = 1;
} else {
numHits = 0;
}
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(searchResponse);
}
});
assertEquals("fetch", phase.getName());
phase.run();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(responseRef.get());
assertEquals(numHits, responseRef.get().getHits().totalHits);
if (numHits != 0) {
assertEquals(42, responseRef.get().getHits().getAt(0).docId());
}
assertTrue(mockSearchPhaseContext.releasedSearchContexts.isEmpty());
}
public void testFetchTwoDocument() throws IOException {
AtomicArray<QuerySearchResultProvider> results = new AtomicArray<>(2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = randomIntBetween(2, 10);
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set
results.set(0, queryResult);
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize);
results.set(1, queryResult);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
ActionListener<FetchSearchResult> listener) {
FetchSearchResult fetchResult = new FetchSearchResult();
if (request.id() == 321) {
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)}, 1, 2.0F));
} else {
assertEquals(123, request.id());
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(42)}, 1, 1.0F));
}
listener.onResponse(fetchResult);
}
};
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
mockSearchPhaseContext.searchTransport = searchTransportService;
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(searchResponse);
}
});
assertEquals("fetch", phase.getName());
phase.run();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(responseRef.get());
assertEquals(2, responseRef.get().getHits().totalHits);
assertEquals(84, responseRef.get().getHits().getAt(0).docId());
assertEquals(42, responseRef.get().getHits().getAt(1).docId());
assertEquals(0, responseRef.get().getFailedShards());
assertEquals(2, responseRef.get().getSuccessfulShards());
assertTrue(mockSearchPhaseContext.releasedSearchContexts.isEmpty());
}
public void testFailFetchOneDoc() throws IOException {
AtomicArray<QuerySearchResultProvider> results = new AtomicArray<>(2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = randomIntBetween(2, 10);
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set
results.set(0, queryResult);
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize);
results.set(1, queryResult);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
ActionListener<FetchSearchResult> listener) {
if (request.id() == 321) {
FetchSearchResult fetchResult = new FetchSearchResult();
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)}, 1, 2.0F));
listener.onResponse(fetchResult);
} else {
listener.onFailure(new MockDirectoryWrapper.FakeIOException());
}
}
};
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
mockSearchPhaseContext.searchTransport = searchTransportService;
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(searchResponse);
}
});
assertEquals("fetch", phase.getName());
phase.run();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(responseRef.get());
assertEquals(2, responseRef.get().getHits().totalHits);
assertEquals(84, responseRef.get().getHits().getAt(0).docId());
assertEquals(1, responseRef.get().getFailedShards());
assertEquals(1, responseRef.get().getSuccessfulShards());
assertEquals(1, responseRef.get().getShardFailures().length);
assertTrue(responseRef.get().getShardFailures()[0].getCause() instanceof MockDirectoryWrapper.FakeIOException);
assertEquals(1, mockSearchPhaseContext.releasedSearchContexts.size());
assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(123L));
}
public void testFetchDocsConcurrently() throws IOException, InterruptedException {
int resultSetSize = randomIntBetween(0, 100);
// we use at least 2 hits otherwise this is subject to single shard optimization and we trip an assert...
int numHits = randomIntBetween(2, 100); // also numshards --> 1 hit per shard
AtomicArray<QuerySearchResultProvider> results = new AtomicArray<>(numHits);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
for (int i = 0; i < numHits; i++) {
QuerySearchResult queryResult = new QuerySearchResult(i, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(i+1, i)}, i), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set
results.set(i, queryResult);
}
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
ActionListener<FetchSearchResult> listener) {
new Thread(() -> {
FetchSearchResult fetchResult = new FetchSearchResult();
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit((int) (request.id()+1))}, 1, 100F));
listener.onResponse(fetchResult);
}).start();
}
};
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits);
mockSearchPhaseContext.searchTransport = searchTransportService;
CountDownLatch latch = new CountDownLatch(1);
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(searchResponse);
latch.countDown();
}
});
assertEquals("fetch", phase.getName());
phase.run();
latch.await();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(responseRef.get());
assertEquals(numHits, responseRef.get().getHits().totalHits);
assertEquals(Math.min(numHits, resultSetSize), responseRef.get().getHits().getHits().length);
SearchHit[] hits = responseRef.get().getHits().getHits();
for (int i = 0; i < hits.length; i++) {
assertNotNull(hits[i]);
assertEquals("index: " + i, numHits-i, hits[i].docId());
assertEquals("index: " + i, numHits-1-i, (int)hits[i].getScore());
}
assertEquals(0, responseRef.get().getFailedShards());
assertEquals(numHits, responseRef.get().getSuccessfulShards());
int sizeReleasedContexts = Math.max(0, numHits - resultSetSize); // all non fetched results will be freed
assertEquals(mockSearchPhaseContext.releasedSearchContexts.toString(),
sizeReleasedContexts, mockSearchPhaseContext.releasedSearchContexts.size());
}
public void testExceptionFailsPhase() throws IOException {
AtomicArray<QuerySearchResultProvider> results = new AtomicArray<>(2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = randomIntBetween(2, 10);
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set
results.set(0, queryResult);
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize);
results.set(1, queryResult);
AtomicInteger numFetches = new AtomicInteger(0);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
ActionListener<FetchSearchResult> listener) {
FetchSearchResult fetchResult = new FetchSearchResult();
if (numFetches.incrementAndGet() == 1) {
throw new RuntimeException("BOOM");
}
if (request.id() == 321) {
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)}, 1, 2.0F));
} else {
assertEquals(request, 123);
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(42)}, 1, 1.0F));
}
listener.onResponse(fetchResult);
}
};
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
mockSearchPhaseContext.searchTransport = searchTransportService;
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(searchResponse);
}
});
assertEquals("fetch", phase.getName());
phase.run();
assertNotNull(mockSearchPhaseContext.phaseFailure.get());
assertEquals(mockSearchPhaseContext.phaseFailure.get().getMessage(), "BOOM");
assertNull(responseRef.get());
assertTrue(mockSearchPhaseContext.releasedSearchContexts.isEmpty());
}
public void testCleanupIrrelevantContexts() throws IOException { // contexts that are not fetched should be cleaned up
AtomicArray<QuerySearchResultProvider> results = new AtomicArray<>(2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = 1;
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set
results.set(0, queryResult);
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1));
queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize);
results.set(1, queryResult);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
ActionListener<FetchSearchResult> listener) {
FetchSearchResult fetchResult = new FetchSearchResult();
if (request.id() == 321) {
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)}, 1, 2.0F));
} else {
fail("requestID 123 should not be fetched but was");
}
listener.onResponse(fetchResult);
}
};
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
mockSearchPhaseContext.searchTransport = searchTransportService;
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(searchResponse);
}
});
assertEquals("fetch", phase.getName());
phase.run();
mockSearchPhaseContext.assertNoFailure();
assertNotNull(responseRef.get());
assertEquals(2, responseRef.get().getHits().totalHits);
assertEquals(1, responseRef.get().getHits().internalHits().length);
assertEquals(84, responseRef.get().getHits().getAt(0).docId());
assertEquals(0, responseRef.get().getFailedShards());
assertEquals(2, responseRef.get().getSuccessfulShards());
assertEquals(1, mockSearchPhaseContext.releasedSearchContexts.size());
assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(123L));
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.transport.Transport;
import org.junit.Assert;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* SearchPhaseContext for tests
*/
public final class MockSearchPhaseContext implements SearchPhaseContext {
private static final Logger logger = Loggers.getLogger(MockSearchPhaseContext.class);
public AtomicReference<Throwable> phaseFailure = new AtomicReference<>();
final int numShards;
final AtomicInteger numSuccess;
List<ShardSearchFailure> failures = Collections.synchronizedList(new ArrayList<>());
SearchTransportService searchTransport;
Set<Long> releasedSearchContexts = new HashSet<>();
public MockSearchPhaseContext(int numShards) {
this.numShards = numShards;
numSuccess = new AtomicInteger(numShards);
}
public void assertNoFailure() {
if (phaseFailure.get() != null) {
throw new AssertionError(phaseFailure.get());
}
}
@Override
public int getNumShards() {
return numShards;
}
@Override
public Logger getLogger() {
return logger;
}
@Override
public SearchTask getTask() {
return new SearchTask(0, "n/a", "n/a", "test", null);
}
@Override
public SearchRequest getRequest() {
return new SearchRequest();
}
@Override
public SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
return new SearchResponse(internalSearchResponse, scrollId, numShards, numSuccess.get(), 0,
failures.toArray(new ShardSearchFailure[0]));
}
@Override
public void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
phaseFailure.set(cause);
}
@Override
public void onShardFailure(int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) {
failures.add(new ShardSearchFailure(e, shardTarget));
numSuccess.decrementAndGet();
}
@Override
public Transport.Connection getConnection(String nodeId) {
return null; // null is ok here for this test
}
@Override
public SearchTransportService getSearchTransport() {
Assert.assertNotNull(searchTransport);
return searchTransport;
}
@Override
public ShardSearchTransportRequest buildShardSearchRequest(ShardIterator shardIt, ShardRouting shard) {
Assert.fail("should not be called");
return null;
}
@Override
public void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase) {
try {
nextPhase.run();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public void execute(Runnable command) {
command.run();
}
@Override
public void onResponse(SearchResponse response) {
Assert.fail("should not be called");
}
@Override
public void onFailure(Exception e) {
Assert.fail("should not be called");
}
@Override
public void sendReleaseSearchContext(long contextId, Transport.Connection connection) {
releasedSearchContexts.add(contextId);
}
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -96,14 +95,14 @@ public class SearchAsyncActionTests extends ESTestCase {
lookup.put(primaryNode.getId(), new MockConnection(primaryNode));
lookup.put(replicaNode.getId(), new MockConnection(replicaNode));
Map<String, AliasFilter> aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY));
AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction<TestSearchPhaseResult>(logger, transportService, lookup::get,
aliasFilters, Collections.emptyMap(), null, null, request, responseListener, shardsIter, 0, 0, null) {
AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction<TestSearchPhaseResult>("test", logger, transportService,
lookup::get, aliasFilters, Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0, null) {
TestSearchResponse response = new TestSearchResponse();
@Override
protected void sendExecuteFirstPhase(Transport.Connection connection, ShardSearchTransportRequest request,
ActionListener listener) {
assertTrue("shard: " + request.shardId() + " has been queried twice", response.queried.add(request.shardId()));
protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener<TestSearchPhaseResult> listener) {
assertTrue("shard: " + shard.shardId() + " has been queried twice", response.queried.add(shard.shardId()));
Transport.Connection connection = getConnection(shard.currentNodeId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
connection.getNode());
Set<Long> ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> new HashSet<>());
@ -116,28 +115,20 @@ public class SearchAsyncActionTests extends ESTestCase {
}
@Override
protected CheckedRunnable<Exception> getNextPhase(AtomicArray<TestSearchPhaseResult> initialResults) {
return () -> {
for (int i = 0; i < initialResults.length(); i++) {
TestSearchPhaseResult result = initialResults.get(i);
protected SearchPhase getNextPhase(AtomicArray<TestSearchPhaseResult> results, SearchPhaseContext context) {
return new SearchPhase("test") {
@Override
public void run() throws IOException {
for (int i = 0; i < results.length(); i++) {
TestSearchPhaseResult result = results.get(i);
assertEquals(result.node.getId(), result.shardTarget().getNodeId());
sendReleaseSearchContext(result.id(), new MockConnection(result.node));
}
responseListener.onResponse(response);
latch.countDown();
}
};
}
@Override
protected String initialPhaseName() {
return "test";
}
@Override
protected Executor getExecutor() {
fail("no executor in this class");
return null;
}
};
asyncAction.start();
latch.await();

View File

@ -0,0 +1,87 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class GroupShardsIteratorTests extends ESTestCase {
public void testSize() {
List<ShardIterator> list = new ArrayList<>();
Index index = new Index("foo", "na");
list.add(new PlainShardIterator(new ShardId(index, 0), Arrays.asList(newRouting(index, 0, true), newRouting(index, 0, true),
newRouting(index, 0, true))));
list.add(new PlainShardIterator(new ShardId(index, 1), Collections.emptyList()));
list.add(new PlainShardIterator(new ShardId(index, 2), Arrays.asList(newRouting(index, 2, true))));
index = new Index("foo_1", "na");
list.add(new PlainShardIterator(new ShardId(index, 0), Arrays.asList(newRouting(index, 0, true))));
list.add(new PlainShardIterator(new ShardId(index, 1), Arrays.asList(newRouting(index, 1, true))));
GroupShardsIterator iter = new GroupShardsIterator(list);
assertEquals(7, iter.totalSizeWith1ForEmpty());
assertEquals(5, iter.size());
assertEquals(6, iter.totalSize());
}
public void testIterate() {
List<ShardIterator> list = new ArrayList<>();
Index index = new Index("foo", "na");
list.add(new PlainShardIterator(new ShardId(index, 0), Arrays.asList(newRouting(index, 0, true), newRouting(index, 0, true),
newRouting(index, 0, true))));
list.add(new PlainShardIterator(new ShardId(index, 1), Collections.emptyList()));
list.add(new PlainShardIterator(new ShardId(index, 2), Arrays.asList(newRouting(index, 2, true))));
list.add(new PlainShardIterator(new ShardId(index, 0), Arrays.asList(newRouting(index, 0, true))));
list.add(new PlainShardIterator(new ShardId(index, 1), Arrays.asList(newRouting(index, 1, true))));
index = new Index("foo_2", "na");
list.add(new PlainShardIterator(new ShardId(index, 0), Arrays.asList(newRouting(index, 0, true))));
list.add(new PlainShardIterator(new ShardId(index, 1), Arrays.asList(newRouting(index, 1, true))));
Collections.shuffle(list, random());
ArrayList<ShardIterator> actualIterators = new ArrayList<>();
GroupShardsIterator iter = new GroupShardsIterator(list);
for (ShardIterator shardsIterator : iter) {
actualIterators.add(shardsIterator);
}
CollectionUtil.timSort(actualIterators);
assertEquals(actualIterators, list);
}
public ShardRouting newRouting(Index index, int id, boolean started) {
ShardRouting shardRouting = ShardRouting.newUnassigned(new ShardId(index, id), true,
RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
shardRouting = ShardRoutingHelper.initialize(shardRouting, "some node");
if (started) {
shardRouting = ShardRoutingHelper.moveToStarted(shardRouting);
}
return shardRouting;
};
}

View File

@ -314,7 +314,7 @@ public class AdjacencyMatrixIT extends ESIntegTestCase {
.execute().actionGet();
fail("SearchPhaseExecutionException should have been thrown");
} catch (SearchPhaseExecutionException ex) {
assertThat(ex.getCause().getCause().getMessage(), containsString("Number of filters is too large"));
assertThat(ex.getCause().getMessage(), containsString("Number of filters is too large"));
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.SearchContextException;
@ -65,9 +66,11 @@ public class SearchAfterIT extends ESIntegTestCase {
fail("Should fail on search_after cannot be used with scroll.");
} catch (SearchPhaseExecutionException e) {
assertThat(e.getCause().getClass(), Matchers.equalTo(RemoteTransportException.class));
assertThat(e.getCause().getCause().getClass(), Matchers.equalTo(SearchContextException.class));
assertThat(e.getCause().getCause().getMessage(), Matchers.equalTo("`search_after` cannot be used in a scroll context."));
assertTrue(e.shardFailures().length > 0);
for (ShardSearchFailure failure : e.shardFailures()) {
assertThat(failure.getCause().getClass(), Matchers.equalTo(SearchContextException.class));
assertThat(failure.getCause().getMessage(), Matchers.equalTo("`search_after` cannot be used in a scroll context."));
}
}
try {
client().prepareSearch("test")
@ -79,9 +82,11 @@ public class SearchAfterIT extends ESIntegTestCase {
fail("Should fail on search_after cannot be used with from > 0.");
} catch (SearchPhaseExecutionException e) {
assertThat(e.getCause().getClass(), Matchers.equalTo(RemoteTransportException.class));
assertThat(e.getCause().getCause().getClass(), Matchers.equalTo(SearchContextException.class));
assertThat(e.getCause().getCause().getMessage(), Matchers.equalTo("`from` parameter must be set to 0 when `search_after` is used."));
assertTrue(e.shardFailures().length > 0);
for (ShardSearchFailure failure : e.shardFailures()) {
assertThat(failure.getCause().getClass(), Matchers.equalTo(SearchContextException.class));
assertThat(failure.getCause().getMessage(), Matchers.equalTo("`from` parameter must be set to 0 when `search_after` is used."));
}
}
try {
@ -92,9 +97,11 @@ public class SearchAfterIT extends ESIntegTestCase {
fail("Should fail on search_after on score only is disabled");
} catch (SearchPhaseExecutionException e) {
assertThat(e.getCause().getClass(), Matchers.equalTo(RemoteTransportException.class));
assertThat(e.getCause().getCause().getClass(), Matchers.equalTo(IllegalArgumentException.class));
assertThat(e.getCause().getCause().getMessage(), Matchers.equalTo("Sort must contain at least one field."));
assertTrue(e.shardFailures().length > 0);
for (ShardSearchFailure failure : e.shardFailures()) {
assertThat(failure.getCause().getClass(), Matchers.equalTo(IllegalArgumentException.class));
assertThat(failure.getCause().getMessage(), Matchers.equalTo("Sort must contain at least one field."));
}
}
try {
@ -106,9 +113,11 @@ public class SearchAfterIT extends ESIntegTestCase {
.get();
fail("Should fail on search_after size differs from sort field size");
} catch (SearchPhaseExecutionException e) {
assertThat(e.getCause().getClass(), Matchers.equalTo(RemoteTransportException.class));
assertThat(e.getCause().getCause().getClass(), Matchers.equalTo(IllegalArgumentException.class));
assertThat(e.getCause().getCause().getMessage(), Matchers.equalTo("search_after has 1 value(s) but sort has 2."));
assertTrue(e.shardFailures().length > 0);
for (ShardSearchFailure failure : e.shardFailures()) {
assertThat(failure.getCause().getClass(), Matchers.equalTo(IllegalArgumentException.class));
assertThat(failure.getCause().getMessage(), Matchers.equalTo("search_after has 1 value(s) but sort has 2."));
}
}
try {
@ -119,9 +128,11 @@ public class SearchAfterIT extends ESIntegTestCase {
.get();
fail("Should fail on search_after size differs from sort field size");
} catch (SearchPhaseExecutionException e) {
assertThat(e.getCause().getClass(), Matchers.equalTo(RemoteTransportException.class));
assertThat(e.getCause().getCause().getClass(), Matchers.equalTo(IllegalArgumentException.class));
assertThat(e.getCause().getCause().getMessage(), Matchers.equalTo("search_after has 2 value(s) but sort has 1."));
for (ShardSearchFailure failure : e.shardFailures()) {
assertTrue(e.shardFailures().length > 0);
assertThat(failure.getCause().getClass(), Matchers.equalTo(IllegalArgumentException.class));
assertThat(failure.getCause().getMessage(), Matchers.equalTo("search_after has 2 value(s) but sort has 1."));
}
}
try {
@ -133,9 +144,11 @@ public class SearchAfterIT extends ESIntegTestCase {
fail("Should fail on search_after on score only is disabled");
} catch (SearchPhaseExecutionException e) {
assertThat(e.getCause().getClass(), Matchers.equalTo(RemoteTransportException.class));
assertThat(e.getCause().getCause().getClass(), Matchers.equalTo(IllegalArgumentException.class));
assertThat(e.getCause().getCause().getMessage(), Matchers.equalTo("Failed to parse search_after value for field [field1]."));
assertTrue(e.shardFailures().length > 0);
for (ShardSearchFailure failure : e.shardFailures()) {
assertThat(failure.getCause().getClass(), Matchers.equalTo(IllegalArgumentException.class));
assertThat(failure.getCause().getMessage(), Matchers.equalTo("Failed to parse search_after value for field [field1]."));
}
}
}

View File

@ -141,7 +141,7 @@ public class SuggestSearchIT extends ESIntegTestCase {
try {
searchSuggest("test", termSuggest);
fail(" can not suggest across multiple indices with different analysis chains");
} catch (ReduceSearchPhaseException ex) {
} catch (SearchPhaseExecutionException ex) {
assertThat(ex.getCause(), instanceOf(IllegalStateException.class));
assertThat(ex.getCause().getMessage(),
anyOf(endsWith("Suggest entries have different sizes actual [1] expected [2]"),
@ -160,7 +160,7 @@ public class SuggestSearchIT extends ESIntegTestCase {
try {
searchSuggest("test", termSuggest);
fail(" can not suggest across multiple indices with different analysis chains");
} catch (ReduceSearchPhaseException ex) {
} catch (SearchPhaseExecutionException ex) {
assertThat(ex.getCause(), instanceOf(IllegalStateException.class));
assertThat(ex.getCause().getMessage(), anyOf(endsWith("Suggest entries have different text actual [ABCD] expected [abcd]"),
endsWith("Suggest entries have different text actual [abcd] expected [ABCD]")));

View File

@ -143,7 +143,10 @@ setup:
- match: { error.root_cause.0.type: "script_exception" }
- match: { error.root_cause.0.reason: "compile error" }
- match: { error.caused_by.type: "script_exception" }
- match: { error.caused_by.reason: "compile error" }
- match: { error.caused_by.caused_by.type: "illegal_argument_exception" }
- match: { error.caused_by.caused_by.reason: "While loop has no escape." }
- match: { error.type: "search_phase_execution_exception" }
- match: { error.reason: "all shards failed" }
- match: { error.failed_shards.0.reason.caused_by.type: "illegal_argument_exception" }
- match: { error.failed_shards.0.reason.caused_by.reason: "While loop has no escape." }
- match: { error.failed_shards.0.reason.type: "script_exception" }
- match: { error.failed_shards.0.reason.reason: "compile error" }