diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 89be2ecabeb..c7f1fa5dc5c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -76,8 +76,8 @@ abstract class AbstractSearchAsyncAction exten Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, - SearchTask task, SearchPhaseResults resultConsumer) { - super(name, request, shardsIts, logger); + SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentShardRequests) { + super(name, request, shardsIts, logger, maxConcurrentShardRequests); this.timeProvider = timeProvider; this.logger = logger; this.searchTransportService = searchTransportService; diff --git a/core/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index ea5cf831859..49575125f68 100644 --- a/core/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -26,10 +26,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.transport.Transport; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -55,9 +51,12 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, SearchTask task, Function, SearchPhase> phaseFactory) { + /* + * We set max concurrent shard requests to the number of shards to otherwise avoid deep recursing that would occur if the local node + * is the coordinating node for the query, holds all the shards for the request, and there are a lot of shards. + */ super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, - listener, - shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size())); + listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size()); this.phaseFactory = phaseFactory; this.shardsIts = shardsIts; } diff --git a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index fcee980379b..a68d1d599c5 100644 --- a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -52,7 +52,8 @@ abstract class InitialSearchPhase extends private final AtomicInteger shardExecutionIndex = new AtomicInteger(0); private final int maxConcurrentShardRequests; - InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger) { + InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger, + int maxConcurrentShardRequests) { super(name); this.request = request; this.shardsIts = shardsIts; @@ -62,7 +63,7 @@ abstract class InitialSearchPhase extends // on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result // we process hence we add one for the non active partition here. this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); - maxConcurrentShardRequests = Math.min(request.getMaxConcurrentShardRequests(), shardsIts.size()); + this.maxConcurrentShardRequests = Math.min(maxConcurrentShardRequests, shardsIts.size()); } private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index a901d711571..ec055dfec8d 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -42,7 +42,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, final long clusterStateVersion, final SearchTask task) { super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, - shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size())); + shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()), + request.getMaxConcurrentShardRequests()); this.searchPhaseController = searchPhaseController; } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index de8109aadd8..5ddd1df231d 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -42,7 +42,8 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, SearchTask task) { super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, - shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size())); + shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()), + request.getMaxConcurrentShardRequests()); this.searchPhaseController = searchPhaseController; } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index dba382aed6c..d4fd7b609ee 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -47,9 +47,9 @@ import org.elasticsearch.search.query.ScrollQuerySearchResult; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.TaskAwareTransportRequestHandler; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportActionProxy; -import org.elasticsearch.transport.TaskAwareTransportRequestHandler; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -59,7 +59,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.function.BiFunction; @@ -447,7 +446,7 @@ public class SearchTransportService extends AbstractComponent { }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new); - // this is super cheap and should not hit thread-pool rejections + // this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new, new TaskAwareTransportRequestHandler() { @Override diff --git a/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index ec78f1892f9..8f413eb4364 100644 --- a/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -60,11 +60,12 @@ public class AbstractSearchAsyncActionTests extends ESTestCase { System::nanoTime); } + final SearchRequest request = new SearchRequest(); return new AbstractSearchAsyncAction("test", null, null, null, Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null, - new SearchRequest(), null, new GroupShardsIterator<>(Collections.singletonList( + request, null, new GroupShardsIterator<>(Collections.singletonList( new SearchShardIterator(null, null, Collections.emptyList(), null))), timeProvider, 0, null, - new InitialSearchPhase.ArraySearchPhaseResults<>(10)) { + new InitialSearchPhase.ArraySearchPhaseResults<>(10), request.getMaxConcurrentShardRequests()) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, final SearchPhaseContext context) { return null; diff --git a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 87cebc957c6..373173a1fc6 100644 --- a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -170,4 +170,61 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase { assertEquals(shard1, !result.get().get(0).skip()); assertFalse(result.get().get(1).skip()); // never skip the failure } + + /* + * In cases that a query coordinating node held all the shards for a query, the can match phase would recurse and end in stack overflow + * when subjected to max concurrent search requests. This test is a test for that situation. + */ + public void testLotsOfShards() throws InterruptedException { + final TransportSearchAction.SearchTimeProvider timeProvider = + new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime); + + final Map lookup = new ConcurrentHashMap<>(); + final DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode)); + lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode)); + + final SearchTransportService searchTransportService = + new SearchTransportService(Settings.builder().put("search.remote.connect", false).build(), null, null) { + @Override + public void sendCanMatch( + Transport.Connection connection, + ShardSearchTransportRequest request, + SearchTask task, + ActionListener listener) { + listener.onResponse(new CanMatchResponse(randomBoolean())); + } + }; + + final AtomicReference> result = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + final OriginalIndices originalIndices = new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()); + final GroupShardsIterator shardsIter = + SearchAsyncActionTests.getShardsIter("idx", originalIndices, 2048, randomBoolean(), primaryNode, replicaNode); + final CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase( + logger, + searchTransportService, + (clusterAlias, node) -> lookup.get(node), + Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), + Collections.emptyMap(), + EsExecutors.newDirectExecutorService(), + new SearchRequest(), + null, + shardsIter, + timeProvider, + 0, + null, + (iter) -> new SearchPhase("test") { + @Override + public void run() throws IOException { + result.set(iter); + latch.countDown(); + }}); + + canMatchPhase.start(); + latch.await(); + + } + } diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 3ee681383cd..b9602f26346 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -110,7 +110,8 @@ public class SearchAsyncActionTests extends ESTestCase { new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), 0, null, - new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) { + new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()), + request.getMaxConcurrentShardRequests()) { @Override protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, @@ -199,7 +200,8 @@ public class SearchAsyncActionTests extends ESTestCase { new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), 0, null, - new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) { + new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()), + request.getMaxConcurrentShardRequests()) { @Override protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, @@ -300,7 +302,8 @@ public class SearchAsyncActionTests extends ESTestCase { new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), 0, null, - new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) { + new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()), + request.getMaxConcurrentShardRequests()) { TestSearchResponse response = new TestSearchResponse(); @Override