From e44390ac204ece3e1e770e9aedbac601d6787f51 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 26 Nov 2018 13:42:41 +0100 Subject: [PATCH] InitialSearchPhase minor cleanups (#35864) This commit simplifies the throttling logic in InitialSearchPhase and removes some asserts from it. Also, a few formatting changes are applied to its code and surrounding classes. --- .../search/CanMatchPreFilterSearchPhase.java | 5 +---- .../action/search/InitialSearchPhase.java | 20 +++++++---------- .../index/shard/SearchOperationListener.java | 22 +++++++++---------- .../elasticsearch/indices/IndicesService.java | 6 ++--- .../elasticsearch/search/SearchService.java | 17 ++++---------- 5 files changed, 27 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 82c8f6b815a..bea5a975b28 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -55,10 +55,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction, SearchPhase> phaseFactory, SearchResponse.Clusters clusters) { - /* - * We set max concurrent shard requests to the number of shards to otherwise avoid deep recursing that would occur if the local node - * is the coordinating node for the query, holds all the shards for the request, and there are a lot of shards. - */ + //We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings, executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters); diff --git a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index 742ad995a8a..b3b7dba9f38 100644 --- a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -80,7 +80,7 @@ abstract class InitialSearchPhase extends // on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result // we process hence we add one for the non active partition here. this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); - this.maxConcurrentRequestsPerNode = Math.min(maxConcurrentRequestsPerNode, shardsIts.size()); + this.maxConcurrentRequestsPerNode = maxConcurrentRequestsPerNode; // in the case were we have less shards than maxConcurrentRequestsPerNode we don't need to throttle this.throttleConcurrentRequests = maxConcurrentRequestsPerNode < shardsIts.size(); this.executor = executor; @@ -161,7 +161,6 @@ abstract class InitialSearchPhase extends } } - private void maybeFork(final Thread thread, final Runnable runnable) { if (thread == Thread.currentThread()) { fork(runnable); @@ -178,7 +177,7 @@ abstract class InitialSearchPhase extends } @Override - protected void doRun() throws Exception { + protected void doRun() { runnable.run(); } @@ -234,15 +233,13 @@ abstract class InitialSearchPhase extends } private void executeNext(PendingExecutions pendingExecutions, Thread originalThread) { - if (pendingExecutions != null) { - assert throttleConcurrentRequests; + if (throttleConcurrentRequests) { maybeFork(originalThread, pendingExecutions::finishAndRunNext); } else { - assert throttleConcurrentRequests == false; + assert pendingExecutions == null; } } - private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) { /* * We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the @@ -286,7 +283,7 @@ abstract class InitialSearchPhase extends try { /* * It is possible to run into connection exceptions here because we are getting the connection early and might - * run in tonodes that are not connected. In this case, on shard failure will move us to the next shard copy. + * run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy. */ fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e)); } finally { @@ -294,11 +291,10 @@ abstract class InitialSearchPhase extends } } }; - if (pendingExecutions == null) { - r.run(); - } else { - assert throttleConcurrentRequests; + if (throttleConcurrentRequests) { pendingExecutions.tryRun(r); + } else { + r.run(); } } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java index b148d1efba3..ede86e6ec22 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java @@ -35,13 +35,13 @@ public interface SearchOperationListener { * Executed before the query phase is executed * @param searchContext the current search context */ - default void onPreQueryPhase(SearchContext searchContext) {}; + default void onPreQueryPhase(SearchContext searchContext) {} /** * Executed if a query phased failed. * @param searchContext the current search context */ - default void onFailedQueryPhase(SearchContext searchContext) {}; + default void onFailedQueryPhase(SearchContext searchContext) {} /** * Executed after the query phase successfully finished. @@ -51,19 +51,19 @@ public interface SearchOperationListener { * * @see #onFailedQueryPhase(SearchContext) */ - default void onQueryPhase(SearchContext searchContext, long tookInNanos) {}; + default void onQueryPhase(SearchContext searchContext, long tookInNanos) {} /** * Executed before the fetch phase is executed * @param searchContext the current search context */ - default void onPreFetchPhase(SearchContext searchContext) {}; + default void onPreFetchPhase(SearchContext searchContext) {} /** * Executed if a fetch phased failed. * @param searchContext the current search context */ - default void onFailedFetchPhase(SearchContext searchContext) {}; + default void onFailedFetchPhase(SearchContext searchContext) {} /** * Executed after the fetch phase successfully finished. @@ -73,13 +73,13 @@ public interface SearchOperationListener { * * @see #onFailedFetchPhase(SearchContext) */ - default void onFetchPhase(SearchContext searchContext, long tookInNanos) {}; + default void onFetchPhase(SearchContext searchContext, long tookInNanos) {} /** * Executed when a new search context was created * @param context the created context */ - default void onNewContext(SearchContext context) {}; + default void onNewContext(SearchContext context) {} /** * Executed when a previously created search context is freed. @@ -88,13 +88,13 @@ public interface SearchOperationListener { * cleaned up. * @param context the freed search context */ - default void onFreeContext(SearchContext context) {}; + default void onFreeContext(SearchContext context) {} /** * Executed when a new scroll search {@link SearchContext} was created * @param context the created search context */ - default void onNewScrollContext(SearchContext context) {}; + default void onNewScrollContext(SearchContext context) {} /** * Executed when a scroll search {@link SearchContext} is freed. @@ -103,7 +103,7 @@ public interface SearchOperationListener { * cleaned up. * @param context the freed search context */ - default void onFreeScrollContext(SearchContext context) {}; + default void onFreeScrollContext(SearchContext context) {} /** * Executed prior to using a {@link SearchContext} that has been retrieved @@ -121,7 +121,7 @@ public interface SearchOperationListener { private final List listeners; private final Logger logger; - public CompositeListener(List listeners, Logger logger) { + CompositeListener(List listeners, Logger logger) { this.listeners = listeners; this.logger = logger; } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 54ec47aa84b..0eb9634e2a0 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -1232,9 +1232,9 @@ public class IndicesService extends AbstractLifecycleComponent final DirectoryReader directoryReader = context.searcher().getDirectoryReader(); boolean[] loadedFromCache = new boolean[] { true }; - BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), () -> { - return "Shard: " + request.shardId() + "\nSource:\n" + request.source(); - }, out -> { + BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), + () -> "Shard: " + request.shardId() + "\nSource:\n" + request.source(), + out -> { queryPhase.execute(context); try { context.queryResult().writeToNoId(out); diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index ee8c996a4b0..29afce4e086 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -36,8 +36,8 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -1039,21 +1039,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv /** * Returns true iff the given search source builder can be early terminated by rewriting to a match none query. Or in other words - * if the execution of a the search request can be early terminated without executing it. This is for instance not possible if + * if the execution of the search request can be early terminated without executing it. This is for instance not possible if * a global aggregation is part of this request or if there is a suggest builder present. */ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) { if (source == null || source.query() == null || source.query() instanceof MatchAllQueryBuilder || source.suggest() != null) { return false; - } else { - AggregatorFactories.Builder aggregations = source.aggregations(); - if (aggregations != null) { - if (aggregations.mustVisitAllDocs()) { - return false; - } - } } - return true; + AggregatorFactories.Builder aggregations = source.aggregations(); + return aggregations == null || aggregations.mustVisitAllDocs() == false; } /* @@ -1102,9 +1096,6 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv public static final class CanMatchResponse extends SearchPhaseResult { private boolean canMatch; - public CanMatchResponse() { - } - public CanMatchResponse(StreamInput in) throws IOException { this.canMatch = in.readBoolean(); }