Limit the number of concurrent requests per node (#31206)

With `max_concurrent_shard_requests` we used to throttle / limit
the number of concurrent shard requests a high level search request
can execute per node. This had several problems since it limited the
number on a global level based on the number of nodes. This change
now throttles the number of concurrent requests per node while still
allowing concurrency across multiple nodes.

Closes #31192
This commit is contained in:
Simon Willnauer 2018-06-11 08:49:18 +02:00 committed by GitHub
parent 85c26d682a
commit f825a530b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 145 additions and 85 deletions

View File

@ -84,3 +84,9 @@ for a particular index with the index setting `index.max_regex_length`.
Search requests with extra content after the main object will no longer be accepted
by the `_search` endpoint. A parsing exception will be thrown instead.
==== Semantics changed for `max_concurrent_shard_requests`
`max_concurrent_shard_requests` used to limit the total number of concurrent shard
requests a single high level search request can execute. In 7.0 this changed to be the
max number of concurrent shard requests per node. The default is now `5`.

View File

@ -175,8 +175,8 @@
},
"max_concurrent_shard_requests" : {
"type" : "number",
"description" : "The number of concurrent shard requests this search executes concurrently. This value should be used to limit the impact of the search on the cluster in order to limit the number of concurrent shard requests",
"default" : "The default grows with the number of nodes in the cluster but is at most 256."
"description" : "The number of concurrent shard requests per node this search executes concurrently. This value should be used to limit the impact of the search on the cluster in order to limit the number of concurrent shard requests",
"default" : "The default is 5."
},
"pre_filter_shard_size" : {
"type" : "number",

View File

@ -79,9 +79,9 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentShardRequests,
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters) {
super(name, request, shardsIts, logger, maxConcurrentShardRequests, executor);
super(name, request, shardsIts, logger, maxConcurrentRequestsPerNode, executor);
this.timeProvider = timeProvider;
this.logger = logger;
this.searchTransportService = searchTransportService;

View File

@ -131,9 +131,7 @@ final class ExpandSearchPhase extends SearchPhase {
if (orig.allowPartialSearchResults() != null){
groupRequest.allowPartialSearchResults(orig.allowPartialSearchResults());
}
if (orig.isMaxConcurrentShardRequestsSet()) {
groupRequest.setMaxConcurrentShardRequests(orig.getMaxConcurrentShardRequests());
}
groupRequest.setMaxConcurrentShardRequests(orig.getMaxConcurrentShardRequests());
return groupRequest;
}

View File

@ -30,9 +30,11 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
@ -52,12 +54,13 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
private final Logger logger;
private final int expectedTotalOps;
private final AtomicInteger totalOps = new AtomicInteger();
private final AtomicInteger shardExecutionIndex = new AtomicInteger(0);
private final int maxConcurrentShardRequests;
private final int maxConcurrentRequestsPerNode;
private final Executor executor;
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;
InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator<SearchShardIterator> shardsIts, Logger logger,
int maxConcurrentShardRequests, Executor executor) {
int maxConcurrentRequestsPerNode, Executor executor) {
super(name);
this.request = request;
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
@ -77,7 +80,9 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
// we process hence we add one for the non active partition here.
this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
this.maxConcurrentShardRequests = Math.min(maxConcurrentShardRequests, shardsIts.size());
this.maxConcurrentRequestsPerNode = Math.min(maxConcurrentRequestsPerNode, shardsIts.size());
// in the case were we have less shards than maxConcurrentRequestsPerNode we don't need to throttle
this.throttleConcurrentRequests = maxConcurrentRequestsPerNode < shardsIts.size();
this.executor = executor;
}
@ -109,7 +114,6 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
if (!lastShard) {
performPhaseOnShard(shardIndex, shardIt, nextShard);
} else {
maybeExecuteNext(); // move to the next execution if needed
// no more shards active, add a failure
if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception
if (e != null && !TransportActions.isShardNotAvailableException(e)) {
@ -123,15 +127,12 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
}
@Override
public final void run() throws IOException {
public final void run() {
for (final SearchShardIterator iterator : toSkipShardsIts) {
assert iterator.skip();
skipShard(iterator);
}
if (shardsIts.size() > 0) {
int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
assert success;
assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.allowPartialSearchResults() == false) {
final StringBuilder missingShards = new StringBuilder();
@ -152,7 +153,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY);
}
}
for (int index = 0; index < maxConcurrentShardRequests; index++) {
for (int index = 0; index < shardsIts.size(); index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
assert shardRoutings.skip() == false;
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
@ -160,14 +161,6 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
}
}
private void maybeExecuteNext() {
final int index = shardExecutionIndex.getAndIncrement();
if (index < shardsIts.size()) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}
}
private void maybeFork(final Thread thread, final Runnable runnable) {
if (thread == Thread.currentThread()) {
@ -197,6 +190,59 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
});
}
private static final class PendingExecutions {
private final int permits;
private int permitsTaken = 0;
private ArrayDeque<Runnable> queue = new ArrayDeque<>();
PendingExecutions(int permits) {
assert permits > 0 : "not enough permits: " + permits;
this.permits = permits;
}
void finishAndRunNext() {
synchronized (this) {
permitsTaken--;
assert permitsTaken >= 0 : "illegal taken permits: " + permitsTaken;
}
tryRun(null);
}
void tryRun(Runnable runnable) {
Runnable r = tryQueue(runnable);
if (r != null) {
r.run();
}
}
private synchronized Runnable tryQueue(Runnable runnable) {
Runnable toExecute = null;
if (permitsTaken < permits) {
permitsTaken++;
toExecute = runnable;
if (toExecute == null) { // only poll if we don't have anything to execute
toExecute = queue.poll();
}
if (toExecute == null) {
permitsTaken--;
}
} else if (runnable != null) {
queue.add(runnable);
}
return toExecute;
}
}
private void executeNext(PendingExecutions pendingExecutions, Thread originalThread) {
if (pendingExecutions != null) {
assert throttleConcurrentRequests;
maybeFork(originalThread, pendingExecutions::finishAndRunNext);
} else {
assert throttleConcurrentRequests == false;
}
}
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {
/*
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
@ -205,29 +251,54 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
* could stack overflow. To prevent this, we fork if we are called back on the same thread that execution started on and otherwise
* we can continue (cf. InitialSearchPhase#maybeFork).
*/
final Thread thread = Thread.currentThread();
if (shard == null) {
fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
} else {
try {
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
@Override
public void innerOnResponse(FirstResult result) {
maybeFork(thread, () -> onShardResult(result, shardIt));
}
final PendingExecutions pendingExecutions = throttleConcurrentRequests ?
pendingExecutionsPerNode.computeIfAbsent(shard.currentNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
: null;
Runnable r = () -> {
final Thread thread = Thread.currentThread();
try {
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
@Override
public void innerOnResponse(FirstResult result) {
try {
onShardResult(result, shardIt);
} finally {
executeNext(pendingExecutions, thread);
}
}
@Override
public void onFailure(Exception t) {
maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t));
@Override
public void onFailure(Exception t) {
try {
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);
} finally {
executeNext(pendingExecutions, thread);
}
}
});
} catch (final Exception e) {
try {
/*
* It is possible to run into connection exceptions here because we are getting the connection early and might
* run in tonodes that are not connected. In this case, on shard failure will move us to the next shard copy.
*/
fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));
} finally {
executeNext(pendingExecutions, thread);
}
});
} catch (final Exception e) {
/*
* It is possible to run into connection exceptions here because we are getting the connection early and might run in to
* nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
*/
fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));
}
};
if (pendingExecutions == null) {
r.run();
} else {
assert throttleConcurrentRequests;
pendingExecutions.tryRun(r);
}
}
}
@ -257,8 +328,6 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
} else if (xTotalOps > expectedTotalOps) {
throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected ["
+ expectedTotalOps + "]");
} else if (shardsIt.skip() == false) {
maybeExecuteNext();
}
}
@ -376,5 +445,4 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
assert iterator.skip();
successfulShardExecution(iterator);
}
}

View File

@ -75,8 +75,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
private Boolean requestCache;
private Boolean allowPartialSearchResults;
private Scroll scroll;
private int batchedReduceSize = 512;
@ -140,7 +140,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
}
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
allowPartialSearchResults = in.readOptionalBoolean();
}
}
}
@Override
@ -165,7 +165,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
}
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
out.writeOptionalBoolean(allowPartialSearchResults);
}
}
}
@Override
@ -362,7 +362,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public Boolean requestCache() {
return this.requestCache;
}
/**
* Sets if this request should allow partial results. (If method is not called,
* will default to the cluster level setting).
@ -374,8 +374,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public Boolean allowPartialSearchResults() {
return this.allowPartialSearchResults;
}
}
/**
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
@ -397,18 +397,18 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
}
/**
* Returns the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to
* reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled
* with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most {@code 256}.
* Returns the number of shard requests that should be executed concurrently on a single node. This value should be used as a
* protection mechanism to reduce the number of shard requests fired per high level search request. Searches that hit the entire
* cluster can be throttled with this number to reduce the cluster load. The default is {@code 5}
*/
public int getMaxConcurrentShardRequests() {
return maxConcurrentShardRequests == 0 ? 256 : maxConcurrentShardRequests;
return maxConcurrentShardRequests == 0 ? 5 : maxConcurrentShardRequests;
}
/**
* Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to
* reduce the number of shard requests fired per high level search request. Searches that hit the entire cluster can be throttled
* with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most {@code 256}.
* Sets the number of shard requests that should be executed concurrently on a single node. This value should be used as a
* protection mechanism to reduce the number of shard requests fired per high level search request. Searches that hit the entire
* cluster can be throttled with this number to reduce the cluster load. The default is {@code 5}
*/
public void setMaxConcurrentShardRequests(int maxConcurrentShardRequests) {
if (maxConcurrentShardRequests < 1) {
@ -510,7 +510,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
@Override
public int hashCode() {
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
allowPartialSearchResults);
}

View File

@ -500,7 +500,7 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
request.requestCache(requestCache);
return this;
}
/**
* Sets if this request should allow partial results. (If method is not called,
@ -509,7 +509,7 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
public SearchRequestBuilder setAllowPartialSearchResults(boolean allowPartialSearchResults) {
request.allowPartialSearchResults(allowPartialSearchResults);
return this;
}
}
/**
* Should the query be profiled. Defaults to <code>false</code>
@ -549,9 +549,9 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
}
/**
* Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to
* reduce the number of shard requests fired per high level search request. Searches that hit the entire cluster can be throttled
* with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most {@code 256}.
* Sets the number of shard requests that should be executed concurrently on a single node. This value should be used as a
* protection mechanism to reduce the number of shard requests fired per high level search request. Searches that hit the entire
* cluster can be throttled with this number to reduce the cluster load. The default is {@code 5}.
*/
public SearchRequestBuilder setMaxConcurrentShardRequests(int maxConcurrentShardRequests) {
this.request.setMaxConcurrentShardRequests(maxConcurrentShardRequests);

View File

@ -27,7 +27,6 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -191,8 +190,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
if (remoteClusterIndices.isEmpty()) {
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(),
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, clusterState.getNodes()
.getDataNodes().size(), SearchResponse.Clusters.EMPTY);
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY);
} else {
remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(),
searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> {
@ -200,11 +198,9 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
int numNodesInvolved = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum()
+ clusterState.getNodes().getDataNodes().size();
SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses);
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices,
remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvolved,
remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,
clusters);
}, listener::onFailure));
}
@ -280,7 +276,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
Map<String, OriginalIndices> remoteClusterIndices, List<SearchShardIterator> remoteShardIterators,
BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,
Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener, int nodeCount,
Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener,
SearchResponse.Clusters clusters) {
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
@ -340,15 +336,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
return searchTransportService.getConnection(clusterName, discoveryNode);
};
if (searchRequest.isMaxConcurrentShardRequestsSet() == false) {
/*
* We try to set a default of max concurrent shard requests based on the node count but upper-bound it by 256 by default to keep
* it sane. A single search request that fans out to lots of shards should not hit a cluster too hard while 256 is already a
* lot.
*/
// we use nodeCount * 5 as we used to default this to the default number of shard which used to be 5.
searchRequest.setMaxConcurrentShardRequests(Math.min(256, nodeCount * 5));
}
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();

View File

@ -174,7 +174,8 @@ public class SearchAsyncActionTests extends ESTestCase {
}
};
DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
// for the sake of this test we place the replica on the same node. ie. this is not a mistake since we limit per node now
DiscoveryNode replicaNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
AtomicInteger contextIdGenerator = new AtomicInteger(0);
GroupShardsIterator<SearchShardIterator> shardsIter = getShardsIter("idx",
@ -242,7 +243,7 @@ public class SearchAsyncActionTests extends ESTestCase {
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
return new SearchPhase("test") {
@Override
public void run() throws IOException {
public void run() {
latch.countDown();
}
};