diff --git a/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java index a3236e9653f..08a1ec5b3de 100644 --- a/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java @@ -38,6 +38,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; */ public class MultiSearchRequest extends ActionRequest implements CompositeIndicesRequest { + private int maxConcurrentSearchRequests = 0; private List requests = new ArrayList<>(); private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed(); @@ -60,6 +61,25 @@ public class MultiSearchRequest extends ActionRequest implem return this; } + /** + * Returns the amount of search requests specified in this multi search requests are allowed to be ran concurrently. + */ + public int maxConcurrentSearchRequests() { + return maxConcurrentSearchRequests; + } + + /** + * Sets how many search requests specified in this multi search requests are allowed to be ran concurrently. + */ + public MultiSearchRequest maxConcurrentSearchRequests(int maxConcurrentSearchRequests) { + if (maxConcurrentSearchRequests < 1) { + throw new IllegalArgumentException("maxConcurrentSearchRequests must be positive"); + } + + this.maxConcurrentSearchRequests = maxConcurrentSearchRequests; + return this; + } + public List requests() { return this.requests; } @@ -100,6 +120,7 @@ public class MultiSearchRequest extends ActionRequest implem @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); + maxConcurrentSearchRequests = in.readVInt(); int size = in.readVInt(); for (int i = 0; i < size; i++) { SearchRequest request = new SearchRequest(); @@ -111,6 +132,7 @@ public class MultiSearchRequest extends ActionRequest implem @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeVInt(maxConcurrentSearchRequests); out.writeVInt(requests.size()); for (SearchRequest request : requests) { request.writeTo(out); diff --git a/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequestBuilder.java index a0d1e4fb5c5..6cebb73fb4f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/search/MultiSearchRequestBuilder.java @@ -71,4 +71,12 @@ public class MultiSearchRequestBuilder extends ActionRequestBuilder { + private final int availableProcessors; private final ClusterService clusterService; - private final TransportSearchAction searchAction; + private final TransportAction searchAction; @Inject public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, - ClusterService clusterService, TransportSearchAction searchAction, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + ClusterService clusterService, TransportSearchAction searchAction, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, MultiSearchRequest::new); this.clusterService = clusterService; this.searchAction = searchAction; + this.availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + } + + // For testing only: + TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService, + ClusterService clusterService, TransportAction searchAction, + IndexNameExpressionResolver indexNameExpressionResolver, int availableProcessors) { + super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, MultiSearchRequest::new); + this.clusterService = clusterService; + this.searchAction = searchAction; + this.availableProcessors = availableProcessors; } @Override - protected void doExecute(final MultiSearchRequest request, final ActionListener listener) { + protected void doExecute(MultiSearchRequest request, ActionListener listener) { ClusterState clusterState = clusterService.state(); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); - final AtomicArray responses = new AtomicArray<>(request.requests().size()); - final AtomicInteger counter = new AtomicInteger(responses.length()); - for (int i = 0; i < responses.length(); i++) { - final int index = i; - searchAction.execute(request.requests().get(i), new ActionListener() { - @Override - public void onResponse(SearchResponse searchResponse) { - responses.set(index, new MultiSearchResponse.Item(searchResponse, null)); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } + int maxConcurrentSearches = request.maxConcurrentSearchRequests(); + if (maxConcurrentSearches == 0) { + maxConcurrentSearches = defaultMaxConcurrentSearches(availableProcessors, clusterState); + } - @Override - public void onFailure(Throwable e) { - responses.set(index, new MultiSearchResponse.Item(null, e)); - if (counter.decrementAndGet() == 0) { - finishHim(); - } - } + Queue searchRequestSlots = new ConcurrentLinkedQueue<>(); + for (int i = 0; i < request.requests().size(); i++) { + SearchRequest searchRequest = request.requests().get(i); + searchRequestSlots.add(new SearchRequestSlot(searchRequest, i)); + } - private void finishHim() { + int numRequests = request.requests().size(); + final AtomicArray responses = new AtomicArray<>(numRequests); + final AtomicInteger responseCounter = new AtomicInteger(numRequests); + int numConcurrentSearches = Math.min(numRequests, maxConcurrentSearches); + for (int i = 0; i < numConcurrentSearches; i++) { + executeSearch(searchRequestSlots, responses, responseCounter, listener); + } + } + + /* + * This is not perfect and makes a big assumption, that all nodes have the same thread pool size / have the number + * of processors and that shard of the indices the search requests go to are more or less evenly distributed across + * all nodes in the cluster. But I think it is a good enough default for most cases, if not then the default should be + * overwritten in the request itself. + */ + static int defaultMaxConcurrentSearches(int availableProcessors, ClusterState state) { + int numDateNodes = state.getNodes().getDataNodes().size(); + // availableProcessors will never be larger than 32, so max defaultMaxConcurrentSearches will never be larger than 49, + // but we don't know about about other search requests that are being executed so lets cap at 10 per node + int defaultSearchThreadPoolSize = Math.min(ThreadPool.searchThreadPoolSize(availableProcessors), 10); + return Math.max(1, numDateNodes * defaultSearchThreadPoolSize); + } + + void executeSearch(Queue requests, AtomicArray responses, + AtomicInteger responseCounter, ActionListener listener) { + SearchRequestSlot request = requests.poll(); + if (request == null) { + // Ok... so there're no more requests then this is ok, we're then waiting for running requests to complete + return; + } + searchAction.execute(request.request, new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + responses.set(request.responseSlot, new MultiSearchResponse.Item(searchResponse, null)); + handleResponse(); + } + + @Override + public void onFailure(Throwable e) { + responses.set(request.responseSlot, new MultiSearchResponse.Item(null, e)); + handleResponse(); + } + + private void handleResponse() { + if (responseCounter.decrementAndGet() == 0) { listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]))); + } else { + executeSearch(requests, responses, responseCounter, listener); } - }); + } + }); + } + + final static class SearchRequestSlot { + + final SearchRequest request; + final int responseSlot; + + SearchRequestSlot(SearchRequest request, int responseSlot) { + this.request = request; + this.responseSlot = responseSlot; } } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java index 8c76525c857..2050360a9d5 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java @@ -91,6 +91,9 @@ public class RestMultiSearchAction extends BaseRestHandler { @Override public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception { MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + if (request.hasParam("max_concurrent_searches")) { + multiSearchRequest.maxConcurrentSearchRequests(request.paramAsInt("max_concurrent_searches", 0)); + } String[] indices = Strings.splitStringByCommaToArray(request.param("index")); String[] types = Strings.splitStringByCommaToArray(request.param("type")); diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index f8cb882b083..3f6a65c82c5 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -167,7 +167,7 @@ public class ThreadPool extends AbstractComponent implements Closeable { builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200)); builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 50)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); - builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, ((availableProcessors * 3) / 2) + 1, 1000)); + builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side @@ -389,6 +389,10 @@ public class ThreadPool extends AbstractComponent implements Closeable { return boundedBy(2 * numberOfProcessors, 2, Integer.MAX_VALUE); } + public static int searchThreadPoolSize(int availableProcessors) { + return ((availableProcessors * 3) / 2) + 1; + } + class LoggingRunnable implements Runnable { private final Runnable runnable; diff --git a/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java b/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java index 2df6f798b8e..2ae23774bdf 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/MultiSearchRequestTests.java @@ -22,8 +22,6 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -37,7 +35,6 @@ import org.elasticsearch.test.StreamsUtils; import java.io.IOException; -import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -167,6 +164,13 @@ public class MultiSearchRequestTests extends ESTestCase { builder.string()); } + public void testMaxConcurrentSearchRequests() { + MultiSearchRequest request = new MultiSearchRequest(); + request.maxConcurrentSearchRequests(randomIntBetween(1, Integer.MAX_VALUE)); + expectThrows(IllegalArgumentException.class, () -> + request.maxConcurrentSearchRequests(randomIntBetween(Integer.MIN_VALUE, 0))); + } + private IndicesQueriesRegistry registry() { IndicesQueriesRegistry registry = new IndicesQueriesRegistry(); QueryParser parser = MatchAllQueryBuilder::fromXContent; diff --git a/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java new file mode 100644 index 00000000000..d751424ef72 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -0,0 +1,137 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportMultiSearchActionTests extends ESTestCase { + + public void testBatchExecute() throws Exception { + // Initialize depedencies of TransportMultiSearchAction + Settings settings = Settings.builder() + .put("node.name", TransportMultiSearchActionTests.class.getSimpleName()) + .build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + ThreadPool threadPool = new ThreadPool(settings); + TaskManager taskManager = mock(TaskManager.class); + TransportService transportService = mock(TransportService.class); + when(transportService.getTaskManager()).thenReturn(taskManager); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build()); + IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(Settings.EMPTY); + + // Keep track of the number of concurrent searches started by multi search api, + // and if there are more searches than is allowed create an error and remember that. + int maxAllowedConcurrentSearches = scaledRandomIntBetween(1, 20); + AtomicInteger counter = new AtomicInteger(); + AtomicReference errorHolder = new AtomicReference<>(); + TransportAction searchAction = new TransportAction + (Settings.EMPTY, "action", threadPool, actionFilters, resolver, taskManager) { + @Override + protected void doExecute(SearchRequest request, ActionListener listener) { + int currentConcurrentSearches = counter.incrementAndGet(); + if (currentConcurrentSearches > maxAllowedConcurrentSearches) { + errorHolder.set(new AssertionError("Current concurrent search [" + currentConcurrentSearches + + "] is higher than is allowed [" + maxAllowedConcurrentSearches + "]")); + } + threadPool.executor(ThreadPool.Names.GENERIC).execute( + () -> { + try { + Thread.sleep(scaledRandomIntBetween(10, 1000)); + } catch (InterruptedException e) { + } + counter.decrementAndGet(); + listener.onResponse(new SearchResponse()); + } + ); + } + }; + TransportMultiSearchAction action = + new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10); + + // Execute the multi search api and fail if we find an error after executing: + try { + int numSearchRequests = randomIntBetween(16, 128); + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + multiSearchRequest.maxConcurrentSearchRequests(maxAllowedConcurrentSearches); + for (int i = 0; i < numSearchRequests; i++) { + multiSearchRequest.add(new SearchRequest()); + } + + MultiSearchResponse response = action.execute(multiSearchRequest).actionGet(); + assertThat(response.getResponses().length, equalTo(numSearchRequests)); + assertThat(errorHolder.get(), nullValue()); + } finally { + assertTrue(ESTestCase.terminate(threadPool)); + } + } + + public void testDefaultMaxConcurrentSearches() { + int numDataNodes = randomIntBetween(1, 10); + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + for (int i = 0; i < numDataNodes; i++) { + builder.put(new DiscoveryNode("_id" + i, new LocalTransportAddress("_id" + i), Collections.emptyMap(), + Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT)); + } + builder.put(new DiscoveryNode("master", new LocalTransportAddress("mater"), Collections.emptyMap(), + Collections.singleton(DiscoveryNode.Role.MASTER), Version.CURRENT)); + builder.put(new DiscoveryNode("ingest", new LocalTransportAddress("ingest"), Collections.emptyMap(), + Collections.singleton(DiscoveryNode.Role.INGEST), Version.CURRENT)); + + ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(builder).build(); + int result = TransportMultiSearchAction.defaultMaxConcurrentSearches(10, state); + assertThat(result, equalTo(10 * numDataNodes)); + + state = ClusterState.builder(new ClusterName("_name")).build(); + result = TransportMultiSearchAction.defaultMaxConcurrentSearches(10, state); + assertThat(result, equalTo(1)); + } + +} diff --git a/core/src/test/java/org/elasticsearch/search/msearch/SimpleMultiSearchIT.java b/core/src/test/java/org/elasticsearch/search/msearch/MultiSearchIT.java similarity index 70% rename from core/src/test/java/org/elasticsearch/search/msearch/SimpleMultiSearchIT.java rename to core/src/test/java/org/elasticsearch/search/msearch/MultiSearchIT.java index d3ee811be23..72306fd99d4 100644 --- a/core/src/test/java/org/elasticsearch/search/msearch/SimpleMultiSearchIT.java +++ b/core/src/test/java/org/elasticsearch/search/msearch/MultiSearchIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.msearch; +import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ESIntegTestCase; @@ -29,9 +30,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId; import static org.hamcrest.Matchers.equalTo; -/** - */ -public class SimpleMultiSearchIT extends ESIntegTestCase { +public class MultiSearchIT extends ESIntegTestCase { + public void testSimpleMultiSearch() { createIndex("test"); ensureGreen(); @@ -54,4 +54,30 @@ public class SimpleMultiSearchIT extends ESIntegTestCase { assertFirstHit(response.getResponses()[0].getResponse(), hasId("1")); assertFirstHit(response.getResponses()[1].getResponse(), hasId("2")); } + + public void testSimpleMultiSearchMoreRequests() { + createIndex("test"); + int numDocs = randomIntBetween(0, 16); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test", "type", Integer.toString(i)).setSource("{}").get(); + } + refresh(); + + int numSearchRequests = randomIntBetween(0, 64); + MultiSearchRequest request = new MultiSearchRequest(); + if (randomBoolean()) { + request.maxConcurrentSearchRequests(randomIntBetween(1, numSearchRequests)); + } + for (int i = 0; i < numSearchRequests; i++) { + request.add(client().prepareSearch("test")); + } + + MultiSearchResponse response = client().multiSearch(request).actionGet(); + assertThat(response.getResponses().length, equalTo(numSearchRequests)); + for (MultiSearchResponse.Item item : response) { + assertNoFailures(item.getResponse()); + assertHitCount(item.getResponse(), numDocs); + } + } + } diff --git a/docs/reference/search/multi-search.asciidoc b/docs/reference/search/multi-search.asciidoc index 699a9b37123..7ecc0f6554c 100644 --- a/docs/reference/search/multi-search.asciidoc +++ b/docs/reference/search/multi-search.asciidoc @@ -71,6 +71,10 @@ against the `test2` index. The `search_type` can be set in a similar manner to globally apply to all search requests. +The msearch's `max_concurrent_searches` request parameter can be used to control +the maximum number of concurrent searches the multi search api will execute. +This default is based on the number of data nodes and the default search thread pool size. + [float] [[msearch-security]] === Security diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json b/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json index 87a86fb1298..0344702ecfe 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/msearch.json @@ -20,6 +20,10 @@ "type" : "enum", "options" : ["query_then_fetch", "query_and_fetch", "dfs_query_then_fetch", "dfs_query_and_fetch"], "description" : "Search operation type" + }, + "max_concurrent_searches" : { + "type" : "number", + "description" : "Controls the maximum number of concurrent searches the multi search api will execute" } } },