Merge remote-tracking branch 'origin/master'

This commit is contained in:
David Pilato 2016-02-29 15:17:08 +01:00
commit 0c871df6ca
23 changed files with 1490 additions and 1637 deletions

View File

@ -207,16 +207,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]ShardSearchFailure.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportClearScrollAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportMultiSearchAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportSearchAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportSearchScrollAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]type[/\\]TransportSearchDfsQueryAndFetchAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]type[/\\]TransportSearchDfsQueryThenFetchAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]type[/\\]TransportSearchHelper.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]type[/\\]TransportSearchQueryAndFetchAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]type[/\\]TransportSearchQueryThenFetchAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]type[/\\]TransportSearchScrollQueryAndFetchAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]type[/\\]TransportSearchScrollQueryThenFetchAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]type[/\\]TransportSearchTypeAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]suggest[/\\]SuggestResponse.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]suggest[/\\]TransportSuggestAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]ActionFilter.java" checks="LineLength" />

View File

@ -174,12 +174,6 @@ import org.elasticsearch.action.search.TransportClearScrollAction;
import org.elasticsearch.action.search.TransportMultiSearchAction;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.action.search.type.TransportSearchDfsQueryAndFetchAction;
import org.elasticsearch.action.search.type.TransportSearchDfsQueryThenFetchAction;
import org.elasticsearch.action.search.type.TransportSearchQueryAndFetchAction;
import org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction;
import org.elasticsearch.action.search.type.TransportSearchScrollQueryAndFetchAction;
import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchAction;
import org.elasticsearch.action.suggest.SuggestAction;
import org.elasticsearch.action.suggest.TransportSuggestAction;
import org.elasticsearch.action.support.ActionFilter;
@ -333,16 +327,8 @@ public class ActionModule extends AbstractModule {
TransportShardMultiGetAction.class);
registerAction(BulkAction.INSTANCE, TransportBulkAction.class,
TransportShardBulkAction.class);
registerAction(SearchAction.INSTANCE, TransportSearchAction.class,
TransportSearchDfsQueryThenFetchAction.class,
TransportSearchQueryThenFetchAction.class,
TransportSearchDfsQueryAndFetchAction.class,
TransportSearchQueryAndFetchAction.class
);
registerAction(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class,
TransportSearchScrollQueryThenFetchAction.class,
TransportSearchScrollQueryAndFetchAction.class
);
registerAction(SearchAction.INSTANCE, TransportSearchAction.class);
registerAction(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
registerAction(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class);
registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class);

View File

@ -17,12 +17,12 @@
* under the License.
*/
package org.elasticsearch.action.search.type;
package org.elasticsearch.action.search;
/**
* Base implementation for an async action.
*/
public class AbstractAsyncAction {
abstract class AbstractAsyncAction {
private final long startTime;
@ -46,4 +46,5 @@ public class AbstractAsyncAction {
return Math.max(1, System.currentTimeMillis() - startTime);
}
abstract void start();
}

View File

@ -0,0 +1,393 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.TransportSearchHelper.internalSearchRequest;
abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
protected final ESLogger logger;
protected final SearchServiceTransportAction searchService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
protected final SearchPhaseController searchPhaseController;
protected final ThreadPool threadPool;
protected final ActionListener<SearchResponse> listener;
protected final GroupShardsIterator shardsIts;
protected final SearchRequest request;
protected final ClusterState clusterState;
protected final DiscoveryNodes nodes;
protected final int expectedSuccessfulOps;
private final int expectedTotalOps;
protected final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger totalOps = new AtomicInteger();
protected final AtomicArray<FirstResult> firstResults;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
private final Object shardFailuresMutex = new Object();
protected volatile ScoreDoc[] sortedShardList;
protected AbstractSearchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request,
ActionListener<SearchResponse> listener) {
this.logger = logger;
this.searchService = searchService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.searchPhaseController = searchPhaseController;
this.threadPool = threadPool;
this.request = request;
this.listener = listener;
this.clusterState = clusterService.state();
nodes = clusterState.nodes();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
// of just for the _search api
String[] concreteIndices = indexNameExpressionResolver.concreteIndices(clusterState, request.indicesOptions(),
startTime(), request.indices());
for (String index : concreteIndices) {
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index);
}
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, request.routing(),
request.indices());
shardsIts = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, request.preference());
expectedSuccessfulOps = shardsIts.size();
// we need to add 1 for non active partition, since we count it in the total!
expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
firstResults = new AtomicArray<>(shardsIts.size());
}
public void start() {
if (expectedSuccessfulOps == 0) {
//no search shards to search on, bail with empty response
//(it happens with search across _all with no indices around and consistent with broadcast operations)
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, buildTookInMillis(),
ShardSearchFailure.EMPTY_ARRAY));
return;
}
int shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performFirstPhase(shardIndex, shardIt, shard);
} else {
// really, no shards active in this group
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
}
}
void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
if (shard == null) {
// no more active shards... (we should not really get here, but just for safety)
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
final DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
String[] filteringAliases = indexNameExpressionResolver.filteringAliases(clusterState,
shard.index().getName(), request.indices());
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases,
startTime()), new ActionListener<FirstResult>() {
@Override
public void onResponse(FirstResult result) {
onFirstPhaseResult(shardIndex, shard, result, shardIt);
}
@Override
public void onFailure(Throwable t) {
onFirstPhaseResult(shardIndex, shard, node.id(), shardIt, t);
}
});
}
}
}
void onFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result, ShardIterator shardIt) {
result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id()));
processFirstPhaseResult(shardIndex, result);
// we need to increment successful ops first before we compare the exit condition otherwise if we
// are fast we could concurrently update totalOps but then preempt one of the threads which can
// cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc.
successfulOps.incrementAndGet();
// increment all the "future" shards to update the total ops since we some may work and some may not...
// and when that happens, we break on total ops, so we must maintain them
final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1);
if (xTotalOps == expectedTotalOps) {
try {
innerMoveToSecondPhase();
} catch (Throwable e) {
if (logger.isDebugEnabled()) {
logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "] while moving to second phase", e);
}
raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
}
} else if (xTotalOps > expectedTotalOps) {
raiseEarlyFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared " +
"to expected [" + expectedTotalOps + "]"));
}
}
void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId,
final ShardIterator shardIt, Throwable t) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId().getIndex(), shardIt.shardId().getId());
addShardFailure(shardIndex, shardTarget, t);
if (totalOps.incrementAndGet() == expectedTotalOps) {
if (logger.isDebugEnabled()) {
if (t != null && !TransportActions.isShardNotAvailableException(t)) {
if (shard != null) {
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
} else {
logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t);
}
} else if (logger.isTraceEnabled()) {
logger.trace("{}: Failed to execute [{}]", t, shard, request);
}
}
final ShardSearchFailure[] shardSearchFailures = buildShardFailures();
if (successfulOps.get() == 0) {
if (logger.isDebugEnabled()) {
logger.debug("All shards failed for phase: [{}]", t, firstPhaseName());
}
// no successful ops, raise an exception
raiseEarlyFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", t, shardSearchFailures));
} else {
try {
innerMoveToSecondPhase();
} catch (Throwable e) {
raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, shardSearchFailures));
}
}
} else {
final ShardRouting nextShard = shardIt.nextOrNull();
final boolean lastShard = nextShard == null;
// trace log this exception
if (logger.isTraceEnabled()) {
logger.trace(executionFailureMsg(shard, shardIt, request, lastShard), t);
}
if (!lastShard) {
try {
performFirstPhase(shardIndex, shardIt, nextShard);
} catch (Throwable t1) {
onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t1);
}
} else {
// no more shards active, add a failure
if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception
if (t != null && !TransportActions.isShardNotAvailableException(t)) {
logger.debug(executionFailureMsg(shard, shardIt, request, lastShard), t);
}
}
}
}
}
private String executionFailureMsg(@Nullable ShardRouting shard, final ShardIterator shardIt, SearchRequest request,
boolean lastShard) {
if (shard != null) {
return shard.shortSummary() + ": Failed to execute [" + request + "] lastShard [" + lastShard + "]";
} else {
return shardIt.shardId() + ": Failed to execute [" + request + "] lastShard [" + lastShard + "]";
}
}
protected final ShardSearchFailure[] buildShardFailures() {
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures;
if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
List<AtomicArray.Entry<ShardSearchFailure>> entries = shardFailures.asList();
ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()];
for (int i = 0; i < failures.length; i++) {
failures[i] = entries.get(i).value;
}
return failures;
}
protected final void addShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Throwable t) {
// we don't aggregate shard failures on non active shards (but do keep the header counts right)
if (TransportActions.isShardNotAvailableException(t)) {
return;
}
// lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
if (shardFailures == null) {
synchronized (shardFailuresMutex) {
if (shardFailures == null) {
shardFailures = new AtomicArray<>(shardsIts.size());
}
}
}
ShardSearchFailure failure = shardFailures.get(shardIndex);
if (failure == null) {
shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget));
} else {
// the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state
if (TransportActions.isReadOverrideException(t)) {
shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget));
}
}
}
private void raiseEarlyFailure(Throwable t) {
for (AtomicArray.Entry<FirstResult> entry : firstResults.asList()) {
try {
DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId());
sendReleaseSearchContext(entry.value.id(), node);
} catch (Throwable t1) {
logger.trace("failed to release context", t1);
}
}
listener.onFailure(t);
}
/**
* Releases shard targets that are not used in the docsIdsToLoad.
*/
protected void releaseIrrelevantSearchContexts(AtomicArray<? extends QuerySearchResultProvider> queryResults,
AtomicArray<IntArrayList> docIdsToLoad) {
if (docIdsToLoad == null) {
return;
}
// we only release search context that we did not fetch from if we are not scrolling
if (request.scroll() == null) {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
final TopDocs topDocs = entry.value.queryResult().queryResult().topDocs();
if (topDocs != null && topDocs.scoreDocs.length > 0 // the shard had matches
&& docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs
try {
DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId());
sendReleaseSearchContext(entry.value.queryResult().id(), node);
} catch (Throwable t1) {
logger.trace("failed to release context", t1);
}
}
}
}
}
protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) {
if (node != null) {
searchService.sendFreeContext(node, contextId, request);
}
}
protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry<IntArrayList> entry,
ScoreDoc[] lastEmittedDocPerShard) {
if (lastEmittedDocPerShard != null) {
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
} else {
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value);
}
}
protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<FirstResult> listener);
protected final void processFirstPhaseResult(int shardIndex, FirstResult result) {
firstResults.set(shardIndex, result);
if (logger.isTraceEnabled()) {
logger.trace("got first-phase result from {}", result != null ? result.shardTarget() : null);
}
// clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level
// so its ok concurrency wise to miss potentially the shard failures being created because of another failure
// in the #addShardFailure, because by definition, it will happen on *another* shardIndex
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures;
if (shardFailures != null) {
shardFailures.set(shardIndex, null);
}
}
final void innerMoveToSecondPhase() throws Exception {
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
boolean hadOne = false;
for (int i = 0; i < firstResults.length(); i++) {
FirstResult result = firstResults.get(i);
if (result == null) {
continue; // failure
}
if (hadOne) {
sb.append(",");
} else {
hadOne = true;
}
sb.append(result.shardTarget());
}
logger.trace("Moving to second phase, based on results from: {} (cluster state version: {})", sb, clusterState.version());
}
moveToSecondPhase();
}
protected abstract void moveToSecondPhase() throws Exception;
protected abstract String firstPhaseName();
}

View File

@ -17,14 +17,14 @@
* under the License.
*/
package org.elasticsearch.action.search.type;
package org.elasticsearch.action.search;
import java.util.Map;
/**
*
*/
public class ParsedScrollId {
class ParsedScrollId {
public static final String QUERY_THEN_FETCH_TYPE = "queryThenFetch";

View File

@ -17,9 +17,9 @@
* under the License.
*/
package org.elasticsearch.action.search.type;
package org.elasticsearch.action.search;
public class ScrollIdForNode {
class ScrollIdForNode {
private final String node;
private final long scrollId;

View File

@ -0,0 +1,142 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {
private final AtomicArray<QueryFetchSearchResult> queryFetchResults;
SearchDfsQueryAndFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) {
super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener);
queryFetchResults = new AtomicArray<>(firstResults.length());
}
@Override
protected String firstPhaseName() {
return "dfs";
}
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<DfsSearchResult> listener) {
searchService.sendExecuteDfs(node, request, listener);
}
@Override
protected void moveToSecondPhase() {
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
}
void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
final DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
searchService.sendExecuteFetch(node, querySearchRequest, new ActionListener<QueryFetchSearchResult>() {
@Override
public void onResponse(QueryFetchSearchResult result) {
result.shardTarget(dfsResult.shardTarget());
queryFetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Throwable t) {
try {
onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
} finally {
// the query might not have been executed at all (for example because thread pool rejected execution)
// and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
sendReleaseSearchContext(querySearchRequest.id(), node);
}
}
});
}
void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult,
AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
@Override
public void onFailure(Throwable t) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", t, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
super.onFailure(t);
}
});
}
}

View File

@ -0,0 +1,223 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {
final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<IntArrayList> docIdsToLoad;
SearchDfsQueryThenFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) {
super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener);
queryResults = new AtomicArray<>(firstResults.length());
fetchResults = new AtomicArray<>(firstResults.length());
docIdsToLoad = new AtomicArray<>(firstResults.length());
}
@Override
protected String firstPhaseName() {
return "dfs";
}
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<DfsSearchResult> listener) {
searchService.sendExecuteDfs(node, request, listener);
}
@Override
protected void moveToSecondPhase() {
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
}
void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
final QuerySearchRequest querySearchRequest, final DiscoveryNode node) {
searchService.sendExecuteQuery(node, querySearchRequest, new ActionListener<QuerySearchResult>() {
@Override
public void onResponse(QuerySearchResult result) {
result.shardTarget(dfsResult.shardTarget());
queryResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
}
@Override
public void onFailure(Throwable t) {
try {
onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
} finally {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
sendReleaseSearchContext(querySearchRequest.id(), node);
}
}
});
}
void onQueryFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult,
AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
if (successfulOps.get() == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures()));
} else {
executeFetchPhase();
}
}
}
void executeFetchPhase() {
try {
innerExecuteFetchPhase();
} catch (Throwable e) {
listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures()));
}
}
void innerExecuteFetchPhase() throws Exception {
boolean useScroll = request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, queryResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
request, sortedShardList, firstResults.length()
);
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget);
fetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Throwable t) {
// the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context. by setting docIdsToLoad to null, the context will be cleared
// in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done.
docIdsToLoad.set(shardIndex, null);
onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
}
});
}
void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex,
SearchShardTarget shardTarget, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
this.addShardFailure(shardIndex, shardTarget, t);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults,
fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
}
@Override
public void onFailure(Throwable t) {
try {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
super.onFailure(failure);
} finally {
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
}
}
});
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetchSearchResult> {
SearchQueryAndFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) {
super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener);
}
@Override
protected String firstPhaseName() {
return "query_fetch";
}
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<QueryFetchSearchResult> listener) {
searchService.sendExecuteFetch(node, request, listener);
}
@Override
protected void moveToSecondPhase() throws Exception {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
boolean useScroll = request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
firstResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
@Override
public void onFailure(Throwable t) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
super.onFailure(failure);
}
});
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySearchResultProvider> {
final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<IntArrayList> docIdsToLoad;
SearchQueryThenFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) {
super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener);
fetchResults = new AtomicArray<>(firstResults.length());
docIdsToLoad = new AtomicArray<>(firstResults.length());
}
@Override
protected String firstPhaseName() {
return "query";
}
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<QuerySearchResultProvider> listener) {
searchService.sendExecuteQuery(node, request, listener);
}
@Override
protected void moveToSecondPhase() throws Exception {
boolean useScroll = request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
request, sortedShardList, firstResults.length()
);
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget);
fetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Throwable t) {
// the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context. by setting docIdsToLoad to null, the context will be cleared
// in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done.
docIdsToLoad.set(shardIndex, null);
onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
}
});
}
void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget,
AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
this.addShardFailure(shardIndex, shardTarget, t);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps,
successfulOps.get(), buildTookInMillis(), buildShardFailures()));
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
}
@Override
public void onFailure(Throwable t) {
try {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", t, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
super.onFailure(failure);
} finally {
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
}
}
});
}
}

View File

@ -0,0 +1,181 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest;
class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
private final ESLogger logger;
private final SearchPhaseController searchPhaseController;
private final SearchServiceTransportAction searchService;
private final SearchScrollRequest request;
private final ActionListener<SearchResponse> listener;
private final ParsedScrollId scrollId;
private final DiscoveryNodes nodes;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
private final AtomicArray<QueryFetchSearchResult> queryFetchResults;
private final AtomicInteger successfulOps;
private final AtomicInteger counter;
SearchScrollQueryAndFetchAsyncAction(ESLogger logger, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController,
SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
this.logger = logger;
this.searchPhaseController = searchPhaseController;
this.searchService = searchService;
this.request = request;
this.listener = listener;
this.scrollId = scrollId;
this.nodes = clusterService.state().nodes();
this.successfulOps = new AtomicInteger(scrollId.getContext().length);
this.counter = new AtomicInteger(scrollId.getContext().length);
this.queryFetchResults = new AtomicArray<>(scrollId.getContext().length);
}
protected final ShardSearchFailure[] buildShardFailures() {
if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
List<AtomicArray.Entry<ShardSearchFailure>> entries = shardFailures.asList();
ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()];
for (int i = 0; i < failures.length; i++) {
failures[i] = entries.get(i).value;
}
return failures;
}
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new AtomicArray<>(scrollId.getContext().length);
}
shardFailures.set(shardIndex, failure);
}
public void start() {
if (scrollId.getContext().length == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY));
return;
}
ScrollIdForNode[] context = scrollId.getContext();
for (int i = 0; i < context.length; i++) {
ScrollIdForNode target = context[i];
DiscoveryNode node = nodes.get(target.getNode());
if (node != null) {
executePhase(i, node, target.getScrollId());
} else {
if (logger.isDebugEnabled()) {
logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]");
}
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
}
for (ScrollIdForNode target : scrollId.getContext()) {
DiscoveryNode node = nodes.get(target.getNode());
if (node == null) {
if (logger.isDebugEnabled()) {
logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]");
}
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
}
}
void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
searchService.sendExecuteFetch(node, internalRequest, new ActionListener<ScrollQueryFetchSearchResult>() {
@Override
public void onResponse(ScrollQueryFetchSearchResult result) {
queryFetchResults.set(shardIndex, result.result());
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Throwable t) {
onPhaseFailure(t, searchId, shardIndex);
}
});
}
private void onPhaseFailure(Throwable t, long searchId, int shardIndex) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
if (successfulOps.get() == 0) {
listener.onFailure(new SearchPhaseExecutionException("query_fetch", "all shards failed", t, buildShardFailures()));
} else {
finishHim();
}
}
}
private void finishHim() {
try {
innerFinishHim();
} catch (Throwable e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
}
private void innerFinishHim() throws Exception {
ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
}

View File

@ -0,0 +1,226 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchRequest;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest;
class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private final ESLogger logger;
private final SearchServiceTransportAction searchService;
private final SearchPhaseController searchPhaseController;
private final SearchScrollRequest request;
private final ActionListener<SearchResponse> listener;
private final ParsedScrollId scrollId;
private final DiscoveryNodes nodes;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
private volatile ScoreDoc[] sortedShardList;
private final AtomicInteger successfulOps;
SearchScrollQueryThenFetchAsyncAction(ESLogger logger, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController,
SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
this.logger = logger;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
this.request = request;
this.listener = listener;
this.scrollId = scrollId;
this.nodes = clusterService.state().nodes();
this.successfulOps = new AtomicInteger(scrollId.getContext().length);
this.queryResults = new AtomicArray<>(scrollId.getContext().length);
this.fetchResults = new AtomicArray<>(scrollId.getContext().length);
}
protected final ShardSearchFailure[] buildShardFailures() {
if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
List<AtomicArray.Entry<ShardSearchFailure>> entries = shardFailures.asList();
ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()];
for (int i = 0; i < failures.length; i++) {
failures[i] = entries.get(i).value;
}
return failures;
}
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new AtomicArray<>(scrollId.getContext().length);
}
shardFailures.set(shardIndex, failure);
}
public void start() {
if (scrollId.getContext().length == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY));
return;
}
final AtomicInteger counter = new AtomicInteger(scrollId.getContext().length);
ScrollIdForNode[] context = scrollId.getContext();
for (int i = 0; i < context.length; i++) {
ScrollIdForNode target = context[i];
DiscoveryNode node = nodes.get(target.getNode());
if (node != null) {
executeQueryPhase(i, counter, node, target.getScrollId());
} else {
if (logger.isDebugEnabled()) {
logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]");
}
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
try {
executeFetchPhase();
} catch (Throwable e) {
listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY));
return;
}
}
}
}
}
private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) {
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
searchService.sendExecuteQuery(node, internalRequest, new ActionListener<ScrollQuerySearchResult>() {
@Override
public void onResponse(ScrollQuerySearchResult result) {
queryResults.set(shardIndex, result.queryResult());
if (counter.decrementAndGet() == 0) {
try {
executeFetchPhase();
} catch (Throwable e) {
onFailure(e);
}
}
}
@Override
public void onFailure(Throwable t) {
onQueryPhaseFailure(shardIndex, counter, searchId, t);
}
});
}
void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
if (successfulOps.get() == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", t, buildShardFailures()));
} else {
try {
executeFetchPhase();
} catch (Throwable e) {
listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY));
}
}
}
}
private void executeFetchPhase() throws Exception {
sortedShardList = searchPhaseController.sortDocs(true, queryResults);
AtomicArray<IntArrayList> docIdsToLoad = new AtomicArray<>(queryResults.length());
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length());
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
IntArrayList docIds = entry.value;
final QuerySearchResult querySearchResult = queryResults.get(entry.index);
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc);
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
searchService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(querySearchResult.shardTarget());
fetchResults.set(entry.index, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to execute fetch phase", t);
}
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
});
}
}
private void finishHim() {
try {
innerFinishHim();
} catch (Throwable e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
}
private void innerFinishHim() {
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.type.ScrollIdForNode;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterService;
@ -41,7 +40,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.action.search.type.TransportSearchHelper.parseScrollId;
import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId;
/**
*/

View File

@ -20,10 +20,6 @@
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.type.TransportSearchDfsQueryAndFetchAction;
import org.elasticsearch.action.search.type.TransportSearchDfsQueryThenFetchAction;
import org.elasticsearch.action.search.type.TransportSearchQueryAndFetchAction;
import org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterService;
@ -33,13 +29,14 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.action.search.SearchType.DFS_QUERY_THEN_FETCH;
import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH;
/**
@ -48,25 +45,18 @@ import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH;
public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
private final ClusterService clusterService;
private final TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction;
private final TransportSearchQueryThenFetchAction queryThenFetchAction;
private final TransportSearchDfsQueryAndFetchAction dfsQueryAndFetchAction;
private final TransportSearchQueryAndFetchAction queryAndFetchAction;
private final SearchServiceTransportAction searchService;
private final SearchPhaseController searchPhaseController;
@Inject
public TransportSearchAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService,
TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction,
TransportSearchQueryThenFetchAction queryThenFetchAction,
TransportSearchDfsQueryAndFetchAction dfsQueryAndFetchAction,
TransportSearchQueryAndFetchAction queryAndFetchAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
public TransportSearchAction(Settings settings, ThreadPool threadPool, SearchPhaseController searchPhaseController,
TransportService transportService, SearchServiceTransportAction searchService,
ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver
indexNameExpressionResolver) {
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new);
this.searchPhaseController = searchPhaseController;
this.searchService = searchService;
this.clusterService = clusterService;
this.dfsQueryThenFetchAction = dfsQueryThenFetchAction;
this.queryThenFetchAction = queryThenFetchAction;
this.dfsQueryAndFetchAction = dfsQueryAndFetchAction;
this.queryAndFetchAction = queryAndFetchAction;
}
@Override
@ -75,7 +65,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
try {
ClusterState clusterState = clusterService.state();
String[] concreteIndices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest);
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices());
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState,
searchRequest.routing(), searchRequest.indices());
int shardCount = clusterService.operationRouting().searchShardsCount(clusterState, concreteIndices, routingMap);
if (shardCount == 1) {
// if we only have one group, then we always want Q_A_F, no need for DFS, and no need to do THEN since we hit one shard
@ -86,16 +77,28 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
} catch (Exception e) {
logger.debug("failed to optimize search type, continue as normal", e);
}
if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) {
dfsQueryThenFetchAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.QUERY_THEN_FETCH) {
queryThenFetchAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.DFS_QUERY_AND_FETCH) {
dfsQueryAndFetchAction.execute(searchRequest, listener);
} else if (searchRequest.searchType() == SearchType.QUERY_AND_FETCH) {
queryAndFetchAction.execute(searchRequest, listener);
} else {
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
AbstractSearchAsyncAction searchAsyncAction;
switch(searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
break;
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
break;
case DFS_QUERY_AND_FETCH:
searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
break;
case QUERY_AND_FETCH:
searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
}
searchAsyncAction.start();
}
}

View File

@ -17,13 +17,10 @@
* under the License.
*/
package org.elasticsearch.action.search.type;
package org.elasticsearch.action.search;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Nullable;
@ -42,17 +39,19 @@ import static java.util.Collections.emptyMap;
/**
*
*/
public abstract class TransportSearchHelper {
final class TransportSearchHelper {
public static ShardSearchTransportRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis) {
static ShardSearchTransportRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request,
String[] filteringAliases, long nowInMillis) {
return new ShardSearchTransportRequest(request, shardRouting, numberOfShards, filteringAliases, nowInMillis);
}
public static InternalScrollSearchRequest internalScrollSearchRequest(long id, SearchScrollRequest request) {
static InternalScrollSearchRequest internalScrollSearchRequest(long id, SearchScrollRequest request) {
return new InternalScrollSearchRequest(request, id);
}
public static String buildScrollId(SearchType searchType, AtomicArray<? extends SearchPhaseResult> searchPhaseResults, @Nullable Map<String, String> attributes) throws IOException {
static String buildScrollId(SearchType searchType, AtomicArray<? extends SearchPhaseResult> searchPhaseResults,
@Nullable Map<String, String> attributes) throws IOException {
if (searchType == SearchType.DFS_QUERY_THEN_FETCH || searchType == SearchType.QUERY_THEN_FETCH) {
return buildScrollId(ParsedScrollId.QUERY_THEN_FETCH_TYPE, searchPhaseResults, attributes);
} else if (searchType == SearchType.QUERY_AND_FETCH || searchType == SearchType.DFS_QUERY_AND_FETCH) {
@ -62,7 +61,8 @@ public abstract class TransportSearchHelper {
}
}
public static String buildScrollId(String type, AtomicArray<? extends SearchPhaseResult> searchPhaseResults, @Nullable Map<String, String> attributes) throws IOException {
static String buildScrollId(String type, AtomicArray<? extends SearchPhaseResult> searchPhaseResults,
@Nullable Map<String, String> attributes) throws IOException {
StringBuilder sb = new StringBuilder().append(type).append(';');
sb.append(searchPhaseResults.asList().size()).append(';');
for (AtomicArray.Entry<? extends SearchPhaseResult> entry : searchPhaseResults.asList()) {
@ -81,7 +81,7 @@ public abstract class TransportSearchHelper {
return Base64.encodeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length, Base64.URL_SAFE);
}
public static ParsedScrollId parseScrollId(String scrollId) {
static ParsedScrollId parseScrollId(String scrollId) {
CharsRefBuilder spare = new CharsRefBuilder();
try {
byte[] decode = Base64.decode(scrollId, Base64.URL_SAFE);
@ -128,5 +128,4 @@ public abstract class TransportSearchHelper {
private TransportSearchHelper() {
}
}

View File

@ -20,51 +20,60 @@
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.type.ParsedScrollId;
import org.elasticsearch.action.search.type.TransportSearchScrollQueryAndFetchAction;
import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import static org.elasticsearch.action.search.type.ParsedScrollId.QUERY_AND_FETCH_TYPE;
import static org.elasticsearch.action.search.type.ParsedScrollId.QUERY_THEN_FETCH_TYPE;
import static org.elasticsearch.action.search.type.TransportSearchHelper.parseScrollId;
import static org.elasticsearch.action.search.ParsedScrollId.QUERY_AND_FETCH_TYPE;
import static org.elasticsearch.action.search.ParsedScrollId.QUERY_THEN_FETCH_TYPE;
import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId;
/**
*
*/
public class TransportSearchScrollAction extends HandledTransportAction<SearchScrollRequest, SearchResponse> {
private final TransportSearchScrollQueryThenFetchAction queryThenFetchAction;
private final TransportSearchScrollQueryAndFetchAction queryAndFetchAction;
private final ClusterService clusterService;
private final SearchServiceTransportAction searchService;
private final SearchPhaseController searchPhaseController;
@Inject
public TransportSearchScrollAction(Settings settings, ThreadPool threadPool, TransportService transportService,
TransportSearchScrollQueryThenFetchAction queryThenFetchAction,
TransportSearchScrollQueryAndFetchAction queryAndFetchAction,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchScrollRequest::new);
this.queryThenFetchAction = queryThenFetchAction;
this.queryAndFetchAction = queryAndFetchAction;
ClusterService clusterService, SearchServiceTransportAction searchService,
SearchPhaseController searchPhaseController,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
SearchScrollRequest::new);
this.clusterService = clusterService;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
}
@Override
protected void doExecute(SearchScrollRequest request, ActionListener<SearchResponse> listener) {
try {
ParsedScrollId scrollId = parseScrollId(request.scrollId());
if (scrollId.getType().equals(QUERY_THEN_FETCH_TYPE)) {
queryThenFetchAction.execute(request, scrollId, listener);
} else if (scrollId.getType().equals(QUERY_AND_FETCH_TYPE)) {
queryAndFetchAction.execute(request, scrollId, listener);
} else {
throw new IllegalArgumentException("Scroll id type [" + scrollId.getType() + "] unrecognized");
AbstractAsyncAction action;
switch (scrollId.getType()) {
case QUERY_THEN_FETCH_TYPE:
action = new SearchScrollQueryThenFetchAsyncAction(logger, clusterService, searchService,
searchPhaseController, request, scrollId, listener);
break;
case QUERY_AND_FETCH_TYPE:
action = new SearchScrollQueryAndFetchAsyncAction(logger, clusterService, searchService,
searchPhaseController, request, scrollId, listener);
break;
default:
throw new IllegalArgumentException("Scroll id type [" + scrollId.getType() + "] unrecognized");
}
action.start();
} catch (Throwable e) {
listener.onFailure(e);
}

View File

@ -1,158 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search.type;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
*/
public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAction {
@Inject
public TransportSearchDfsQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, threadPool, clusterService, searchService, searchPhaseController, actionFilters, indexNameExpressionResolver);
}
@Override
protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
new AsyncAction(searchRequest, listener).start();
}
private class AsyncAction extends BaseAsyncAction<DfsSearchResult> {
private final AtomicArray<QueryFetchSearchResult> queryFetchResults;
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener);
queryFetchResults = new AtomicArray<>(firstResults.length());
}
@Override
protected String firstPhaseName() {
return "dfs";
}
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener<DfsSearchResult> listener) {
searchService.sendExecuteDfs(node, request, listener);
}
@Override
protected void moveToSecondPhase() {
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
}
void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
searchService.sendExecuteFetch(node, querySearchRequest, new ActionListener<QueryFetchSearchResult>() {
@Override
public void onResponse(QueryFetchSearchResult result) {
result.shardTarget(dfsResult.shardTarget());
queryFetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Throwable t) {
try {
onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
} finally {
// the query might not have been executed at all (for example because thread pool rejected execution)
// and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
sendReleaseSearchContext(querySearchRequest.id(), node);
}
}
});
}
void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
}
@Override
public void onFailure(Throwable t) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", t, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
super.onFailure(t);
}
});
}
}
}

View File

@ -1,239 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search.type;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
*/
public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeAction {
@Inject
public TransportSearchDfsQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, threadPool, clusterService, searchService, searchPhaseController, actionFilters, indexNameExpressionResolver);
}
@Override
protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
new AsyncAction(searchRequest, listener).start();
}
private class AsyncAction extends BaseAsyncAction<DfsSearchResult> {
final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<IntArrayList> docIdsToLoad;
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener);
queryResults = new AtomicArray<>(firstResults.length());
fetchResults = new AtomicArray<>(firstResults.length());
docIdsToLoad = new AtomicArray<>(firstResults.length());
}
@Override
protected String firstPhaseName() {
return "dfs";
}
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener<DfsSearchResult> listener) {
searchService.sendExecuteDfs(node, request, listener);
}
@Override
protected void moveToSecondPhase() {
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
}
void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, final DiscoveryNode node) {
searchService.sendExecuteQuery(node, querySearchRequest, new ActionListener<QuerySearchResult>() {
@Override
public void onResponse(QuerySearchResult result) {
result.shardTarget(dfsResult.shardTarget());
queryResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
}
@Override
public void onFailure(Throwable t) {
try {
onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
} finally {
// the query might not have been executed at all (for example because thread pool rejected execution)
// and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
sendReleaseSearchContext(querySearchRequest.id(), node);
}
}
});
}
void onQueryFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
if (successfulOps.get() == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures()));
} else {
executeFetchPhase();
}
}
}
void executeFetchPhase() {
try {
innerExecuteFetchPhase();
} catch (Throwable e) {
listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures()));
}
}
void innerExecuteFetchPhase() throws Exception {
boolean useScroll = request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, queryResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
request, sortedShardList, firstResults.length()
);
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget);
fetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Throwable t) {
// the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context. by setting docIdsToLoad to null, the context will be cleared
// in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done.
docIdsToLoad.set(shardIndex, null);
onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
}
});
}
void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
this.addShardFailure(shardIndex, shardTarget, t);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults,
fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
}
@Override
public void onFailure(Throwable t) {
try {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
super.onFailure(failure);
} finally {
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
}
}
});
}
}
}

View File

@ -1,104 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search.type;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId;
/**
*
*/
public class TransportSearchQueryAndFetchAction extends TransportSearchTypeAction {
@Inject
public TransportSearchQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, threadPool, clusterService, searchService, searchPhaseController, actionFilters, indexNameExpressionResolver);
}
@Override
protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
new AsyncAction(searchRequest, listener).start();
}
private class AsyncAction extends BaseAsyncAction<QueryFetchSearchResult> {
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener);
}
@Override
protected String firstPhaseName() {
return "query_fetch";
}
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener<QueryFetchSearchResult> listener) {
searchService.sendExecuteFetch(node, request, listener);
}
@Override
protected void moveToSecondPhase() throws Exception {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
boolean useScroll = request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
firstResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
}
@Override
public void onFailure(Throwable t) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
super.onFailure(failure);
}
});
}
}
}

View File

@ -1,173 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search.type;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
*/
public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction {
@Inject
public TransportSearchQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, threadPool, clusterService, searchService, searchPhaseController, actionFilters, indexNameExpressionResolver);
}
@Override
protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
new AsyncAction(searchRequest, listener).start();
}
private class AsyncAction extends BaseAsyncAction<QuerySearchResultProvider> {
final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<IntArrayList> docIdsToLoad;
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener);
fetchResults = new AtomicArray<>(firstResults.length());
docIdsToLoad = new AtomicArray<>(firstResults.length());
}
@Override
protected String firstPhaseName() {
return "query";
}
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener<QuerySearchResultProvider> listener) {
searchService.sendExecuteQuery(node, request, listener);
}
@Override
protected void moveToSecondPhase() throws Exception {
boolean useScroll = request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
request, sortedShardList, firstResults.length()
);
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget);
fetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Throwable t) {
// the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context. by setting docIdsToLoad to null, the context will be cleared
// in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done.
docIdsToLoad.set(shardIndex, null);
onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
}
});
}
void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
this.addShardFailure(shardIndex, shardTarget, t);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
}
@Override
public void onFailure(Throwable t) {
try {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", t, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
super.onFailure(failure);
} finally {
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
}
}
});
}
}
}

View File

@ -1,205 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search.type;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest;
/**
*
*/
public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent {
private final ClusterService clusterService;
private final SearchServiceTransportAction searchService;
private final SearchPhaseController searchPhaseController;
@Inject
public TransportSearchScrollQueryAndFetchAction(Settings settings, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings);
this.clusterService = clusterService;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
}
public void execute(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
new AsyncAction(request, scrollId, listener).start();
}
private class AsyncAction extends AbstractAsyncAction {
private final SearchScrollRequest request;
private final ActionListener<SearchResponse> listener;
private final ParsedScrollId scrollId;
private final DiscoveryNodes nodes;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
private final AtomicArray<QueryFetchSearchResult> queryFetchResults;
private final AtomicInteger successfulOps;
private final AtomicInteger counter;
private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
this.request = request;
this.listener = listener;
this.scrollId = scrollId;
this.nodes = clusterService.state().nodes();
this.successfulOps = new AtomicInteger(scrollId.getContext().length);
this.counter = new AtomicInteger(scrollId.getContext().length);
this.queryFetchResults = new AtomicArray<>(scrollId.getContext().length);
}
protected final ShardSearchFailure[] buildShardFailures() {
if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
List<AtomicArray.Entry<ShardSearchFailure>> entries = shardFailures.asList();
ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()];
for (int i = 0; i < failures.length; i++) {
failures[i] = entries.get(i).value;
}
return failures;
}
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new AtomicArray<>(scrollId.getContext().length);
}
shardFailures.set(shardIndex, failure);
}
public void start() {
if (scrollId.getContext().length == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY));
return;
}
ScrollIdForNode[] context = scrollId.getContext();
for (int i = 0; i < context.length; i++) {
ScrollIdForNode target = context[i];
DiscoveryNode node = nodes.get(target.getNode());
if (node != null) {
executePhase(i, node, target.getScrollId());
} else {
if (logger.isDebugEnabled()) {
logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]");
}
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
}
for (ScrollIdForNode target : scrollId.getContext()) {
DiscoveryNode node = nodes.get(target.getNode());
if (node == null) {
if (logger.isDebugEnabled()) {
logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]");
}
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
}
}
void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
searchService.sendExecuteFetch(node, internalRequest, new ActionListener<ScrollQueryFetchSearchResult>() {
@Override
public void onResponse(ScrollQueryFetchSearchResult result) {
queryFetchResults.set(shardIndex, result.result());
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Throwable t) {
onPhaseFailure(t, searchId, shardIndex);
}
});
}
private void onPhaseFailure(Throwable t, long searchId, int shardIndex) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
if (successfulOps.get() == 0) {
listener.onFailure(new SearchPhaseExecutionException("query_fetch", "all shards failed", t, buildShardFailures()));
} else {
finishHim();
}
}
}
private void finishHim() {
try {
innerFinishHim();
} catch (Throwable e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
}
private void innerFinishHim() throws Exception {
ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
}
}

View File

@ -1,255 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search.type;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchRequest;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest;
/**
*
*/
public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent {
private final ClusterService clusterService;
private final SearchServiceTransportAction searchService;
private final SearchPhaseController searchPhaseController;
@Inject
public TransportSearchScrollQueryThenFetchAction(Settings settings, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) {
super(settings);
this.clusterService = clusterService;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
}
public void execute(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
new AsyncAction(request, scrollId, listener).start();
}
private class AsyncAction extends AbstractAsyncAction {
private final SearchScrollRequest request;
private final ActionListener<SearchResponse> listener;
private final ParsedScrollId scrollId;
private final DiscoveryNodes nodes;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
private volatile ScoreDoc[] sortedShardList;
private final AtomicInteger successfulOps;
private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
this.request = request;
this.listener = listener;
this.scrollId = scrollId;
this.nodes = clusterService.state().nodes();
this.successfulOps = new AtomicInteger(scrollId.getContext().length);
this.queryResults = new AtomicArray<>(scrollId.getContext().length);
this.fetchResults = new AtomicArray<>(scrollId.getContext().length);
}
protected final ShardSearchFailure[] buildShardFailures() {
if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
List<AtomicArray.Entry<ShardSearchFailure>> entries = shardFailures.asList();
ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()];
for (int i = 0; i < failures.length; i++) {
failures[i] = entries.get(i).value;
}
return failures;
}
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new AtomicArray<>(scrollId.getContext().length);
}
shardFailures.set(shardIndex, failure);
}
public void start() {
if (scrollId.getContext().length == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY));
return;
}
final AtomicInteger counter = new AtomicInteger(scrollId.getContext().length);
ScrollIdForNode[] context = scrollId.getContext();
for (int i = 0; i < context.length; i++) {
ScrollIdForNode target = context[i];
DiscoveryNode node = nodes.get(target.getNode());
if (node != null) {
executeQueryPhase(i, counter, node, target.getScrollId());
} else {
if (logger.isDebugEnabled()) {
logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]");
}
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
try {
executeFetchPhase();
} catch (Throwable e) {
listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY));
return;
}
}
}
}
}
private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) {
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
searchService.sendExecuteQuery(node, internalRequest, new ActionListener<ScrollQuerySearchResult>() {
@Override
public void onResponse(ScrollQuerySearchResult result) {
queryResults.set(shardIndex, result.queryResult());
if (counter.decrementAndGet() == 0) {
try {
executeFetchPhase();
} catch (Throwable e) {
onFailure(e);
}
}
}
@Override
public void onFailure(Throwable t) {
onQueryPhaseFailure(shardIndex, counter, searchId, t);
}
});
}
void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
if (successfulOps.get() == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", t, buildShardFailures()));
} else {
try {
executeFetchPhase();
} catch (Throwable e) {
listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY));
}
}
}
}
private void executeFetchPhase() throws Exception {
sortedShardList = searchPhaseController.sortDocs(true, queryResults);
AtomicArray<IntArrayList> docIdsToLoad = new AtomicArray<>(queryResults.length());
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length());
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
IntArrayList docIds = entry.value;
final QuerySearchResult querySearchResult = queryResults.get(entry.index);
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc);
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
searchService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(querySearchResult.shardTarget());
fetchResults.set(entry.index, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to execute fetch phase", t);
}
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
});
}
}
private void finishHim() {
try {
innerFinishHim();
} catch (Throwable e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
}
private void innerFinishHim() {
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
}
}

View File

@ -1,406 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search.type;
import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.internalSearchRequest;
/**
*
*/
public abstract class TransportSearchTypeAction extends TransportAction<SearchRequest, SearchResponse> {
protected final ClusterService clusterService;
protected final SearchServiceTransportAction searchService;
protected final SearchPhaseController searchPhaseController;
public TransportSearchTypeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, SearchAction.NAME, threadPool, actionFilters, indexNameExpressionResolver, clusterService.getTaskManager());
this.clusterService = clusterService;
this.searchService = searchService;
this.searchPhaseController = searchPhaseController;
}
protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
protected final ActionListener<SearchResponse> listener;
protected final GroupShardsIterator shardsIts;
protected final SearchRequest request;
protected final ClusterState clusterState;
protected final DiscoveryNodes nodes;
protected final int expectedSuccessfulOps;
private final int expectedTotalOps;
protected final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger totalOps = new AtomicInteger();
protected final AtomicArray<FirstResult> firstResults;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
private final Object shardFailuresMutex = new Object();
protected volatile ScoreDoc[] sortedShardList;
protected BaseAsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
this.request = request;
this.listener = listener;
this.clusterState = clusterService.state();
nodes = clusterState.nodes();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
// of just for the _search api
String[] concreteIndices = indexNameExpressionResolver.concreteIndices(clusterState, request.indicesOptions(), startTime(), request.indices());
for (String index : concreteIndices) {
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index);
}
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, request.routing(), request.indices());
shardsIts = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, request.preference());
expectedSuccessfulOps = shardsIts.size();
// we need to add 1 for non active partition, since we count it in the total!
expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
firstResults = new AtomicArray<>(shardsIts.size());
}
public void start() {
if (expectedSuccessfulOps == 0) {
// no search shards to search on, bail with empty response (it happens with search across _all with no indices around and consistent with broadcast operations)
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, buildTookInMillis(), ShardSearchFailure.EMPTY_ARRAY));
return;
}
int shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performFirstPhase(shardIndex, shardIt, shard);
} else {
// really, no shards active in this group
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
}
}
void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
if (shard == null) {
// no more active shards... (we should not really get here, but just for safety)
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
final DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
String[] filteringAliases = indexNameExpressionResolver.filteringAliases(clusterState, shard.index().getName(), request.indices());
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime()), new ActionListener<FirstResult>() {
@Override
public void onResponse(FirstResult result) {
onFirstPhaseResult(shardIndex, shard, result, shardIt);
}
@Override
public void onFailure(Throwable t) {
onFirstPhaseResult(shardIndex, shard, node.id(), shardIt, t);
}
});
}
}
}
void onFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result, ShardIterator shardIt) {
result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id()));
processFirstPhaseResult(shardIndex, result);
// we need to increment successful ops first before we compare the exit condition otherwise if we
// are fast we could concurrently update totalOps but then preempt one of the threads which can
// cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc.
successfulOps.incrementAndGet();
// increment all the "future" shards to update the total ops since we some may work and some may not...
// and when that happens, we break on total ops, so we must maintain them
final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1);
if (xTotalOps == expectedTotalOps) {
try {
innerMoveToSecondPhase();
} catch (Throwable e) {
if (logger.isDebugEnabled()) {
logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "] while moving to second phase", e);
}
raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
}
} else if (xTotalOps > expectedTotalOps) {
raiseEarlyFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]"));
}
}
void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, final ShardIterator shardIt, Throwable t) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId().getIndex(), shardIt.shardId().getId());
addShardFailure(shardIndex, shardTarget, t);
if (totalOps.incrementAndGet() == expectedTotalOps) {
if (logger.isDebugEnabled()) {
if (t != null && !TransportActions.isShardNotAvailableException(t)) {
if (shard != null) {
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
} else {
logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t);
}
} else if (logger.isTraceEnabled()) {
logger.trace("{}: Failed to execute [{}]", t, shard, request);
}
}
final ShardSearchFailure[] shardSearchFailures = buildShardFailures();
if (successfulOps.get() == 0) {
if (logger.isDebugEnabled()) {
logger.debug("All shards failed for phase: [{}]", t, firstPhaseName());
}
// no successful ops, raise an exception
raiseEarlyFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", t, shardSearchFailures));
} else {
try {
innerMoveToSecondPhase();
} catch (Throwable e) {
raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, shardSearchFailures));
}
}
} else {
final ShardRouting nextShard = shardIt.nextOrNull();
final boolean lastShard = nextShard == null;
// trace log this exception
if (logger.isTraceEnabled()) {
logger.trace(executionFailureMsg(shard, shardIt, request, lastShard), t);
}
if (!lastShard) {
try {
performFirstPhase(shardIndex, shardIt, nextShard);
} catch (Throwable t1) {
onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t1);
}
} else {
// no more shards active, add a failure
if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception
if (t != null && !TransportActions.isShardNotAvailableException(t)) {
logger.debug(executionFailureMsg(shard, shardIt, request, lastShard), t);
}
}
}
}
}
private String executionFailureMsg(@Nullable ShardRouting shard, final ShardIterator shardIt, SearchRequest request, boolean lastShard) {
if (shard != null) {
return shard.shortSummary() + ": Failed to execute [" + request + "] lastShard [" + lastShard + "]";
} else {
return shardIt.shardId() + ": Failed to execute [" + request + "] lastShard [" + lastShard + "]";
}
}
protected final ShardSearchFailure[] buildShardFailures() {
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures;
if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
List<AtomicArray.Entry<ShardSearchFailure>> entries = shardFailures.asList();
ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()];
for (int i = 0; i < failures.length; i++) {
failures[i] = entries.get(i).value;
}
return failures;
}
protected final void addShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Throwable t) {
// we don't aggregate shard failures on non active shards (but do keep the header counts right)
if (TransportActions.isShardNotAvailableException(t)) {
return;
}
// lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
if (shardFailures == null) {
synchronized (shardFailuresMutex) {
if (shardFailures == null) {
shardFailures = new AtomicArray<>(shardsIts.size());
}
}
}
ShardSearchFailure failure = shardFailures.get(shardIndex);
if (failure == null) {
shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget));
} else {
// the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state
if (TransportActions.isReadOverrideException(t)) {
shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget));
}
}
}
private void raiseEarlyFailure(Throwable t) {
for (AtomicArray.Entry<FirstResult> entry : firstResults.asList()) {
try {
DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId());
sendReleaseSearchContext(entry.value.id(), node);
} catch (Throwable t1) {
logger.trace("failed to release context", t1);
}
}
listener.onFailure(t);
}
/**
* Releases shard targets that are not used in the docsIdsToLoad.
*/
protected void releaseIrrelevantSearchContexts(AtomicArray<? extends QuerySearchResultProvider> queryResults,
AtomicArray<IntArrayList> docIdsToLoad) {
if (docIdsToLoad == null) {
return;
}
// we only release search context that we did not fetch from if we are not scrolling
if (request.scroll() == null) {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
final TopDocs topDocs = entry.value.queryResult().queryResult().topDocs();
if (topDocs != null && topDocs.scoreDocs.length > 0 // the shard had matches
&& docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs
try {
DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId());
sendReleaseSearchContext(entry.value.queryResult().id(), node);
} catch (Throwable t1) {
logger.trace("failed to release context", t1);
}
}
}
}
}
protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) {
if (node != null) {
searchService.sendFreeContext(node, contextId, request);
}
}
protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry<IntArrayList> entry, ScoreDoc[] lastEmittedDocPerShard) {
if (lastEmittedDocPerShard != null) {
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
} else {
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value);
}
}
protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener<FirstResult> listener);
protected final void processFirstPhaseResult(int shardIndex, FirstResult result) {
firstResults.set(shardIndex, result);
if (logger.isTraceEnabled()) {
logger.trace("got first-phase result from {}", result != null ? result.shardTarget() : null);
}
// clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level
// so its ok concurrency wise to miss potentially the shard failures being created because of another failure
// in the #addShardFailure, because by definition, it will happen on *another* shardIndex
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures;
if (shardFailures != null) {
shardFailures.set(shardIndex, null);
}
}
final void innerMoveToSecondPhase() throws Exception {
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
boolean hadOne = false;
for (int i = 0; i < firstResults.length(); i++) {
FirstResult result = firstResults.get(i);
if (result == null) {
continue; // failure
}
if (hadOne) {
sb.append(",");
} else {
hadOne = true;
}
sb.append(result.shardTarget());
}
logger.trace("Moving to second phase, based on results from: {} (cluster state version: {})", sb, clusterState.version());
}
moveToSecondPhase();
}
protected abstract void moveToSecondPhase() throws Exception;
protected abstract String firstPhaseName();
}
}