Avoid stack overflow in multi-search

Today when handling a multi-search request, we asynchornously execute as
many search requests as the minimum of the number of search requests in
the multi-search request and the maximum number of concurrent
requests. When these search requests return, we poll more search
requests from a queue of search requests from the original multi-search
request. The implementation of this was recursive, and if the number of
requests in the multi-search request was large, a stack overflow could
arise due to the recursive invocation. This commit replaces this
recursive implementation with a simple iterative implementation.

Relates #23527
This commit is contained in:
Jason Tedor 2017-03-09 15:19:05 -08:00 committed by GitHub
parent 2666ecd76f
commit 3d82549d8e
2 changed files with 55 additions and 25 deletions

View File

@ -107,27 +107,61 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
AtomicInteger responseCounter, ActionListener<MultiSearchResponse> listener) { AtomicInteger responseCounter, ActionListener<MultiSearchResponse> listener) {
SearchRequestSlot request = requests.poll(); SearchRequestSlot request = requests.poll();
if (request == null) { if (request == null) {
// Ok... so there're no more requests then this is ok, we're then waiting for running requests to complete /*
* The number of times that we poll an item from the queue here is the minimum of the number of requests and the maximum number
* of concurrent requests. At first glance, it appears that we should never poll from the queue and not obtain a request given
* that we only poll here no more times than the number of requests. However, this is not the only consumer of this queue as
* earlier requests that have already completed will poll from the queue too and they could complete before later polls are
* invoked here. Thus, it can be the case that we poll here and and the queue was empty.
*/
return; return;
} }
/*
* With a request in hand, we are going to asynchronously execute the search request. When the search request returns, either with
* a success or with a failure, we set the response corresponding to the request. Then, we enter a loop that repeatedly pulls
* requests off the request queue, this time only setting the response corresponding to the request.
*/
searchAction.execute(request.request, new ActionListener<SearchResponse>() { searchAction.execute(request.request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
handleResponse(request.responseSlot, new MultiSearchResponse.Item(searchResponse, null));
executeSearchLoop();
}
@Override
public void onFailure(final Exception e) {
handleResponse(request.responseSlot, new MultiSearchResponse.Item(null, e));
executeSearchLoop();
}
private void handleResponse(final int responseSlot, final MultiSearchResponse.Item item) {
responses.set(responseSlot, item);
if (responseCounter.decrementAndGet() == 0) {
assert requests.isEmpty();
finish();
}
}
private void finish() {
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()])));
}
private void executeSearchLoop() {
SearchRequestSlot next;
while ((next = requests.poll()) != null) {
final int nextResponseSlot = next.responseSlot;
searchAction.execute(next.request, new ActionListener<SearchResponse>() {
@Override @Override
public void onResponse(SearchResponse searchResponse) { public void onResponse(SearchResponse searchResponse) {
responses.set(request.responseSlot, new MultiSearchResponse.Item(searchResponse, null)); handleResponse(nextResponseSlot, new MultiSearchResponse.Item(searchResponse, null));
handleResponse();
} }
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
responses.set(request.responseSlot, new MultiSearchResponse.Item(null, e)); handleResponse(nextResponseSlot, new MultiSearchResponse.Item(null, e));
handleResponse();
} }
});
private void handleResponse() {
if (responseCounter.decrementAndGet() == 0) {
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()])));
} else {
executeSearch(requests, responses, responseCounter, listener);
} }
} }
}); });

View File

@ -70,7 +70,7 @@ public class TransportMultiSearchActionTests extends ESTestCase {
// Keep track of the number of concurrent searches started by multi search api, // Keep track of the number of concurrent searches started by multi search api,
// and if there are more searches than is allowed create an error and remember that. // and if there are more searches than is allowed create an error and remember that.
int maxAllowedConcurrentSearches = scaledRandomIntBetween(1, 20); int maxAllowedConcurrentSearches = scaledRandomIntBetween(1, 16);
AtomicInteger counter = new AtomicInteger(); AtomicInteger counter = new AtomicInteger();
AtomicReference<AssertionError> errorHolder = new AtomicReference<>(); AtomicReference<AssertionError> errorHolder = new AtomicReference<>();
TransportAction<SearchRequest, SearchResponse> searchAction = new TransportAction<SearchRequest, SearchResponse> TransportAction<SearchRequest, SearchResponse> searchAction = new TransportAction<SearchRequest, SearchResponse>
@ -82,24 +82,20 @@ public class TransportMultiSearchActionTests extends ESTestCase {
errorHolder.set(new AssertionError("Current concurrent search [" + currentConcurrentSearches + errorHolder.set(new AssertionError("Current concurrent search [" + currentConcurrentSearches +
"] is higher than is allowed [" + maxAllowedConcurrentSearches + "]")); "] is higher than is allowed [" + maxAllowedConcurrentSearches + "]"));
} }
threadPool.executor(ThreadPool.Names.GENERIC).execute(
() -> {
try {
Thread.sleep(scaledRandomIntBetween(10, 1000));
} catch (InterruptedException e) {
}
counter.decrementAndGet(); counter.decrementAndGet();
listener.onResponse(new SearchResponse()); listener.onResponse(new SearchResponse());
} }
);
}
}; };
TransportMultiSearchAction action = TransportMultiSearchAction action =
new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10); new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, resolver, 10);
// Execute the multi search api and fail if we find an error after executing: // Execute the multi search api and fail if we find an error after executing:
try { try {
int numSearchRequests = randomIntBetween(16, 128); /*
* Allow for a large number of search requests in a single batch as previous implementations could stack overflow if the number
* of requests in a single batch was large
*/
int numSearchRequests = scaledRandomIntBetween(1, 8192);
MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
multiSearchRequest.maxConcurrentSearchRequests(maxAllowedConcurrentSearches); multiSearchRequest.maxConcurrentSearchRequests(maxAllowedConcurrentSearches);
for (int i = 0; i < numSearchRequests; i++) { for (int i = 0; i < numSearchRequests; i++) {