Let search phases override max concurrent requests
If the query coordinating node is also a data node that holds all the shards for a search request, we can end up recursing through the can match phase (because we send a local request and on response in the listener move to the next shard and do this again, without ever having returned from previous shards). This recursion can lead to stack overflow for even a reasonable number of indices (daily indices over a sixty days with five shards per day is enough to trigger the stack overflow). Moreover, all this execution would be happening on a network thread (the thread that initially received the query). With this commit, we allow search phases to override max concurrent requests. This allows the can match phase to avoid recursing through the shards towards a stack overflow. Relates #26484
This commit is contained in:
parent
e00db235bc
commit
b3e7e85cf1
|
@ -76,8 +76,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
|
|||
Executor executor, SearchRequest request,
|
||||
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
|
||||
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
|
||||
SearchTask task, SearchPhaseResults<Result> resultConsumer) {
|
||||
super(name, request, shardsIts, logger);
|
||||
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentShardRequests) {
|
||||
super(name, request, shardsIts, logger, maxConcurrentShardRequests);
|
||||
this.timeProvider = timeProvider;
|
||||
this.logger = logger;
|
||||
this.searchTransportService = searchTransportService;
|
||||
|
|
|
@ -26,10 +26,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
|||
import org.elasticsearch.search.internal.AliasFilter;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.BiFunction;
|
||||
|
@ -55,9 +51,12 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
|
|||
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
|
||||
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
|
||||
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory) {
|
||||
/*
|
||||
* We set max concurrent shard requests to the number of shards to otherwise avoid deep recursing that would occur if the local node
|
||||
* is the coordinating node for the query, holds all the shards for the request, and there are a lot of shards.
|
||||
*/
|
||||
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request,
|
||||
listener,
|
||||
shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()));
|
||||
listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size());
|
||||
this.phaseFactory = phaseFactory;
|
||||
this.shardsIts = shardsIts;
|
||||
}
|
||||
|
|
|
@ -52,7 +52,8 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
|
|||
private final AtomicInteger shardExecutionIndex = new AtomicInteger(0);
|
||||
private final int maxConcurrentShardRequests;
|
||||
|
||||
InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator<SearchShardIterator> shardsIts, Logger logger) {
|
||||
InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator<SearchShardIterator> shardsIts, Logger logger,
|
||||
int maxConcurrentShardRequests) {
|
||||
super(name);
|
||||
this.request = request;
|
||||
this.shardsIts = shardsIts;
|
||||
|
@ -62,7 +63,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
|
|||
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
|
||||
// we process hence we add one for the non active partition here.
|
||||
this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
|
||||
maxConcurrentShardRequests = Math.min(request.getMaxConcurrentShardRequests(), shardsIts.size());
|
||||
this.maxConcurrentShardRequests = Math.min(maxConcurrentShardRequests, shardsIts.size());
|
||||
}
|
||||
|
||||
private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId,
|
||||
|
|
|
@ -42,7 +42,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
|
|||
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
|
||||
final long clusterStateVersion, final SearchTask task) {
|
||||
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
|
||||
shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()));
|
||||
shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()),
|
||||
request.getMaxConcurrentShardRequests());
|
||||
this.searchPhaseController = searchPhaseController;
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,8 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
|
|||
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
|
||||
long clusterStateVersion, SearchTask task) {
|
||||
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
|
||||
shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()));
|
||||
shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()),
|
||||
request.getMaxConcurrentShardRequests());
|
||||
this.searchPhaseController = searchPhaseController;
|
||||
}
|
||||
|
||||
|
|
|
@ -47,9 +47,9 @@ import org.elasticsearch.search.query.ScrollQuerySearchResult;
|
|||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportActionProxy;
|
||||
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
|
@ -59,7 +59,6 @@ import org.elasticsearch.transport.TransportService;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
|
@ -447,7 +446,7 @@ public class SearchTransportService extends AbstractComponent {
|
|||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
|
||||
|
||||
// this is super cheap and should not hit thread-pool rejections
|
||||
// this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread
|
||||
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
|
||||
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
|
||||
@Override
|
||||
|
|
|
@ -60,11 +60,12 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
|
|||
System::nanoTime);
|
||||
}
|
||||
|
||||
final SearchRequest request = new SearchRequest();
|
||||
return new AbstractSearchAsyncAction<SearchPhaseResult>("test", null, null, null,
|
||||
Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null,
|
||||
new SearchRequest(), null, new GroupShardsIterator<>(Collections.singletonList(
|
||||
request, null, new GroupShardsIterator<>(Collections.singletonList(
|
||||
new SearchShardIterator(null, null, Collections.emptyList(), null))), timeProvider, 0, null,
|
||||
new InitialSearchPhase.ArraySearchPhaseResults<>(10)) {
|
||||
new InitialSearchPhase.ArraySearchPhaseResults<>(10), request.getMaxConcurrentShardRequests()) {
|
||||
@Override
|
||||
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {
|
||||
return null;
|
||||
|
|
|
@ -170,4 +170,61 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
|
|||
assertEquals(shard1, !result.get().get(0).skip());
|
||||
assertFalse(result.get().get(1).skip()); // never skip the failure
|
||||
}
|
||||
|
||||
/*
|
||||
* In cases that a query coordinating node held all the shards for a query, the can match phase would recurse and end in stack overflow
|
||||
* when subjected to max concurrent search requests. This test is a test for that situation.
|
||||
*/
|
||||
public void testLotsOfShards() throws InterruptedException {
|
||||
final TransportSearchAction.SearchTimeProvider timeProvider =
|
||||
new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime);
|
||||
|
||||
final Map<String, Transport.Connection> lookup = new ConcurrentHashMap<>();
|
||||
final DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
final DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode));
|
||||
lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode));
|
||||
|
||||
final SearchTransportService searchTransportService =
|
||||
new SearchTransportService(Settings.builder().put("search.remote.connect", false).build(), null, null) {
|
||||
@Override
|
||||
public void sendCanMatch(
|
||||
Transport.Connection connection,
|
||||
ShardSearchTransportRequest request,
|
||||
SearchTask task,
|
||||
ActionListener<CanMatchResponse> listener) {
|
||||
listener.onResponse(new CanMatchResponse(randomBoolean()));
|
||||
}
|
||||
};
|
||||
|
||||
final AtomicReference<GroupShardsIterator<SearchShardIterator>> result = new AtomicReference<>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final OriginalIndices originalIndices = new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed());
|
||||
final GroupShardsIterator<SearchShardIterator> shardsIter =
|
||||
SearchAsyncActionTests.getShardsIter("idx", originalIndices, 2048, randomBoolean(), primaryNode, replicaNode);
|
||||
final CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(
|
||||
logger,
|
||||
searchTransportService,
|
||||
(clusterAlias, node) -> lookup.get(node),
|
||||
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
|
||||
Collections.emptyMap(),
|
||||
EsExecutors.newDirectExecutorService(),
|
||||
new SearchRequest(),
|
||||
null,
|
||||
shardsIter,
|
||||
timeProvider,
|
||||
0,
|
||||
null,
|
||||
(iter) -> new SearchPhase("test") {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
result.set(iter);
|
||||
latch.countDown();
|
||||
}});
|
||||
|
||||
canMatchPhase.start();
|
||||
latch.await();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -110,7 +110,8 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
|
||||
0,
|
||||
null,
|
||||
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) {
|
||||
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()),
|
||||
request.getMaxConcurrentShardRequests()) {
|
||||
|
||||
@Override
|
||||
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
|
||||
|
@ -199,7 +200,8 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
|
||||
0,
|
||||
null,
|
||||
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) {
|
||||
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()),
|
||||
request.getMaxConcurrentShardRequests()) {
|
||||
|
||||
@Override
|
||||
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
|
||||
|
@ -300,7 +302,8 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
|
||||
0,
|
||||
null,
|
||||
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) {
|
||||
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()),
|
||||
request.getMaxConcurrentShardRequests()) {
|
||||
TestSearchResponse response = new TestSearchResponse();
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue