Make AbstractSearchAsyncAction more testable and add a basic test case (#20890)

`AbstractSearchAsyncAction` has only been tested in integration tests.
The infrastructure is rather critical and should be tested on a unit-test
level. This change takes the first step.
This commit is contained in:
Simon Willnauer 2016-10-13 16:07:31 +02:00 committed by GitHub
parent fdceb64072
commit 61fd1cd582
9 changed files with 368 additions and 124 deletions

View File

@ -26,8 +26,10 @@ abstract class AbstractAsyncAction {
private final long startTime;
protected AbstractAsyncAction() {
this.startTime = System.currentTimeMillis();
protected AbstractAsyncAction() { this(System.currentTimeMillis());}
protected AbstractAsyncAction(long startTime) {
this.startTime = startTime;
}
/**

View File

@ -27,15 +27,10 @@ import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
@ -45,12 +40,12 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import static org.elasticsearch.action.search.TransportSearchHelper.internalSearchRequest;
@ -58,73 +53,45 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
protected final Logger logger;
protected final SearchTransportService searchTransportService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
protected final SearchPhaseController searchPhaseController;
protected final ThreadPool threadPool;
private final Executor executor;
protected final ActionListener<SearchResponse> listener;
protected final GroupShardsIterator shardsIts;
private final GroupShardsIterator shardsIts;
protected final SearchRequest request;
protected final ClusterState clusterState;
protected final DiscoveryNodes nodes;
/** Used by subclasses to resolve node ids to DiscoveryNodes. **/
protected final Function<String, DiscoveryNode> nodeIdToDiscoveryNode;
protected final int expectedSuccessfulOps;
private final int expectedTotalOps;
protected final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger totalOps = new AtomicInteger();
protected final AtomicArray<FirstResult> firstResults;
private final Map<String, String[]> perIndexFilteringAliases;
private final long clusterStateVersion;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
private final Object shardFailuresMutex = new Object();
protected volatile ScoreDoc[] sortedShardDocs;
protected AbstractSearchAsyncAction(Logger logger, SearchTransportService searchTransportService, ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request,
ActionListener<SearchResponse> listener) {
protected AbstractSearchAsyncAction(Logger logger, SearchTransportService searchTransportService,
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Map<String, String[]> perIndexFilteringAliases, Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, long startTime,
long clusterStateVersion) {
super(startTime);
this.logger = logger;
this.searchTransportService = searchTransportService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.searchPhaseController = searchPhaseController;
this.threadPool = threadPool;
this.executor = executor;
this.request = request;
this.listener = listener;
this.clusterState = clusterService.state();
nodes = clusterState.nodes();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
// of just for the _search api
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request.indicesOptions(),
startTime(), request.indices());
for (String index : concreteIndices) {
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index);
}
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, request.routing(),
request.indices());
shardsIts = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, request.preference());
final int shardCount = shardsIts.size();
failIfOverShardCountLimit(clusterService, shardCount);
expectedSuccessfulOps = shardCount;
this.perIndexFilteringAliases = perIndexFilteringAliases;
this.nodeIdToDiscoveryNode = nodeIdToDiscoveryNode;
this.clusterStateVersion = clusterStateVersion;
this.shardsIts = shardsIts;
expectedSuccessfulOps = shardsIts.size();
// we need to add 1 for non active partition, since we count it in the total!
expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
firstResults = new AtomicArray<>(shardsIts.size());
}
private void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) {
final long shardCountLimit = clusterService.getClusterSettings().get(TransportSearchAction.SHARD_COUNT_LIMIT_SETTING);
if (shardCount > shardCountLimit) {
throw new IllegalArgumentException("Trying to query " + shardCount + " shards, which is over the limit of "
+ shardCountLimit + ". This limit exists because querying many shards at the same time can make the "
+ "job of the coordinating node very CPU and/or memory intensive. It is usually a better idea to "
+ "have a smaller number of larger shards. Update [" + TransportSearchAction.SHARD_COUNT_LIMIT_SETTING.getKey()
+ "] to a greater value if you really want to query that many shards at the same time.");
}
}
public void start() {
if (expectedSuccessfulOps == 0) {
@ -152,12 +119,11 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
// no more active shards... (we should not really get here, but just for safety)
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
final DiscoveryNode node = nodes.get(shard.currentNodeId());
final DiscoveryNode node = nodeIdToDiscoveryNode.apply(shard.currentNodeId());
if (node == null) {
onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
String[] filteringAliases = indexNameExpressionResolver.filteringAliases(clusterState,
shard.index().getName(), request.indices());
String[] filteringAliases = perIndexFilteringAliases.get(shard.index().getName());
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases,
startTime()), new ActionListener<FirstResult>() {
@Override
@ -319,7 +285,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
private void raiseEarlyFailure(Exception e) {
for (AtomicArray.Entry<FirstResult> entry : firstResults.asList()) {
try {
DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId());
DiscoveryNode node = nodeIdToDiscoveryNode.apply(entry.value.shardTarget().nodeId());
sendReleaseSearchContext(entry.value.id(), node);
} catch (Exception inner) {
inner.addSuppressed(e);
@ -344,7 +310,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
if (queryResult.hasHits()
&& docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs
try {
DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId());
DiscoveryNode node = nodeIdToDiscoveryNode.apply(entry.value.queryResult().shardTarget().nodeId());
sendReleaseSearchContext(entry.value.queryResult().id(), node);
} catch (Exception e) {
logger.trace("failed to release context", e);
@ -402,7 +368,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
sb.append(result.shardTarget());
}
logger.trace("Moving to second phase, based on results from: {} (cluster state version: {})", sb, clusterState.version());
logger.trace("Moving to second phase, based on results from: {} (cluster state version: {})", sb, clusterStateVersion);
}
moveToSecondPhase();
}
@ -410,4 +376,9 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
protected abstract void moveToSecondPhase() throws Exception;
protected abstract String firstPhaseName();
protected Executor getExecutor() {
return executor;
}
}

View File

@ -24,9 +24,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
@ -34,21 +33,25 @@ import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {
private final AtomicArray<QueryFetchSearchResult> queryFetchResults;
private final SearchPhaseController searchPhaseController;
SearchDfsQueryAndFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) {
super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool,
request, listener);
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Map<String, String[]> perIndexFilteringAliases, SearchPhaseController searchPhaseController,
Executor executor, SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, perIndexFilteringAliases, executor,
request, listener, shardsIts, startTime, clusterStateVersion);
this.searchPhaseController = searchPhaseController;
queryFetchResults = new AtomicArray<>(firstResults.length());
}
@ -70,7 +73,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
DiscoveryNode node = nodeIdToDiscoveryNode.apply(dfsResult.shardTarget().nodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
@ -115,7 +118,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
}
private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults);

View File

@ -26,9 +26,8 @@ import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.AggregatedDfs;
@ -39,23 +38,28 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {
final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<IntArrayList> docIdsToLoad;
private final SearchPhaseController searchPhaseController;
SearchDfsQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) {
super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool,
request, listener);
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Map<String, String[]> perIndexFilteringAliases, SearchPhaseController searchPhaseController,
Executor executor, SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, perIndexFilteringAliases, executor,
request, listener, shardsIts, startTime, clusterStateVersion);
this.searchPhaseController = searchPhaseController;
queryResults = new AtomicArray<>(firstResults.length());
fetchResults = new AtomicArray<>(firstResults.length());
docIdsToLoad = new AtomicArray<>(firstResults.length());
@ -78,7 +82,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
DiscoveryNode node = nodeIdToDiscoveryNode.apply(dfsResult.shardTarget().nodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
@ -149,7 +153,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
DiscoveryNode node = nodeIdToDiscoveryNode.apply(queryResult.shardTarget().nodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
@ -192,7 +196,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
}
private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final boolean isScrollRequest = request.scroll() != null;

View File

@ -22,24 +22,29 @@ package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function;
class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetchSearchResult> {
private final SearchPhaseController searchPhaseController;
SearchQueryAndFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) {
super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool,
request, listener);
Function<String, DiscoveryNode> nodeIdToDiscoveryNode, Map<String, String[]> perIndexFilteringAliases,
SearchPhaseController searchPhaseController, Executor executor,
SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, perIndexFilteringAliases, executor,
request, listener, shardsIts, startTime, clusterStateVersion);
this.searchPhaseController = searchPhaseController;
}
@Override
@ -55,7 +60,7 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetc
@Override
protected void moveToSecondPhase() throws Exception {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final boolean isScrollRequest = request.scroll() != null;

View File

@ -26,9 +26,8 @@ import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.FetchSearchResult;
@ -36,21 +35,27 @@ import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySearchResultProvider> {
final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<IntArrayList> docIdsToLoad;
private final SearchPhaseController searchPhaseController;
SearchQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) {
super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener);
SearchQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
Function<String, DiscoveryNode> nodeIdToDiscoveryNode, Map<String, String[]> perIndexFilteringAliases,
SearchPhaseController searchPhaseController, Executor executor,
SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, perIndexFilteringAliases, executor, request, listener,
shardsIts, startTime, clusterStateVersion);
this.searchPhaseController = searchPhaseController;
fetchResults = new AtomicArray<>(firstResults.length());
docIdsToLoad = new AtomicArray<>(firstResults.length());
}
@ -82,7 +87,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
DiscoveryNode node = nodeIdToDiscoveryNode.apply(queryResult.shardTarget().nodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
@ -125,7 +130,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
}
private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final boolean isScrollRequest = request.scroll() != null;

View File

@ -23,7 +23,11 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
@ -37,8 +41,12 @@ import org.elasticsearch.search.SearchService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Function;
import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH;
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
@ -67,14 +75,33 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
@Override
protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
// pure paranoia if time goes backwards we are at least positive
final long startTimeInMillis = Math.max(0, System.currentTimeMillis());
ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
// of just for the _search api
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, searchRequest.indicesOptions(),
startTimeInMillis, searchRequest.indices());
Map<String, String[]> filteringAliasLookup = new HashMap<>();
for (String index : concreteIndices) {
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index);
filteringAliasLookup.put(index, indexNameExpressionResolver.filteringAliases(clusterState,
index, searchRequest.indices()));
}
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
searchRequest.indices());
GroupShardsIterator shardIterators = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap,
searchRequest.preference());
failIfOverShardCountLimit(clusterService, shardIterators.size());
// optimize search type for cases where there is only one shard group to search on
try {
ClusterState clusterState = clusterService.state();
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, searchRequest);
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState,
searchRequest.routing(), searchRequest.indices());
int shardCount = clusterService.operationRouting().searchShardsCount(clusterState, concreteIndices, routingMap);
if (shardCount == 1) {
if (shardIterators.size() == 1) {
// if we only have one group, then we always want Q_A_F, no need for DFS, and no need to do THEN since we hit one shard
searchRequest.searchType(QUERY_AND_FETCH);
}
@ -95,27 +122,37 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
logger.debug("failed to optimize search type, continue as normal", e);
}
searchAsyncAction(searchRequest, listener).start();
searchAsyncAction(searchRequest, shardIterators, startTimeInMillis, clusterState, Collections.unmodifiableMap(filteringAliasLookup)
, listener).start();
}
private AbstractSearchAsyncAction searchAsyncAction(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
private AbstractSearchAsyncAction searchAsyncAction(SearchRequest searchRequest, GroupShardsIterator shardIterators, long startTime,
ClusterState state, Map<String, String[]> filteringAliasLookup,
ActionListener<SearchResponse> listener) {
final Function<String, DiscoveryNode> nodesLookup = state.nodes()::get;
final long clusterStateVersion = state.version();
Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
AbstractSearchAsyncAction searchAsyncAction;
switch(searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, nodesLookup,
filteringAliasLookup, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
clusterStateVersion);
break;
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, nodesLookup,
filteringAliasLookup, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
clusterStateVersion);
break;
case DFS_QUERY_AND_FETCH:
searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchTransportService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchTransportService, nodesLookup,
filteringAliasLookup, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
clusterStateVersion);
break;
case QUERY_AND_FETCH:
searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchTransportService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchTransportService, nodesLookup,
filteringAliasLookup, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
clusterStateVersion);
break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
@ -123,4 +160,15 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
return searchAsyncAction;
}
private void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) {
final long shardCountLimit = clusterService.getClusterSettings().get(SHARD_COUNT_LIMIT_SETTING);
if (shardCount > shardCountLimit) {
throw new IllegalArgumentException("Trying to query " + shardCount + " shards, which is over the limit of "
+ shardCountLimit + ". This limit exists because querying many shards at the same time can make the "
+ "job of the coordinating node very CPU and/or memory intensive. It is usually a better idea to "
+ "have a smaller number of larger shards. Update [" + SHARD_COUNT_LIMIT_SETTING.getKey()
+ "] to a greater value if you really want to query that many shards at the same time.");
}
}
}

View File

@ -68,11 +68,6 @@ public class OperationRouting extends AbstractComponent {
return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference);
}
public int searchShardsCount(ClusterState clusterState, String[] concreteIndices, @Nullable Map<String, Set<String>> routing) {
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
return shards.size();
}
public GroupShardsIterator searchShards(ClusterState clusterState, String[] concreteIndices, @Nullable Map<String, Set<String>> routing, @Nullable String preference) {
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
final Set<ShardIterator> set = new HashSet<>(shards.size());

View File

@ -0,0 +1,211 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class SearchAsyncActionTests extends ESTestCase {
public void testFanOutAndCollect() throws InterruptedException {
SearchRequest request = new SearchRequest();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<TestSearchResponse> response = new AtomicReference<>();
ActionListener<SearchResponse> responseListener = new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
response.set((TestSearchResponse) searchResponse);
}
@Override
public void onFailure(Exception e) {
logger.warn("test failed", e);
fail(e.getMessage());
}
};
DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode replicaNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
Map<DiscoveryNode, Set<Long>> nodeToContextMap = new HashMap<>();
AtomicInteger contextIdGenerator = new AtomicInteger(0);
GroupShardsIterator shardsIter = getShardsIter("idx", randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode);
AtomicInteger numFreedContext = new AtomicInteger();
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null) {
@Override
public void sendFreeContext(DiscoveryNode node, long contextId, SearchRequest request) {
numFreedContext.incrementAndGet();
assertTrue(nodeToContextMap.containsKey(node));
assertTrue(nodeToContextMap.get(node).remove(contextId));
}
};
Map<String, DiscoveryNode> lookup = new HashMap<>();
lookup.put(primaryNode.getId(), primaryNode);
AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction<TestSearchPhaseResult>(logger, transportService, lookup::get,
Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0) {
TestSearchResponse response = new TestSearchResponse();
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) {
assertTrue("shard: " + request.shardId() + " has been queried twice", response.queried.add(request.shardId()));
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(), node);
Set<Long> ids = nodeToContextMap.computeIfAbsent(node, (n) -> new HashSet<>());
ids.add(testSearchPhaseResult.id);
if (randomBoolean()) {
listener.onResponse(testSearchPhaseResult);
} else {
new Thread(() -> listener.onResponse(testSearchPhaseResult)).start();
}
}
@Override
protected void moveToSecondPhase() throws Exception {
for (int i = 0; i < firstResults.length(); i++) {
TestSearchPhaseResult result = firstResults.get(i);
assertEquals(result.node.getId(), result.shardTarget().getNodeId());
sendReleaseSearchContext(result.id(), result.node);
}
responseListener.onResponse(response);
latch.countDown();
}
@Override
protected String firstPhaseName() {
return "test";
}
@Override
protected Executor getExecutor() {
fail("no executor in this class");
return null;
}
};
asyncAction.start();
latch.await();
assertNotNull(response.get());
assertFalse(nodeToContextMap.isEmpty());
assertTrue(nodeToContextMap.containsKey(primaryNode));
assertEquals(shardsIter.size(), numFreedContext.get());
assertTrue(nodeToContextMap.get(primaryNode).toString(), nodeToContextMap.get(primaryNode).isEmpty());
}
private GroupShardsIterator getShardsIter(String index, int numShards, boolean doReplicas, DiscoveryNode primaryNode,
DiscoveryNode replicaNode) {
ArrayList<ShardIterator> list = new ArrayList<>();
for (int i = 0; i < numShards; i++) {
ArrayList<ShardRouting> started = new ArrayList<>();
ArrayList<ShardRouting> initializing = new ArrayList<>();
ArrayList<ShardRouting> unassigned = new ArrayList<>();
ShardRouting routing = ShardRouting.newUnassigned(new ShardId(new Index(index, "_na_"), i), true,
RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"));
routing = routing.initialize(primaryNode.getId(), i + "p", 0);
routing.started();
started.add(routing);
if (doReplicas) {
routing = ShardRouting.newUnassigned(new ShardId(new Index(index, "_na_"), i), false,
RecoverySource.PeerRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"));
if (replicaNode != null) {
routing = routing.initialize(replicaNode.getId(), i + "r", 0);
if (randomBoolean()) {
routing.started();
started.add(routing);
} else {
initializing.add(routing);
}
} else {
unassigned.add(routing); // unused yet
}
}
Collections.shuffle(started, random());
started.addAll(initializing);
list.add(new PlainShardIterator(new ShardId(new Index(index, "_na_"), i), started));
}
return new GroupShardsIterator(list);
}
public static class TestSearchResponse extends SearchResponse {
public final Set<ShardId> queried = new HashSet<>();
}
public static class TestSearchPhaseResult implements SearchPhaseResult {
final long id;
final DiscoveryNode node;
SearchShardTarget shardTarget;
public TestSearchPhaseResult(long id, DiscoveryNode node) {
this.id = id;
this.node = node;
}
@Override
public long id() {
return id;
}
@Override
public SearchShardTarget shardTarget() {
return this.shardTarget;
}
@Override
public void shardTarget(SearchShardTarget shardTarget) {
this.shardTarget = shardTarget;
}
@Override
public void readFrom(StreamInput in) throws IOException {
}
@Override
public void writeTo(StreamOutput out) throws IOException {
}
}
}