diff --git a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index de58b190642..16362365250 100644 --- a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -50,6 +50,8 @@ abstract class InitialSearchPhase extends private final Logger logger; private final int expectedTotalOps; private final AtomicInteger totalOps = new AtomicInteger(); + private final AtomicInteger shardExecutionIndex = new AtomicInteger(0); + private final int maxConcurrentShardRequests; InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger) { super(name); @@ -61,6 +63,7 @@ abstract class InitialSearchPhase extends // on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result // we process hence we add one for the non active partition here. this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); + maxConcurrentShardRequests = Math.min(request.getMaxConcurrentShardRequests(), shardsIts.size()); } private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, @@ -105,6 +108,7 @@ abstract class InitialSearchPhase extends onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, inner); } } else { + maybeExecuteNext(); // move to the next execution if needed // no more shards active, add a failure if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception if (e != null && !TransportActions.isShardNotAvailableException(e)) { @@ -124,23 +128,25 @@ abstract class InitialSearchPhase extends @Override public final void run() throws IOException { - int shardIndex = -1; - for (final SearchShardIterator shardIt : shardsIts) { - shardIndex++; - final ShardRouting shard = shardIt.nextOrNull(); - if (shard != null) { - performPhaseOnShard(shardIndex, shardIt, shard); - } else { - // really, no shards active in this group - onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); - } + boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests); + assert success; + for (int i = 0; i < maxConcurrentShardRequests; i++) { + SearchShardIterator shardRoutings = shardsIts.get(i); + performPhaseOnShard(i, shardRoutings, shardRoutings.nextOrNull()); } } + private void maybeExecuteNext() { + final int index = shardExecutionIndex.getAndIncrement(); + if (index < shardsIts.size()) { + SearchShardIterator shardRoutings = shardsIts.get(index); + performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); + } + } + + private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) { if (shard == null) { - // TODO upgrade this to an assert... - // no more active shards... (we should not really get here, but just for safety) onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); } else { try { @@ -166,6 +172,7 @@ abstract class InitialSearchPhase extends } private void onShardResult(FirstResult result, ShardIterator shardIt) { + maybeExecuteNext(); assert result.getShardIndex() != -1 : "shard index is not set"; assert result.getSearchShardTarget() != null : "search shard target must not be null"; onShardSuccess(result); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 01a3e94620a..02cde220b35 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; @@ -74,6 +75,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private int batchedReduceSize = 512; + private int maxConcurrentShardRequests = 0; + private String[] types = Strings.EMPTY_ARRAY; public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed(); @@ -302,6 +305,34 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest return batchedReduceSize; } + /** + * Returns the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to + * reduce the number of shard reqeusts fired per high level search request. Searches that hit the entire cluster can be throttled + * with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most 256. + */ + public int getMaxConcurrentShardRequests() { + return maxConcurrentShardRequests == 0 ? 256 : maxConcurrentShardRequests; + } + + /** + * Sets the number of shard requests that should be executed concurrently. This value should be used as a protection mechanism to + * reduce the number of shard requests fired per high level search request. Searches that hit the entire cluster can be throttled + * with this number to reduce the cluster load. The default grows with the number of nodes in the cluster but is at most 256. + */ + public void setMaxConcurrentShardRequests(int maxConcurrentShardRequests) { + if (maxConcurrentShardRequests < 1) { + throw new IllegalArgumentException("maxConcurrentShardRequests must be >= 1"); + } + this.maxConcurrentShardRequests = maxConcurrentShardRequests; + } + + /** + * Returns true iff the maxConcurrentShardRequest is set. + */ + boolean isMaxConcurrentShardRequestsSet() { + return maxConcurrentShardRequests != 0; + } + /** * @return true if the request only has suggest */ @@ -349,6 +380,9 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest indicesOptions = IndicesOptions.readIndicesOptions(in); requestCache = in.readOptionalBoolean(); batchedReduceSize = in.readVInt(); + if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { + maxConcurrentShardRequests = in.readVInt(); + } } @Override @@ -367,6 +401,9 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest indicesOptions.writeIndicesOptions(out); out.writeOptionalBoolean(requestCache); out.writeVInt(batchedReduceSize); + if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { + out.writeVInt(maxConcurrentShardRequests); + } } @Override @@ -386,13 +423,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest Objects.equals(requestCache, that.requestCache) && Objects.equals(scroll, that.scroll) && Arrays.equals(types, that.types) && + Objects.equals(batchedReduceSize, that.batchedReduceSize) && + Objects.equals(maxConcurrentShardRequests, that.maxConcurrentShardRequests) && Objects.equals(indicesOptions, that.indicesOptions); } @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, - scroll, Arrays.hashCode(types), indicesOptions); + scroll, Arrays.hashCode(types), indicesOptions, maxConcurrentShardRequests); } @Override @@ -406,6 +445,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest ", preference='" + preference + '\'' + ", requestCache=" + requestCache + ", scroll=" + scroll + + ", maxConcurrentShardRequests=" + maxConcurrentShardRequests + + ", batchedReduceSize=" + batchedReduceSize + ", source=" + source + '}'; } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 8b9879b2fa2..49e25f67493 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -525,4 +525,14 @@ public class SearchRequestBuilder extends ActionRequestBuilder256. + */ + public SearchRequestBuilder setMaxConcurrentShardRequests(int maxConcurrentShardRequests) { + this.request.setMaxConcurrentShardRequests(maxConcurrentShardRequests); + return this; + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index e765c0443f5..2c3900e9951 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -184,7 +185,8 @@ public class TransportSearchAction extends HandledTransportAction null, clusterState, Collections.emptyMap(), listener); + (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, clusterState.getNodes() + .getDataNodes().size()); } else { remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> { @@ -192,8 +194,10 @@ public class TransportSearchAction extends HandledTransportAction remoteAliasFilters = new HashMap<>(); BiFunction clusterNodeLookup = processRemoteShards(searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); + int numNodesInvovled = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum() + + clusterState.getNodes().getDataNodes().size(); executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices, remoteShardIterators, - clusterNodeLookup, clusterState, remoteAliasFilters, listener); + clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvovled); }, listener::onFailure)); } } @@ -247,7 +251,7 @@ public class TransportSearchAction extends HandledTransportAction remoteClusterIndices, List remoteShardIterators, BiFunction remoteConnections, ClusterState clusterState, - Map remoteAliasMap, ActionListener listener) { + Map remoteAliasMap, ActionListener listener, int nodeCount) { clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name @@ -300,7 +304,15 @@ public class TransportSearchAction extends HandledTransportAction implements public Iterator iterator() { return iterators.iterator(); } + + public ShardIt get(int index) { + return iterators.get(index); + } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 3f04603c2da..b871446ba88 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -99,6 +99,14 @@ public class RestSearchAction extends BaseRestHandler { final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize()); searchRequest.setBatchedReduceSize(batchedReduceSize); + if (request.hasParam("max_concurrent_shard_requests")) { + // only set if we have the parameter since we auto adjust the max concurrency on the coordinator + // based on the number of nodes in the cluster + final int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests", + searchRequest.getMaxConcurrentShardRequests()); + searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests); + } + // do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types // from the REST layer. these modes are an internal optimization and should // not be specified explicitly by the user. diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 878cb7e6126..b0bf4cc6626 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -48,14 +48,113 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class SearchAsyncActionTests extends ESTestCase { + public void testLimitConcurrentShardRequests() throws InterruptedException { + SearchRequest request = new SearchRequest(); + int numConcurrent = randomIntBetween(1, 5); + request.setMaxConcurrentShardRequests(numConcurrent); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference response = new AtomicReference<>(); + ActionListener responseListener = new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + response.set((TestSearchResponse) searchResponse); + } + + @Override + public void onFailure(Exception e) { + logger.warn("test failed", e); + fail(e.getMessage()); + } + }; + DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + + AtomicInteger contextIdGenerator = new AtomicInteger(0); + GroupShardsIterator shardsIter = getShardsIter("idx", + new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()), + 10, randomBoolean(), primaryNode, replicaNode); + SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null); + Map lookup = new HashMap<>(); + Map seenShard = new ConcurrentHashMap<>(); + lookup.put(primaryNode.getId(), new MockConnection(primaryNode)); + lookup.put(replicaNode.getId(), new MockConnection(replicaNode)); + Map aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)); + CountDownLatch awaitInitialRequests = new CountDownLatch(1); + AtomicInteger numRequests = new AtomicInteger(0); + AtomicInteger numResponses = new AtomicInteger(0); + AbstractSearchAsyncAction asyncAction = + new AbstractSearchAsyncAction( + "test", + logger, + transportService, + (cluster, node) -> { + assert cluster == null : "cluster was not null: " + cluster; + return lookup.get(node); }, + aliasFilters, + Collections.emptyMap(), + null, + request, + responseListener, + shardsIter, + new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), + 0, + null, + new InitialSearchPhase.SearchPhaseResults<>(shardsIter.size())) { + + @Override + protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, + SearchActionListener listener) { + seenShard.computeIfAbsent(shard.shardId(), (i) -> { + numRequests.incrementAndGet(); // only count this once per replica + return Boolean.TRUE; + }); + + new Thread(() -> { + try { + awaitInitialRequests.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + Transport.Connection connection = getConnection(null, shard.currentNodeId()); + TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(), + connection.getNode()); + if (numResponses.getAndIncrement() > 0 && randomBoolean()) { // at least one response otherwise the entire + // request fails + listener.onFailure(new RuntimeException()); + } else { + listener.onResponse(testSearchPhaseResult); + } + + }).start(); + } + + @Override + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { + return new SearchPhase("test") { + @Override + public void run() throws IOException { + latch.countDown(); + } + }; + } + }; + asyncAction.start(); + assertEquals(numConcurrent, numRequests.get()); + awaitInitialRequests.countDown(); + latch.await(); + assertEquals(10, numRequests.get()); + } + public void testFanOutAndCollect() throws InterruptedException { SearchRequest request = new SearchRequest(); + request.setMaxConcurrentShardRequests(randomIntBetween(1, 100)); CountDownLatch latch = new CountDownLatch(1); AtomicReference response = new AtomicReference<>(); ActionListener responseListener = new ActionListener() { diff --git a/docs/reference/search/search.asciidoc b/docs/reference/search/search.asciidoc index 41ba6e5c87a..816cf54c529 100644 --- a/docs/reference/search/search.asciidoc +++ b/docs/reference/search/search.asciidoc @@ -67,3 +67,10 @@ CPU and memory wise. It is usually a better idea to organize data in such a way that there are fewer larger shards. In case you would like to configure a soft limit, you can update the `action.search.shard_count.limit` cluster setting in order to reject search requests that hit too many shards. + +The search's `max_concurrent_shard_requests` request parameter can be used to control +the maximum number of concurrent shard requests the search API will execute for this request. +This parameter should be used to protect a singe request from overloading a cluster ie. a default +request will hit all indices in a cluster which could cause shard request rejections if the +number of shards per node is high. This default is based on the number of data nodes in +the cluster but at most `256`. \ No newline at end of file diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json index 5b4b9b681d5..92086d26a27 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json @@ -163,6 +163,11 @@ "type" : "number", "description" : "The number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.", "default" : 512 + }, + "max_concurrent_shard_requests" : { + "type" : "number", + "description" : "The number of concurrent shard requests this search executes concurrently. This value should be used to limit the impact of the search on the cluster in order to limit the number of concurrent shard requests", + "default" : "The default grows with the number of nodes in the cluster but is at most 256." } } }, diff --git a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java index a7d9a72e6b7..7076a80f29b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java +++ b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java @@ -37,6 +37,7 @@ public class RandomizingClient extends FilterClient { private final SearchType defaultSearchType; private final String defaultPreference; private final int batchedReduceSize; + private final int maxConcurrentShardRequests; public RandomizingClient(Client client, Random random) { @@ -55,13 +56,21 @@ public class RandomizingClient extends FilterClient { defaultPreference = null; } this.batchedReduceSize = 2 + random.nextInt(10); - + if (random.nextBoolean()) { + this.maxConcurrentShardRequests = 1 + random.nextInt(1 << random.nextInt(8)); + } else { + this.maxConcurrentShardRequests = -1; // randomly use the default + } } @Override public SearchRequestBuilder prepareSearch(String... indices) { - return in.prepareSearch(indices).setSearchType(defaultSearchType).setPreference(defaultPreference) - .setBatchedReduceSize(batchedReduceSize); + SearchRequestBuilder searchRequestBuilder = in.prepareSearch(indices).setSearchType(defaultSearchType) + .setPreference(defaultPreference).setBatchedReduceSize(batchedReduceSize); + if (maxConcurrentShardRequests != -1) { + searchRequestBuilder.setMaxConcurrentShardRequests(maxConcurrentShardRequests); + } + return searchRequestBuilder; } @Override