diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index d7ea1545fc9..6b7ad9499a5 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -207,16 +207,6 @@ - - - - - - - - - - diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java index cc7b34ff7bd..491202e7c7a 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java @@ -174,12 +174,6 @@ import org.elasticsearch.action.search.TransportClearScrollAction; import org.elasticsearch.action.search.TransportMultiSearchAction; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.search.TransportSearchScrollAction; -import org.elasticsearch.action.search.type.TransportSearchDfsQueryAndFetchAction; -import org.elasticsearch.action.search.type.TransportSearchDfsQueryThenFetchAction; -import org.elasticsearch.action.search.type.TransportSearchQueryAndFetchAction; -import org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction; -import org.elasticsearch.action.search.type.TransportSearchScrollQueryAndFetchAction; -import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchAction; import org.elasticsearch.action.suggest.SuggestAction; import org.elasticsearch.action.suggest.TransportSuggestAction; import org.elasticsearch.action.support.ActionFilter; @@ -333,16 +327,8 @@ public class ActionModule extends AbstractModule { TransportShardMultiGetAction.class); registerAction(BulkAction.INSTANCE, TransportBulkAction.class, TransportShardBulkAction.class); - registerAction(SearchAction.INSTANCE, TransportSearchAction.class, - TransportSearchDfsQueryThenFetchAction.class, - TransportSearchQueryThenFetchAction.class, - TransportSearchDfsQueryAndFetchAction.class, - TransportSearchQueryAndFetchAction.class - ); - registerAction(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class, - TransportSearchScrollQueryThenFetchAction.class, - TransportSearchScrollQueryAndFetchAction.class - ); + registerAction(SearchAction.INSTANCE, TransportSearchAction.class); + registerAction(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class); registerAction(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class); registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class); registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class); diff --git a/core/src/main/java/org/elasticsearch/action/search/type/AbstractAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractAsyncAction.java similarity index 93% rename from core/src/main/java/org/elasticsearch/action/search/type/AbstractAsyncAction.java rename to core/src/main/java/org/elasticsearch/action/search/AbstractAsyncAction.java index 95352ebe44b..3ce14d8dacd 100644 --- a/core/src/main/java/org/elasticsearch/action/search/type/AbstractAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractAsyncAction.java @@ -17,12 +17,12 @@ * under the License. */ -package org.elasticsearch.action.search.type; +package org.elasticsearch.action.search; /** * Base implementation for an async action. */ -public class AbstractAsyncAction { +abstract class AbstractAsyncAction { private final long startTime; @@ -46,4 +46,5 @@ public class AbstractAsyncAction { return Math.max(1, System.currentTimeMillis() - startTime); } + abstract void start(); } diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java new file mode 100644 index 00000000000..d4ae139ee0c --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -0,0 +1,393 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import com.carrotsearch.hppc.IntArrayList; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.fetch.ShardFetchSearchRequest; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.ShardSearchTransportRequest; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.query.QuerySearchResultProvider; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.action.search.TransportSearchHelper.internalSearchRequest; + +abstract class AbstractSearchAsyncAction extends AbstractAsyncAction { + + protected final ESLogger logger; + protected final SearchServiceTransportAction searchService; + private final IndexNameExpressionResolver indexNameExpressionResolver; + protected final SearchPhaseController searchPhaseController; + protected final ThreadPool threadPool; + protected final ActionListener listener; + protected final GroupShardsIterator shardsIts; + protected final SearchRequest request; + protected final ClusterState clusterState; + protected final DiscoveryNodes nodes; + protected final int expectedSuccessfulOps; + private final int expectedTotalOps; + protected final AtomicInteger successfulOps = new AtomicInteger(); + private final AtomicInteger totalOps = new AtomicInteger(); + protected final AtomicArray firstResults; + private volatile AtomicArray shardFailures; + private final Object shardFailuresMutex = new Object(); + protected volatile ScoreDoc[] sortedShardList; + + protected AbstractSearchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request, + ActionListener listener) { + this.logger = logger; + this.searchService = searchService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.searchPhaseController = searchPhaseController; + this.threadPool = threadPool; + this.request = request; + this.listener = listener; + + this.clusterState = clusterService.state(); + nodes = clusterState.nodes(); + + clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); + + // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name + // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead + // of just for the _search api + String[] concreteIndices = indexNameExpressionResolver.concreteIndices(clusterState, request.indicesOptions(), + startTime(), request.indices()); + + for (String index : concreteIndices) { + clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index); + } + + Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, request.routing(), + request.indices()); + + shardsIts = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, request.preference()); + expectedSuccessfulOps = shardsIts.size(); + // we need to add 1 for non active partition, since we count it in the total! + expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); + + firstResults = new AtomicArray<>(shardsIts.size()); + } + + public void start() { + if (expectedSuccessfulOps == 0) { + //no search shards to search on, bail with empty response + //(it happens with search across _all with no indices around and consistent with broadcast operations) + listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, buildTookInMillis(), + ShardSearchFailure.EMPTY_ARRAY)); + return; + } + int shardIndex = -1; + for (final ShardIterator shardIt : shardsIts) { + shardIndex++; + final ShardRouting shard = shardIt.nextOrNull(); + if (shard != null) { + performFirstPhase(shardIndex, shardIt, shard); + } else { + // really, no shards active in this group + onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); + } + } + } + + void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) { + if (shard == null) { + // no more active shards... (we should not really get here, but just for safety) + onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); + } else { + final DiscoveryNode node = nodes.get(shard.currentNodeId()); + if (node == null) { + onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); + } else { + String[] filteringAliases = indexNameExpressionResolver.filteringAliases(clusterState, + shard.index().getName(), request.indices()); + sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, + startTime()), new ActionListener() { + @Override + public void onResponse(FirstResult result) { + onFirstPhaseResult(shardIndex, shard, result, shardIt); + } + + @Override + public void onFailure(Throwable t) { + onFirstPhaseResult(shardIndex, shard, node.id(), shardIt, t); + } + }); + } + } + } + + void onFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result, ShardIterator shardIt) { + result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id())); + processFirstPhaseResult(shardIndex, result); + // we need to increment successful ops first before we compare the exit condition otherwise if we + // are fast we could concurrently update totalOps but then preempt one of the threads which can + // cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc. + successfulOps.incrementAndGet(); + // increment all the "future" shards to update the total ops since we some may work and some may not... + // and when that happens, we break on total ops, so we must maintain them + final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1); + if (xTotalOps == expectedTotalOps) { + try { + innerMoveToSecondPhase(); + } catch (Throwable e) { + if (logger.isDebugEnabled()) { + logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "] while moving to second phase", e); + } + raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures())); + } + } else if (xTotalOps > expectedTotalOps) { + raiseEarlyFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared " + + "to expected [" + expectedTotalOps + "]")); + } + } + + void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, + final ShardIterator shardIt, Throwable t) { + // we always add the shard failure for a specific shard instance + // we do make sure to clean it on a successful response from a shard + SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId().getIndex(), shardIt.shardId().getId()); + addShardFailure(shardIndex, shardTarget, t); + + if (totalOps.incrementAndGet() == expectedTotalOps) { + if (logger.isDebugEnabled()) { + if (t != null && !TransportActions.isShardNotAvailableException(t)) { + if (shard != null) { + logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t); + } else { + logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t); + } + } else if (logger.isTraceEnabled()) { + logger.trace("{}: Failed to execute [{}]", t, shard, request); + } + } + final ShardSearchFailure[] shardSearchFailures = buildShardFailures(); + if (successfulOps.get() == 0) { + if (logger.isDebugEnabled()) { + logger.debug("All shards failed for phase: [{}]", t, firstPhaseName()); + } + + // no successful ops, raise an exception + raiseEarlyFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", t, shardSearchFailures)); + } else { + try { + innerMoveToSecondPhase(); + } catch (Throwable e) { + raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, shardSearchFailures)); + } + } + } else { + final ShardRouting nextShard = shardIt.nextOrNull(); + final boolean lastShard = nextShard == null; + // trace log this exception + if (logger.isTraceEnabled()) { + logger.trace(executionFailureMsg(shard, shardIt, request, lastShard), t); + } + if (!lastShard) { + try { + performFirstPhase(shardIndex, shardIt, nextShard); + } catch (Throwable t1) { + onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t1); + } + } else { + // no more shards active, add a failure + if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception + if (t != null && !TransportActions.isShardNotAvailableException(t)) { + logger.debug(executionFailureMsg(shard, shardIt, request, lastShard), t); + } + } + } + } + } + + private String executionFailureMsg(@Nullable ShardRouting shard, final ShardIterator shardIt, SearchRequest request, + boolean lastShard) { + if (shard != null) { + return shard.shortSummary() + ": Failed to execute [" + request + "] lastShard [" + lastShard + "]"; + } else { + return shardIt.shardId() + ": Failed to execute [" + request + "] lastShard [" + lastShard + "]"; + } + } + + protected final ShardSearchFailure[] buildShardFailures() { + AtomicArray shardFailures = this.shardFailures; + if (shardFailures == null) { + return ShardSearchFailure.EMPTY_ARRAY; + } + List> entries = shardFailures.asList(); + ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; + for (int i = 0; i < failures.length; i++) { + failures[i] = entries.get(i).value; + } + return failures; + } + + protected final void addShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Throwable t) { + // we don't aggregate shard failures on non active shards (but do keep the header counts right) + if (TransportActions.isShardNotAvailableException(t)) { + return; + } + + // lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures) + if (shardFailures == null) { + synchronized (shardFailuresMutex) { + if (shardFailures == null) { + shardFailures = new AtomicArray<>(shardsIts.size()); + } + } + } + ShardSearchFailure failure = shardFailures.get(shardIndex); + if (failure == null) { + shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget)); + } else { + // the failure is already present, try and not override it with an exception that is less meaningless + // for example, getting illegal shard state + if (TransportActions.isReadOverrideException(t)) { + shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget)); + } + } + } + + private void raiseEarlyFailure(Throwable t) { + for (AtomicArray.Entry entry : firstResults.asList()) { + try { + DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId()); + sendReleaseSearchContext(entry.value.id(), node); + } catch (Throwable t1) { + logger.trace("failed to release context", t1); + } + } + listener.onFailure(t); + } + + /** + * Releases shard targets that are not used in the docsIdsToLoad. + */ + protected void releaseIrrelevantSearchContexts(AtomicArray queryResults, + AtomicArray docIdsToLoad) { + if (docIdsToLoad == null) { + return; + } + // we only release search context that we did not fetch from if we are not scrolling + if (request.scroll() == null) { + for (AtomicArray.Entry entry : queryResults.asList()) { + final TopDocs topDocs = entry.value.queryResult().queryResult().topDocs(); + if (topDocs != null && topDocs.scoreDocs.length > 0 // the shard had matches + && docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs + try { + DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId()); + sendReleaseSearchContext(entry.value.queryResult().id(), node); + } catch (Throwable t1) { + logger.trace("failed to release context", t1); + } + } + } + } + } + + protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) { + if (node != null) { + searchService.sendFreeContext(node, contextId, request); + } + } + + protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry entry, + ScoreDoc[] lastEmittedDocPerShard) { + if (lastEmittedDocPerShard != null) { + ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; + return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc); + } else { + return new ShardFetchSearchRequest(request, queryResult.id(), entry.value); + } + } + + protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, + ActionListener listener); + + protected final void processFirstPhaseResult(int shardIndex, FirstResult result) { + firstResults.set(shardIndex, result); + + if (logger.isTraceEnabled()) { + logger.trace("got first-phase result from {}", result != null ? result.shardTarget() : null); + } + + // clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level + // so its ok concurrency wise to miss potentially the shard failures being created because of another failure + // in the #addShardFailure, because by definition, it will happen on *another* shardIndex + AtomicArray shardFailures = this.shardFailures; + if (shardFailures != null) { + shardFailures.set(shardIndex, null); + } + } + + final void innerMoveToSecondPhase() throws Exception { + if (logger.isTraceEnabled()) { + StringBuilder sb = new StringBuilder(); + boolean hadOne = false; + for (int i = 0; i < firstResults.length(); i++) { + FirstResult result = firstResults.get(i); + if (result == null) { + continue; // failure + } + if (hadOne) { + sb.append(","); + } else { + hadOne = true; + } + sb.append(result.shardTarget()); + } + + logger.trace("Moving to second phase, based on results from: {} (cluster state version: {})", sb, clusterState.version()); + } + moveToSecondPhase(); + } + + protected abstract void moveToSecondPhase() throws Exception; + + protected abstract String firstPhaseName(); +} diff --git a/core/src/main/java/org/elasticsearch/action/search/type/ParsedScrollId.java b/core/src/main/java/org/elasticsearch/action/search/ParsedScrollId.java similarity index 95% rename from core/src/main/java/org/elasticsearch/action/search/type/ParsedScrollId.java rename to core/src/main/java/org/elasticsearch/action/search/ParsedScrollId.java index c98adecc3a9..ee0ba95b2b1 100644 --- a/core/src/main/java/org/elasticsearch/action/search/type/ParsedScrollId.java +++ b/core/src/main/java/org/elasticsearch/action/search/ParsedScrollId.java @@ -17,14 +17,14 @@ * under the License. */ -package org.elasticsearch.action.search.type; +package org.elasticsearch.action.search; import java.util.Map; /** * */ -public class ParsedScrollId { +class ParsedScrollId { public static final String QUERY_THEN_FETCH_TYPE = "queryThenFetch"; diff --git a/core/src/main/java/org/elasticsearch/action/search/type/ScrollIdForNode.java b/core/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java similarity index 93% rename from core/src/main/java/org/elasticsearch/action/search/type/ScrollIdForNode.java rename to core/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java index 38c79c91519..488132fdda2 100644 --- a/core/src/main/java/org/elasticsearch/action/search/type/ScrollIdForNode.java +++ b/core/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java @@ -17,9 +17,9 @@ * under the License. */ -package org.elasticsearch.action.search.type; +package org.elasticsearch.action.search; -public class ScrollIdForNode { +class ScrollIdForNode { private final String node; private final long scrollId; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java new file mode 100644 index 00000000000..b04b18f735b --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java @@ -0,0 +1,142 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.dfs.AggregatedDfs; +import org.elasticsearch.search.dfs.DfsSearchResult; +import org.elasticsearch.search.fetch.QueryFetchSearchResult; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.ShardSearchTransportRequest; +import org.elasticsearch.search.query.QuerySearchRequest; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction { + + private final AtomicArray queryFetchResults; + + SearchDfsQueryAndFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, + ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, + SearchPhaseController searchPhaseController, ThreadPool threadPool, + SearchRequest request, ActionListener listener) { + super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); + queryFetchResults = new AtomicArray<>(firstResults.length()); + } + + @Override + protected String firstPhaseName() { + return "dfs"; + } + + @Override + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, + ActionListener listener) { + searchService.sendExecuteDfs(node, request, listener); + } + + @Override + protected void moveToSecondPhase() { + final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); + final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); + + for (final AtomicArray.Entry entry : firstResults.asList()) { + DfsSearchResult dfsResult = entry.value; + DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); + QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); + executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); + } + } + + void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, + final DiscoveryNode node, final QuerySearchRequest querySearchRequest) { + searchService.sendExecuteFetch(node, querySearchRequest, new ActionListener() { + @Override + public void onResponse(QueryFetchSearchResult result) { + result.shardTarget(dfsResult.shardTarget()); + queryFetchResults.set(shardIndex, result); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + @Override + public void onFailure(Throwable t) { + try { + onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter); + } finally { + // the query might not have been executed at all (for example because thread pool rejected execution) + // and the search context that was created in dfs phase might not be released. + // release it again to be in the safe side + sendReleaseSearchContext(querySearchRequest.id(), node); + } + } + }); + } + + void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, + AtomicInteger counter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); + } + this.addShardFailure(shardIndex, dfsResult.shardTarget(), t); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + private void finishHim() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, + queryFetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); + } + + @Override + public void onFailure(Throwable t) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", t, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(t); + } + }); + + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java new file mode 100644 index 00000000000..76337334caa --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -0,0 +1,223 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import com.carrotsearch.hppc.IntArrayList; +import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.dfs.AggregatedDfs; +import org.elasticsearch.search.dfs.DfsSearchResult; +import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.fetch.ShardFetchSearchRequest; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.ShardSearchTransportRequest; +import org.elasticsearch.search.query.QuerySearchRequest; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { + + final AtomicArray queryResults; + final AtomicArray fetchResults; + final AtomicArray docIdsToLoad; + + SearchDfsQueryThenFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, + ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, + SearchPhaseController searchPhaseController, ThreadPool threadPool, + SearchRequest request, ActionListener listener) { + super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); + queryResults = new AtomicArray<>(firstResults.length()); + fetchResults = new AtomicArray<>(firstResults.length()); + docIdsToLoad = new AtomicArray<>(firstResults.length()); + } + + @Override + protected String firstPhaseName() { + return "dfs"; + } + + @Override + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, + ActionListener listener) { + searchService.sendExecuteDfs(node, request, listener); + } + + @Override + protected void moveToSecondPhase() { + final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); + final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); + for (final AtomicArray.Entry entry : firstResults.asList()) { + DfsSearchResult dfsResult = entry.value; + DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); + QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); + executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); + } + } + + void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, + final QuerySearchRequest querySearchRequest, final DiscoveryNode node) { + searchService.sendExecuteQuery(node, querySearchRequest, new ActionListener() { + @Override + public void onResponse(QuerySearchResult result) { + result.shardTarget(dfsResult.shardTarget()); + queryResults.set(shardIndex, result); + if (counter.decrementAndGet() == 0) { + executeFetchPhase(); + } + } + + @Override + public void onFailure(Throwable t) { + try { + onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter); + } finally { + // the query might not have been executed at all (for example because thread pool rejected + // execution) and the search context that was created in dfs phase might not be released. + // release it again to be in the safe side + sendReleaseSearchContext(querySearchRequest.id(), node); + } + } + }); + } + + void onQueryFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, + AtomicInteger counter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); + } + this.addShardFailure(shardIndex, dfsResult.shardTarget(), t); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + if (successfulOps.get() == 0) { + listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures())); + } else { + executeFetchPhase(); + } + } + } + + void executeFetchPhase() { + try { + innerExecuteFetchPhase(); + } catch (Throwable e) { + listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures())); + } + } + + void innerExecuteFetchPhase() throws Exception { + boolean useScroll = request.scroll() != null; + sortedShardList = searchPhaseController.sortDocs(useScroll, queryResults); + searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); + + if (docIdsToLoad.asList().isEmpty()) { + finishHim(); + return; + } + + final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard( + request, sortedShardList, firstResults.length() + ); + final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); + for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { + QuerySearchResult queryResult = queryResults.get(entry.index); + DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); + ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); + } + } + + void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, + final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { + searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { + @Override + public void onResponse(FetchSearchResult result) { + result.shardTarget(shardTarget); + fetchResults.set(shardIndex, result); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + @Override + public void onFailure(Throwable t) { + // the search context might not be cleared on the node where the fetch was executed for example + // because the action was rejected by the thread pool. in this case we need to send a dedicated + // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared + // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. + docIdsToLoad.set(shardIndex, null); + onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); + } + }); + } + + void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, + SearchShardTarget shardTarget, AtomicInteger counter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); + } + this.addShardFailure(shardIndex, shardTarget, t); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + private void finishHim() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, + fetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); + releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); + } + + @Override + public void onFailure(Throwable t) { + try { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(failure); + } finally { + releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); + } + } + }); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java new file mode 100644 index 00000000000..5187e77f0e7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java @@ -0,0 +1,84 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.fetch.QueryFetchSearchResult; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.ShardSearchTransportRequest; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; + +class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction { + + SearchQueryAndFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, + ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, + SearchPhaseController searchPhaseController, ThreadPool threadPool, + SearchRequest request, ActionListener listener) { + super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); + } + + @Override + protected String firstPhaseName() { + return "query_fetch"; + } + + @Override + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, + ActionListener listener) { + searchService.sendExecuteFetch(node, request, listener); + } + + @Override + protected void moveToSecondPhase() throws Exception { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + boolean useScroll = request.scroll() != null; + sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, + firstResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); + } + + @Override + public void onFailure(Throwable t) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(failure); + } + }); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java new file mode 100644 index 00000000000..84f93590f23 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -0,0 +1,157 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import com.carrotsearch.hppc.IntArrayList; +import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.fetch.ShardFetchSearchRequest; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.ShardSearchTransportRequest; +import org.elasticsearch.search.query.QuerySearchResultProvider; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { + + final AtomicArray fetchResults; + final AtomicArray docIdsToLoad; + + SearchQueryThenFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, + ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, + SearchPhaseController searchPhaseController, ThreadPool threadPool, + SearchRequest request, ActionListener listener) { + super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); + fetchResults = new AtomicArray<>(firstResults.length()); + docIdsToLoad = new AtomicArray<>(firstResults.length()); + } + + @Override + protected String firstPhaseName() { + return "query"; + } + + @Override + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, + ActionListener listener) { + searchService.sendExecuteQuery(node, request, listener); + } + + @Override + protected void moveToSecondPhase() throws Exception { + boolean useScroll = request.scroll() != null; + sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); + searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); + + if (docIdsToLoad.asList().isEmpty()) { + finishHim(); + return; + } + + final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard( + request, sortedShardList, firstResults.length() + ); + final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); + for (AtomicArray.Entry entry : docIdsToLoad.asList()) { + QuerySearchResultProvider queryResult = firstResults.get(entry.index); + DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); + ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); + } + } + + void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, + final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { + searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { + @Override + public void onResponse(FetchSearchResult result) { + result.shardTarget(shardTarget); + fetchResults.set(shardIndex, result); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + @Override + public void onFailure(Throwable t) { + // the search context might not be cleared on the node where the fetch was executed for example + // because the action was rejected by the thread pool. in this case we need to send a dedicated + // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared + // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. + docIdsToLoad.set(shardIndex, null); + onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); + } + }); + } + + void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, + AtomicInteger counter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); + } + this.addShardFailure(shardIndex, shardTarget, t); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + private void finishHim() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, + fetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, + successfulOps.get(), buildTookInMillis(), buildShardFailures())); + releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); + } + + @Override + public void onFailure(Throwable t) { + try { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", t, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(failure); + } finally { + releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); + } + } + }); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java new file mode 100644 index 00000000000..e8fe59cc447 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java @@ -0,0 +1,181 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.fetch.QueryFetchSearchResult; +import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; +import org.elasticsearch.search.internal.InternalScrollSearchRequest; +import org.elasticsearch.search.internal.InternalSearchResponse; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest; + +class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { + + private final ESLogger logger; + private final SearchPhaseController searchPhaseController; + private final SearchServiceTransportAction searchService; + private final SearchScrollRequest request; + private final ActionListener listener; + private final ParsedScrollId scrollId; + private final DiscoveryNodes nodes; + private volatile AtomicArray shardFailures; + private final AtomicArray queryFetchResults; + private final AtomicInteger successfulOps; + private final AtomicInteger counter; + + SearchScrollQueryAndFetchAsyncAction(ESLogger logger, ClusterService clusterService, + SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, + SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { + this.logger = logger; + this.searchPhaseController = searchPhaseController; + this.searchService = searchService; + this.request = request; + this.listener = listener; + this.scrollId = scrollId; + this.nodes = clusterService.state().nodes(); + this.successfulOps = new AtomicInteger(scrollId.getContext().length); + this.counter = new AtomicInteger(scrollId.getContext().length); + + this.queryFetchResults = new AtomicArray<>(scrollId.getContext().length); + } + + protected final ShardSearchFailure[] buildShardFailures() { + if (shardFailures == null) { + return ShardSearchFailure.EMPTY_ARRAY; + } + List> entries = shardFailures.asList(); + ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; + for (int i = 0; i < failures.length; i++) { + failures[i] = entries.get(i).value; + } + return failures; + } + + // we do our best to return the shard failures, but its ok if its not fully concurrently safe + // we simply try and return as much as possible + protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) { + if (shardFailures == null) { + shardFailures = new AtomicArray<>(scrollId.getContext().length); + } + shardFailures.set(shardIndex, failure); + } + + public void start() { + if (scrollId.getContext().length == 0) { + listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY)); + return; + } + + ScrollIdForNode[] context = scrollId.getContext(); + for (int i = 0; i < context.length; i++) { + ScrollIdForNode target = context[i]; + DiscoveryNode node = nodes.get(target.getNode()); + if (node != null) { + executePhase(i, node, target.getScrollId()); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]"); + } + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + } + + for (ScrollIdForNode target : scrollId.getContext()) { + DiscoveryNode node = nodes.get(target.getNode()); + if (node == null) { + if (logger.isDebugEnabled()) { + logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]"); + } + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + } + } + + void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { + InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); + searchService.sendExecuteFetch(node, internalRequest, new ActionListener() { + @Override + public void onResponse(ScrollQueryFetchSearchResult result) { + queryFetchResults.set(shardIndex, result.result()); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + @Override + public void onFailure(Throwable t) { + onPhaseFailure(t, searchId, shardIndex); + } + }); + } + + private void onPhaseFailure(Throwable t, long searchId, int shardIndex) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute query phase", t, searchId); + } + addShardFailure(shardIndex, new ShardSearchFailure(t)); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + if (successfulOps.get() == 0) { + listener.onFailure(new SearchPhaseExecutionException("query_fetch", "all shards failed", t, buildShardFailures())); + } else { + finishHim(); + } + } + } + + private void finishHim() { + try { + innerFinishHim(); + } catch (Throwable e) { + listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures())); + } + } + + private void innerFinishHim() throws Exception { + ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, + queryFetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = request.scrollId(); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java new file mode 100644 index 00000000000..0efff74524d --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -0,0 +1,226 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import com.carrotsearch.hppc.IntArrayList; +import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.fetch.ShardFetchRequest; +import org.elasticsearch.search.internal.InternalScrollSearchRequest; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.query.ScrollQuerySearchResult; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest; + +class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { + + private final ESLogger logger; + private final SearchServiceTransportAction searchService; + private final SearchPhaseController searchPhaseController; + private final SearchScrollRequest request; + private final ActionListener listener; + private final ParsedScrollId scrollId; + private final DiscoveryNodes nodes; + private volatile AtomicArray shardFailures; + final AtomicArray queryResults; + final AtomicArray fetchResults; + private volatile ScoreDoc[] sortedShardList; + private final AtomicInteger successfulOps; + + SearchScrollQueryThenFetchAsyncAction(ESLogger logger, ClusterService clusterService, + SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, + SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { + this.logger = logger; + this.searchService = searchService; + this.searchPhaseController = searchPhaseController; + this.request = request; + this.listener = listener; + this.scrollId = scrollId; + this.nodes = clusterService.state().nodes(); + this.successfulOps = new AtomicInteger(scrollId.getContext().length); + this.queryResults = new AtomicArray<>(scrollId.getContext().length); + this.fetchResults = new AtomicArray<>(scrollId.getContext().length); + } + + protected final ShardSearchFailure[] buildShardFailures() { + if (shardFailures == null) { + return ShardSearchFailure.EMPTY_ARRAY; + } + List> entries = shardFailures.asList(); + ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; + for (int i = 0; i < failures.length; i++) { + failures[i] = entries.get(i).value; + } + return failures; + } + + // we do our best to return the shard failures, but its ok if its not fully concurrently safe + // we simply try and return as much as possible + protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) { + if (shardFailures == null) { + shardFailures = new AtomicArray<>(scrollId.getContext().length); + } + shardFailures.set(shardIndex, failure); + } + + public void start() { + if (scrollId.getContext().length == 0) { + listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY)); + return; + } + final AtomicInteger counter = new AtomicInteger(scrollId.getContext().length); + + ScrollIdForNode[] context = scrollId.getContext(); + for (int i = 0; i < context.length; i++) { + ScrollIdForNode target = context[i]; + DiscoveryNode node = nodes.get(target.getNode()); + if (node != null) { + executeQueryPhase(i, counter, node, target.getScrollId()); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]"); + } + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + try { + executeFetchPhase(); + } catch (Throwable e) { + listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY)); + return; + } + } + } + } + } + + private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) { + InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); + searchService.sendExecuteQuery(node, internalRequest, new ActionListener() { + @Override + public void onResponse(ScrollQuerySearchResult result) { + queryResults.set(shardIndex, result.queryResult()); + if (counter.decrementAndGet() == 0) { + try { + executeFetchPhase(); + } catch (Throwable e) { + onFailure(e); + } + } + } + + @Override + public void onFailure(Throwable t) { + onQueryPhaseFailure(shardIndex, counter, searchId, t); + } + }); + } + + void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Throwable t) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute query phase", t, searchId); + } + addShardFailure(shardIndex, new ShardSearchFailure(t)); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + if (successfulOps.get() == 0) { + listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", t, buildShardFailures())); + } else { + try { + executeFetchPhase(); + } catch (Throwable e) { + listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY)); + } + } + } + } + + private void executeFetchPhase() throws Exception { + sortedShardList = searchPhaseController.sortDocs(true, queryResults); + AtomicArray docIdsToLoad = new AtomicArray<>(queryResults.length()); + searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); + + if (docIdsToLoad.asList().isEmpty()) { + finishHim(); + return; + } + + + final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length()); + final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); + for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { + IntArrayList docIds = entry.value; + final QuerySearchResult querySearchResult = queryResults.get(entry.index); + ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; + ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc); + DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId()); + searchService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener() { + @Override + public void onResponse(FetchSearchResult result) { + result.shardTarget(querySearchResult.shardTarget()); + fetchResults.set(entry.index, result); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + @Override + public void onFailure(Throwable t) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to execute fetch phase", t); + } + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + }); + } + } + + private void finishHim() { + try { + innerFinishHim(); + } catch (Throwable e) { + listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures())); + } + } + + private void innerFinishHim() { + InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = request.scrollId(); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java index 8786480c254..a43f9302f3a 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java @@ -20,7 +20,6 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.type.ScrollIdForNode; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterService; @@ -41,7 +40,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static org.elasticsearch.action.search.type.TransportSearchHelper.parseScrollId; +import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId; /** */ diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 07584772f66..8e08350b694 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -20,10 +20,6 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.type.TransportSearchDfsQueryAndFetchAction; -import org.elasticsearch.action.search.type.TransportSearchDfsQueryThenFetchAction; -import org.elasticsearch.action.search.type.TransportSearchQueryAndFetchAction; -import org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterService; @@ -33,13 +29,14 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.util.Map; import java.util.Set; -import static org.elasticsearch.action.search.SearchType.DFS_QUERY_THEN_FETCH; import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH; /** @@ -48,25 +45,18 @@ import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH; public class TransportSearchAction extends HandledTransportAction { private final ClusterService clusterService; - private final TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction; - private final TransportSearchQueryThenFetchAction queryThenFetchAction; - private final TransportSearchDfsQueryAndFetchAction dfsQueryAndFetchAction; - private final TransportSearchQueryAndFetchAction queryAndFetchAction; + private final SearchServiceTransportAction searchService; + private final SearchPhaseController searchPhaseController; @Inject - public TransportSearchAction(Settings settings, ThreadPool threadPool, - TransportService transportService, ClusterService clusterService, - TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction, - TransportSearchQueryThenFetchAction queryThenFetchAction, - TransportSearchDfsQueryAndFetchAction dfsQueryAndFetchAction, - TransportSearchQueryAndFetchAction queryAndFetchAction, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + public TransportSearchAction(Settings settings, ThreadPool threadPool, SearchPhaseController searchPhaseController, + TransportService transportService, SearchServiceTransportAction searchService, + ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver + indexNameExpressionResolver) { super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new); + this.searchPhaseController = searchPhaseController; + this.searchService = searchService; this.clusterService = clusterService; - this.dfsQueryThenFetchAction = dfsQueryThenFetchAction; - this.queryThenFetchAction = queryThenFetchAction; - this.dfsQueryAndFetchAction = dfsQueryAndFetchAction; - this.queryAndFetchAction = queryAndFetchAction; } @Override @@ -75,7 +65,8 @@ public class TransportSearchAction extends HandledTransportAction> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); + Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, + searchRequest.routing(), searchRequest.indices()); int shardCount = clusterService.operationRouting().searchShardsCount(clusterState, concreteIndices, routingMap); if (shardCount == 1) { // if we only have one group, then we always want Q_A_F, no need for DFS, and no need to do THEN since we hit one shard @@ -86,16 +77,28 @@ public class TransportSearchAction extends HandledTransportAction searchPhaseResults, @Nullable Map attributes) throws IOException { + static String buildScrollId(SearchType searchType, AtomicArray searchPhaseResults, + @Nullable Map attributes) throws IOException { if (searchType == SearchType.DFS_QUERY_THEN_FETCH || searchType == SearchType.QUERY_THEN_FETCH) { return buildScrollId(ParsedScrollId.QUERY_THEN_FETCH_TYPE, searchPhaseResults, attributes); } else if (searchType == SearchType.QUERY_AND_FETCH || searchType == SearchType.DFS_QUERY_AND_FETCH) { @@ -62,7 +61,8 @@ public abstract class TransportSearchHelper { } } - public static String buildScrollId(String type, AtomicArray searchPhaseResults, @Nullable Map attributes) throws IOException { + static String buildScrollId(String type, AtomicArray searchPhaseResults, + @Nullable Map attributes) throws IOException { StringBuilder sb = new StringBuilder().append(type).append(';'); sb.append(searchPhaseResults.asList().size()).append(';'); for (AtomicArray.Entry entry : searchPhaseResults.asList()) { @@ -81,7 +81,7 @@ public abstract class TransportSearchHelper { return Base64.encodeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length, Base64.URL_SAFE); } - public static ParsedScrollId parseScrollId(String scrollId) { + static ParsedScrollId parseScrollId(String scrollId) { CharsRefBuilder spare = new CharsRefBuilder(); try { byte[] decode = Base64.decode(scrollId, Base64.URL_SAFE); @@ -128,5 +128,4 @@ public abstract class TransportSearchHelper { private TransportSearchHelper() { } - } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java index 445dec114ee..5f4be211eb7 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java @@ -20,51 +20,60 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.type.ParsedScrollId; -import org.elasticsearch.action.search.type.TransportSearchScrollQueryAndFetchAction; -import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import static org.elasticsearch.action.search.type.ParsedScrollId.QUERY_AND_FETCH_TYPE; -import static org.elasticsearch.action.search.type.ParsedScrollId.QUERY_THEN_FETCH_TYPE; -import static org.elasticsearch.action.search.type.TransportSearchHelper.parseScrollId; +import static org.elasticsearch.action.search.ParsedScrollId.QUERY_AND_FETCH_TYPE; +import static org.elasticsearch.action.search.ParsedScrollId.QUERY_THEN_FETCH_TYPE; +import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId; /** * */ public class TransportSearchScrollAction extends HandledTransportAction { - private final TransportSearchScrollQueryThenFetchAction queryThenFetchAction; - private final TransportSearchScrollQueryAndFetchAction queryAndFetchAction; + private final ClusterService clusterService; + private final SearchServiceTransportAction searchService; + private final SearchPhaseController searchPhaseController; @Inject public TransportSearchScrollAction(Settings settings, ThreadPool threadPool, TransportService transportService, - TransportSearchScrollQueryThenFetchAction queryThenFetchAction, - TransportSearchScrollQueryAndFetchAction queryAndFetchAction, - ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchScrollRequest::new); - this.queryThenFetchAction = queryThenFetchAction; - this.queryAndFetchAction = queryAndFetchAction; + ClusterService clusterService, SearchServiceTransportAction searchService, + SearchPhaseController searchPhaseController, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, + SearchScrollRequest::new); + this.clusterService = clusterService; + this.searchService = searchService; + this.searchPhaseController = searchPhaseController; } @Override protected void doExecute(SearchScrollRequest request, ActionListener listener) { try { ParsedScrollId scrollId = parseScrollId(request.scrollId()); - if (scrollId.getType().equals(QUERY_THEN_FETCH_TYPE)) { - queryThenFetchAction.execute(request, scrollId, listener); - } else if (scrollId.getType().equals(QUERY_AND_FETCH_TYPE)) { - queryAndFetchAction.execute(request, scrollId, listener); - } else { - throw new IllegalArgumentException("Scroll id type [" + scrollId.getType() + "] unrecognized"); + AbstractAsyncAction action; + switch (scrollId.getType()) { + case QUERY_THEN_FETCH_TYPE: + action = new SearchScrollQueryThenFetchAsyncAction(logger, clusterService, searchService, + searchPhaseController, request, scrollId, listener); + break; + case QUERY_AND_FETCH_TYPE: + action = new SearchScrollQueryAndFetchAsyncAction(logger, clusterService, searchService, + searchPhaseController, request, scrollId, listener); + break; + default: + throw new IllegalArgumentException("Scroll id type [" + scrollId.getType() + "] unrecognized"); } + action.start(); } catch (Throwable e) { listener.onFailure(e); } diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java deleted file mode 100644 index 6d22264815b..00000000000 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.dfs.AggregatedDfs; -import org.elasticsearch.search.dfs.DfsSearchResult; -import org.elasticsearch.search.fetch.QueryFetchSearchResult; -import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.internal.ShardSearchTransportRequest; -import org.elasticsearch.search.query.QuerySearchRequest; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * - */ -public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAction { - - @Inject - public TransportSearchDfsQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, threadPool, clusterService, searchService, searchPhaseController, actionFilters, indexNameExpressionResolver); - } - - @Override - protected void doExecute(SearchRequest searchRequest, ActionListener listener) { - new AsyncAction(searchRequest, listener).start(); - } - - private class AsyncAction extends BaseAsyncAction { - - private final AtomicArray queryFetchResults; - - private AsyncAction(SearchRequest request, ActionListener listener) { - super(request, listener); - queryFetchResults = new AtomicArray<>(firstResults.length()); - } - - @Override - protected String firstPhaseName() { - return "dfs"; - } - - @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { - searchService.sendExecuteDfs(node, request, listener); - } - - @Override - protected void moveToSecondPhase() { - final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); - final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); - - for (final AtomicArray.Entry entry : firstResults.asList()) { - DfsSearchResult dfsResult = entry.value; - DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - } - - void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final DiscoveryNode node, final QuerySearchRequest querySearchRequest) { - searchService.sendExecuteFetch(node, querySearchRequest, new ActionListener() { - @Override - public void onResponse(QueryFetchSearchResult result) { - result.shardTarget(dfsResult.shardTarget()); - queryFetchResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - @Override - public void onFailure(Throwable t) { - try { - onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter); - } finally { - // the query might not have been executed at all (for example because thread pool rejected execution) - // and the search context that was created in dfs phase might not be released. - // release it again to be in the safe side - sendReleaseSearchContext(querySearchRequest.id(), node); - } - } - }); - } - - void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, AtomicInteger counter) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); - } - this.addShardFailure(shardIndex, dfsResult.shardTarget(), t); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - private void finishHim() { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, - queryFetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); - } - - @Override - public void onFailure(Throwable t) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", t, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(t); - } - }); - - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java deleted file mode 100644 index 31128cea961..00000000000 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import com.carrotsearch.hppc.IntArrayList; -import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.dfs.AggregatedDfs; -import org.elasticsearch.search.dfs.DfsSearchResult; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.fetch.ShardFetchSearchRequest; -import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.internal.ShardSearchTransportRequest; -import org.elasticsearch.search.query.QuerySearchRequest; -import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * - */ -public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeAction { - - @Inject - public TransportSearchDfsQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, threadPool, clusterService, searchService, searchPhaseController, actionFilters, indexNameExpressionResolver); - } - - @Override - protected void doExecute(SearchRequest searchRequest, ActionListener listener) { - new AsyncAction(searchRequest, listener).start(); - } - - private class AsyncAction extends BaseAsyncAction { - - final AtomicArray queryResults; - final AtomicArray fetchResults; - final AtomicArray docIdsToLoad; - - private AsyncAction(SearchRequest request, ActionListener listener) { - super(request, listener); - queryResults = new AtomicArray<>(firstResults.length()); - fetchResults = new AtomicArray<>(firstResults.length()); - docIdsToLoad = new AtomicArray<>(firstResults.length()); - } - - @Override - protected String firstPhaseName() { - return "dfs"; - } - - @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { - searchService.sendExecuteDfs(node, request, listener); - } - - @Override - protected void moveToSecondPhase() { - final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); - final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); - for (final AtomicArray.Entry entry : firstResults.asList()) { - DfsSearchResult dfsResult = entry.value; - DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - } - - void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, final DiscoveryNode node) { - searchService.sendExecuteQuery(node, querySearchRequest, new ActionListener() { - @Override - public void onResponse(QuerySearchResult result) { - result.shardTarget(dfsResult.shardTarget()); - queryResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - executeFetchPhase(); - } - } - - @Override - public void onFailure(Throwable t) { - try { - onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter); - } finally { - // the query might not have been executed at all (for example because thread pool rejected execution) - // and the search context that was created in dfs phase might not be released. - // release it again to be in the safe side - sendReleaseSearchContext(querySearchRequest.id(), node); - } - } - }); - } - - void onQueryFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, AtomicInteger counter) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); - } - this.addShardFailure(shardIndex, dfsResult.shardTarget(), t); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - if (successfulOps.get() == 0) { - listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures())); - } else { - executeFetchPhase(); - } - } - } - - void executeFetchPhase() { - try { - innerExecuteFetchPhase(); - } catch (Throwable e) { - listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures())); - } - } - - void innerExecuteFetchPhase() throws Exception { - boolean useScroll = request.scroll() != null; - sortedShardList = searchPhaseController.sortDocs(useScroll, queryResults); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); - - if (docIdsToLoad.asList().isEmpty()) { - finishHim(); - return; - } - - final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard( - request, sortedShardList, firstResults.length() - ); - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResult queryResult = queryResults.get(entry.index); - DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - - void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { - searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { - @Override - public void onResponse(FetchSearchResult result) { - result.shardTarget(shardTarget); - fetchResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - @Override - public void onFailure(Throwable t) { - // the search context might not be cleared on the node where the fetch was executed for example - // because the action was rejected by the thread pool. in this case we need to send a dedicated - // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared - // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. - docIdsToLoad.set(shardIndex, null); - onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); - } - }); - } - - void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); - } - this.addShardFailure(shardIndex, shardTarget, t); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - private void finishHim() { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, - fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); - } - - @Override - public void onFailure(Throwable t) { - try { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(failure); - } finally { - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); - } - } - }); - - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java deleted file mode 100644 index 0e1e8db5519..00000000000 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.fetch.QueryFetchSearchResult; -import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.internal.ShardSearchTransportRequest; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.IOException; - -import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId; - -/** - * - */ -public class TransportSearchQueryAndFetchAction extends TransportSearchTypeAction { - - @Inject - public TransportSearchQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, threadPool, clusterService, searchService, searchPhaseController, actionFilters, indexNameExpressionResolver); - } - - @Override - protected void doExecute(SearchRequest searchRequest, ActionListener listener) { - new AsyncAction(searchRequest, listener).start(); - } - - private class AsyncAction extends BaseAsyncAction { - - private AsyncAction(SearchRequest request, ActionListener listener) { - super(request, listener); - } - - @Override - protected String firstPhaseName() { - return "query_fetch"; - } - - @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { - searchService.sendExecuteFetch(node, request, listener); - } - - @Override - protected void moveToSecondPhase() throws Exception { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - boolean useScroll = request.scroll() != null; - sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, - firstResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = buildScrollId(request.searchType(), firstResults, null); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); - } - - @Override - public void onFailure(Throwable t) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(failure); - } - }); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java deleted file mode 100644 index c63287d9956..00000000000 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import com.carrotsearch.hppc.IntArrayList; -import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.fetch.ShardFetchSearchRequest; -import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.internal.ShardSearchTransportRequest; -import org.elasticsearch.search.query.QuerySearchResultProvider; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * - */ -public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction { - - @Inject - public TransportSearchQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, threadPool, clusterService, searchService, searchPhaseController, actionFilters, indexNameExpressionResolver); - } - - @Override - protected void doExecute(SearchRequest searchRequest, ActionListener listener) { - new AsyncAction(searchRequest, listener).start(); - } - - private class AsyncAction extends BaseAsyncAction { - - final AtomicArray fetchResults; - final AtomicArray docIdsToLoad; - - private AsyncAction(SearchRequest request, ActionListener listener) { - super(request, listener); - fetchResults = new AtomicArray<>(firstResults.length()); - docIdsToLoad = new AtomicArray<>(firstResults.length()); - } - - @Override - protected String firstPhaseName() { - return "query"; - } - - @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { - searchService.sendExecuteQuery(node, request, listener); - } - - @Override - protected void moveToSecondPhase() throws Exception { - boolean useScroll = request.scroll() != null; - sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); - - if (docIdsToLoad.asList().isEmpty()) { - finishHim(); - return; - } - - final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard( - request, sortedShardList, firstResults.length() - ); - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - for (AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResultProvider queryResult = firstResults.get(entry.index); - DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - - void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { - searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { - @Override - public void onResponse(FetchSearchResult result) { - result.shardTarget(shardTarget); - fetchResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - @Override - public void onFailure(Throwable t) { - // the search context might not be cleared on the node where the fetch was executed for example - // because the action was rejected by the thread pool. in this case we need to send a dedicated - // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared - // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. - docIdsToLoad.set(shardIndex, null); - onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); - } - }); - } - - void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); - } - this.addShardFailure(shardIndex, shardTarget, t); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - private void finishHim() { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, - fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); - releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); - } - - @Override - public void onFailure(Throwable t) { - try { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", t, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(failure); - } finally { - releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); - } - } - }); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java deleted file mode 100644 index b718baaa294..00000000000 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; -import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.fetch.QueryFetchSearchResult; -import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; -import org.elasticsearch.search.internal.InternalScrollSearchRequest; -import org.elasticsearch.search.internal.InternalSearchResponse; - -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; - -/** - * - */ -public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent { - - private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; - private final SearchPhaseController searchPhaseController; - - @Inject - public TransportSearchScrollQueryAndFetchAction(Settings settings, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { - super(settings); - this.clusterService = clusterService; - this.searchService = searchService; - this.searchPhaseController = searchPhaseController; - } - - public void execute(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { - new AsyncAction(request, scrollId, listener).start(); - } - - private class AsyncAction extends AbstractAsyncAction { - - private final SearchScrollRequest request; - private final ActionListener listener; - private final ParsedScrollId scrollId; - private final DiscoveryNodes nodes; - - private volatile AtomicArray shardFailures; - private final AtomicArray queryFetchResults; - - private final AtomicInteger successfulOps; - private final AtomicInteger counter; - - private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { - this.request = request; - this.listener = listener; - this.scrollId = scrollId; - this.nodes = clusterService.state().nodes(); - this.successfulOps = new AtomicInteger(scrollId.getContext().length); - this.counter = new AtomicInteger(scrollId.getContext().length); - - this.queryFetchResults = new AtomicArray<>(scrollId.getContext().length); - } - - protected final ShardSearchFailure[] buildShardFailures() { - if (shardFailures == null) { - return ShardSearchFailure.EMPTY_ARRAY; - } - List> entries = shardFailures.asList(); - ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; - for (int i = 0; i < failures.length; i++) { - failures[i] = entries.get(i).value; - } - return failures; - } - - // we do our best to return the shard failures, but its ok if its not fully concurrently safe - // we simply try and return as much as possible - protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) { - if (shardFailures == null) { - shardFailures = new AtomicArray<>(scrollId.getContext().length); - } - shardFailures.set(shardIndex, failure); - } - - public void start() { - if (scrollId.getContext().length == 0) { - listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY)); - return; - } - - ScrollIdForNode[] context = scrollId.getContext(); - for (int i = 0; i < context.length; i++) { - ScrollIdForNode target = context[i]; - DiscoveryNode node = nodes.get(target.getNode()); - if (node != null) { - executePhase(i, node, target.getScrollId()); - } else { - if (logger.isDebugEnabled()) { - logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]"); - } - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - } - - for (ScrollIdForNode target : scrollId.getContext()) { - DiscoveryNode node = nodes.get(target.getNode()); - if (node == null) { - if (logger.isDebugEnabled()) { - logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]"); - } - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - } - } - - void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { - InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); - searchService.sendExecuteFetch(node, internalRequest, new ActionListener() { - @Override - public void onResponse(ScrollQueryFetchSearchResult result) { - queryFetchResults.set(shardIndex, result.result()); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - @Override - public void onFailure(Throwable t) { - onPhaseFailure(t, searchId, shardIndex); - } - }); - } - - private void onPhaseFailure(Throwable t, long searchId, int shardIndex) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute query phase", t, searchId); - } - addShardFailure(shardIndex, new ShardSearchFailure(t)); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - if (successfulOps.get() == 0) { - listener.onFailure(new SearchPhaseExecutionException("query_fetch", "all shards failed", t, buildShardFailures())); - } else { - finishHim(); - } - } - } - - private void finishHim() { - try { - innerFinishHim(); - } catch (Throwable e) { - listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures())); - } - } - - private void innerFinishHim() throws Exception { - ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, - queryFetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = request.scrollId(); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(), - buildTookInMillis(), buildShardFailures())); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java deleted file mode 100644 index 93a28b29aa1..00000000000 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import com.carrotsearch.hppc.IntArrayList; -import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; -import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.fetch.ShardFetchRequest; -import org.elasticsearch.search.internal.InternalScrollSearchRequest; -import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.ScrollQuerySearchResult; - -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; - -/** - * - */ -public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent { - - private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; - private final SearchPhaseController searchPhaseController; - - @Inject - public TransportSearchScrollQueryThenFetchAction(Settings settings, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { - super(settings); - this.clusterService = clusterService; - this.searchService = searchService; - this.searchPhaseController = searchPhaseController; - } - - public void execute(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { - new AsyncAction(request, scrollId, listener).start(); - } - - private class AsyncAction extends AbstractAsyncAction { - - private final SearchScrollRequest request; - - private final ActionListener listener; - - private final ParsedScrollId scrollId; - - private final DiscoveryNodes nodes; - - private volatile AtomicArray shardFailures; - final AtomicArray queryResults; - final AtomicArray fetchResults; - - private volatile ScoreDoc[] sortedShardList; - - private final AtomicInteger successfulOps; - - private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { - this.request = request; - this.listener = listener; - this.scrollId = scrollId; - this.nodes = clusterService.state().nodes(); - this.successfulOps = new AtomicInteger(scrollId.getContext().length); - - this.queryResults = new AtomicArray<>(scrollId.getContext().length); - this.fetchResults = new AtomicArray<>(scrollId.getContext().length); - } - - protected final ShardSearchFailure[] buildShardFailures() { - if (shardFailures == null) { - return ShardSearchFailure.EMPTY_ARRAY; - } - List> entries = shardFailures.asList(); - ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; - for (int i = 0; i < failures.length; i++) { - failures[i] = entries.get(i).value; - } - return failures; - } - - // we do our best to return the shard failures, but its ok if its not fully concurrently safe - // we simply try and return as much as possible - protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) { - if (shardFailures == null) { - shardFailures = new AtomicArray<>(scrollId.getContext().length); - } - shardFailures.set(shardIndex, failure); - } - - public void start() { - if (scrollId.getContext().length == 0) { - listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY)); - return; - } - final AtomicInteger counter = new AtomicInteger(scrollId.getContext().length); - - ScrollIdForNode[] context = scrollId.getContext(); - for (int i = 0; i < context.length; i++) { - ScrollIdForNode target = context[i]; - DiscoveryNode node = nodes.get(target.getNode()); - if (node != null) { - executeQueryPhase(i, counter, node, target.getScrollId()); - } else { - if (logger.isDebugEnabled()) { - logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]"); - } - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - try { - executeFetchPhase(); - } catch (Throwable e) { - listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY)); - return; - } - } - } - } - } - - private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) { - InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); - searchService.sendExecuteQuery(node, internalRequest, new ActionListener() { - @Override - public void onResponse(ScrollQuerySearchResult result) { - queryResults.set(shardIndex, result.queryResult()); - if (counter.decrementAndGet() == 0) { - try { - executeFetchPhase(); - } catch (Throwable e) { - onFailure(e); - } - } - } - - @Override - public void onFailure(Throwable t) { - onQueryPhaseFailure(shardIndex, counter, searchId, t); - } - }); - } - - void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Throwable t) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute query phase", t, searchId); - } - addShardFailure(shardIndex, new ShardSearchFailure(t)); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - if (successfulOps.get() == 0) { - listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", t, buildShardFailures())); - } else { - try { - executeFetchPhase(); - } catch (Throwable e) { - listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY)); - } - } - } - } - - private void executeFetchPhase() throws Exception { - sortedShardList = searchPhaseController.sortDocs(true, queryResults); - AtomicArray docIdsToLoad = new AtomicArray<>(queryResults.length()); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); - - if (docIdsToLoad.asList().isEmpty()) { - finishHim(); - return; - } - - - final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length()); - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - IntArrayList docIds = entry.value; - final QuerySearchResult querySearchResult = queryResults.get(entry.index); - ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; - ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc); - DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId()); - searchService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener() { - @Override - public void onResponse(FetchSearchResult result) { - result.shardTarget(querySearchResult.shardTarget()); - fetchResults.set(entry.index, result); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - @Override - public void onFailure(Throwable t) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to execute fetch phase", t); - } - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - }); - } - } - - private void finishHim() { - try { - innerFinishHim(); - } catch (Throwable e) { - listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures())); - } - } - - private void innerFinishHim() { - InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = request.scrollId(); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(), - buildTookInMillis(), buildShardFailures())); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java deleted file mode 100644 index 042534a2e7b..00000000000 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ /dev/null @@ -1,406 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import com.carrotsearch.hppc.IntArrayList; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.TopDocs; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.NoShardAvailableActionException; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.TransportAction; -import org.elasticsearch.action.support.TransportActions; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.SearchPhaseResult; -import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.fetch.ShardFetchSearchRequest; -import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.internal.ShardSearchTransportRequest; -import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; -import org.elasticsearch.tasks.TaskManager; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.elasticsearch.action.search.type.TransportSearchHelper.internalSearchRequest; - -/** - * - */ -public abstract class TransportSearchTypeAction extends TransportAction { - - protected final ClusterService clusterService; - - protected final SearchServiceTransportAction searchService; - - protected final SearchPhaseController searchPhaseController; - - public TransportSearchTypeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, SearchAction.NAME, threadPool, actionFilters, indexNameExpressionResolver, clusterService.getTaskManager()); - this.clusterService = clusterService; - this.searchService = searchService; - this.searchPhaseController = searchPhaseController; - } - - protected abstract class BaseAsyncAction extends AbstractAsyncAction { - - protected final ActionListener listener; - - protected final GroupShardsIterator shardsIts; - - protected final SearchRequest request; - - protected final ClusterState clusterState; - protected final DiscoveryNodes nodes; - - protected final int expectedSuccessfulOps; - private final int expectedTotalOps; - - protected final AtomicInteger successfulOps = new AtomicInteger(); - private final AtomicInteger totalOps = new AtomicInteger(); - - protected final AtomicArray firstResults; - private volatile AtomicArray shardFailures; - private final Object shardFailuresMutex = new Object(); - protected volatile ScoreDoc[] sortedShardList; - - protected BaseAsyncAction(SearchRequest request, ActionListener listener) { - this.request = request; - this.listener = listener; - - this.clusterState = clusterService.state(); - nodes = clusterState.nodes(); - - clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); - - // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name - // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead - // of just for the _search api - String[] concreteIndices = indexNameExpressionResolver.concreteIndices(clusterState, request.indicesOptions(), startTime(), request.indices()); - - for (String index : concreteIndices) { - clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index); - } - - Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, request.routing(), request.indices()); - - shardsIts = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, request.preference()); - expectedSuccessfulOps = shardsIts.size(); - // we need to add 1 for non active partition, since we count it in the total! - expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); - - firstResults = new AtomicArray<>(shardsIts.size()); - } - - public void start() { - if (expectedSuccessfulOps == 0) { - // no search shards to search on, bail with empty response (it happens with search across _all with no indices around and consistent with broadcast operations) - listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, buildTookInMillis(), ShardSearchFailure.EMPTY_ARRAY)); - return; - } - int shardIndex = -1; - for (final ShardIterator shardIt : shardsIts) { - shardIndex++; - final ShardRouting shard = shardIt.nextOrNull(); - if (shard != null) { - performFirstPhase(shardIndex, shardIt, shard); - } else { - // really, no shards active in this group - onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); - } - } - } - - void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) { - if (shard == null) { - // no more active shards... (we should not really get here, but just for safety) - onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); - } else { - final DiscoveryNode node = nodes.get(shard.currentNodeId()); - if (node == null) { - onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); - } else { - String[] filteringAliases = indexNameExpressionResolver.filteringAliases(clusterState, shard.index().getName(), request.indices()); - sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime()), new ActionListener() { - @Override - public void onResponse(FirstResult result) { - onFirstPhaseResult(shardIndex, shard, result, shardIt); - } - - @Override - public void onFailure(Throwable t) { - onFirstPhaseResult(shardIndex, shard, node.id(), shardIt, t); - } - }); - } - } - } - - void onFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result, ShardIterator shardIt) { - result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id())); - processFirstPhaseResult(shardIndex, result); - // we need to increment successful ops first before we compare the exit condition otherwise if we - // are fast we could concurrently update totalOps but then preempt one of the threads which can - // cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc. - successfulOps.incrementAndGet(); - // increment all the "future" shards to update the total ops since we some may work and some may not... - // and when that happens, we break on total ops, so we must maintain them - final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1); - if (xTotalOps == expectedTotalOps) { - try { - innerMoveToSecondPhase(); - } catch (Throwable e) { - if (logger.isDebugEnabled()) { - logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "] while moving to second phase", e); - } - raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures())); - } - } else if (xTotalOps > expectedTotalOps) { - raiseEarlyFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]")); - } - } - - void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, final ShardIterator shardIt, Throwable t) { - // we always add the shard failure for a specific shard instance - // we do make sure to clean it on a successful response from a shard - SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId().getIndex(), shardIt.shardId().getId()); - addShardFailure(shardIndex, shardTarget, t); - - if (totalOps.incrementAndGet() == expectedTotalOps) { - if (logger.isDebugEnabled()) { - if (t != null && !TransportActions.isShardNotAvailableException(t)) { - if (shard != null) { - logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t); - } else { - logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t); - } - } else if (logger.isTraceEnabled()) { - logger.trace("{}: Failed to execute [{}]", t, shard, request); - } - } - final ShardSearchFailure[] shardSearchFailures = buildShardFailures(); - if (successfulOps.get() == 0) { - if (logger.isDebugEnabled()) { - logger.debug("All shards failed for phase: [{}]", t, firstPhaseName()); - } - - // no successful ops, raise an exception - raiseEarlyFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", t, shardSearchFailures)); - } else { - try { - innerMoveToSecondPhase(); - } catch (Throwable e) { - raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, shardSearchFailures)); - } - } - } else { - final ShardRouting nextShard = shardIt.nextOrNull(); - final boolean lastShard = nextShard == null; - // trace log this exception - if (logger.isTraceEnabled()) { - logger.trace(executionFailureMsg(shard, shardIt, request, lastShard), t); - } - if (!lastShard) { - try { - performFirstPhase(shardIndex, shardIt, nextShard); - } catch (Throwable t1) { - onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t1); - } - } else { - // no more shards active, add a failure - if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception - if (t != null && !TransportActions.isShardNotAvailableException(t)) { - logger.debug(executionFailureMsg(shard, shardIt, request, lastShard), t); - } - } - } - } - } - - private String executionFailureMsg(@Nullable ShardRouting shard, final ShardIterator shardIt, SearchRequest request, boolean lastShard) { - if (shard != null) { - return shard.shortSummary() + ": Failed to execute [" + request + "] lastShard [" + lastShard + "]"; - } else { - return shardIt.shardId() + ": Failed to execute [" + request + "] lastShard [" + lastShard + "]"; - } - } - - protected final ShardSearchFailure[] buildShardFailures() { - AtomicArray shardFailures = this.shardFailures; - if (shardFailures == null) { - return ShardSearchFailure.EMPTY_ARRAY; - } - List> entries = shardFailures.asList(); - ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; - for (int i = 0; i < failures.length; i++) { - failures[i] = entries.get(i).value; - } - return failures; - } - - protected final void addShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Throwable t) { - // we don't aggregate shard failures on non active shards (but do keep the header counts right) - if (TransportActions.isShardNotAvailableException(t)) { - return; - } - - // lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures) - if (shardFailures == null) { - synchronized (shardFailuresMutex) { - if (shardFailures == null) { - shardFailures = new AtomicArray<>(shardsIts.size()); - } - } - } - ShardSearchFailure failure = shardFailures.get(shardIndex); - if (failure == null) { - shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget)); - } else { - // the failure is already present, try and not override it with an exception that is less meaningless - // for example, getting illegal shard state - if (TransportActions.isReadOverrideException(t)) { - shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget)); - } - } - } - - private void raiseEarlyFailure(Throwable t) { - for (AtomicArray.Entry entry : firstResults.asList()) { - try { - DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId()); - sendReleaseSearchContext(entry.value.id(), node); - } catch (Throwable t1) { - logger.trace("failed to release context", t1); - } - } - listener.onFailure(t); - } - - /** - * Releases shard targets that are not used in the docsIdsToLoad. - */ - protected void releaseIrrelevantSearchContexts(AtomicArray queryResults, - AtomicArray docIdsToLoad) { - if (docIdsToLoad == null) { - return; - } - // we only release search context that we did not fetch from if we are not scrolling - if (request.scroll() == null) { - for (AtomicArray.Entry entry : queryResults.asList()) { - final TopDocs topDocs = entry.value.queryResult().queryResult().topDocs(); - if (topDocs != null && topDocs.scoreDocs.length > 0 // the shard had matches - && docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs - try { - DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId()); - sendReleaseSearchContext(entry.value.queryResult().id(), node); - } catch (Throwable t1) { - logger.trace("failed to release context", t1); - } - } - } - } - } - - protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) { - if (node != null) { - searchService.sendFreeContext(node, contextId, request); - } - } - - protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry entry, ScoreDoc[] lastEmittedDocPerShard) { - if (lastEmittedDocPerShard != null) { - ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; - return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc); - } else { - return new ShardFetchSearchRequest(request, queryResult.id(), entry.value); - } - } - - protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener); - - protected final void processFirstPhaseResult(int shardIndex, FirstResult result) { - firstResults.set(shardIndex, result); - - if (logger.isTraceEnabled()) { - logger.trace("got first-phase result from {}", result != null ? result.shardTarget() : null); - } - - // clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level - // so its ok concurrency wise to miss potentially the shard failures being created because of another failure - // in the #addShardFailure, because by definition, it will happen on *another* shardIndex - AtomicArray shardFailures = this.shardFailures; - if (shardFailures != null) { - shardFailures.set(shardIndex, null); - } - } - - final void innerMoveToSecondPhase() throws Exception { - if (logger.isTraceEnabled()) { - StringBuilder sb = new StringBuilder(); - boolean hadOne = false; - for (int i = 0; i < firstResults.length(); i++) { - FirstResult result = firstResults.get(i); - if (result == null) { - continue; // failure - } - if (hadOne) { - sb.append(","); - } else { - hadOne = true; - } - sb.append(result.shardTarget()); - } - - logger.trace("Moving to second phase, based on results from: {} (cluster state version: {})", sb, clusterState.version()); - } - moveToSecondPhase(); - } - - protected abstract void moveToSecondPhase() throws Exception; - - protected abstract String firstPhaseName(); - } -}