diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java index 2bceccce385..4bc834ec5fd 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java @@ -46,8 +46,8 @@ public class TransportMultiSearchAction extends HandledTransportAction listener) { SearchRequestSlot request = requests.poll(); if (request == null) { - // Ok... so there're no more requests then this is ok, we're then waiting for running requests to complete + /* + * The number of times that we poll an item from the queue here is the minimum of the number of requests and the maximum number + * of concurrent requests. At first glance, it appears that we should never poll from the queue and not obtain a request given + * that we only poll here no more times than the number of requests. However, this is not the only consumer of this queue as + * earlier requests that have already completed will poll from the queue too and they could complete before later polls are + * invoked here. Thus, it can be the case that we poll here and and the queue was empty. + */ return; } + + /* + * With a request in hand, we are going to asynchronously execute the search request. When the search request returns, either with + * a success or with a failure, we set the response corresponding to the request. Then, we enter a loop that repeatedly pulls + * requests off the request queue, this time only setting the response corresponding to the request. + */ searchAction.execute(request.request, new ActionListener() { @Override - public void onResponse(SearchResponse searchResponse) { - responses.set(request.responseSlot, new MultiSearchResponse.Item(searchResponse, null)); - handleResponse(); + public void onResponse(final SearchResponse searchResponse) { + handleResponse(request.responseSlot, new MultiSearchResponse.Item(searchResponse, null)); + executeSearchLoop(); } @Override - public void onFailure(Exception e) { - responses.set(request.responseSlot, new MultiSearchResponse.Item(null, e)); - handleResponse(); + public void onFailure(final Exception e) { + handleResponse(request.responseSlot, new MultiSearchResponse.Item(null, e)); + executeSearchLoop(); } - private void handleResponse() { + private void handleResponse(final int responseSlot, final MultiSearchResponse.Item item) { + responses.set(responseSlot, item); if (responseCounter.decrementAndGet() == 0) { - listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]))); - } else { - executeSearch(requests, responses, responseCounter, listener); + assert requests.isEmpty(); + finish(); + } + } + + private void finish() { + listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()]))); + } + + private void executeSearchLoop() { + SearchRequestSlot next; + while ((next = requests.poll()) != null) { + final int nextResponseSlot = next.responseSlot; + searchAction.execute(next.request, new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + handleResponse(nextResponseSlot, new MultiSearchResponse.Item(searchResponse, null)); + } + + @Override + public void onFailure(Exception e) { + handleResponse(nextResponseSlot, new MultiSearchResponse.Item(null, e)); + } + }); } } }); diff --git a/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java index 7773264f77f..00d397b2241 100644 --- a/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -70,7 +70,7 @@ public class TransportMultiSearchActionTests extends ESTestCase { // Keep track of the number of concurrent searches started by multi search api, // and if there are more searches than is allowed create an error and remember that. - int maxAllowedConcurrentSearches = scaledRandomIntBetween(1, 20); + int maxAllowedConcurrentSearches = scaledRandomIntBetween(1, 16); AtomicInteger counter = new AtomicInteger(); AtomicReference errorHolder = new AtomicReference<>(); TransportAction searchAction = new TransportAction @@ -82,16 +82,8 @@ public class TransportMultiSearchActionTests extends ESTestCase { errorHolder.set(new AssertionError("Current concurrent search [" + currentConcurrentSearches + "] is higher than is allowed [" + maxAllowedConcurrentSearches + "]")); } - threadPool.executor(ThreadPool.Names.GENERIC).execute( - () -> { - try { - Thread.sleep(scaledRandomIntBetween(10, 1000)); - } catch (InterruptedException e) { - } - counter.decrementAndGet(); - listener.onResponse(new SearchResponse()); - } - ); + counter.decrementAndGet(); + listener.onResponse(new SearchResponse()); } }; TransportMultiSearchAction action = @@ -99,7 +91,11 @@ public class TransportMultiSearchActionTests extends ESTestCase { // Execute the multi search api and fail if we find an error after executing: try { - int numSearchRequests = randomIntBetween(16, 128); + /* + * Allow for a large number of search requests in a single batch as previous implementations could stack overflow if the number + * of requests in a single batch was large + */ + int numSearchRequests = scaledRandomIntBetween(1, 8192); MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); multiSearchRequest.maxConcurrentSearchRequests(maxAllowedConcurrentSearches); for (int i = 0; i < numSearchRequests; i++) {