Limit the number of concurrent shard requests per search request (#25632)
This is a protection mechanism to prevent a single search request from hitting a large number of shards in the cluster concurrently. If a search is executed against all indices in the cluster this can easily overload the cluster causing rejections etc. which is not necessarily desirable. Instead this PR adds a per request limit of `max_concurrent_shard_requests` that throttles the number of concurrent initial phase requests to `256` by default. This limit can be increased per request and protects single search requests from overloading the cluster. Subsequent PRs can introduces addiontional improvemetns ie. limiting this on a `_msearch` level, making defaults a factor of the number of nodes or sort shards iters such that we gain the best concurrency across nodes.
This commit is contained in:
parent
bd7ddfa175
commit
98c91a3bd0
|
@ -50,6 +50,8 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
|
||||||
private final Logger logger;
|
private final Logger logger;
|
||||||
private final int expectedTotalOps;
|
private final int expectedTotalOps;
|
||||||
private final AtomicInteger totalOps = new AtomicInteger();
|
private final AtomicInteger totalOps = new AtomicInteger();
|
||||||
|
private final AtomicInteger shardExecutionIndex = new AtomicInteger(0);
|
||||||
|
private final int maxConcurrentShardRequests;
|
||||||
|
|
||||||
InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator<SearchShardIterator> shardsIts, Logger logger) {
|
InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator<SearchShardIterator> shardsIts, Logger logger) {
|
||||||
super(name);
|
super(name);
|
||||||
|
@ -61,6 +63,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
|
||||||
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
|
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
|
||||||
// we process hence we add one for the non active partition here.
|
// we process hence we add one for the non active partition here.
|
||||||
this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
|
this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
|
||||||
|
maxConcurrentShardRequests = Math.min(request.getMaxConcurrentShardRequests(), shardsIts.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId,
|
private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId,
|
||||||
|
@ -105,6 +108,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
|
||||||
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, inner);
|
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, inner);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
maybeExecuteNext(); // move to the next execution if needed
|
||||||
// no more shards active, add a failure
|
// no more shards active, add a failure
|
||||||
if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception
|
if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception
|
||||||
if (e != null && !TransportActions.isShardNotAvailableException(e)) {
|
if (e != null && !TransportActions.isShardNotAvailableException(e)) {
|
||||||
|
@ -124,23 +128,25 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void run() throws IOException {
|
public final void run() throws IOException {
|
||||||
int shardIndex = -1;
|
boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
|
||||||
for (final SearchShardIterator shardIt : shardsIts) {
|
assert success;
|
||||||
shardIndex++;
|
for (int i = 0; i < maxConcurrentShardRequests; i++) {
|
||||||
final ShardRouting shard = shardIt.nextOrNull();
|
SearchShardIterator shardRoutings = shardsIts.get(i);
|
||||||
if (shard != null) {
|
performPhaseOnShard(i, shardRoutings, shardRoutings.nextOrNull());
|
||||||
performPhaseOnShard(shardIndex, shardIt, shard);
|
|
||||||
} else {
|
|
||||||
// really, no shards active in this group
|
|
||||||
onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void maybeExecuteNext() {
|
||||||
|
final int index = shardExecutionIndex.getAndIncrement();
|
||||||
|
if (index < shardsIts.size()) {
|
||||||
|
SearchShardIterator shardRoutings = shardsIts.get(index);
|
||||||
|
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {
|
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {
|
||||||
if (shard == null) {
|
if (shard == null) {
|
||||||
// TODO upgrade this to an assert...
|
|
||||||
// no more active shards... (we should not really get here, but just for safety)
|
|
||||||
onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
|
onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
|
@ -166,6 +172,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onShardResult(FirstResult result, ShardIterator shardIt) {
|
private void onShardResult(FirstResult result, ShardIterator shardIt) {
|
||||||
|
maybeExecuteNext();
|
||||||
assert result.getShardIndex() != -1 : "shard index is not set";
|
assert result.getShardIndex() != -1 : "shard index is not set";
|
||||||
assert result.getSearchShardTarget() != null : "search shard target must not be null";
|
assert result.getSearchShardTarget() != null : "search shard target must not be null";
|
||||||
onShardSuccess(result);
|
onShardSuccess(result);
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.action.search;
|
package org.elasticsearch.action.search;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionRequest;
|
import org.elasticsearch.action.ActionRequest;
|
||||||
import org.elasticsearch.action.ActionRequestValidationException;
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
import org.elasticsearch.action.IndicesRequest;
|
import org.elasticsearch.action.IndicesRequest;
|
||||||
|
@ -74,6 +75,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
|
|
||||||
private int batchedReduceSize = 512;
|
private int batchedReduceSize = 512;
|
||||||
|
|
||||||
|
private int maxConcurrentShardRequests = 0;
|
||||||
|
|
||||||
private String[] types = Strings.EMPTY_ARRAY;
|
private String[] types = Strings.EMPTY_ARRAY;
|
||||||
|
|
||||||
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed();
|
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed();
|
||||||
|
@ -302,6 +305,34 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
return batchedReduceSize;
|
return batchedReduceSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to
|
||||||
|
* reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled
|
||||||
|
* with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most <tt>256</tt>.
|
||||||
|
*/
|
||||||
|
public int getMaxConcurrentShardRequests() {
|
||||||
|
return maxConcurrentShardRequests == 0 ? 256 : maxConcurrentShardRequests;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to
|
||||||
|
* reduce the number of shard requests fired per high level search request. Searches that hit the entire cluster can be throttled
|
||||||
|
* with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most <tt>256</tt>.
|
||||||
|
*/
|
||||||
|
public void setMaxConcurrentShardRequests(int maxConcurrentShardRequests) {
|
||||||
|
if (maxConcurrentShardRequests < 1) {
|
||||||
|
throw new IllegalArgumentException("maxConcurrentShardRequests must be >= 1");
|
||||||
|
}
|
||||||
|
this.maxConcurrentShardRequests = maxConcurrentShardRequests;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns <code>true</code> iff the maxConcurrentShardRequest is set.
|
||||||
|
*/
|
||||||
|
boolean isMaxConcurrentShardRequestsSet() {
|
||||||
|
return maxConcurrentShardRequests != 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return true if the request only has suggest
|
* @return true if the request only has suggest
|
||||||
*/
|
*/
|
||||||
|
@ -349,6 +380,9 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
indicesOptions = IndicesOptions.readIndicesOptions(in);
|
indicesOptions = IndicesOptions.readIndicesOptions(in);
|
||||||
requestCache = in.readOptionalBoolean();
|
requestCache = in.readOptionalBoolean();
|
||||||
batchedReduceSize = in.readVInt();
|
batchedReduceSize = in.readVInt();
|
||||||
|
if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
|
||||||
|
maxConcurrentShardRequests = in.readVInt();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -367,6 +401,9 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
indicesOptions.writeIndicesOptions(out);
|
indicesOptions.writeIndicesOptions(out);
|
||||||
out.writeOptionalBoolean(requestCache);
|
out.writeOptionalBoolean(requestCache);
|
||||||
out.writeVInt(batchedReduceSize);
|
out.writeVInt(batchedReduceSize);
|
||||||
|
if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
|
||||||
|
out.writeVInt(maxConcurrentShardRequests);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -386,13 +423,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
Objects.equals(requestCache, that.requestCache) &&
|
Objects.equals(requestCache, that.requestCache) &&
|
||||||
Objects.equals(scroll, that.scroll) &&
|
Objects.equals(scroll, that.scroll) &&
|
||||||
Arrays.equals(types, that.types) &&
|
Arrays.equals(types, that.types) &&
|
||||||
|
Objects.equals(batchedReduceSize, that.batchedReduceSize) &&
|
||||||
|
Objects.equals(maxConcurrentShardRequests, that.maxConcurrentShardRequests) &&
|
||||||
Objects.equals(indicesOptions, that.indicesOptions);
|
Objects.equals(indicesOptions, that.indicesOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
|
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
|
||||||
scroll, Arrays.hashCode(types), indicesOptions);
|
scroll, Arrays.hashCode(types), indicesOptions, maxConcurrentShardRequests);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -406,6 +445,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
", preference='" + preference + '\'' +
|
", preference='" + preference + '\'' +
|
||||||
", requestCache=" + requestCache +
|
", requestCache=" + requestCache +
|
||||||
", scroll=" + scroll +
|
", scroll=" + scroll +
|
||||||
|
", maxConcurrentShardRequests=" + maxConcurrentShardRequests +
|
||||||
|
", batchedReduceSize=" + batchedReduceSize +
|
||||||
", source=" + source + '}';
|
", source=" + source + '}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -525,4 +525,14 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
|
||||||
this.request.setBatchedReduceSize(batchedReduceSize);
|
this.request.setBatchedReduceSize(batchedReduceSize);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to
|
||||||
|
* reduce the number of shard requests fired per high level search request. Searches that hit the entire cluster can be throttled
|
||||||
|
* with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most <tt>256</tt>.
|
||||||
|
*/
|
||||||
|
public SearchRequestBuilder setMaxConcurrentShardRequests(int maxConcurrentShardRequests) {
|
||||||
|
this.request.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.HandledTransportAction;
|
import org.elasticsearch.action.support.HandledTransportAction;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
|
@ -184,7 +185,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
||||||
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
|
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
|
||||||
if (remoteClusterIndices.isEmpty()) {
|
if (remoteClusterIndices.isEmpty()) {
|
||||||
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(),
|
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(),
|
||||||
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener);
|
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, clusterState.getNodes()
|
||||||
|
.getDataNodes().size());
|
||||||
} else {
|
} else {
|
||||||
remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
|
remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
|
||||||
remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> {
|
remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> {
|
||||||
|
@ -192,8 +194,10 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
||||||
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
|
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
|
||||||
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
|
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
|
||||||
remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
|
remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
|
||||||
|
int numNodesInvovled = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum()
|
||||||
|
+ clusterState.getNodes().getDataNodes().size();
|
||||||
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices, remoteShardIterators,
|
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices, remoteShardIterators,
|
||||||
clusterNodeLookup, clusterState, remoteAliasFilters, listener);
|
clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvovled);
|
||||||
}, listener::onFailure));
|
}, listener::onFailure));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -247,7 +251,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
||||||
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
|
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
|
||||||
Map<String, OriginalIndices> remoteClusterIndices, List<SearchShardIterator> remoteShardIterators,
|
Map<String, OriginalIndices> remoteClusterIndices, List<SearchShardIterator> remoteShardIterators,
|
||||||
BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,
|
BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,
|
||||||
Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener) {
|
Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener, int nodeCount) {
|
||||||
|
|
||||||
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
|
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
|
||||||
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
|
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
|
||||||
|
@ -300,7 +304,15 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
||||||
}
|
}
|
||||||
return searchTransportService.getConnection(clusterName, discoveryNode);
|
return searchTransportService.getConnection(clusterName, discoveryNode);
|
||||||
};
|
};
|
||||||
|
if (searchRequest.isMaxConcurrentShardRequestsSet() == false) {
|
||||||
|
// we try to set a default of max concurrent shard requests based on
|
||||||
|
// the node count but upper-bound it by 256 by default to keep it sane. A single
|
||||||
|
// search request that fans out lots of shards should hit a cluster too hard while 256 is already a lot
|
||||||
|
// we multiply is by the default number of shards such that a single request in a cluster of 1 would hit all shards of a
|
||||||
|
// default index.
|
||||||
|
searchRequest.setMaxConcurrentShardRequests(Math.min(256, nodeCount
|
||||||
|
* IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getDefault(Settings.EMPTY)));
|
||||||
|
}
|
||||||
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
|
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
|
||||||
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener).start();
|
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener).start();
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,4 +78,8 @@ public final class GroupShardsIterator<ShardIt extends ShardIterator> implements
|
||||||
public Iterator<ShardIt> iterator() {
|
public Iterator<ShardIt> iterator() {
|
||||||
return iterators.iterator();
|
return iterators.iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ShardIt get(int index) {
|
||||||
|
return iterators.get(index);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,6 +99,14 @@ public class RestSearchAction extends BaseRestHandler {
|
||||||
final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());
|
final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());
|
||||||
searchRequest.setBatchedReduceSize(batchedReduceSize);
|
searchRequest.setBatchedReduceSize(batchedReduceSize);
|
||||||
|
|
||||||
|
if (request.hasParam("max_concurrent_shard_requests")) {
|
||||||
|
// only set if we have the parameter since we auto adjust the max concurrency on the coordinator
|
||||||
|
// based on the number of nodes in the cluster
|
||||||
|
final int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests",
|
||||||
|
searchRequest.getMaxConcurrentShardRequests());
|
||||||
|
searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
|
||||||
|
}
|
||||||
|
|
||||||
// do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types
|
// do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types
|
||||||
// from the REST layer. these modes are an internal optimization and should
|
// from the REST layer. these modes are an internal optimization and should
|
||||||
// not be specified explicitly by the user.
|
// not be specified explicitly by the user.
|
||||||
|
|
|
@ -48,14 +48,113 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
public class SearchAsyncActionTests extends ESTestCase {
|
public class SearchAsyncActionTests extends ESTestCase {
|
||||||
|
|
||||||
|
public void testLimitConcurrentShardRequests() throws InterruptedException {
|
||||||
|
SearchRequest request = new SearchRequest();
|
||||||
|
int numConcurrent = randomIntBetween(1, 5);
|
||||||
|
request.setMaxConcurrentShardRequests(numConcurrent);
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
AtomicReference<TestSearchResponse> response = new AtomicReference<>();
|
||||||
|
ActionListener<SearchResponse> responseListener = new ActionListener<SearchResponse>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(SearchResponse searchResponse) {
|
||||||
|
response.set((TestSearchResponse) searchResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
logger.warn("test failed", e);
|
||||||
|
fail(e.getMessage());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||||
|
DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||||
|
|
||||||
|
AtomicInteger contextIdGenerator = new AtomicInteger(0);
|
||||||
|
GroupShardsIterator<SearchShardIterator> shardsIter = getShardsIter("idx",
|
||||||
|
new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()),
|
||||||
|
10, randomBoolean(), primaryNode, replicaNode);
|
||||||
|
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null);
|
||||||
|
Map<String, Transport.Connection> lookup = new HashMap<>();
|
||||||
|
Map<ShardId, Boolean> seenShard = new ConcurrentHashMap<>();
|
||||||
|
lookup.put(primaryNode.getId(), new MockConnection(primaryNode));
|
||||||
|
lookup.put(replicaNode.getId(), new MockConnection(replicaNode));
|
||||||
|
Map<String, AliasFilter> aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY));
|
||||||
|
CountDownLatch awaitInitialRequests = new CountDownLatch(1);
|
||||||
|
AtomicInteger numRequests = new AtomicInteger(0);
|
||||||
|
AtomicInteger numResponses = new AtomicInteger(0);
|
||||||
|
AbstractSearchAsyncAction asyncAction =
|
||||||
|
new AbstractSearchAsyncAction<TestSearchPhaseResult>(
|
||||||
|
"test",
|
||||||
|
logger,
|
||||||
|
transportService,
|
||||||
|
(cluster, node) -> {
|
||||||
|
assert cluster == null : "cluster was not null: " + cluster;
|
||||||
|
return lookup.get(node); },
|
||||||
|
aliasFilters,
|
||||||
|
Collections.emptyMap(),
|
||||||
|
null,
|
||||||
|
request,
|
||||||
|
responseListener,
|
||||||
|
shardsIter,
|
||||||
|
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
|
||||||
|
0,
|
||||||
|
null,
|
||||||
|
new InitialSearchPhase.SearchPhaseResults<>(shardsIter.size())) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
|
||||||
|
SearchActionListener<TestSearchPhaseResult> listener) {
|
||||||
|
seenShard.computeIfAbsent(shard.shardId(), (i) -> {
|
||||||
|
numRequests.incrementAndGet(); // only count this once per replica
|
||||||
|
return Boolean.TRUE;
|
||||||
|
});
|
||||||
|
|
||||||
|
new Thread(() -> {
|
||||||
|
try {
|
||||||
|
awaitInitialRequests.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new AssertionError(e);
|
||||||
|
}
|
||||||
|
Transport.Connection connection = getConnection(null, shard.currentNodeId());
|
||||||
|
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
|
||||||
|
connection.getNode());
|
||||||
|
if (numResponses.getAndIncrement() > 0 && randomBoolean()) { // at least one response otherwise the entire
|
||||||
|
// request fails
|
||||||
|
listener.onFailure(new RuntimeException());
|
||||||
|
} else {
|
||||||
|
listener.onResponse(testSearchPhaseResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
}).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
|
||||||
|
return new SearchPhase("test") {
|
||||||
|
@Override
|
||||||
|
public void run() throws IOException {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
asyncAction.start();
|
||||||
|
assertEquals(numConcurrent, numRequests.get());
|
||||||
|
awaitInitialRequests.countDown();
|
||||||
|
latch.await();
|
||||||
|
assertEquals(10, numRequests.get());
|
||||||
|
}
|
||||||
|
|
||||||
public void testFanOutAndCollect() throws InterruptedException {
|
public void testFanOutAndCollect() throws InterruptedException {
|
||||||
SearchRequest request = new SearchRequest();
|
SearchRequest request = new SearchRequest();
|
||||||
|
request.setMaxConcurrentShardRequests(randomIntBetween(1, 100));
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
AtomicReference<TestSearchResponse> response = new AtomicReference<>();
|
AtomicReference<TestSearchResponse> response = new AtomicReference<>();
|
||||||
ActionListener<SearchResponse> responseListener = new ActionListener<SearchResponse>() {
|
ActionListener<SearchResponse> responseListener = new ActionListener<SearchResponse>() {
|
||||||
|
|
|
@ -67,3 +67,10 @@ CPU and memory wise. It is usually a better idea to organize data in such a way
|
||||||
that there are fewer larger shards. In case you would like to configure a soft
|
that there are fewer larger shards. In case you would like to configure a soft
|
||||||
limit, you can update the `action.search.shard_count.limit` cluster setting in order
|
limit, you can update the `action.search.shard_count.limit` cluster setting in order
|
||||||
to reject search requests that hit too many shards.
|
to reject search requests that hit too many shards.
|
||||||
|
|
||||||
|
The search's `max_concurrent_shard_requests` request parameter can be used to control
|
||||||
|
the maximum number of concurrent shard requests the search API will execute for this request.
|
||||||
|
This parameter should be used to protect a singe request from overloading a cluster ie. a default
|
||||||
|
request will hit all indices in a cluster which could cause shard request rejections if the
|
||||||
|
number of shards per node is high. This default is based on the number of data nodes in
|
||||||
|
the cluster but at most `256`.
|
|
@ -163,6 +163,11 @@
|
||||||
"type" : "number",
|
"type" : "number",
|
||||||
"description" : "The number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.",
|
"description" : "The number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.",
|
||||||
"default" : 512
|
"default" : 512
|
||||||
|
},
|
||||||
|
"max_concurrent_shard_requests" : {
|
||||||
|
"type" : "number",
|
||||||
|
"description" : "The number of concurrent shard requests this search executes concurrently. This value should be used to limit the impact of the search on the cluster in order to limit the number of concurrent shard requests",
|
||||||
|
"default" : "The default grows with the number of nodes in the cluster but is at most 256."
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -37,6 +37,7 @@ public class RandomizingClient extends FilterClient {
|
||||||
private final SearchType defaultSearchType;
|
private final SearchType defaultSearchType;
|
||||||
private final String defaultPreference;
|
private final String defaultPreference;
|
||||||
private final int batchedReduceSize;
|
private final int batchedReduceSize;
|
||||||
|
private final int maxConcurrentShardRequests;
|
||||||
|
|
||||||
|
|
||||||
public RandomizingClient(Client client, Random random) {
|
public RandomizingClient(Client client, Random random) {
|
||||||
|
@ -55,13 +56,21 @@ public class RandomizingClient extends FilterClient {
|
||||||
defaultPreference = null;
|
defaultPreference = null;
|
||||||
}
|
}
|
||||||
this.batchedReduceSize = 2 + random.nextInt(10);
|
this.batchedReduceSize = 2 + random.nextInt(10);
|
||||||
|
if (random.nextBoolean()) {
|
||||||
|
this.maxConcurrentShardRequests = 1 + random.nextInt(1 << random.nextInt(8));
|
||||||
|
} else {
|
||||||
|
this.maxConcurrentShardRequests = -1; // randomly use the default
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SearchRequestBuilder prepareSearch(String... indices) {
|
public SearchRequestBuilder prepareSearch(String... indices) {
|
||||||
return in.prepareSearch(indices).setSearchType(defaultSearchType).setPreference(defaultPreference)
|
SearchRequestBuilder searchRequestBuilder = in.prepareSearch(indices).setSearchType(defaultSearchType)
|
||||||
.setBatchedReduceSize(batchedReduceSize);
|
.setPreference(defaultPreference).setBatchedReduceSize(batchedReduceSize);
|
||||||
|
if (maxConcurrentShardRequests != -1) {
|
||||||
|
searchRequestBuilder.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
|
||||||
|
}
|
||||||
|
return searchRequestBuilder;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue