InitialSearchPhase minor cleanups (#35864)

This commit simplifies  the throttling logic in InitialSearchPhase and removes some asserts from it. Also, a few formatting changes are applied to its code and surrounding classes.
This commit is contained in:
Luca Cavanna 2018-11-26 13:42:41 +01:00 committed by GitHub
parent cd822b7ca8
commit e44390ac20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 27 additions and 43 deletions

View File

@ -55,10 +55,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters) {
/*
* We set max concurrent shard requests to the number of shards to otherwise avoid deep recursing that would occur if the local node
* is the coordinating node for the query, holds all the shards for the request, and there are a lot of shards.
*/
//We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);

View File

@ -80,7 +80,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
// we process hence we add one for the non active partition here.
this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
this.maxConcurrentRequestsPerNode = Math.min(maxConcurrentRequestsPerNode, shardsIts.size());
this.maxConcurrentRequestsPerNode = maxConcurrentRequestsPerNode;
// in the case were we have less shards than maxConcurrentRequestsPerNode we don't need to throttle
this.throttleConcurrentRequests = maxConcurrentRequestsPerNode < shardsIts.size();
this.executor = executor;
@ -161,7 +161,6 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
}
}
private void maybeFork(final Thread thread, final Runnable runnable) {
if (thread == Thread.currentThread()) {
fork(runnable);
@ -178,7 +177,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
}
@Override
protected void doRun() throws Exception {
protected void doRun() {
runnable.run();
}
@ -234,15 +233,13 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
}
private void executeNext(PendingExecutions pendingExecutions, Thread originalThread) {
if (pendingExecutions != null) {
assert throttleConcurrentRequests;
if (throttleConcurrentRequests) {
maybeFork(originalThread, pendingExecutions::finishAndRunNext);
} else {
assert throttleConcurrentRequests == false;
assert pendingExecutions == null;
}
}
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {
/*
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
@ -286,7 +283,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
try {
/*
* It is possible to run into connection exceptions here because we are getting the connection early and might
* run in tonodes that are not connected. In this case, on shard failure will move us to the next shard copy.
* run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
*/
fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));
} finally {
@ -294,11 +291,10 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
}
}
};
if (pendingExecutions == null) {
r.run();
} else {
assert throttleConcurrentRequests;
if (throttleConcurrentRequests) {
pendingExecutions.tryRun(r);
} else {
r.run();
}
}
}

View File

@ -35,13 +35,13 @@ public interface SearchOperationListener {
* Executed before the query phase is executed
* @param searchContext the current search context
*/
default void onPreQueryPhase(SearchContext searchContext) {};
default void onPreQueryPhase(SearchContext searchContext) {}
/**
* Executed if a query phased failed.
* @param searchContext the current search context
*/
default void onFailedQueryPhase(SearchContext searchContext) {};
default void onFailedQueryPhase(SearchContext searchContext) {}
/**
* Executed after the query phase successfully finished.
@ -51,19 +51,19 @@ public interface SearchOperationListener {
*
* @see #onFailedQueryPhase(SearchContext)
*/
default void onQueryPhase(SearchContext searchContext, long tookInNanos) {};
default void onQueryPhase(SearchContext searchContext, long tookInNanos) {}
/**
* Executed before the fetch phase is executed
* @param searchContext the current search context
*/
default void onPreFetchPhase(SearchContext searchContext) {};
default void onPreFetchPhase(SearchContext searchContext) {}
/**
* Executed if a fetch phased failed.
* @param searchContext the current search context
*/
default void onFailedFetchPhase(SearchContext searchContext) {};
default void onFailedFetchPhase(SearchContext searchContext) {}
/**
* Executed after the fetch phase successfully finished.
@ -73,13 +73,13 @@ public interface SearchOperationListener {
*
* @see #onFailedFetchPhase(SearchContext)
*/
default void onFetchPhase(SearchContext searchContext, long tookInNanos) {};
default void onFetchPhase(SearchContext searchContext, long tookInNanos) {}
/**
* Executed when a new search context was created
* @param context the created context
*/
default void onNewContext(SearchContext context) {};
default void onNewContext(SearchContext context) {}
/**
* Executed when a previously created search context is freed.
@ -88,13 +88,13 @@ public interface SearchOperationListener {
* cleaned up.
* @param context the freed search context
*/
default void onFreeContext(SearchContext context) {};
default void onFreeContext(SearchContext context) {}
/**
* Executed when a new scroll search {@link SearchContext} was created
* @param context the created search context
*/
default void onNewScrollContext(SearchContext context) {};
default void onNewScrollContext(SearchContext context) {}
/**
* Executed when a scroll search {@link SearchContext} is freed.
@ -103,7 +103,7 @@ public interface SearchOperationListener {
* cleaned up.
* @param context the freed search context
*/
default void onFreeScrollContext(SearchContext context) {};
default void onFreeScrollContext(SearchContext context) {}
/**
* Executed prior to using a {@link SearchContext} that has been retrieved
@ -121,7 +121,7 @@ public interface SearchOperationListener {
private final List<SearchOperationListener> listeners;
private final Logger logger;
public CompositeListener(List<SearchOperationListener> listeners, Logger logger) {
CompositeListener(List<SearchOperationListener> listeners, Logger logger) {
this.listeners = listeners;
this.logger = logger;
}

View File

@ -1232,9 +1232,9 @@ public class IndicesService extends AbstractLifecycleComponent
final DirectoryReader directoryReader = context.searcher().getDirectoryReader();
boolean[] loadedFromCache = new boolean[] { true };
BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), () -> {
return "Shard: " + request.shardId() + "\nSource:\n" + request.source();
}, out -> {
BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(),
() -> "Shard: " + request.shardId() + "\nSource:\n" + request.source(),
out -> {
queryPhase.execute(context);
try {
context.queryResult().writeToNoId(out);

View File

@ -36,8 +36,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -1039,21 +1039,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
/**
* Returns true iff the given search source builder can be early terminated by rewriting to a match none query. Or in other words
* if the execution of a the search request can be early terminated without executing it. This is for instance not possible if
* if the execution of the search request can be early terminated without executing it. This is for instance not possible if
* a global aggregation is part of this request or if there is a suggest builder present.
*/
public static boolean canRewriteToMatchNone(SearchSourceBuilder source) {
if (source == null || source.query() == null || source.query() instanceof MatchAllQueryBuilder || source.suggest() != null) {
return false;
} else {
}
AggregatorFactories.Builder aggregations = source.aggregations();
if (aggregations != null) {
if (aggregations.mustVisitAllDocs()) {
return false;
}
}
}
return true;
return aggregations == null || aggregations.mustVisitAllDocs() == false;
}
/*
@ -1102,9 +1096,6 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public static final class CanMatchResponse extends SearchPhaseResult {
private boolean canMatch;
public CanMatchResponse() {
}
public CanMatchResponse(StreamInput in) throws IOException {
this.canMatch = in.readBoolean();
}