From b3e7e85cf110b6d48cab4f82fa1fc689975e81de Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 13 Sep 2017 06:16:27 -0400 Subject: [PATCH] Let search phases override max concurrent requests If the query coordinating node is also a data node that holds all the shards for a search request, we can end up recursing through the can match phase (because we send a local request and on response in the listener move to the next shard and do this again, without ever having returned from previous shards). This recursion can lead to stack overflow for even a reasonable number of indices (daily indices over a sixty days with five shards per day is enough to trigger the stack overflow). Moreover, all this execution would be happening on a network thread (the thread that initially received the query). With this commit, we allow search phases to override max concurrent requests. This allows the can match phase to avoid recursing through the shards towards a stack overflow. Relates #26484 --- .../search/AbstractSearchAsyncAction.java | 4 +- .../search/CanMatchPreFilterSearchPhase.java | 11 ++-- .../action/search/InitialSearchPhase.java | 5 +- .../SearchDfsQueryThenFetchAsyncAction.java | 3 +- .../SearchQueryThenFetchAsyncAction.java | 3 +- .../action/search/SearchTransportService.java | 5 +- .../AbstractSearchAsyncActionTests.java | 5 +- .../CanMatchPreFilterSearchPhaseTests.java | 57 +++++++++++++++++++ .../action/search/SearchAsyncActionTests.java | 9 ++- 9 files changed, 82 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 89be2ecabeb..c7f1fa5dc5c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -76,8 +76,8 @@ abstract class AbstractSearchAsyncAction exten Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, - SearchTask task, SearchPhaseResults resultConsumer) { - super(name, request, shardsIts, logger); + SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentShardRequests) { + super(name, request, shardsIts, logger, maxConcurrentShardRequests); this.timeProvider = timeProvider; this.logger = logger; this.searchTransportService = searchTransportService; diff --git a/core/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index ea5cf831859..49575125f68 100644 --- a/core/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -26,10 +26,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.transport.Transport; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -55,9 +51,12 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, SearchTask task, Function, SearchPhase> phaseFactory) { + /* + * We set max concurrent shard requests to the number of shards to otherwise avoid deep recursing that would occur if the local node + * is the coordinating node for the query, holds all the shards for the request, and there are a lot of shards. + */ super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, - listener, - shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size())); + listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size()); this.phaseFactory = phaseFactory; this.shardsIts = shardsIts; } diff --git a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index fcee980379b..a68d1d599c5 100644 --- a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -52,7 +52,8 @@ abstract class InitialSearchPhase extends private final AtomicInteger shardExecutionIndex = new AtomicInteger(0); private final int maxConcurrentShardRequests; - InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger) { + InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger, + int maxConcurrentShardRequests) { super(name); this.request = request; this.shardsIts = shardsIts; @@ -62,7 +63,7 @@ abstract class InitialSearchPhase extends // on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result // we process hence we add one for the non active partition here. this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); - maxConcurrentShardRequests = Math.min(request.getMaxConcurrentShardRequests(), shardsIts.size()); + this.maxConcurrentShardRequests = Math.min(maxConcurrentShardRequests, shardsIts.size()); } private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index a901d711571..ec055dfec8d 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -42,7 +42,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, final long clusterStateVersion, final SearchTask task) { super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, - shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size())); + shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()), + request.getMaxConcurrentShardRequests()); this.searchPhaseController = searchPhaseController; } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index de8109aadd8..5ddd1df231d 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -42,7 +42,8 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, SearchTask task) { super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, - shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size())); + shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()), + request.getMaxConcurrentShardRequests()); this.searchPhaseController = searchPhaseController; } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index dba382aed6c..d4fd7b609ee 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -47,9 +47,9 @@ import org.elasticsearch.search.query.ScrollQuerySearchResult; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.TaskAwareTransportRequestHandler; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportActionProxy; -import org.elasticsearch.transport.TaskAwareTransportRequestHandler; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -59,7 +59,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.function.BiFunction; @@ -447,7 +446,7 @@ public class SearchTransportService extends AbstractComponent { }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new); - // this is super cheap and should not hit thread-pool rejections + // this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new, new TaskAwareTransportRequestHandler() { @Override diff --git a/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index ec78f1892f9..8f413eb4364 100644 --- a/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -60,11 +60,12 @@ public class AbstractSearchAsyncActionTests extends ESTestCase { System::nanoTime); } + final SearchRequest request = new SearchRequest(); return new AbstractSearchAsyncAction("test", null, null, null, Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null, - new SearchRequest(), null, new GroupShardsIterator<>(Collections.singletonList( + request, null, new GroupShardsIterator<>(Collections.singletonList( new SearchShardIterator(null, null, Collections.emptyList(), null))), timeProvider, 0, null, - new InitialSearchPhase.ArraySearchPhaseResults<>(10)) { + new InitialSearchPhase.ArraySearchPhaseResults<>(10), request.getMaxConcurrentShardRequests()) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, final SearchPhaseContext context) { return null; diff --git a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 87cebc957c6..373173a1fc6 100644 --- a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -170,4 +170,61 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { assertEquals(shard1, !result.get().get(0).skip()); assertFalse(result.get().get(1).skip()); // never skip the failure } + + /* + * In cases that a query coordinating node held all the shards for a query, the can match phase would recurse and end in stack overflow + * when subjected to max concurrent search requests. This test is a test for that situation. + */ + public void testLotsOfShards() throws InterruptedException { + final TransportSearchAction.SearchTimeProvider timeProvider = + new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); + + final Map lookup = new ConcurrentHashMap<>(); + final DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode)); + lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); + + final SearchTransportService searchTransportService = + new SearchTransportService(Settings.builder().put("search.remote.connect", false).build(), null, null) { + @Override + public void sendCanMatch( + Transport.Connection connection, + ShardSearchTransportRequest request, + SearchTask task, + ActionListener listener) { + listener.onResponse(new CanMatchResponse(randomBoolean())); + } + }; + + final AtomicReference> result = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + final OriginalIndices originalIndices = new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()); + final GroupShardsIterator shardsIter = + SearchAsyncActionTests.getShardsIter("idx", originalIndices, 2048, randomBoolean(), primaryNode, replicaNode); + final CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase( + logger, + searchTransportService, + (clusterAlias, node) -> lookup.get(node), + Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), + Collections.emptyMap(), + EsExecutors.newDirectExecutorService(), + new SearchRequest(), + null, + shardsIter, + timeProvider, + 0, + null, + (iter) -> new SearchPhase("test") { + @Override + public void run() throws IOException { + result.set(iter); + latch.countDown(); + }}); + + canMatchPhase.start(); + latch.await(); + + } + } diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 3ee681383cd..b9602f26346 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -110,7 +110,8 @@ public class SearchAsyncActionTests extends ESTestCase { new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), 0, null, - new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) { + new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()), + request.getMaxConcurrentShardRequests()) { @Override protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, @@ -199,7 +200,8 @@ public class SearchAsyncActionTests extends ESTestCase { new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), 0, null, - new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) { + new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()), + request.getMaxConcurrentShardRequests()) { @Override protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, @@ -300,7 +302,8 @@ public class SearchAsyncActionTests extends ESTestCase { new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), 0, null, - new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) { + new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()), + request.getMaxConcurrentShardRequests()) { TestSearchResponse response = new TestSearchResponse(); @Override