From 9738ef3a90bec148abbe55dec6f6183190fa4713 Mon Sep 17 00:00:00 2001 From: javanna Date: Sun, 21 Feb 2016 21:39:49 -0800 Subject: [PATCH] Cleanup search sub transport actions and collapse o.e.action.search.type package into o.e.action.search TransportSearchTypeAction and subclasses are not actually transport actions, but just support classes useful for their inner async actions that can easily be extracted out so that we get rid of one too many level of abstraction. Same pattern can be applied to TransportSearchScrollQueryAndFetchAction & TransportSearchScrollQueryThenFetchAction which we could remove in favour of keeping only their inner classes named SearchScrollQueryAndFetchAsyncAction and SearchScrollQueryThenFetchAsyncAction. Remove org.elasticsearch.action.search.type package, collapsed remaining classes into existing org.elasticsearch.action.search package Make also ParsedScrollId ScrollIdForNode and TransportSearchHelper classes and their methods package private. Closes #11710 --- .../resources/checkstyle_suppressions.xml | 10 - .../elasticsearch/action/ActionModule.java | 18 +- .../{type => }/AbstractAsyncAction.java | 5 +- .../search/AbstractSearchAsyncAction.java | 393 +++++++++++++++++ .../search/{type => }/ParsedScrollId.java | 4 +- .../search/{type => }/ScrollIdForNode.java | 4 +- .../SearchDfsQueryAndFetchAsyncAction.java | 142 ++++++ .../SearchDfsQueryThenFetchAsyncAction.java | 223 ++++++++++ .../SearchQueryAndFetchAsyncAction.java | 84 ++++ .../SearchQueryThenFetchAsyncAction.java | 157 +++++++ .../SearchScrollQueryAndFetchAsyncAction.java | 181 ++++++++ ...SearchScrollQueryThenFetchAsyncAction.java | 226 ++++++++++ .../search/TransportClearScrollAction.java | 3 +- .../action/search/TransportSearchAction.java | 65 +-- .../{type => }/TransportSearchHelper.java | 21 +- .../search/TransportSearchScrollAction.java | 51 ++- ...TransportSearchDfsQueryAndFetchAction.java | 158 ------- ...ransportSearchDfsQueryThenFetchAction.java | 239 ----------- .../TransportSearchQueryAndFetchAction.java | 104 ----- .../TransportSearchQueryThenFetchAction.java | 173 -------- ...nsportSearchScrollQueryAndFetchAction.java | 205 --------- ...sportSearchScrollQueryThenFetchAction.java | 255 ----------- .../type/TransportSearchTypeAction.java | 406 ------------------ 23 files changed, 1490 insertions(+), 1637 deletions(-) rename core/src/main/java/org/elasticsearch/action/search/{type => }/AbstractAsyncAction.java (93%) create mode 100644 core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java rename core/src/main/java/org/elasticsearch/action/search/{type => }/ParsedScrollId.java (95%) rename core/src/main/java/org/elasticsearch/action/search/{type => }/ScrollIdForNode.java (93%) create mode 100644 core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java create mode 100644 core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java create mode 100644 core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java create mode 100644 core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java create mode 100644 core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java create mode 100644 core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java rename core/src/main/java/org/elasticsearch/action/search/{type => }/TransportSearchHelper.java (83%) delete mode 100644 core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java delete mode 100644 core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java delete mode 100644 core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java delete mode 100644 core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java delete mode 100644 core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java delete mode 100644 core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java delete mode 100644 core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index d7ea1545fc9..6b7ad9499a5 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -207,16 +207,6 @@ - - - - - - - - - - diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java index cc7b34ff7bd..491202e7c7a 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java @@ -174,12 +174,6 @@ import org.elasticsearch.action.search.TransportClearScrollAction; import org.elasticsearch.action.search.TransportMultiSearchAction; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.search.TransportSearchScrollAction; -import org.elasticsearch.action.search.type.TransportSearchDfsQueryAndFetchAction; -import org.elasticsearch.action.search.type.TransportSearchDfsQueryThenFetchAction; -import org.elasticsearch.action.search.type.TransportSearchQueryAndFetchAction; -import org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction; -import org.elasticsearch.action.search.type.TransportSearchScrollQueryAndFetchAction; -import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchAction; import org.elasticsearch.action.suggest.SuggestAction; import org.elasticsearch.action.suggest.TransportSuggestAction; import org.elasticsearch.action.support.ActionFilter; @@ -333,16 +327,8 @@ public class ActionModule extends AbstractModule { TransportShardMultiGetAction.class); registerAction(BulkAction.INSTANCE, TransportBulkAction.class, TransportShardBulkAction.class); - registerAction(SearchAction.INSTANCE, TransportSearchAction.class, - TransportSearchDfsQueryThenFetchAction.class, - TransportSearchQueryThenFetchAction.class, - TransportSearchDfsQueryAndFetchAction.class, - TransportSearchQueryAndFetchAction.class - ); - registerAction(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class, - TransportSearchScrollQueryThenFetchAction.class, - TransportSearchScrollQueryAndFetchAction.class - ); + registerAction(SearchAction.INSTANCE, TransportSearchAction.class); + registerAction(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class); registerAction(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class); registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class); registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class); diff --git a/core/src/main/java/org/elasticsearch/action/search/type/AbstractAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractAsyncAction.java similarity index 93% rename from core/src/main/java/org/elasticsearch/action/search/type/AbstractAsyncAction.java rename to core/src/main/java/org/elasticsearch/action/search/AbstractAsyncAction.java index 95352ebe44b..3ce14d8dacd 100644 --- a/core/src/main/java/org/elasticsearch/action/search/type/AbstractAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractAsyncAction.java @@ -17,12 +17,12 @@ * under the License. */ -package org.elasticsearch.action.search.type; +package org.elasticsearch.action.search; /** * Base implementation for an async action. */ -public class AbstractAsyncAction { +abstract class AbstractAsyncAction { private final long startTime; @@ -46,4 +46,5 @@ public class AbstractAsyncAction { return Math.max(1, System.currentTimeMillis() - startTime); } + abstract void start(); } diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java new file mode 100644 index 00000000000..d4ae139ee0c --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -0,0 +1,393 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import com.carrotsearch.hppc.IntArrayList; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.fetch.ShardFetchSearchRequest; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.ShardSearchTransportRequest; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.query.QuerySearchResultProvider; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.action.search.TransportSearchHelper.internalSearchRequest; + +abstract class AbstractSearchAsyncAction extends AbstractAsyncAction { + + protected final ESLogger logger; + protected final SearchServiceTransportAction searchService; + private final IndexNameExpressionResolver indexNameExpressionResolver; + protected final SearchPhaseController searchPhaseController; + protected final ThreadPool threadPool; + protected final ActionListener listener; + protected final GroupShardsIterator shardsIts; + protected final SearchRequest request; + protected final ClusterState clusterState; + protected final DiscoveryNodes nodes; + protected final int expectedSuccessfulOps; + private final int expectedTotalOps; + protected final AtomicInteger successfulOps = new AtomicInteger(); + private final AtomicInteger totalOps = new AtomicInteger(); + protected final AtomicArray firstResults; + private volatile AtomicArray shardFailures; + private final Object shardFailuresMutex = new Object(); + protected volatile ScoreDoc[] sortedShardList; + + protected AbstractSearchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request, + ActionListener listener) { + this.logger = logger; + this.searchService = searchService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.searchPhaseController = searchPhaseController; + this.threadPool = threadPool; + this.request = request; + this.listener = listener; + + this.clusterState = clusterService.state(); + nodes = clusterState.nodes(); + + clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); + + // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name + // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead + // of just for the _search api + String[] concreteIndices = indexNameExpressionResolver.concreteIndices(clusterState, request.indicesOptions(), + startTime(), request.indices()); + + for (String index : concreteIndices) { + clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index); + } + + Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, request.routing(), + request.indices()); + + shardsIts = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, request.preference()); + expectedSuccessfulOps = shardsIts.size(); + // we need to add 1 for non active partition, since we count it in the total! + expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); + + firstResults = new AtomicArray<>(shardsIts.size()); + } + + public void start() { + if (expectedSuccessfulOps == 0) { + //no search shards to search on, bail with empty response + //(it happens with search across _all with no indices around and consistent with broadcast operations) + listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, buildTookInMillis(), + ShardSearchFailure.EMPTY_ARRAY)); + return; + } + int shardIndex = -1; + for (final ShardIterator shardIt : shardsIts) { + shardIndex++; + final ShardRouting shard = shardIt.nextOrNull(); + if (shard != null) { + performFirstPhase(shardIndex, shardIt, shard); + } else { + // really, no shards active in this group + onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); + } + } + } + + void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) { + if (shard == null) { + // no more active shards... (we should not really get here, but just for safety) + onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); + } else { + final DiscoveryNode node = nodes.get(shard.currentNodeId()); + if (node == null) { + onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); + } else { + String[] filteringAliases = indexNameExpressionResolver.filteringAliases(clusterState, + shard.index().getName(), request.indices()); + sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, + startTime()), new ActionListener() { + @Override + public void onResponse(FirstResult result) { + onFirstPhaseResult(shardIndex, shard, result, shardIt); + } + + @Override + public void onFailure(Throwable t) { + onFirstPhaseResult(shardIndex, shard, node.id(), shardIt, t); + } + }); + } + } + } + + void onFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result, ShardIterator shardIt) { + result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id())); + processFirstPhaseResult(shardIndex, result); + // we need to increment successful ops first before we compare the exit condition otherwise if we + // are fast we could concurrently update totalOps but then preempt one of the threads which can + // cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc. + successfulOps.incrementAndGet(); + // increment all the "future" shards to update the total ops since we some may work and some may not... + // and when that happens, we break on total ops, so we must maintain them + final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1); + if (xTotalOps == expectedTotalOps) { + try { + innerMoveToSecondPhase(); + } catch (Throwable e) { + if (logger.isDebugEnabled()) { + logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "] while moving to second phase", e); + } + raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures())); + } + } else if (xTotalOps > expectedTotalOps) { + raiseEarlyFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared " + + "to expected [" + expectedTotalOps + "]")); + } + } + + void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, + final ShardIterator shardIt, Throwable t) { + // we always add the shard failure for a specific shard instance + // we do make sure to clean it on a successful response from a shard + SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId().getIndex(), shardIt.shardId().getId()); + addShardFailure(shardIndex, shardTarget, t); + + if (totalOps.incrementAndGet() == expectedTotalOps) { + if (logger.isDebugEnabled()) { + if (t != null && !TransportActions.isShardNotAvailableException(t)) { + if (shard != null) { + logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t); + } else { + logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t); + } + } else if (logger.isTraceEnabled()) { + logger.trace("{}: Failed to execute [{}]", t, shard, request); + } + } + final ShardSearchFailure[] shardSearchFailures = buildShardFailures(); + if (successfulOps.get() == 0) { + if (logger.isDebugEnabled()) { + logger.debug("All shards failed for phase: [{}]", t, firstPhaseName()); + } + + // no successful ops, raise an exception + raiseEarlyFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", t, shardSearchFailures)); + } else { + try { + innerMoveToSecondPhase(); + } catch (Throwable e) { + raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, shardSearchFailures)); + } + } + } else { + final ShardRouting nextShard = shardIt.nextOrNull(); + final boolean lastShard = nextShard == null; + // trace log this exception + if (logger.isTraceEnabled()) { + logger.trace(executionFailureMsg(shard, shardIt, request, lastShard), t); + } + if (!lastShard) { + try { + performFirstPhase(shardIndex, shardIt, nextShard); + } catch (Throwable t1) { + onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t1); + } + } else { + // no more shards active, add a failure + if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception + if (t != null && !TransportActions.isShardNotAvailableException(t)) { + logger.debug(executionFailureMsg(shard, shardIt, request, lastShard), t); + } + } + } + } + } + + private String executionFailureMsg(@Nullable ShardRouting shard, final ShardIterator shardIt, SearchRequest request, + boolean lastShard) { + if (shard != null) { + return shard.shortSummary() + ": Failed to execute [" + request + "] lastShard [" + lastShard + "]"; + } else { + return shardIt.shardId() + ": Failed to execute [" + request + "] lastShard [" + lastShard + "]"; + } + } + + protected final ShardSearchFailure[] buildShardFailures() { + AtomicArray shardFailures = this.shardFailures; + if (shardFailures == null) { + return ShardSearchFailure.EMPTY_ARRAY; + } + List> entries = shardFailures.asList(); + ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; + for (int i = 0; i < failures.length; i++) { + failures[i] = entries.get(i).value; + } + return failures; + } + + protected final void addShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Throwable t) { + // we don't aggregate shard failures on non active shards (but do keep the header counts right) + if (TransportActions.isShardNotAvailableException(t)) { + return; + } + + // lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures) + if (shardFailures == null) { + synchronized (shardFailuresMutex) { + if (shardFailures == null) { + shardFailures = new AtomicArray<>(shardsIts.size()); + } + } + } + ShardSearchFailure failure = shardFailures.get(shardIndex); + if (failure == null) { + shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget)); + } else { + // the failure is already present, try and not override it with an exception that is less meaningless + // for example, getting illegal shard state + if (TransportActions.isReadOverrideException(t)) { + shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget)); + } + } + } + + private void raiseEarlyFailure(Throwable t) { + for (AtomicArray.Entry entry : firstResults.asList()) { + try { + DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId()); + sendReleaseSearchContext(entry.value.id(), node); + } catch (Throwable t1) { + logger.trace("failed to release context", t1); + } + } + listener.onFailure(t); + } + + /** + * Releases shard targets that are not used in the docsIdsToLoad. + */ + protected void releaseIrrelevantSearchContexts(AtomicArray queryResults, + AtomicArray docIdsToLoad) { + if (docIdsToLoad == null) { + return; + } + // we only release search context that we did not fetch from if we are not scrolling + if (request.scroll() == null) { + for (AtomicArray.Entry entry : queryResults.asList()) { + final TopDocs topDocs = entry.value.queryResult().queryResult().topDocs(); + if (topDocs != null && topDocs.scoreDocs.length > 0 // the shard had matches + && docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs + try { + DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId()); + sendReleaseSearchContext(entry.value.queryResult().id(), node); + } catch (Throwable t1) { + logger.trace("failed to release context", t1); + } + } + } + } + } + + protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) { + if (node != null) { + searchService.sendFreeContext(node, contextId, request); + } + } + + protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry entry, + ScoreDoc[] lastEmittedDocPerShard) { + if (lastEmittedDocPerShard != null) { + ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; + return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc); + } else { + return new ShardFetchSearchRequest(request, queryResult.id(), entry.value); + } + } + + protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, + ActionListener listener); + + protected final void processFirstPhaseResult(int shardIndex, FirstResult result) { + firstResults.set(shardIndex, result); + + if (logger.isTraceEnabled()) { + logger.trace("got first-phase result from {}", result != null ? result.shardTarget() : null); + } + + // clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level + // so its ok concurrency wise to miss potentially the shard failures being created because of another failure + // in the #addShardFailure, because by definition, it will happen on *another* shardIndex + AtomicArray shardFailures = this.shardFailures; + if (shardFailures != null) { + shardFailures.set(shardIndex, null); + } + } + + final void innerMoveToSecondPhase() throws Exception { + if (logger.isTraceEnabled()) { + StringBuilder sb = new StringBuilder(); + boolean hadOne = false; + for (int i = 0; i < firstResults.length(); i++) { + FirstResult result = firstResults.get(i); + if (result == null) { + continue; // failure + } + if (hadOne) { + sb.append(","); + } else { + hadOne = true; + } + sb.append(result.shardTarget()); + } + + logger.trace("Moving to second phase, based on results from: {} (cluster state version: {})", sb, clusterState.version()); + } + moveToSecondPhase(); + } + + protected abstract void moveToSecondPhase() throws Exception; + + protected abstract String firstPhaseName(); +} diff --git a/core/src/main/java/org/elasticsearch/action/search/type/ParsedScrollId.java b/core/src/main/java/org/elasticsearch/action/search/ParsedScrollId.java similarity index 95% rename from core/src/main/java/org/elasticsearch/action/search/type/ParsedScrollId.java rename to core/src/main/java/org/elasticsearch/action/search/ParsedScrollId.java index c98adecc3a9..ee0ba95b2b1 100644 --- a/core/src/main/java/org/elasticsearch/action/search/type/ParsedScrollId.java +++ b/core/src/main/java/org/elasticsearch/action/search/ParsedScrollId.java @@ -17,14 +17,14 @@ * under the License. */ -package org.elasticsearch.action.search.type; +package org.elasticsearch.action.search; import java.util.Map; /** * */ -public class ParsedScrollId { +class ParsedScrollId { public static final String QUERY_THEN_FETCH_TYPE = "queryThenFetch"; diff --git a/core/src/main/java/org/elasticsearch/action/search/type/ScrollIdForNode.java b/core/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java similarity index 93% rename from core/src/main/java/org/elasticsearch/action/search/type/ScrollIdForNode.java rename to core/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java index 38c79c91519..488132fdda2 100644 --- a/core/src/main/java/org/elasticsearch/action/search/type/ScrollIdForNode.java +++ b/core/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java @@ -17,9 +17,9 @@ * under the License. */ -package org.elasticsearch.action.search.type; +package org.elasticsearch.action.search; -public class ScrollIdForNode { +class ScrollIdForNode { private final String node; private final long scrollId; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java new file mode 100644 index 00000000000..b04b18f735b --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java @@ -0,0 +1,142 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.dfs.AggregatedDfs; +import org.elasticsearch.search.dfs.DfsSearchResult; +import org.elasticsearch.search.fetch.QueryFetchSearchResult; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.ShardSearchTransportRequest; +import org.elasticsearch.search.query.QuerySearchRequest; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction { + + private final AtomicArray queryFetchResults; + + SearchDfsQueryAndFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, + ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, + SearchPhaseController searchPhaseController, ThreadPool threadPool, + SearchRequest request, ActionListener listener) { + super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); + queryFetchResults = new AtomicArray<>(firstResults.length()); + } + + @Override + protected String firstPhaseName() { + return "dfs"; + } + + @Override + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, + ActionListener listener) { + searchService.sendExecuteDfs(node, request, listener); + } + + @Override + protected void moveToSecondPhase() { + final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); + final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); + + for (final AtomicArray.Entry entry : firstResults.asList()) { + DfsSearchResult dfsResult = entry.value; + DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); + QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); + executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); + } + } + + void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, + final DiscoveryNode node, final QuerySearchRequest querySearchRequest) { + searchService.sendExecuteFetch(node, querySearchRequest, new ActionListener() { + @Override + public void onResponse(QueryFetchSearchResult result) { + result.shardTarget(dfsResult.shardTarget()); + queryFetchResults.set(shardIndex, result); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + @Override + public void onFailure(Throwable t) { + try { + onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter); + } finally { + // the query might not have been executed at all (for example because thread pool rejected execution) + // and the search context that was created in dfs phase might not be released. + // release it again to be in the safe side + sendReleaseSearchContext(querySearchRequest.id(), node); + } + } + }); + } + + void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, + AtomicInteger counter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); + } + this.addShardFailure(shardIndex, dfsResult.shardTarget(), t); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + private void finishHim() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, + queryFetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); + } + + @Override + public void onFailure(Throwable t) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", t, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(t); + } + }); + + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java new file mode 100644 index 00000000000..76337334caa --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -0,0 +1,223 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import com.carrotsearch.hppc.IntArrayList; +import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.dfs.AggregatedDfs; +import org.elasticsearch.search.dfs.DfsSearchResult; +import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.fetch.ShardFetchSearchRequest; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.ShardSearchTransportRequest; +import org.elasticsearch.search.query.QuerySearchRequest; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { + + final AtomicArray queryResults; + final AtomicArray fetchResults; + final AtomicArray docIdsToLoad; + + SearchDfsQueryThenFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, + ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, + SearchPhaseController searchPhaseController, ThreadPool threadPool, + SearchRequest request, ActionListener listener) { + super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); + queryResults = new AtomicArray<>(firstResults.length()); + fetchResults = new AtomicArray<>(firstResults.length()); + docIdsToLoad = new AtomicArray<>(firstResults.length()); + } + + @Override + protected String firstPhaseName() { + return "dfs"; + } + + @Override + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, + ActionListener listener) { + searchService.sendExecuteDfs(node, request, listener); + } + + @Override + protected void moveToSecondPhase() { + final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); + final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); + for (final AtomicArray.Entry entry : firstResults.asList()) { + DfsSearchResult dfsResult = entry.value; + DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); + QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); + executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); + } + } + + void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, + final QuerySearchRequest querySearchRequest, final DiscoveryNode node) { + searchService.sendExecuteQuery(node, querySearchRequest, new ActionListener() { + @Override + public void onResponse(QuerySearchResult result) { + result.shardTarget(dfsResult.shardTarget()); + queryResults.set(shardIndex, result); + if (counter.decrementAndGet() == 0) { + executeFetchPhase(); + } + } + + @Override + public void onFailure(Throwable t) { + try { + onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter); + } finally { + // the query might not have been executed at all (for example because thread pool rejected + // execution) and the search context that was created in dfs phase might not be released. + // release it again to be in the safe side + sendReleaseSearchContext(querySearchRequest.id(), node); + } + } + }); + } + + void onQueryFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, + AtomicInteger counter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); + } + this.addShardFailure(shardIndex, dfsResult.shardTarget(), t); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + if (successfulOps.get() == 0) { + listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures())); + } else { + executeFetchPhase(); + } + } + } + + void executeFetchPhase() { + try { + innerExecuteFetchPhase(); + } catch (Throwable e) { + listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures())); + } + } + + void innerExecuteFetchPhase() throws Exception { + boolean useScroll = request.scroll() != null; + sortedShardList = searchPhaseController.sortDocs(useScroll, queryResults); + searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); + + if (docIdsToLoad.asList().isEmpty()) { + finishHim(); + return; + } + + final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard( + request, sortedShardList, firstResults.length() + ); + final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); + for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { + QuerySearchResult queryResult = queryResults.get(entry.index); + DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); + ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); + } + } + + void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, + final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { + searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { + @Override + public void onResponse(FetchSearchResult result) { + result.shardTarget(shardTarget); + fetchResults.set(shardIndex, result); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + @Override + public void onFailure(Throwable t) { + // the search context might not be cleared on the node where the fetch was executed for example + // because the action was rejected by the thread pool. in this case we need to send a dedicated + // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared + // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. + docIdsToLoad.set(shardIndex, null); + onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); + } + }); + } + + void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, + SearchShardTarget shardTarget, AtomicInteger counter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); + } + this.addShardFailure(shardIndex, shardTarget, t); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + private void finishHim() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, + fetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); + releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); + } + + @Override + public void onFailure(Throwable t) { + try { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(failure); + } finally { + releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); + } + } + }); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java new file mode 100644 index 00000000000..5187e77f0e7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java @@ -0,0 +1,84 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.fetch.QueryFetchSearchResult; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.ShardSearchTransportRequest; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; + +class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction { + + SearchQueryAndFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, + ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, + SearchPhaseController searchPhaseController, ThreadPool threadPool, + SearchRequest request, ActionListener listener) { + super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); + } + + @Override + protected String firstPhaseName() { + return "query_fetch"; + } + + @Override + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, + ActionListener listener) { + searchService.sendExecuteFetch(node, request, listener); + } + + @Override + protected void moveToSecondPhase() throws Exception { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + boolean useScroll = request.scroll() != null; + sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, + firstResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); + } + + @Override + public void onFailure(Throwable t) { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(failure); + } + }); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java new file mode 100644 index 00000000000..84f93590f23 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -0,0 +1,157 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import com.carrotsearch.hppc.IntArrayList; +import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.fetch.ShardFetchSearchRequest; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.internal.ShardSearchTransportRequest; +import org.elasticsearch.search.query.QuerySearchResultProvider; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { + + final AtomicArray fetchResults; + final AtomicArray docIdsToLoad; + + SearchQueryThenFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, + ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, + SearchPhaseController searchPhaseController, ThreadPool threadPool, + SearchRequest request, ActionListener listener) { + super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); + fetchResults = new AtomicArray<>(firstResults.length()); + docIdsToLoad = new AtomicArray<>(firstResults.length()); + } + + @Override + protected String firstPhaseName() { + return "query"; + } + + @Override + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, + ActionListener listener) { + searchService.sendExecuteQuery(node, request, listener); + } + + @Override + protected void moveToSecondPhase() throws Exception { + boolean useScroll = request.scroll() != null; + sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); + searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); + + if (docIdsToLoad.asList().isEmpty()) { + finishHim(); + return; + } + + final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard( + request, sortedShardList, firstResults.length() + ); + final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); + for (AtomicArray.Entry entry : docIdsToLoad.asList()) { + QuerySearchResultProvider queryResult = firstResults.get(entry.index); + DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); + ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); + } + } + + void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, + final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { + searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { + @Override + public void onResponse(FetchSearchResult result) { + result.shardTarget(shardTarget); + fetchResults.set(shardIndex, result); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + @Override + public void onFailure(Throwable t) { + // the search context might not be cleared on the node where the fetch was executed for example + // because the action was rejected by the thread pool. in this case we need to send a dedicated + // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared + // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. + docIdsToLoad.set(shardIndex, null); + onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); + } + }); + } + + void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, + AtomicInteger counter) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); + } + this.addShardFailure(shardIndex, shardTarget, t); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + private void finishHim() { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + @Override + public void doRun() throws IOException { + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, + fetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, + successfulOps.get(), buildTookInMillis(), buildShardFailures())); + releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); + } + + @Override + public void onFailure(Throwable t) { + try { + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", t, buildShardFailures()); + if (logger.isDebugEnabled()) { + logger.debug("failed to reduce search", failure); + } + super.onFailure(failure); + } finally { + releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); + } + } + }); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java new file mode 100644 index 00000000000..e8fe59cc447 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java @@ -0,0 +1,181 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.fetch.QueryFetchSearchResult; +import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; +import org.elasticsearch.search.internal.InternalScrollSearchRequest; +import org.elasticsearch.search.internal.InternalSearchResponse; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest; + +class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { + + private final ESLogger logger; + private final SearchPhaseController searchPhaseController; + private final SearchServiceTransportAction searchService; + private final SearchScrollRequest request; + private final ActionListener listener; + private final ParsedScrollId scrollId; + private final DiscoveryNodes nodes; + private volatile AtomicArray shardFailures; + private final AtomicArray queryFetchResults; + private final AtomicInteger successfulOps; + private final AtomicInteger counter; + + SearchScrollQueryAndFetchAsyncAction(ESLogger logger, ClusterService clusterService, + SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, + SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { + this.logger = logger; + this.searchPhaseController = searchPhaseController; + this.searchService = searchService; + this.request = request; + this.listener = listener; + this.scrollId = scrollId; + this.nodes = clusterService.state().nodes(); + this.successfulOps = new AtomicInteger(scrollId.getContext().length); + this.counter = new AtomicInteger(scrollId.getContext().length); + + this.queryFetchResults = new AtomicArray<>(scrollId.getContext().length); + } + + protected final ShardSearchFailure[] buildShardFailures() { + if (shardFailures == null) { + return ShardSearchFailure.EMPTY_ARRAY; + } + List> entries = shardFailures.asList(); + ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; + for (int i = 0; i < failures.length; i++) { + failures[i] = entries.get(i).value; + } + return failures; + } + + // we do our best to return the shard failures, but its ok if its not fully concurrently safe + // we simply try and return as much as possible + protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) { + if (shardFailures == null) { + shardFailures = new AtomicArray<>(scrollId.getContext().length); + } + shardFailures.set(shardIndex, failure); + } + + public void start() { + if (scrollId.getContext().length == 0) { + listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY)); + return; + } + + ScrollIdForNode[] context = scrollId.getContext(); + for (int i = 0; i < context.length; i++) { + ScrollIdForNode target = context[i]; + DiscoveryNode node = nodes.get(target.getNode()); + if (node != null) { + executePhase(i, node, target.getScrollId()); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]"); + } + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + } + + for (ScrollIdForNode target : scrollId.getContext()) { + DiscoveryNode node = nodes.get(target.getNode()); + if (node == null) { + if (logger.isDebugEnabled()) { + logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]"); + } + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + } + } + + void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { + InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); + searchService.sendExecuteFetch(node, internalRequest, new ActionListener() { + @Override + public void onResponse(ScrollQueryFetchSearchResult result) { + queryFetchResults.set(shardIndex, result.result()); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + @Override + public void onFailure(Throwable t) { + onPhaseFailure(t, searchId, shardIndex); + } + }); + } + + private void onPhaseFailure(Throwable t, long searchId, int shardIndex) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute query phase", t, searchId); + } + addShardFailure(shardIndex, new ShardSearchFailure(t)); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + if (successfulOps.get() == 0) { + listener.onFailure(new SearchPhaseExecutionException("query_fetch", "all shards failed", t, buildShardFailures())); + } else { + finishHim(); + } + } + } + + private void finishHim() { + try { + innerFinishHim(); + } catch (Throwable e) { + listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures())); + } + } + + private void innerFinishHim() throws Exception { + ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, + queryFetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = request.scrollId(); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java new file mode 100644 index 00000000000..0efff74524d --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -0,0 +1,226 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import com.carrotsearch.hppc.IntArrayList; +import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.fetch.ShardFetchRequest; +import org.elasticsearch.search.internal.InternalScrollSearchRequest; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.query.ScrollQuerySearchResult; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest; + +class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { + + private final ESLogger logger; + private final SearchServiceTransportAction searchService; + private final SearchPhaseController searchPhaseController; + private final SearchScrollRequest request; + private final ActionListener listener; + private final ParsedScrollId scrollId; + private final DiscoveryNodes nodes; + private volatile AtomicArray shardFailures; + final AtomicArray queryResults; + final AtomicArray fetchResults; + private volatile ScoreDoc[] sortedShardList; + private final AtomicInteger successfulOps; + + SearchScrollQueryThenFetchAsyncAction(ESLogger logger, ClusterService clusterService, + SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, + SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { + this.logger = logger; + this.searchService = searchService; + this.searchPhaseController = searchPhaseController; + this.request = request; + this.listener = listener; + this.scrollId = scrollId; + this.nodes = clusterService.state().nodes(); + this.successfulOps = new AtomicInteger(scrollId.getContext().length); + this.queryResults = new AtomicArray<>(scrollId.getContext().length); + this.fetchResults = new AtomicArray<>(scrollId.getContext().length); + } + + protected final ShardSearchFailure[] buildShardFailures() { + if (shardFailures == null) { + return ShardSearchFailure.EMPTY_ARRAY; + } + List> entries = shardFailures.asList(); + ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; + for (int i = 0; i < failures.length; i++) { + failures[i] = entries.get(i).value; + } + return failures; + } + + // we do our best to return the shard failures, but its ok if its not fully concurrently safe + // we simply try and return as much as possible + protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) { + if (shardFailures == null) { + shardFailures = new AtomicArray<>(scrollId.getContext().length); + } + shardFailures.set(shardIndex, failure); + } + + public void start() { + if (scrollId.getContext().length == 0) { + listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY)); + return; + } + final AtomicInteger counter = new AtomicInteger(scrollId.getContext().length); + + ScrollIdForNode[] context = scrollId.getContext(); + for (int i = 0; i < context.length; i++) { + ScrollIdForNode target = context[i]; + DiscoveryNode node = nodes.get(target.getNode()); + if (node != null) { + executeQueryPhase(i, counter, node, target.getScrollId()); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]"); + } + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + try { + executeFetchPhase(); + } catch (Throwable e) { + listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY)); + return; + } + } + } + } + } + + private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) { + InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); + searchService.sendExecuteQuery(node, internalRequest, new ActionListener() { + @Override + public void onResponse(ScrollQuerySearchResult result) { + queryResults.set(shardIndex, result.queryResult()); + if (counter.decrementAndGet() == 0) { + try { + executeFetchPhase(); + } catch (Throwable e) { + onFailure(e); + } + } + } + + @Override + public void onFailure(Throwable t) { + onQueryPhaseFailure(shardIndex, counter, searchId, t); + } + }); + } + + void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Throwable t) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Failed to execute query phase", t, searchId); + } + addShardFailure(shardIndex, new ShardSearchFailure(t)); + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + if (successfulOps.get() == 0) { + listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", t, buildShardFailures())); + } else { + try { + executeFetchPhase(); + } catch (Throwable e) { + listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY)); + } + } + } + } + + private void executeFetchPhase() throws Exception { + sortedShardList = searchPhaseController.sortDocs(true, queryResults); + AtomicArray docIdsToLoad = new AtomicArray<>(queryResults.length()); + searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); + + if (docIdsToLoad.asList().isEmpty()) { + finishHim(); + return; + } + + + final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length()); + final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); + for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { + IntArrayList docIds = entry.value; + final QuerySearchResult querySearchResult = queryResults.get(entry.index); + ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; + ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc); + DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId()); + searchService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener() { + @Override + public void onResponse(FetchSearchResult result) { + result.shardTarget(querySearchResult.shardTarget()); + fetchResults.set(entry.index, result); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + + @Override + public void onFailure(Throwable t) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to execute fetch phase", t); + } + successfulOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + finishHim(); + } + } + }); + } + } + + private void finishHim() { + try { + innerFinishHim(); + } catch (Throwable e) { + listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures())); + } + } + + private void innerFinishHim() { + InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); + String scrollId = null; + if (request.scroll() != null) { + scrollId = request.scrollId(); + } + listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(), + buildTookInMillis(), buildShardFailures())); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java index 8786480c254..a43f9302f3a 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java @@ -20,7 +20,6 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.type.ScrollIdForNode; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterService; @@ -41,7 +40,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static org.elasticsearch.action.search.type.TransportSearchHelper.parseScrollId; +import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId; /** */ diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 07584772f66..8e08350b694 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -20,10 +20,6 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.type.TransportSearchDfsQueryAndFetchAction; -import org.elasticsearch.action.search.type.TransportSearchDfsQueryThenFetchAction; -import org.elasticsearch.action.search.type.TransportSearchQueryAndFetchAction; -import org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterService; @@ -33,13 +29,14 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.util.Map; import java.util.Set; -import static org.elasticsearch.action.search.SearchType.DFS_QUERY_THEN_FETCH; import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH; /** @@ -48,25 +45,18 @@ import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH; public class TransportSearchAction extends HandledTransportAction { private final ClusterService clusterService; - private final TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction; - private final TransportSearchQueryThenFetchAction queryThenFetchAction; - private final TransportSearchDfsQueryAndFetchAction dfsQueryAndFetchAction; - private final TransportSearchQueryAndFetchAction queryAndFetchAction; + private final SearchServiceTransportAction searchService; + private final SearchPhaseController searchPhaseController; @Inject - public TransportSearchAction(Settings settings, ThreadPool threadPool, - TransportService transportService, ClusterService clusterService, - TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction, - TransportSearchQueryThenFetchAction queryThenFetchAction, - TransportSearchDfsQueryAndFetchAction dfsQueryAndFetchAction, - TransportSearchQueryAndFetchAction queryAndFetchAction, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + public TransportSearchAction(Settings settings, ThreadPool threadPool, SearchPhaseController searchPhaseController, + TransportService transportService, SearchServiceTransportAction searchService, + ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver + indexNameExpressionResolver) { super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new); + this.searchPhaseController = searchPhaseController; + this.searchService = searchService; this.clusterService = clusterService; - this.dfsQueryThenFetchAction = dfsQueryThenFetchAction; - this.queryThenFetchAction = queryThenFetchAction; - this.dfsQueryAndFetchAction = dfsQueryAndFetchAction; - this.queryAndFetchAction = queryAndFetchAction; } @Override @@ -75,7 +65,8 @@ public class TransportSearchAction extends HandledTransportAction> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); + Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, + searchRequest.routing(), searchRequest.indices()); int shardCount = clusterService.operationRouting().searchShardsCount(clusterState, concreteIndices, routingMap); if (shardCount == 1) { // if we only have one group, then we always want Q_A_F, no need for DFS, and no need to do THEN since we hit one shard @@ -86,16 +77,28 @@ public class TransportSearchAction extends HandledTransportAction searchPhaseResults, @Nullable Map attributes) throws IOException { + static String buildScrollId(SearchType searchType, AtomicArray searchPhaseResults, + @Nullable Map attributes) throws IOException { if (searchType == SearchType.DFS_QUERY_THEN_FETCH || searchType == SearchType.QUERY_THEN_FETCH) { return buildScrollId(ParsedScrollId.QUERY_THEN_FETCH_TYPE, searchPhaseResults, attributes); } else if (searchType == SearchType.QUERY_AND_FETCH || searchType == SearchType.DFS_QUERY_AND_FETCH) { @@ -62,7 +61,8 @@ public abstract class TransportSearchHelper { } } - public static String buildScrollId(String type, AtomicArray searchPhaseResults, @Nullable Map attributes) throws IOException { + static String buildScrollId(String type, AtomicArray searchPhaseResults, + @Nullable Map attributes) throws IOException { StringBuilder sb = new StringBuilder().append(type).append(';'); sb.append(searchPhaseResults.asList().size()).append(';'); for (AtomicArray.Entry entry : searchPhaseResults.asList()) { @@ -81,7 +81,7 @@ public abstract class TransportSearchHelper { return Base64.encodeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length, Base64.URL_SAFE); } - public static ParsedScrollId parseScrollId(String scrollId) { + static ParsedScrollId parseScrollId(String scrollId) { CharsRefBuilder spare = new CharsRefBuilder(); try { byte[] decode = Base64.decode(scrollId, Base64.URL_SAFE); @@ -128,5 +128,4 @@ public abstract class TransportSearchHelper { private TransportSearchHelper() { } - } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java index 445dec114ee..5f4be211eb7 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java @@ -20,51 +20,60 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.type.ParsedScrollId; -import org.elasticsearch.action.search.type.TransportSearchScrollQueryAndFetchAction; -import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import static org.elasticsearch.action.search.type.ParsedScrollId.QUERY_AND_FETCH_TYPE; -import static org.elasticsearch.action.search.type.ParsedScrollId.QUERY_THEN_FETCH_TYPE; -import static org.elasticsearch.action.search.type.TransportSearchHelper.parseScrollId; +import static org.elasticsearch.action.search.ParsedScrollId.QUERY_AND_FETCH_TYPE; +import static org.elasticsearch.action.search.ParsedScrollId.QUERY_THEN_FETCH_TYPE; +import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId; /** * */ public class TransportSearchScrollAction extends HandledTransportAction { - private final TransportSearchScrollQueryThenFetchAction queryThenFetchAction; - private final TransportSearchScrollQueryAndFetchAction queryAndFetchAction; + private final ClusterService clusterService; + private final SearchServiceTransportAction searchService; + private final SearchPhaseController searchPhaseController; @Inject public TransportSearchScrollAction(Settings settings, ThreadPool threadPool, TransportService transportService, - TransportSearchScrollQueryThenFetchAction queryThenFetchAction, - TransportSearchScrollQueryAndFetchAction queryAndFetchAction, - ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchScrollRequest::new); - this.queryThenFetchAction = queryThenFetchAction; - this.queryAndFetchAction = queryAndFetchAction; + ClusterService clusterService, SearchServiceTransportAction searchService, + SearchPhaseController searchPhaseController, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, + SearchScrollRequest::new); + this.clusterService = clusterService; + this.searchService = searchService; + this.searchPhaseController = searchPhaseController; } @Override protected void doExecute(SearchScrollRequest request, ActionListener listener) { try { ParsedScrollId scrollId = parseScrollId(request.scrollId()); - if (scrollId.getType().equals(QUERY_THEN_FETCH_TYPE)) { - queryThenFetchAction.execute(request, scrollId, listener); - } else if (scrollId.getType().equals(QUERY_AND_FETCH_TYPE)) { - queryAndFetchAction.execute(request, scrollId, listener); - } else { - throw new IllegalArgumentException("Scroll id type [" + scrollId.getType() + "] unrecognized"); + AbstractAsyncAction action; + switch (scrollId.getType()) { + case QUERY_THEN_FETCH_TYPE: + action = new SearchScrollQueryThenFetchAsyncAction(logger, clusterService, searchService, + searchPhaseController, request, scrollId, listener); + break; + case QUERY_AND_FETCH_TYPE: + action = new SearchScrollQueryAndFetchAsyncAction(logger, clusterService, searchService, + searchPhaseController, request, scrollId, listener); + break; + default: + throw new IllegalArgumentException("Scroll id type [" + scrollId.getType() + "] unrecognized"); } + action.start(); } catch (Throwable e) { listener.onFailure(e); } diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java deleted file mode 100644 index 6d22264815b..00000000000 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.dfs.AggregatedDfs; -import org.elasticsearch.search.dfs.DfsSearchResult; -import org.elasticsearch.search.fetch.QueryFetchSearchResult; -import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.internal.ShardSearchTransportRequest; -import org.elasticsearch.search.query.QuerySearchRequest; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * - */ -public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAction { - - @Inject - public TransportSearchDfsQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, threadPool, clusterService, searchService, searchPhaseController, actionFilters, indexNameExpressionResolver); - } - - @Override - protected void doExecute(SearchRequest searchRequest, ActionListener listener) { - new AsyncAction(searchRequest, listener).start(); - } - - private class AsyncAction extends BaseAsyncAction { - - private final AtomicArray queryFetchResults; - - private AsyncAction(SearchRequest request, ActionListener listener) { - super(request, listener); - queryFetchResults = new AtomicArray<>(firstResults.length()); - } - - @Override - protected String firstPhaseName() { - return "dfs"; - } - - @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { - searchService.sendExecuteDfs(node, request, listener); - } - - @Override - protected void moveToSecondPhase() { - final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); - final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); - - for (final AtomicArray.Entry entry : firstResults.asList()) { - DfsSearchResult dfsResult = entry.value; - DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - } - - void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final DiscoveryNode node, final QuerySearchRequest querySearchRequest) { - searchService.sendExecuteFetch(node, querySearchRequest, new ActionListener() { - @Override - public void onResponse(QueryFetchSearchResult result) { - result.shardTarget(dfsResult.shardTarget()); - queryFetchResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - @Override - public void onFailure(Throwable t) { - try { - onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter); - } finally { - // the query might not have been executed at all (for example because thread pool rejected execution) - // and the search context that was created in dfs phase might not be released. - // release it again to be in the safe side - sendReleaseSearchContext(querySearchRequest.id(), node); - } - } - }); - } - - void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, AtomicInteger counter) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); - } - this.addShardFailure(shardIndex, dfsResult.shardTarget(), t); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - private void finishHim() { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, - queryFetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); - } - - @Override - public void onFailure(Throwable t) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", t, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(t); - } - }); - - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java deleted file mode 100644 index 31128cea961..00000000000 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import com.carrotsearch.hppc.IntArrayList; -import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.dfs.AggregatedDfs; -import org.elasticsearch.search.dfs.DfsSearchResult; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.fetch.ShardFetchSearchRequest; -import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.internal.ShardSearchTransportRequest; -import org.elasticsearch.search.query.QuerySearchRequest; -import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * - */ -public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeAction { - - @Inject - public TransportSearchDfsQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, threadPool, clusterService, searchService, searchPhaseController, actionFilters, indexNameExpressionResolver); - } - - @Override - protected void doExecute(SearchRequest searchRequest, ActionListener listener) { - new AsyncAction(searchRequest, listener).start(); - } - - private class AsyncAction extends BaseAsyncAction { - - final AtomicArray queryResults; - final AtomicArray fetchResults; - final AtomicArray docIdsToLoad; - - private AsyncAction(SearchRequest request, ActionListener listener) { - super(request, listener); - queryResults = new AtomicArray<>(firstResults.length()); - fetchResults = new AtomicArray<>(firstResults.length()); - docIdsToLoad = new AtomicArray<>(firstResults.length()); - } - - @Override - protected String firstPhaseName() { - return "dfs"; - } - - @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { - searchService.sendExecuteDfs(node, request, listener); - } - - @Override - protected void moveToSecondPhase() { - final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); - final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); - for (final AtomicArray.Entry entry : firstResults.asList()) { - DfsSearchResult dfsResult = entry.value; - DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - } - - void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, final DiscoveryNode node) { - searchService.sendExecuteQuery(node, querySearchRequest, new ActionListener() { - @Override - public void onResponse(QuerySearchResult result) { - result.shardTarget(dfsResult.shardTarget()); - queryResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - executeFetchPhase(); - } - } - - @Override - public void onFailure(Throwable t) { - try { - onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter); - } finally { - // the query might not have been executed at all (for example because thread pool rejected execution) - // and the search context that was created in dfs phase might not be released. - // release it again to be in the safe side - sendReleaseSearchContext(querySearchRequest.id(), node); - } - } - }); - } - - void onQueryFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult, AtomicInteger counter) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); - } - this.addShardFailure(shardIndex, dfsResult.shardTarget(), t); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - if (successfulOps.get() == 0) { - listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", buildShardFailures())); - } else { - executeFetchPhase(); - } - } - } - - void executeFetchPhase() { - try { - innerExecuteFetchPhase(); - } catch (Throwable e) { - listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures())); - } - } - - void innerExecuteFetchPhase() throws Exception { - boolean useScroll = request.scroll() != null; - sortedShardList = searchPhaseController.sortDocs(useScroll, queryResults); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); - - if (docIdsToLoad.asList().isEmpty()) { - finishHim(); - return; - } - - final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard( - request, sortedShardList, firstResults.length() - ); - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResult queryResult = queryResults.get(entry.index); - DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - - void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { - searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { - @Override - public void onResponse(FetchSearchResult result) { - result.shardTarget(shardTarget); - fetchResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - @Override - public void onFailure(Throwable t) { - // the search context might not be cleared on the node where the fetch was executed for example - // because the action was rejected by the thread pool. in this case we need to send a dedicated - // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared - // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. - docIdsToLoad.set(shardIndex, null); - onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); - } - }); - } - - void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); - } - this.addShardFailure(shardIndex, shardTarget, t); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - private void finishHim() { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, - fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); - } - - @Override - public void onFailure(Throwable t) { - try { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(failure); - } finally { - releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); - } - } - }); - - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java deleted file mode 100644 index 0e1e8db5519..00000000000 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.fetch.QueryFetchSearchResult; -import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.internal.ShardSearchTransportRequest; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.IOException; - -import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId; - -/** - * - */ -public class TransportSearchQueryAndFetchAction extends TransportSearchTypeAction { - - @Inject - public TransportSearchQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, threadPool, clusterService, searchService, searchPhaseController, actionFilters, indexNameExpressionResolver); - } - - @Override - protected void doExecute(SearchRequest searchRequest, ActionListener listener) { - new AsyncAction(searchRequest, listener).start(); - } - - private class AsyncAction extends BaseAsyncAction { - - private AsyncAction(SearchRequest request, ActionListener listener) { - super(request, listener); - } - - @Override - protected String firstPhaseName() { - return "query_fetch"; - } - - @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { - searchService.sendExecuteFetch(node, request, listener); - } - - @Override - protected void moveToSecondPhase() throws Exception { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - boolean useScroll = request.scroll() != null; - sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, - firstResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = buildScrollId(request.searchType(), firstResults, null); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); - } - - @Override - public void onFailure(Throwable t) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(failure); - } - }); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java deleted file mode 100644 index c63287d9956..00000000000 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import com.carrotsearch.hppc.IntArrayList; -import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.fetch.ShardFetchSearchRequest; -import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.internal.ShardSearchTransportRequest; -import org.elasticsearch.search.query.QuerySearchResultProvider; -import org.elasticsearch.threadpool.ThreadPool; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * - */ -public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction { - - @Inject - public TransportSearchQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, threadPool, clusterService, searchService, searchPhaseController, actionFilters, indexNameExpressionResolver); - } - - @Override - protected void doExecute(SearchRequest searchRequest, ActionListener listener) { - new AsyncAction(searchRequest, listener).start(); - } - - private class AsyncAction extends BaseAsyncAction { - - final AtomicArray fetchResults; - final AtomicArray docIdsToLoad; - - private AsyncAction(SearchRequest request, ActionListener listener) { - super(request, listener); - fetchResults = new AtomicArray<>(firstResults.length()); - docIdsToLoad = new AtomicArray<>(firstResults.length()); - } - - @Override - protected String firstPhaseName() { - return "query"; - } - - @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { - searchService.sendExecuteQuery(node, request, listener); - } - - @Override - protected void moveToSecondPhase() throws Exception { - boolean useScroll = request.scroll() != null; - sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); - - if (docIdsToLoad.asList().isEmpty()) { - finishHim(); - return; - } - - final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard( - request, sortedShardList, firstResults.length() - ); - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - for (AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResultProvider queryResult = firstResults.get(entry.index); - DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - - void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { - searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { - @Override - public void onResponse(FetchSearchResult result) { - result.shardTarget(shardTarget); - fetchResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - @Override - public void onFailure(Throwable t) { - // the search context might not be cleared on the node where the fetch was executed for example - // because the action was rejected by the thread pool. in this case we need to send a dedicated - // request to clear the search context. by setting docIdsToLoad to null, the context will be cleared - // in TransportSearchTypeAction.releaseIrrelevantSearchContexts() after the search request is done. - docIdsToLoad.set(shardIndex, null); - onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter); - } - }); - } - - void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); - } - this.addShardFailure(shardIndex, shardTarget, t); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - private void finishHim() { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { - @Override - public void doRun() throws IOException { - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, - fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); - releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); - } - - @Override - public void onFailure(Throwable t) { - try { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", t, buildShardFailures()); - if (logger.isDebugEnabled()) { - logger.debug("failed to reduce search", failure); - } - super.onFailure(failure); - } finally { - releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); - } - } - }); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java deleted file mode 100644 index b718baaa294..00000000000 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; -import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.fetch.QueryFetchSearchResult; -import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; -import org.elasticsearch.search.internal.InternalScrollSearchRequest; -import org.elasticsearch.search.internal.InternalSearchResponse; - -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; - -/** - * - */ -public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent { - - private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; - private final SearchPhaseController searchPhaseController; - - @Inject - public TransportSearchScrollQueryAndFetchAction(Settings settings, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { - super(settings); - this.clusterService = clusterService; - this.searchService = searchService; - this.searchPhaseController = searchPhaseController; - } - - public void execute(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { - new AsyncAction(request, scrollId, listener).start(); - } - - private class AsyncAction extends AbstractAsyncAction { - - private final SearchScrollRequest request; - private final ActionListener listener; - private final ParsedScrollId scrollId; - private final DiscoveryNodes nodes; - - private volatile AtomicArray shardFailures; - private final AtomicArray queryFetchResults; - - private final AtomicInteger successfulOps; - private final AtomicInteger counter; - - private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { - this.request = request; - this.listener = listener; - this.scrollId = scrollId; - this.nodes = clusterService.state().nodes(); - this.successfulOps = new AtomicInteger(scrollId.getContext().length); - this.counter = new AtomicInteger(scrollId.getContext().length); - - this.queryFetchResults = new AtomicArray<>(scrollId.getContext().length); - } - - protected final ShardSearchFailure[] buildShardFailures() { - if (shardFailures == null) { - return ShardSearchFailure.EMPTY_ARRAY; - } - List> entries = shardFailures.asList(); - ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; - for (int i = 0; i < failures.length; i++) { - failures[i] = entries.get(i).value; - } - return failures; - } - - // we do our best to return the shard failures, but its ok if its not fully concurrently safe - // we simply try and return as much as possible - protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) { - if (shardFailures == null) { - shardFailures = new AtomicArray<>(scrollId.getContext().length); - } - shardFailures.set(shardIndex, failure); - } - - public void start() { - if (scrollId.getContext().length == 0) { - listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY)); - return; - } - - ScrollIdForNode[] context = scrollId.getContext(); - for (int i = 0; i < context.length; i++) { - ScrollIdForNode target = context[i]; - DiscoveryNode node = nodes.get(target.getNode()); - if (node != null) { - executePhase(i, node, target.getScrollId()); - } else { - if (logger.isDebugEnabled()) { - logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]"); - } - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - } - - for (ScrollIdForNode target : scrollId.getContext()) { - DiscoveryNode node = nodes.get(target.getNode()); - if (node == null) { - if (logger.isDebugEnabled()) { - logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]"); - } - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - } - } - - void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { - InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); - searchService.sendExecuteFetch(node, internalRequest, new ActionListener() { - @Override - public void onResponse(ScrollQueryFetchSearchResult result) { - queryFetchResults.set(shardIndex, result.result()); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - @Override - public void onFailure(Throwable t) { - onPhaseFailure(t, searchId, shardIndex); - } - }); - } - - private void onPhaseFailure(Throwable t, long searchId, int shardIndex) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute query phase", t, searchId); - } - addShardFailure(shardIndex, new ShardSearchFailure(t)); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - if (successfulOps.get() == 0) { - listener.onFailure(new SearchPhaseExecutionException("query_fetch", "all shards failed", t, buildShardFailures())); - } else { - finishHim(); - } - } - } - - private void finishHim() { - try { - innerFinishHim(); - } catch (Throwable e) { - listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures())); - } - } - - private void innerFinishHim() throws Exception { - ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, - queryFetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = request.scrollId(); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(), - buildTookInMillis(), buildShardFailures())); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java deleted file mode 100644 index 93a28b29aa1..00000000000 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import com.carrotsearch.hppc.IntArrayList; -import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequest; -import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.fetch.ShardFetchRequest; -import org.elasticsearch.search.internal.InternalScrollSearchRequest; -import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.ScrollQuerySearchResult; - -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.elasticsearch.action.search.type.TransportSearchHelper.internalScrollSearchRequest; - -/** - * - */ -public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent { - - private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; - private final SearchPhaseController searchPhaseController; - - @Inject - public TransportSearchScrollQueryThenFetchAction(Settings settings, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { - super(settings); - this.clusterService = clusterService; - this.searchService = searchService; - this.searchPhaseController = searchPhaseController; - } - - public void execute(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { - new AsyncAction(request, scrollId, listener).start(); - } - - private class AsyncAction extends AbstractAsyncAction { - - private final SearchScrollRequest request; - - private final ActionListener listener; - - private final ParsedScrollId scrollId; - - private final DiscoveryNodes nodes; - - private volatile AtomicArray shardFailures; - final AtomicArray queryResults; - final AtomicArray fetchResults; - - private volatile ScoreDoc[] sortedShardList; - - private final AtomicInteger successfulOps; - - private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { - this.request = request; - this.listener = listener; - this.scrollId = scrollId; - this.nodes = clusterService.state().nodes(); - this.successfulOps = new AtomicInteger(scrollId.getContext().length); - - this.queryResults = new AtomicArray<>(scrollId.getContext().length); - this.fetchResults = new AtomicArray<>(scrollId.getContext().length); - } - - protected final ShardSearchFailure[] buildShardFailures() { - if (shardFailures == null) { - return ShardSearchFailure.EMPTY_ARRAY; - } - List> entries = shardFailures.asList(); - ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; - for (int i = 0; i < failures.length; i++) { - failures[i] = entries.get(i).value; - } - return failures; - } - - // we do our best to return the shard failures, but its ok if its not fully concurrently safe - // we simply try and return as much as possible - protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) { - if (shardFailures == null) { - shardFailures = new AtomicArray<>(scrollId.getContext().length); - } - shardFailures.set(shardIndex, failure); - } - - public void start() { - if (scrollId.getContext().length == 0) { - listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY)); - return; - } - final AtomicInteger counter = new AtomicInteger(scrollId.getContext().length); - - ScrollIdForNode[] context = scrollId.getContext(); - for (int i = 0; i < context.length; i++) { - ScrollIdForNode target = context[i]; - DiscoveryNode node = nodes.get(target.getNode()); - if (node != null) { - executeQueryPhase(i, counter, node, target.getScrollId()); - } else { - if (logger.isDebugEnabled()) { - logger.debug("Node [" + target.getNode() + "] not available for scroll request [" + scrollId.getSource() + "]"); - } - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - try { - executeFetchPhase(); - } catch (Throwable e) { - listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY)); - return; - } - } - } - } - } - - private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) { - InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); - searchService.sendExecuteQuery(node, internalRequest, new ActionListener() { - @Override - public void onResponse(ScrollQuerySearchResult result) { - queryResults.set(shardIndex, result.queryResult()); - if (counter.decrementAndGet() == 0) { - try { - executeFetchPhase(); - } catch (Throwable e) { - onFailure(e); - } - } - } - - @Override - public void onFailure(Throwable t) { - onQueryPhaseFailure(shardIndex, counter, searchId, t); - } - }); - } - - void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Throwable t) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Failed to execute query phase", t, searchId); - } - addShardFailure(shardIndex, new ShardSearchFailure(t)); - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - if (successfulOps.get() == 0) { - listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", t, buildShardFailures())); - } else { - try { - executeFetchPhase(); - } catch (Throwable e) { - listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY)); - } - } - } - } - - private void executeFetchPhase() throws Exception { - sortedShardList = searchPhaseController.sortDocs(true, queryResults); - AtomicArray docIdsToLoad = new AtomicArray<>(queryResults.length()); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); - - if (docIdsToLoad.asList().isEmpty()) { - finishHim(); - return; - } - - - final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length()); - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - IntArrayList docIds = entry.value; - final QuerySearchResult querySearchResult = queryResults.get(entry.index); - ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; - ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc); - DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId()); - searchService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener() { - @Override - public void onResponse(FetchSearchResult result) { - result.shardTarget(querySearchResult.shardTarget()); - fetchResults.set(entry.index, result); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - - @Override - public void onFailure(Throwable t) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to execute fetch phase", t); - } - successfulOps.decrementAndGet(); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } - }); - } - } - - private void finishHim() { - try { - innerFinishHim(); - } catch (Throwable e) { - listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures())); - } - } - - private void innerFinishHim() { - InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = request.scrollId(); - } - listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(), - buildTookInMillis(), buildShardFailures())); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java deleted file mode 100644 index 042534a2e7b..00000000000 --- a/core/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ /dev/null @@ -1,406 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search.type; - -import com.carrotsearch.hppc.IntArrayList; -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.TopDocs; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.NoShardAvailableActionException; -import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.TransportAction; -import org.elasticsearch.action.support.TransportActions; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.SearchPhaseResult; -import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.fetch.ShardFetchSearchRequest; -import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.internal.ShardSearchTransportRequest; -import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.search.query.QuerySearchResultProvider; -import org.elasticsearch.tasks.TaskManager; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.elasticsearch.action.search.type.TransportSearchHelper.internalSearchRequest; - -/** - * - */ -public abstract class TransportSearchTypeAction extends TransportAction { - - protected final ClusterService clusterService; - - protected final SearchServiceTransportAction searchService; - - protected final SearchPhaseController searchPhaseController; - - public TransportSearchTypeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, SearchAction.NAME, threadPool, actionFilters, indexNameExpressionResolver, clusterService.getTaskManager()); - this.clusterService = clusterService; - this.searchService = searchService; - this.searchPhaseController = searchPhaseController; - } - - protected abstract class BaseAsyncAction extends AbstractAsyncAction { - - protected final ActionListener listener; - - protected final GroupShardsIterator shardsIts; - - protected final SearchRequest request; - - protected final ClusterState clusterState; - protected final DiscoveryNodes nodes; - - protected final int expectedSuccessfulOps; - private final int expectedTotalOps; - - protected final AtomicInteger successfulOps = new AtomicInteger(); - private final AtomicInteger totalOps = new AtomicInteger(); - - protected final AtomicArray firstResults; - private volatile AtomicArray shardFailures; - private final Object shardFailuresMutex = new Object(); - protected volatile ScoreDoc[] sortedShardList; - - protected BaseAsyncAction(SearchRequest request, ActionListener listener) { - this.request = request; - this.listener = listener; - - this.clusterState = clusterService.state(); - nodes = clusterState.nodes(); - - clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); - - // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name - // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead - // of just for the _search api - String[] concreteIndices = indexNameExpressionResolver.concreteIndices(clusterState, request.indicesOptions(), startTime(), request.indices()); - - for (String index : concreteIndices) { - clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index); - } - - Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, request.routing(), request.indices()); - - shardsIts = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, request.preference()); - expectedSuccessfulOps = shardsIts.size(); - // we need to add 1 for non active partition, since we count it in the total! - expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); - - firstResults = new AtomicArray<>(shardsIts.size()); - } - - public void start() { - if (expectedSuccessfulOps == 0) { - // no search shards to search on, bail with empty response (it happens with search across _all with no indices around and consistent with broadcast operations) - listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, buildTookInMillis(), ShardSearchFailure.EMPTY_ARRAY)); - return; - } - int shardIndex = -1; - for (final ShardIterator shardIt : shardsIts) { - shardIndex++; - final ShardRouting shard = shardIt.nextOrNull(); - if (shard != null) { - performFirstPhase(shardIndex, shardIt, shard); - } else { - // really, no shards active in this group - onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); - } - } - } - - void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) { - if (shard == null) { - // no more active shards... (we should not really get here, but just for safety) - onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); - } else { - final DiscoveryNode node = nodes.get(shard.currentNodeId()); - if (node == null) { - onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); - } else { - String[] filteringAliases = indexNameExpressionResolver.filteringAliases(clusterState, shard.index().getName(), request.indices()); - sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime()), new ActionListener() { - @Override - public void onResponse(FirstResult result) { - onFirstPhaseResult(shardIndex, shard, result, shardIt); - } - - @Override - public void onFailure(Throwable t) { - onFirstPhaseResult(shardIndex, shard, node.id(), shardIt, t); - } - }); - } - } - } - - void onFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result, ShardIterator shardIt) { - result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id())); - processFirstPhaseResult(shardIndex, result); - // we need to increment successful ops first before we compare the exit condition otherwise if we - // are fast we could concurrently update totalOps but then preempt one of the threads which can - // cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc. - successfulOps.incrementAndGet(); - // increment all the "future" shards to update the total ops since we some may work and some may not... - // and when that happens, we break on total ops, so we must maintain them - final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1); - if (xTotalOps == expectedTotalOps) { - try { - innerMoveToSecondPhase(); - } catch (Throwable e) { - if (logger.isDebugEnabled()) { - logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "] while moving to second phase", e); - } - raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures())); - } - } else if (xTotalOps > expectedTotalOps) { - raiseEarlyFailure(new IllegalStateException("unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]")); - } - } - - void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, final ShardIterator shardIt, Throwable t) { - // we always add the shard failure for a specific shard instance - // we do make sure to clean it on a successful response from a shard - SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId().getIndex(), shardIt.shardId().getId()); - addShardFailure(shardIndex, shardTarget, t); - - if (totalOps.incrementAndGet() == expectedTotalOps) { - if (logger.isDebugEnabled()) { - if (t != null && !TransportActions.isShardNotAvailableException(t)) { - if (shard != null) { - logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t); - } else { - logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t); - } - } else if (logger.isTraceEnabled()) { - logger.trace("{}: Failed to execute [{}]", t, shard, request); - } - } - final ShardSearchFailure[] shardSearchFailures = buildShardFailures(); - if (successfulOps.get() == 0) { - if (logger.isDebugEnabled()) { - logger.debug("All shards failed for phase: [{}]", t, firstPhaseName()); - } - - // no successful ops, raise an exception - raiseEarlyFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", t, shardSearchFailures)); - } else { - try { - innerMoveToSecondPhase(); - } catch (Throwable e) { - raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, shardSearchFailures)); - } - } - } else { - final ShardRouting nextShard = shardIt.nextOrNull(); - final boolean lastShard = nextShard == null; - // trace log this exception - if (logger.isTraceEnabled()) { - logger.trace(executionFailureMsg(shard, shardIt, request, lastShard), t); - } - if (!lastShard) { - try { - performFirstPhase(shardIndex, shardIt, nextShard); - } catch (Throwable t1) { - onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t1); - } - } else { - // no more shards active, add a failure - if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception - if (t != null && !TransportActions.isShardNotAvailableException(t)) { - logger.debug(executionFailureMsg(shard, shardIt, request, lastShard), t); - } - } - } - } - } - - private String executionFailureMsg(@Nullable ShardRouting shard, final ShardIterator shardIt, SearchRequest request, boolean lastShard) { - if (shard != null) { - return shard.shortSummary() + ": Failed to execute [" + request + "] lastShard [" + lastShard + "]"; - } else { - return shardIt.shardId() + ": Failed to execute [" + request + "] lastShard [" + lastShard + "]"; - } - } - - protected final ShardSearchFailure[] buildShardFailures() { - AtomicArray shardFailures = this.shardFailures; - if (shardFailures == null) { - return ShardSearchFailure.EMPTY_ARRAY; - } - List> entries = shardFailures.asList(); - ShardSearchFailure[] failures = new ShardSearchFailure[entries.size()]; - for (int i = 0; i < failures.length; i++) { - failures[i] = entries.get(i).value; - } - return failures; - } - - protected final void addShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Throwable t) { - // we don't aggregate shard failures on non active shards (but do keep the header counts right) - if (TransportActions.isShardNotAvailableException(t)) { - return; - } - - // lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures) - if (shardFailures == null) { - synchronized (shardFailuresMutex) { - if (shardFailures == null) { - shardFailures = new AtomicArray<>(shardsIts.size()); - } - } - } - ShardSearchFailure failure = shardFailures.get(shardIndex); - if (failure == null) { - shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget)); - } else { - // the failure is already present, try and not override it with an exception that is less meaningless - // for example, getting illegal shard state - if (TransportActions.isReadOverrideException(t)) { - shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget)); - } - } - } - - private void raiseEarlyFailure(Throwable t) { - for (AtomicArray.Entry entry : firstResults.asList()) { - try { - DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId()); - sendReleaseSearchContext(entry.value.id(), node); - } catch (Throwable t1) { - logger.trace("failed to release context", t1); - } - } - listener.onFailure(t); - } - - /** - * Releases shard targets that are not used in the docsIdsToLoad. - */ - protected void releaseIrrelevantSearchContexts(AtomicArray queryResults, - AtomicArray docIdsToLoad) { - if (docIdsToLoad == null) { - return; - } - // we only release search context that we did not fetch from if we are not scrolling - if (request.scroll() == null) { - for (AtomicArray.Entry entry : queryResults.asList()) { - final TopDocs topDocs = entry.value.queryResult().queryResult().topDocs(); - if (topDocs != null && topDocs.scoreDocs.length > 0 // the shard had matches - && docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs - try { - DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId()); - sendReleaseSearchContext(entry.value.queryResult().id(), node); - } catch (Throwable t1) { - logger.trace("failed to release context", t1); - } - } - } - } - } - - protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) { - if (node != null) { - searchService.sendFreeContext(node, contextId, request); - } - } - - protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry entry, ScoreDoc[] lastEmittedDocPerShard) { - if (lastEmittedDocPerShard != null) { - ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; - return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc); - } else { - return new ShardFetchSearchRequest(request, queryResult.id(), entry.value); - } - } - - protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener); - - protected final void processFirstPhaseResult(int shardIndex, FirstResult result) { - firstResults.set(shardIndex, result); - - if (logger.isTraceEnabled()) { - logger.trace("got first-phase result from {}", result != null ? result.shardTarget() : null); - } - - // clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level - // so its ok concurrency wise to miss potentially the shard failures being created because of another failure - // in the #addShardFailure, because by definition, it will happen on *another* shardIndex - AtomicArray shardFailures = this.shardFailures; - if (shardFailures != null) { - shardFailures.set(shardIndex, null); - } - } - - final void innerMoveToSecondPhase() throws Exception { - if (logger.isTraceEnabled()) { - StringBuilder sb = new StringBuilder(); - boolean hadOne = false; - for (int i = 0; i < firstResults.length(); i++) { - FirstResult result = firstResults.get(i); - if (result == null) { - continue; // failure - } - if (hadOne) { - sb.append(","); - } else { - hadOne = true; - } - sb.append(result.shardTarget()); - } - - logger.trace("Moving to second phase, based on results from: {} (cluster state version: {})", sb, clusterState.version()); - } - moveToSecondPhase(); - } - - protected abstract void moveToSecondPhase() throws Exception; - - protected abstract String firstPhaseName(); - } -}