From 3b96055b23cd4e5e744430a7ef72c53c633c467b Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 3 Jun 2016 14:47:42 +0200 Subject: [PATCH] msearch: Cap the number of searches the msearch api will concurrently execute By default the number of searches msearch executes is capped by the number of nodes multiplied with the default size of the search threadpool. This default can be overwritten by using the newly added `max_concurrent_searches` parameter. Before the msearch api would concurrently execute all searches concurrently. If many large msearch requests would be executed this could lead to some searches being rejected while other searches in the msearch request would succeed. The goal of this change is to avoid this exhausting of the search TP. Closes #17926 --- .../action/search/MultiSearchRequest.java | 22 +++ .../search/MultiSearchRequestBuilder.java | 8 + .../search/TransportMultiSearchAction.java | 116 +++++++++++---- .../action/search/RestMultiSearchAction.java | 3 + .../elasticsearch/threadpool/ThreadPool.java | 6 +- .../search/MultiSearchRequestTests.java | 10 +- .../TransportMultiSearchActionTests.java | 137 ++++++++++++++++++ ...eMultiSearchIT.java => MultiSearchIT.java} | 32 +++- docs/reference/search/multi-search.asciidoc | 4 + .../resources/rest-api-spec/api/msearch.json | 4 + 10 files changed, 308 insertions(+), 34 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java rename core/src/test/java/org/elasticsearch/search/msearch/{SimpleMultiSearchIT.java => MultiSearchIT.java} (70%) diff --git a/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java index a3236e9653f..08a1ec5b3de 100644 --- a/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java @@ -38,6 +38,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; */ public class MultiSearchRequest extends ActionRequest implements CompositeIndicesRequest { + private int maxConcurrentSearchRequests = 0; private List requests = new ArrayList<>(); private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed(); @@ -60,6 +61,25 @@ public class MultiSearchRequest extends ActionRequest implem return this; } + /** + * Returns the amount of search requests specified in this multi search requests are allowed to be ran concurrently. + */ + public int maxConcurrentSearchRequests() { + return maxConcurrentSearchRequests; + } + + /** + * Sets how many search requests specified in this multi search requests are allowed to be ran concurrently. + */ + public MultiSearchRequest maxConcurrentSearchRequests(int maxConcurrentSearchRequests) { + if (maxConcurrentSearchRequests < 1) { + throw new IllegalArgumentException("maxConcurrentSearchRequests must be positive"); + } + + this.maxConcurrentSearchRequests = maxConcurrentSearchRequests; + return this; + } + public List requests() { return this.requests; } @@ -100,6 +120,7 @@ public class MultiSearchRequest extends ActionRequest implem @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); + maxConcurrentSearchRequests = in.readVInt(); int size = in.readVInt(); for (int i = 0; i < size; i++) { SearchRequest request = new SearchRequest(); @@ -111,6 +132,7 @@ public class MultiSearchRequest extends ActionRequest implem @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeVInt(maxConcurrentSearchRequests); out.writeVInt(requests.size()); for (SearchRequest request : requests) { request.writeTo(out); diff --git a/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequestBuilder.java index a0d1e4fb5c5..6cebb73fb4f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequestBuilder.java @@ -71,4 +71,12 @@ public class MultiSearchRequestBuilder extends ActionRequestBuilder { + private final int availableProcessors; private final ClusterService clusterService; - private final TransportSearchAction searchAction; + private final TransportAction searchAction; @Inject public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, - ClusterService clusterService, TransportSearchAction searchAction, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + ClusterService clusterService, TransportSearchAction searchAction, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, MultiSearchRequest::new); this.clusterService = clusterService; this.searchAction = searchAction; + this.availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + } + + // For testing only: + TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService, + ClusterService clusterService, TransportAction searchAction, + IndexNameExpressionResolver indexNameExpressionResolver, int availableProcessors) { + super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, MultiSearchRequest::new); + this.clusterService = clusterService; + this.searchAction = searchAction; + this.availableProcessors = availableProcessors; } @Override - protected void doExecute(final MultiSearchRequest request, final ActionListener listener) { + protected void doExecute(MultiSearchRequest request, ActionListener listener) { ClusterState clusterState = clusterService.state(); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); - final AtomicArray responses = new AtomicArray<>(request.requests().size()); - final AtomicInteger counter = new AtomicInteger(responses.length()); - for (int i = 0; i < responses.length(); i++) { - final int index = i; - searchAction.execute(request.requests().get(i), new ActionListener() { - @Override - public void onResponse(SearchResponse searchResponse) { - responses.set(index, new MultiSearchResponse.Item(searchResponse, null)); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } + int maxConcurrentSearches = request.maxConcurrentSearchRequests(); + if (maxConcurrentSearches == 0) { + maxConcurrentSearches = defaultMaxConcurrentSearches(availableProcessors, clusterState); + } - @Override - public void onFailure(Throwable e) { - responses.set(index, new MultiSearchResponse.Item(null, e)); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } + Queue searchRequestSlots = new ConcurrentLinkedQueue<>(); + for (int i = 0; i < request.requests().size(); i++) { + SearchRequest searchRequest = request.requests().get(i); + searchRequestSlots.add(new SearchRequestSlot(searchRequest, i)); + } - private void finishHim() { + int numRequests = request.requests().size(); + final AtomicArray responses = new AtomicArray<>(numRequests); + final AtomicInteger responseCounter = new AtomicInteger(numRequests); + int numConcurrentSearches = Math.min(numRequests, maxConcurrentSearches); + for (int i = 0; i < numConcurrentSearches; i++) { + executeSearch(searchRequestSlots, responses, responseCounter, listener); + } + } + + /* + * This is not perfect and makes a big assumption, that all nodes have the same thread pool size / have the number + * of processors and that shard of the indices the search requests go to are more or less evenly distributed across + * all nodes in the cluster. But I think it is a good enough default for most cases, if not then the default should be + * overwritten in the request itself. + */ + static int defaultMaxConcurrentSearches(int availableProcessors, ClusterState state) { + int numDateNodes = state.getNodes().getDataNodes().size(); + // availableProcessors will never be larger than 32, so max defaultMaxConcurrentSearches will never be larger than 49, + // but we don't know about about other search requests that are being executed so lets cap at 10 per node + int defaultSearchThreadPoolSize = Math.min(ThreadPool.searchThreadPoolSize(availableProcessors), 10); + return Math.max(1, numDateNodes * defaultSearchThreadPoolSize); + } + + void executeSearch(Queue requests, AtomicArray responses, + AtomicInteger responseCounter, ActionListener listener) { + SearchRequestSlot request = requests.poll(); + if (request == null) { + // Ok... so there're no more requests then this is ok, we're then waiting for running requests to complete + return; + } + searchAction.execute(request.request, new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + responses.set(request.responseSlot, new MultiSearchResponse.Item(searchResponse, null)); + handleResponse(); + } + + @Override + public void onFailure(Throwable e) { + responses.set(request.responseSlot, new MultiSearchResponse.Item(null, e)); + handleResponse(); + } + + private void handleResponse() { + if (responseCounter.decrementAndGet() == 0) { listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]))); + } else { + executeSearch(requests, responses, responseCounter, listener); } - }); + } + }); + } + + final static class SearchRequestSlot { + + final SearchRequest request; + final int responseSlot; + + SearchRequestSlot(SearchRequest request, int responseSlot) { + this.request = request; + this.responseSlot = responseSlot; } } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java index 8c76525c857..2050360a9d5 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java @@ -91,6 +91,9 @@ public class RestMultiSearchAction extends BaseRestHandler { @Override public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception { MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + if (request.hasParam("max_concurrent_searches")) { + multiSearchRequest.maxConcurrentSearchRequests(request.paramAsInt("max_concurrent_searches", 0)); + } String[] indices = Strings.splitStringByCommaToArray(request.param("index")); String[] types = Strings.splitStringByCommaToArray(request.param("type")); diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index f8cb882b083..3f6a65c82c5 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -167,7 +167,7 @@ public class ThreadPool extends AbstractComponent implements Closeable { builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200)); builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 50)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); - builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, ((availableProcessors * 3) / 2) + 1, 1000)); + builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side @@ -389,6 +389,10 @@ public class ThreadPool extends AbstractComponent implements Closeable { return boundedBy(2 * numberOfProcessors, 2, Integer.MAX_VALUE); } + public static int searchThreadPoolSize(int availableProcessors) { + return ((availableProcessors * 3) / 2) + 1; + } + class LoggingRunnable implements Runnable { private final Runnable runnable; diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java index 2df6f798b8e..2ae23774bdf 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java @@ -22,8 +22,6 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -37,7 +35,6 @@ import org.elasticsearch.test.StreamsUtils; import java.io.IOException; -import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -167,6 +164,13 @@ public class MultiSearchRequestTests extends ESTestCase { builder.string()); } + public void testMaxConcurrentSearchRequests() { + MultiSearchRequest request = new MultiSearchRequest(); + request.maxConcurrentSearchRequests(randomIntBetween(1, Integer.MAX_VALUE)); + expectThrows(IllegalArgumentException.class, () -> + request.maxConcurrentSearchRequests(randomIntBetween(Integer.MIN_VALUE, 0))); + } + private IndicesQueriesRegistry registry() { IndicesQueriesRegistry registry = new IndicesQueriesRegistry(); QueryParser parser = MatchAllQueryBuilder::fromXContent; diff --git a/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java new file mode 100644 index 00000000000..d751424ef72 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -0,0 +1,137 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportMultiSearchActionTests extends ESTestCase { + + public void testBatchExecute() throws Exception { + // Initialize depedencies of TransportMultiSearchAction + Settings settings = Settings.builder() + .put("node.name", TransportMultiSearchActionTests.class.getSimpleName()) + .build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + ThreadPool threadPool = new ThreadPool(settings); + TaskManager taskManager = mock(TaskManager.class); + TransportService transportService = mock(TransportService.class); + when(transportService.getTaskManager()).thenReturn(taskManager); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build()); + IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY); + + // Keep track of the number of concurrent searches started by multi search api, + // and if there are more searches than is allowed create an error and remember that. + int maxAllowedConcurrentSearches = scaledRandomIntBetween(1, 20); + AtomicInteger counter = new AtomicInteger(); + AtomicReference errorHolder = new AtomicReference<>(); + TransportAction searchAction = new TransportAction + (Settings.EMPTY, "action", threadPool, actionFilters, resolver, taskManager) { + @Override + protected void doExecute(SearchRequest request, ActionListener listener) { + int currentConcurrentSearches = counter.incrementAndGet(); + if (currentConcurrentSearches > maxAllowedConcurrentSearches) { + errorHolder.set(new AssertionError("Current concurrent search [" + currentConcurrentSearches + + "] is higher than is allowed [" + maxAllowedConcurrentSearches + "]")); + } + threadPool.executor(ThreadPool.Names.GENERIC).execute( + () -> { + try { + Thread.sleep(scaledRandomIntBetween(10, 1000)); + } catch (InterruptedException e) { + } + counter.decrementAndGet(); + listener.onResponse(new SearchResponse()); + } + ); + } + }; + TransportMultiSearchAction action = + new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10); + + // Execute the multi search api and fail if we find an error after executing: + try { + int numSearchRequests = randomIntBetween(16, 128); + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + multiSearchRequest.maxConcurrentSearchRequests(maxAllowedConcurrentSearches); + for (int i = 0; i < numSearchRequests; i++) { + multiSearchRequest.add(new SearchRequest()); + } + + MultiSearchResponse response = action.execute(multiSearchRequest).actionGet(); + assertThat(response.getResponses().length, equalTo(numSearchRequests)); + assertThat(errorHolder.get(), nullValue()); + } finally { + assertTrue(ESTestCase.terminate(threadPool)); + } + } + + public void testDefaultMaxConcurrentSearches() { + int numDataNodes = randomIntBetween(1, 10); + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + for (int i = 0; i < numDataNodes; i++) { + builder.put(new DiscoveryNode("_id" + i, new LocalTransportAddress("_id" + i), Collections.emptyMap(), + Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT)); + } + builder.put(new DiscoveryNode("master", new LocalTransportAddress("mater"), Collections.emptyMap(), + Collections.singleton(DiscoveryNode.Role.MASTER), Version.CURRENT)); + builder.put(new DiscoveryNode("ingest", new LocalTransportAddress("ingest"), Collections.emptyMap(), + Collections.singleton(DiscoveryNode.Role.INGEST), Version.CURRENT)); + + ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(builder).build(); + int result = TransportMultiSearchAction.defaultMaxConcurrentSearches(10, state); + assertThat(result, equalTo(10 * numDataNodes)); + + state = ClusterState.builder(new ClusterName("_name")).build(); + result = TransportMultiSearchAction.defaultMaxConcurrentSearches(10, state); + assertThat(result, equalTo(1)); + } + +} diff --git a/core/src/test/java/org/elasticsearch/search/msearch/SimpleMultiSearchIT.java b/core/src/test/java/org/elasticsearch/search/msearch/MultiSearchIT.java similarity index 70% rename from core/src/test/java/org/elasticsearch/search/msearch/SimpleMultiSearchIT.java rename to core/src/test/java/org/elasticsearch/search/msearch/MultiSearchIT.java index d3ee811be23..72306fd99d4 100644 --- a/core/src/test/java/org/elasticsearch/search/msearch/SimpleMultiSearchIT.java +++ b/core/src/test/java/org/elasticsearch/search/msearch/MultiSearchIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.msearch; +import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ESIntegTestCase; @@ -29,9 +30,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId; import static org.hamcrest.Matchers.equalTo; -/** - */ -public class SimpleMultiSearchIT extends ESIntegTestCase { +public class MultiSearchIT extends ESIntegTestCase { + public void testSimpleMultiSearch() { createIndex("test"); ensureGreen(); @@ -54,4 +54,30 @@ public class SimpleMultiSearchIT extends ESIntegTestCase { assertFirstHit(response.getResponses()[0].getResponse(), hasId("1")); assertFirstHit(response.getResponses()[1].getResponse(), hasId("2")); } + + public void testSimpleMultiSearchMoreRequests() { + createIndex("test"); + int numDocs = randomIntBetween(0, 16); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test", "type", Integer.toString(i)).setSource("{}").get(); + } + refresh(); + + int numSearchRequests = randomIntBetween(0, 64); + MultiSearchRequest request = new MultiSearchRequest(); + if (randomBoolean()) { + request.maxConcurrentSearchRequests(randomIntBetween(1, numSearchRequests)); + } + for (int i = 0; i < numSearchRequests; i++) { + request.add(client().prepareSearch("test")); + } + + MultiSearchResponse response = client().multiSearch(request).actionGet(); + assertThat(response.getResponses().length, equalTo(numSearchRequests)); + for (MultiSearchResponse.Item item : response) { + assertNoFailures(item.getResponse()); + assertHitCount(item.getResponse(), numDocs); + } + } + } diff --git a/docs/reference/search/multi-search.asciidoc b/docs/reference/search/multi-search.asciidoc index 699a9b37123..7ecc0f6554c 100644 --- a/docs/reference/search/multi-search.asciidoc +++ b/docs/reference/search/multi-search.asciidoc @@ -71,6 +71,10 @@ against the `test2` index. The `search_type` can be set in a similar manner to globally apply to all search requests. +The msearch's `max_concurrent_searches` request parameter can be used to control +the maximum number of concurrent searches the multi search api will execute. +This default is based on the number of data nodes and the default search thread pool size. + [float] [[msearch-security]] === Security diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json b/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json index 87a86fb1298..0344702ecfe 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json @@ -20,6 +20,10 @@ "type" : "enum", "options" : ["query_then_fetch", "query_and_fetch", "dfs_query_then_fetch", "dfs_query_and_fetch"], "description" : "Search operation type" + }, + "max_concurrent_searches" : { + "type" : "number", + "description" : "Controls the maximum number of concurrent searches the multi search api will execute" } } },