From c51ef0b2ca45878ea651341caa7e320a7c3ceb42 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 12 Mar 2017 00:45:40 -0800 Subject: [PATCH] Honor max concurrent searches in multi-search A previous change to the multi-search request execution to avoid stack overflows regressed on limiting the number of concurrent search requests from a batched multi-search request. In particular, the replacement of the tail-recursive call with a loop could asynchronously fire off all of the remaining search requests in the batch while max concurrent search requests are already executing. This commit attempts to address this issue by taking a more careful approach to the initial problem of recurisve calls. The cause of the initial problem was due to possibility of individual requests completing on the same thread as invoked the search action execution. This can happen, for example, in cases when an individual request does not resolve to any shards. To address this problem, when an individual request completes we check if it completed on the same thread as fired off the request. In this case, we loop and otherwise safely recurse. Sadly, there was a unit test to check that the maximum number of concurrent search requests was not exceeded, but that test was broken while modifying the test to reproduce a case that led to the possibility of stack overflow. As such, we randomize whether or not search actions execute on the same thread as the thread that invoked the action. Relates #23538 --- .../resources/checkstyle_suppressions.xml | 1 - .../search/TransportMultiSearchAction.java | 71 ++++++++++--------- .../TransportMultiSearchActionTests.java | 21 +++++- 3 files changed, 56 insertions(+), 37 deletions(-) diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index fd2d0f1ba55..85a658df5ac 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -159,7 +159,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java index 4bc834ec5fd..db5a21edb2b 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java @@ -47,18 +47,17 @@ public class TransportMultiSearchAction extends HandledTransportAction searchAction, - IndexNameExpressionResolver indexNameExpressionResolver, int availableProcessors) { - super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, MultiSearchRequest::new); + IndexNameExpressionResolver resolver, int availableProcessors) { + super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, resolver, MultiSearchRequest::new); this.clusterService = clusterService; this.searchAction = searchAction; this.availableProcessors = availableProcessors; @@ -90,10 +89,9 @@ public class TransportMultiSearchAction extends HandledTransportAction requests, AtomicArray responses, - AtomicInteger responseCounter, ActionListener listener) { + /** + * Executes a single request from the queue of requests. When a request finishes, another request is taken from the queue. When a + * request is executed, a permit is taken on the specified semaphore, and released as each request completes. + * + * @param requests the queue of multi-search requests to execute + * @param responses atomic array to hold the responses corresponding to each search request slot + * @param responseCounter incremented on each response + * @param listener the listener attached to the multi-search request + */ + private void executeSearch( + final Queue requests, + final AtomicArray responses, + final AtomicInteger responseCounter, + final ActionListener listener) { SearchRequestSlot request = requests.poll(); if (request == null) { /* @@ -118,21 +128,22 @@ public class TransportMultiSearchAction extends HandledTransportAction() { @Override public void onResponse(final SearchResponse searchResponse) { handleResponse(request.responseSlot, new MultiSearchResponse.Item(searchResponse, null)); - executeSearchLoop(); } @Override public void onFailure(final Exception e) { handleResponse(request.responseSlot, new MultiSearchResponse.Item(null, e)); - executeSearchLoop(); } private void handleResponse(final int responseSlot, final MultiSearchResponse.Item item) { @@ -140,30 +151,20 @@ public class TransportMultiSearchAction extends HandledTransportAction executeSearch(requests, responses, responseCounter, listener)); + } else { + // we are on a different thread (we went asynchronous), it's safe to recurse + executeSearch(requests, responses, responseCounter, listener); + } } } private void finish() { listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]))); } - - private void executeSearchLoop() { - SearchRequestSlot next; - while ((next = requests.poll()) != null) { - final int nextResponseSlot = next.responseSlot; - searchAction.execute(next.request, new ActionListener() { - @Override - public void onResponse(SearchResponse searchResponse) { - handleResponse(nextResponseSlot, new MultiSearchResponse.Item(searchResponse, null)); - } - - @Override - public void onFailure(Exception e) { - handleResponse(nextResponseSlot, new MultiSearchResponse.Item(null, e)); - } - }); - } - } }); } @@ -176,5 +177,7 @@ public class TransportMultiSearchAction extends HandledTransportAction errorHolder = new AtomicReference<>(); + // randomize whether or not requests are executed asynchronously + final List threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME); + Randomness.shuffle(threadPoolNames); + final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0)); + final ExecutorService rarelyExecutor = threadPool.executor(threadPoolNames.get(1)); + final Set requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>())); TransportAction searchAction = new TransportAction (Settings.EMPTY, "action", threadPool, actionFilters, resolver, taskManager) { @Override protected void doExecute(SearchRequest request, ActionListener listener) { + requests.add(request); int currentConcurrentSearches = counter.incrementAndGet(); if (currentConcurrentSearches > maxAllowedConcurrentSearches) { errorHolder.set(new AssertionError("Current concurrent search [" + currentConcurrentSearches + "] is higher than is allowed [" + maxAllowedConcurrentSearches + "]")); } - counter.decrementAndGet(); - listener.onResponse(new SearchResponse()); + final ExecutorService executorService = rarely() ? rarelyExecutor : commonExecutor; + executorService.execute(() -> { + counter.decrementAndGet(); + listener.onResponse(new SearchResponse()); + }); } }; TransportMultiSearchAction action = @@ -104,6 +120,7 @@ public class TransportMultiSearchActionTests extends ESTestCase { MultiSearchResponse response = action.execute(multiSearchRequest).actionGet(); assertThat(response.getResponses().length, equalTo(numSearchRequests)); + assertThat(requests.size(), equalTo(numSearchRequests)); assertThat(errorHolder.get(), nullValue()); } finally { assertTrue(ESTestCase.terminate(threadPool));