Honor max concurrent searches in multi-search

A previous change to the multi-search request execution to avoid stack
overflows regressed on limiting the number of concurrent search requests
from a batched multi-search request. In particular, the replacement of
the tail-recursive call with a loop could asynchronously fire off all of
the remaining search requests in the batch while max concurrent search
requests are already executing. This commit attempts to address this
issue by taking a more careful approach to the initial problem of
recurisve calls. The cause of the initial problem was due to possibility
of individual requests completing on the same thread as invoked the
search action execution. This can happen, for example, in cases when an
individual request does not resolve to any shards. To address this
problem, when an individual request completes we check if it completed
on the same thread as fired off the request. In this case, we loop and
otherwise safely recurse. Sadly, there was a unit test to check that the
maximum number of concurrent search requests was not exceeded, but that
test was broken while modifying the test to reproduce a case that led to
the possibility of stack overflow. As such, we randomize whether or not
search actions execute on the same thread as the thread that invoked the
action.

Relates #23538
This commit is contained in:
Jason Tedor 2017-03-12 00:45:40 -08:00 committed by GitHub
parent a2e838d286
commit c51ef0b2ca
3 changed files with 56 additions and 37 deletions

View File

@ -159,7 +159,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]MultiSearchRequestBuilder.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]ShardSearchFailure.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportClearScrollAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportMultiSearchAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]suggest[/\\]SuggestResponse.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]ActionFilter.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]DelegatingActionListener.java" checks="LineLength" />

View File

@ -47,18 +47,17 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
@Inject
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, TransportSearchAction searchAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, MultiSearchRequest::new);
ActionFilters actionFilters, IndexNameExpressionResolver resolver) {
super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters, resolver, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = EsExecutors.numberOfProcessors(settings);
}
// For testing only:
TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService,
ClusterService clusterService, TransportAction<SearchRequest, SearchResponse> searchAction,
IndexNameExpressionResolver indexNameExpressionResolver, int availableProcessors) {
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, MultiSearchRequest::new);
IndexNameExpressionResolver resolver, int availableProcessors) {
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, resolver, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = availableProcessors;
@ -90,10 +89,9 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
}
/*
* This is not perfect and makes a big assumption, that all nodes have the same thread pool size / have the number
* of processors and that shard of the indices the search requests go to are more or less evenly distributed across
* all nodes in the cluster. But I think it is a good enough default for most cases, if not then the default should be
* overwritten in the request itself.
* This is not perfect and makes a big assumption, that all nodes have the same thread pool size / have the number of processors and
* that shard of the indices the search requests go to are more or less evenly distributed across all nodes in the cluster. But I think
* it is a good enough default for most cases, if not then the default should be overwritten in the request itself.
*/
static int defaultMaxConcurrentSearches(int availableProcessors, ClusterState state) {
int numDateNodes = state.getNodes().getDataNodes().size();
@ -103,8 +101,20 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
return Math.max(1, numDateNodes * defaultSearchThreadPoolSize);
}
void executeSearch(Queue<SearchRequestSlot> requests, AtomicArray<MultiSearchResponse.Item> responses,
AtomicInteger responseCounter, ActionListener<MultiSearchResponse> listener) {
/**
* Executes a single request from the queue of requests. When a request finishes, another request is taken from the queue. When a
* request is executed, a permit is taken on the specified semaphore, and released as each request completes.
*
* @param requests the queue of multi-search requests to execute
* @param responses atomic array to hold the responses corresponding to each search request slot
* @param responseCounter incremented on each response
* @param listener the listener attached to the multi-search request
*/
private void executeSearch(
final Queue<SearchRequestSlot> requests,
final AtomicArray<MultiSearchResponse.Item> responses,
final AtomicInteger responseCounter,
final ActionListener<MultiSearchResponse> listener) {
SearchRequestSlot request = requests.poll();
if (request == null) {
/*
@ -118,21 +128,22 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
}
/*
* With a request in hand, we are going to asynchronously execute the search request. When the search request returns, either with
* a success or with a failure, we set the response corresponding to the request. Then, we enter a loop that repeatedly pulls
* requests off the request queue, this time only setting the response corresponding to the request.
* With a request in hand, we are now prepared to execute the search request. There are two possibilities, either we go asynchronous
* or we do not (this can happen if the request does not resolve to any shards). If we do not go asynchronous, we are going to come
* back on the same thread that attempted to execute the search request. At this point, or any other point where we come back on the
* same thread as when the request was submitted, we should not recurse lest we might descend into a stack overflow. To avoid this,
* when we handle the response rather than going recursive, we fork to another thread, otherwise we recurse.
*/
final Thread thread = Thread.currentThread();
searchAction.execute(request.request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
handleResponse(request.responseSlot, new MultiSearchResponse.Item(searchResponse, null));
executeSearchLoop();
}
@Override
public void onFailure(final Exception e) {
handleResponse(request.responseSlot, new MultiSearchResponse.Item(null, e));
executeSearchLoop();
}
private void handleResponse(final int responseSlot, final MultiSearchResponse.Item item) {
@ -140,30 +151,20 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
if (responseCounter.decrementAndGet() == 0) {
assert requests.isEmpty();
finish();
} else {
if (thread == Thread.currentThread()) {
// we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread
threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, listener));
} else {
// we are on a different thread (we went asynchronous), it's safe to recurse
executeSearch(requests, responses, responseCounter, listener);
}
}
}
private void finish() {
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()])));
}
private void executeSearchLoop() {
SearchRequestSlot next;
while ((next = requests.poll()) != null) {
final int nextResponseSlot = next.responseSlot;
searchAction.execute(next.request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
handleResponse(nextResponseSlot, new MultiSearchResponse.Item(searchResponse, null));
}
@Override
public void onFailure(Exception e) {
handleResponse(nextResponseSlot, new MultiSearchResponse.Item(null, e));
}
});
}
}
});
}
@ -176,5 +177,7 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
this.request = request;
this.responseSlot = responseSlot;
}
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.TaskManager;
@ -37,7 +38,12 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Arrays;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -73,17 +79,27 @@ public class TransportMultiSearchActionTests extends ESTestCase {
int maxAllowedConcurrentSearches = scaledRandomIntBetween(1, 16);
AtomicInteger counter = new AtomicInteger();
AtomicReference<AssertionError> errorHolder = new AtomicReference<>();
// randomize whether or not requests are executed asynchronously
final List<String> threadPoolNames = Arrays.asList(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME);
Randomness.shuffle(threadPoolNames);
final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0));
final ExecutorService rarelyExecutor = threadPool.executor(threadPoolNames.get(1));
final Set<SearchRequest> requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>()));
TransportAction<SearchRequest, SearchResponse> searchAction = new TransportAction<SearchRequest, SearchResponse>
(Settings.EMPTY, "action", threadPool, actionFilters, resolver, taskManager) {
@Override
protected void doExecute(SearchRequest request, ActionListener<SearchResponse> listener) {
requests.add(request);
int currentConcurrentSearches = counter.incrementAndGet();
if (currentConcurrentSearches > maxAllowedConcurrentSearches) {
errorHolder.set(new AssertionError("Current concurrent search [" + currentConcurrentSearches +
"] is higher than is allowed [" + maxAllowedConcurrentSearches + "]"));
}
counter.decrementAndGet();
listener.onResponse(new SearchResponse());
final ExecutorService executorService = rarely() ? rarelyExecutor : commonExecutor;
executorService.execute(() -> {
counter.decrementAndGet();
listener.onResponse(new SearchResponse());
});
}
};
TransportMultiSearchAction action =
@ -104,6 +120,7 @@ public class TransportMultiSearchActionTests extends ESTestCase {
MultiSearchResponse response = action.execute(multiSearchRequest).actionGet();
assertThat(response.getResponses().length, equalTo(numSearchRequests));
assertThat(requests.size(), equalTo(numSearchRequests));
assertThat(errorHolder.get(), nullValue());
} finally {
assertTrue(ESTestCase.terminate(threadPool));