Add a shard filter search phase to pre-filter shards based on query rewriting (#25658)

Today if we search across a large amount of shards we hit every shard. Yet, it's quite
common to search across an index pattern for time based indices but filtering will exclude
all results outside a certain time range ie. `now-3d`. While the search can potentially hit
hundreds of shards the majority of the shards might yield 0 results since there is not document
that is within this date range. Kibana for instance does this regularly but used `_field_stats`
to optimize the indexes they need to query. Now with the deprecation of `_field_stats` and it's upcoming removal a single dashboard in kibana can potentially turn into searches hitting hundreds or thousands of shards and that can easily cause search rejections even though the most of the requests are very likely super cheap and only need a query rewriting to early terminate with 0 results.

This change adds a pre-filter phase for searches that can, if the number of shards are higher than a the `pre_filter_shard_size` threshold (defaults to 128 shards), fan out to the shards
and check if the query can potentially match any documents at all. While false positives are possible, a negative response means that no matches are possible. These requests are not subject to rejection and can greatly reduce the number of shards a request needs to hit. The approach here is preferable to the kibana approach with field stats since it correctly handles aliases and uses the correct threadpools to execute these requests. Further it's completely transparent to the user and improves scalability of elasticsearch in general on large clusters.
This commit is contained in:
Simon Willnauer 2017-07-12 22:19:20 +02:00 committed by GitHub
parent 86e9438d3c
commit e81804cfa4
54 changed files with 1115 additions and 123 deletions

View File

@ -53,6 +53,6 @@ public class TransportNoopSearchAction extends HandledTransportAction<SearchRequ
new SearchHit[0], 0L, 0.0f),
new InternalAggregations(Collections.emptyList()),
new Suggest(Collections.emptyList()),
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, new ShardSearchFailure[0]));
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, 0, new ShardSearchFailure[0]));
}
}

View File

@ -153,7 +153,7 @@ public class RestHighLevelClientTests extends ESTestCase {
public void testSearchScroll() throws IOException {
Header[] headers = randomHeaders(random(), "Header");
SearchResponse mockSearchResponse = new SearchResponse(new SearchResponseSections(SearchHits.empty(), InternalAggregations.EMPTY,
null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 100, new ShardSearchFailure[0]);
null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 0, 100, new ShardSearchFailure[0]);
mockResponse(mockSearchResponse);
SearchResponse searchResponse = restHighLevelClient.searchScroll(new SearchScrollRequest(randomAlphaOfLengthBetween(5, 10)),
headers);

View File

@ -31,7 +31,6 @@ import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
@ -67,6 +66,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger skippedOps = new AtomicInteger();
private final TransportSearchAction.SearchTimeProvider timeProvider;
@ -107,7 +107,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
if (getNumShards() == 0) {
//no search shards to search on, bail with empty response
//(it happens with search across _all with no indices around and consistent with broadcast operations)
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, buildTookInMillis(),
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, buildTookInMillis(),
ShardSearchFailure.EMPTY_ARRAY));
return;
}
@ -169,35 +169,35 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) {
// we don't aggregate shard failures on non active shards (but do keep the header counts right)
if (TransportActions.isShardNotAvailableException(e)) {
return;
}
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
// lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
if (shardFailures == null) { // this is double checked locking but it's fine since SetOnce uses a volatile read internally
synchronized (shardFailuresMutex) {
shardFailures = this.shardFailures.get(); // read again otherwise somebody else has created it?
if (shardFailures == null) { // still null so we are the first and create a new instance
shardFailures = new AtomicArray<>(getNumShards());
this.shardFailures.set(shardFailures);
if (TransportActions.isShardNotAvailableException(e) == false) {
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
// lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
if (shardFailures == null) { // this is double checked locking but it's fine since SetOnce uses a volatile read internally
synchronized (shardFailuresMutex) {
shardFailures = this.shardFailures.get(); // read again otherwise somebody else has created it?
if (shardFailures == null) { // still null so we are the first and create a new instance
shardFailures = new AtomicArray<>(getNumShards());
this.shardFailures.set(shardFailures);
}
}
}
}
ShardSearchFailure failure = shardFailures.get(shardIndex);
if (failure == null) {
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
} else {
// the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state
if (TransportActions.isReadOverrideException(e)) {
ShardSearchFailure failure = shardFailures.get(shardIndex);
if (failure == null) {
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
} else {
// the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state
if (TransportActions.isReadOverrideException(e)) {
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
}
}
if (results.hasResult(shardIndex)) {
assert failure == null : "shard failed before but shouldn't: " + failure;
successfulOps.decrementAndGet(); // if this shard was successful before (initial phase) we have to adjust the counter
}
}
if (results.hasResult(shardIndex)) {
assert failure == null : "shard failed before but shouldn't: " + failure;
successfulOps.decrementAndGet(); // if this shard was successful before (initial phase) we have to adjust the counter
}
results.consumeShardFailure(shardIndex);
}
/**
@ -264,7 +264,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
@Override
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
buildTookInMillis(), buildShardFailures());
skippedOps.get(), buildTookInMillis(), buildShardFailures());
}
@Override
@ -313,4 +313,11 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
* @param context the search context for the next phase
*/
protected abstract SearchPhase getNextPhase(SearchPhaseResults<Result> results, SearchPhaseContext context);
@Override
protected void skipShard(SearchShardIterator iterator) {
super.skipShard(iterator);
successfulOps.incrementAndGet();
skippedOps.incrementAndGet();
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.transport.Transport;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;
/**
* This search phrase can be used as an initial search phase to pre-filter search shards based on query rewriting.
* The queries are rewritten against the shards and based on the rewrite result shards might be able to be excluded
* from the search. The extra round trip to the search shards is very cheap and is not subject to rejections
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
* large portion of the clusters indices.
*/
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchTransportService.CanMatchResponse> {
private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
private final GroupShardsIterator<SearchShardIterator> shardsIts;
CanMatchPreFilterSearchPhase(Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory) {
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request,
listener,
shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()));
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
}
@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
SearchActionListener<SearchTransportService.CanMatchResponse> listener) {
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
}
@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchTransportService.CanMatchResponse> results,
SearchPhaseContext context) {
return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts));
}
private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseResults results,
GroupShardsIterator<SearchShardIterator> shardsIts) {
int cardinality = results.getNumPossibleMatches();
FixedBitSet possibleMatches = results.getPossibleMatches();
if (cardinality == 0) {
// this is a special case where we have no hit but we need to get at least one search response in order
// to produce a valid search result with all the aggs etc.
possibleMatches.set(0);
}
int i = 0;
for (SearchShardIterator iter : shardsIts) {
if (possibleMatches.get(i++)) {
iter.reset();
} else {
iter.resetAndSkip();
}
}
return shardsIts;
}
private static final class BitSetSearchPhaseResults extends InitialSearchPhase.
SearchPhaseResults<SearchTransportService.CanMatchResponse> {
private final FixedBitSet possibleMatches;
private int numPossibleMatches;
BitSetSearchPhaseResults(int size) {
super(size);
possibleMatches = new FixedBitSet(size);
}
@Override
void consumeResult(SearchTransportService.CanMatchResponse result) {
if (result.canMatch()) {
consumeShardFailure(result.getShardIndex());
}
}
@Override
boolean hasResult(int shardIndex) {
return false; // unneeded
}
@Override
synchronized void consumeShardFailure(int shardIndex) {
// we have to carry over shard failures in order to account for them in the response.
possibleMatches.set(shardIndex);
numPossibleMatches++;
}
synchronized int getNumPossibleMatches() {
return numPossibleMatches;
}
synchronized FixedBitSet getPossibleMatches() {
return possibleMatches;
}
@Override
Stream<SearchTransportService.CanMatchResponse> getSuccessfulResults() {
return Stream.empty();
}
}
}

View File

@ -41,16 +41,16 @@ import java.util.function.Function;
* @see CountedCollector#onFailure(int, SearchShardTarget, Exception)
*/
final class DfsQueryPhase extends SearchPhase {
private final InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> queryResult;
private final InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> queryResult;
private final SearchPhaseController searchPhaseController;
private final AtomicArray<DfsSearchResult> dfsSearchResults;
private final Function<InitialSearchPhase.SearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory;
private final Function<InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory;
private final SearchPhaseContext context;
private final SearchTransportService searchTransportService;
DfsQueryPhase(AtomicArray<DfsSearchResult> dfsSearchResults,
SearchPhaseController searchPhaseController,
Function<InitialSearchPhase.SearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory,
Function<InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory,
SearchPhaseContext context) {
super("dfs_query");
this.queryResult = searchPhaseController.newSearchPhaseResults(context.getRequest(), context.getNumShards());

View File

@ -69,7 +69,7 @@ final class FetchSearchPhase extends SearchPhase {
}
this.fetchResults = new AtomicArray<>(resultConsumer.getNumShards());
this.searchPhaseController = searchPhaseController;
this.queryResults = resultConsumer.results;
this.queryResults = resultConsumer.getAtomicArray();
this.nextPhaseFactory = nextPhaseFactory;
this.context = context;
this.logger = context.getLogger();
@ -105,7 +105,8 @@ final class FetchSearchPhase extends SearchPhase {
-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults);
if (queryAndFetchOptimization) {
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null;
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty [" + phaseResults.isEmpty()
+ "], single result: " + phaseResults.get(0).fetchResult();
// query AND fetch optimization
finishPhase.run();
} else {

View File

@ -24,7 +24,6 @@ import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
@ -132,7 +131,11 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
assert success;
for (int i = 0; i < maxConcurrentShardRequests; i++) {
SearchShardIterator shardRoutings = shardsIts.get(i);
performPhaseOnShard(i, shardRoutings, shardRoutings.nextOrNull());
if (shardRoutings.skip()) {
skipShard(shardRoutings);
} else {
performPhaseOnShard(i, shardRoutings, shardRoutings.nextOrNull());
}
}
}
@ -140,7 +143,11 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
final int index = shardExecutionIndex.getAndIncrement();
if (index < shardsIts.size()) {
SearchShardIterator shardRoutings = shardsIts.get(index);
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
if (shardRoutings.skip()) {
skipShard(shardRoutings);
} else {
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}
}
}
@ -171,8 +178,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
}
}
private void onShardResult(FirstResult result, ShardIterator shardIt) {
maybeExecuteNext();
private void onShardResult(FirstResult result, SearchShardIterator shardIt) {
assert result.getShardIndex() != -1 : "shard index is not set";
assert result.getSearchShardTarget() != null : "search shard target must not be null";
onShardSuccess(result);
@ -181,12 +187,24 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
// cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc.
// increment all the "future" shards to update the total ops since we some may work and some may not...
// and when that happens, we break on total ops, so we must maintain them
final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1);
successfulShardExecution(shardIt);
}
private void successfulShardExecution(SearchShardIterator shardsIt) {
final int remainingOpsOnIterator;
if (shardsIt.skip()) {
remainingOpsOnIterator = shardsIt.remaining();
} else {
remainingOpsOnIterator = shardsIt.remaining() + 1;
}
final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);
if (xTotalOps == expectedTotalOps) {
onPhaseDone();
} else if (xTotalOps > expectedTotalOps) {
throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected ["
+ expectedTotalOps + "]");
} else {
maybeExecuteNext();
}
}
@ -227,41 +245,39 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
/**
* This class acts as a basic result collection that can be extended to do on-the-fly reduction or result processing
*/
static class SearchPhaseResults<Result extends SearchPhaseResult> {
final AtomicArray<Result> results;
abstract static class SearchPhaseResults<Result extends SearchPhaseResult> {
private final int numShards;
SearchPhaseResults(int size) {
results = new AtomicArray<>(size);
protected SearchPhaseResults(int numShards) {
this.numShards = numShards;
}
/**
* Returns the number of expected results this class should collect
*/
final int getNumShards() {
return results.length();
return numShards;
}
/**
* A stream of all non-null (successful) shard results
*/
final Stream<Result> getSuccessfulResults() {
return results.asList().stream();
}
abstract Stream<Result> getSuccessfulResults();
/**
* Consumes a single shard result
* @param result the shards result
*/
void consumeResult(Result result) {
assert results.get(result.getShardIndex()) == null : "shardIndex: " + result.getShardIndex() + " is already set";
results.set(result.getShardIndex(), result);
}
abstract void consumeResult(Result result);
/**
* Returns <code>true</code> iff a result if present for the given shard ID.
*/
final boolean hasResult(int shardIndex) {
return results.get(shardIndex) != null;
abstract boolean hasResult(int shardIndex);
void consumeShardFailure(int shardIndex) {}
AtomicArray<Result> getAtomicArray() {
throw new UnsupportedOperationException();
}
/**
@ -271,4 +287,40 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
throw new UnsupportedOperationException("reduce is not supported");
}
}
/**
* This class acts as a basic result collection that can be extended to do on-the-fly reduction or result processing
*/
static class ArraySearchPhaseResults<Result extends SearchPhaseResult> extends SearchPhaseResults<Result> {
final AtomicArray<Result> results;
ArraySearchPhaseResults(int size) {
super(size);
this.results = new AtomicArray<>(size);
}
Stream<Result> getSuccessfulResults() {
return results.asList().stream();
}
void consumeResult(Result result) {
assert results.get(result.getShardIndex()) == null : "shardIndex: " + result.getShardIndex() + " is already set";
results.set(result.getShardIndex(), result);
}
boolean hasResult(int shardIndex) {
return results.get(shardIndex) != null;
}
@Override
AtomicArray<Result> getAtomicArray() {
return results;
}
}
protected void skipShard(SearchShardIterator iterator) {
assert iterator.skip();
successfulShardExecution(iterator);
}
}

View File

@ -42,7 +42,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
final long clusterStateVersion, final SearchTask task) {
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
shardsIts, timeProvider, clusterStateVersion, task, new SearchPhaseResults<>(shardsIts.size()));
shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()));
this.searchPhaseController = searchPhaseController;
}
@ -55,7 +55,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
@Override
protected SearchPhase getNextPhase(final SearchPhaseResults<DfsSearchResult> results, final SearchPhaseContext context) {
return new DfsQueryPhase(results.results, searchPhaseController, (queryResults) ->
return new DfsQueryPhase(results.getAtomicArray(), searchPhaseController, (queryResults) ->
new FetchSearchPhase(queryResults, searchPhaseController, context), context);
}
}

View File

@ -607,12 +607,12 @@ public final class SearchPhaseController extends AbstractComponent {
}
/**
* A {@link org.elasticsearch.action.search.InitialSearchPhase.SearchPhaseResults} implementation
* A {@link InitialSearchPhase.ArraySearchPhaseResults} implementation
* that incrementally reduces aggregation results as shard results are consumed.
* This implementation can be configured to batch up a certain amount of results and only reduce them
* iff the buffer is exhausted.
*/
static final class QueryPhaseResultConsumer extends InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> {
static final class QueryPhaseResultConsumer extends InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> {
private final InternalAggregations[] aggsBuffer;
private final TopDocs[] topDocsBuffer;
private final boolean hasAggs;
@ -714,9 +714,9 @@ public final class SearchPhaseController extends AbstractComponent {
}
/**
* Returns a new SearchPhaseResults instance. This might return an instance that reduces search responses incrementally.
* Returns a new ArraySearchPhaseResults instance. This might return an instance that reduces search responses incrementally.
*/
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchRequest request, int numShards) {
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchRequest request, int numShards) {
SearchSourceBuilder source = request.source();
boolean isScrollRequest = request.scroll() != null;
final boolean hasAggs = source != null && source.aggregations() != null;
@ -730,7 +730,7 @@ public final class SearchPhaseController extends AbstractComponent {
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs);
}
}
return new InitialSearchPhase.SearchPhaseResults(numShards) {
return new InitialSearchPhase.ArraySearchPhaseResults(numShards) {
@Override
public ReducedQueryPhase reduce() {
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits);

View File

@ -58,6 +58,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
private SearchType searchType = SearchType.DEFAULT;
private String[] indices = Strings.EMPTY_ARRAY;
@ -77,6 +79,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
private int maxConcurrentShardRequests = 0;
private int preFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE;
private String[] types = Strings.EMPTY_ARRAY;
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed();
@ -325,6 +329,28 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
}
this.maxConcurrentShardRequests = maxConcurrentShardRequests;
}
/**
* Sets a threshold that enforces a pre-filter roundtrip to pre-filter search shards based on query rewriting if the number of shards
* the search request expands to exceeds the threshold. This filter roundtrip can limit the number of shards significantly if for
* instance a shard can not match any documents based on it's rewrite method ie. if date filters are mandatory to match but the shard
* bounds and the query are disjoint. The default is <tt>128</tt>
*/
public void setPreFilterShardSize(int preFilterShardSize) {
if (preFilterShardSize < 1) {
throw new IllegalArgumentException("preFilterShardSize must be >= 1");
}
this.preFilterShardSize = preFilterShardSize;
}
/**
* Returns a threshold that enforces a pre-filter roundtrip to pre-filter search shards based on query rewriting if the number of shards
* the search request expands to exceeds the threshold. This filter roundtrip can limit the number of shards significantly if for
* instance a shard can not match any documents based on it's rewrite method ie. if date filters are mandatory to match but the shard
* bounds and the query are disjoint. The default is <tt>128</tt>
*/
public int getPreFilterShardSize() {
return preFilterShardSize;
}
/**
* Returns <code>true</code> iff the maxConcurrentShardRequest is set.
@ -382,6 +408,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
batchedReduceSize = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
maxConcurrentShardRequests = in.readVInt();
preFilterShardSize = in.readVInt();
}
}
@ -403,6 +430,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
out.writeVInt(batchedReduceSize);
if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
out.writeVInt(maxConcurrentShardRequests);
out.writeVInt(preFilterShardSize);
}
}
@ -425,13 +453,14 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
Arrays.equals(types, that.types) &&
Objects.equals(batchedReduceSize, that.batchedReduceSize) &&
Objects.equals(maxConcurrentShardRequests, that.maxConcurrentShardRequests) &&
Objects.equals(preFilterShardSize, that.preFilterShardSize) &&
Objects.equals(indicesOptions, that.indicesOptions);
}
@Override
public int hashCode() {
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
scroll, Arrays.hashCode(types), indicesOptions, maxConcurrentShardRequests);
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize);
}
@Override
@ -447,6 +476,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
", scroll=" + scroll +
", maxConcurrentShardRequests=" + maxConcurrentShardRequests +
", batchedReduceSize=" + batchedReduceSize +
", preFilterShardSize=" + preFilterShardSize +
", source=" + source + '}';
}
}

View File

@ -535,4 +535,15 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
this.request.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
return this;
}
/**
* Sets a threshold that enforces a pre-filter roundtrip to pre-filter search shards based on query rewriting if the number of shards
* the search request expands to exceeds the threshold. This filter roundtrip can limit the number of shards significantly if for
* instance a shard can not match any documents based on it's rewrite method ie. if date filters are mandatory to match but the shard
* bounds and the query are disjoint. The default is <tt>128</tt>
*/
public SearchRequestBuilder setPreFilterShardSize(int preFilterShardSize) {
this.request.setPreFilterShardSize(preFilterShardSize);
return this;
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
@ -66,6 +67,8 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
private int successfulShards;
private int skippedShards;
private ShardSearchFailure[] shardFailures;
private long tookInMillis;
@ -74,13 +77,15 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
}
public SearchResponse(SearchResponseSections internalResponse, String scrollId, int totalShards, int successfulShards,
long tookInMillis, ShardSearchFailure[] shardFailures) {
int skippedShards, long tookInMillis, ShardSearchFailure[] shardFailures) {
this.internalResponse = internalResponse;
this.scrollId = scrollId;
this.totalShards = totalShards;
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
this.tookInMillis = tookInMillis;
this.shardFailures = shardFailures;
assert skippedShards <= totalShards : "skipped: " + skippedShards + " total: " + totalShards;
}
@Override
@ -147,6 +152,14 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
return successfulShards;
}
/**
* The number of shards skipped due to pre-filtering
*/
public int getSkippedShards() {
return skippedShards;
}
/**
* The failed number of shards the search was executed on.
*/
@ -206,8 +219,8 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
if (getNumReducePhases() != 1) {
builder.field(NUM_REDUCE_PHASES.getPreferredName(), getNumReducePhases());
}
RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getFailedShards(),
getShardFailures());
RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getSkippedShards(),
getFailedShards(), getShardFailures());
internalResponse.toXContent(builder, params);
return builder;
}
@ -226,6 +239,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
long tookInMillis = -1;
int successfulShards = -1;
int totalShards = -1;
int skippedShards = 0; // 0 for BWC
String scrollId = null;
List<ShardSearchFailure> failures = new ArrayList<>();
while((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@ -265,6 +279,8 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
successfulShards = parser.intValue();
} else if (RestActions.TOTAL_FIELD.match(currentFieldName)) {
totalShards = parser.intValue();
} else if (RestActions.SKIPPED_FIELD.match(currentFieldName)) {
skippedShards = parser.intValue();
} else {
parser.skipChildren();
}
@ -287,7 +303,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
}
SearchResponseSections searchResponseSections = new SearchResponseSections(hits, aggs, suggest, timedOut, terminatedEarly,
profile, numReducePhases);
return new SearchResponse(searchResponseSections, scrollId, totalShards, successfulShards, tookInMillis,
return new SearchResponse(searchResponseSections, scrollId, totalShards, successfulShards, skippedShards, tookInMillis,
failures.toArray(new ShardSearchFailure[failures.size()]));
}
@ -308,6 +324,9 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
}
scrollId = in.readOptionalString();
tookInMillis = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
skippedShards = in.readVInt();
}
}
@Override
@ -324,10 +343,14 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
out.writeOptionalString(scrollId);
out.writeVLong(tookInMillis);
if(out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
out.writeVInt(skippedShards);
}
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -249,7 +249,7 @@ abstract class SearchScrollAsyncAction<T extends SearchPhaseResult> implements R
scrollId = request.scrollId();
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
0, buildTookInMillis(), buildShardFailures()));
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "inner finish failed", e, buildShardFailures()));
}

View File

@ -34,6 +34,7 @@ public final class SearchShardIterator extends PlainShardIterator {
private final OriginalIndices originalIndices;
private String clusterAlias;
private boolean skip = false;
/**
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
@ -58,4 +59,20 @@ public final class SearchShardIterator extends PlainShardIterator {
public String getClusterAlias() {
return clusterAlias;
}
/**
* Reset the iterator and mark it as skippable
* @see #skip()
*/
void resetAndSkip() {
reset();
skip = true;
}
/**
* Returns <code>true</code> if the search execution should skip this shard since it can not match any documents given the query.
*/
boolean skip() {
return skip;
}
}

View File

@ -74,6 +74,7 @@ public class SearchTransportService extends AbstractComponent {
public static final String QUERY_FETCH_SCROLL_ACTION_NAME = "indices:data/read/search[phase/query+fetch/scroll]";
public static final String FETCH_ID_SCROLL_ACTION_NAME = "indices:data/read/search[phase/fetch/id/scroll]";
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";
public static final String QUERY_CAN_MATCH_NAME = "indices:data/read/search[can_match]";
private final TransportService transportService;
@ -102,6 +103,12 @@ public class SearchTransportService extends AbstractComponent {
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new));
}
public void sendCanMatch(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, final
ActionListener<CanMatchResponse> listener) {
transportService.sendChildRequest(connection, QUERY_CAN_MATCH_NAME, request, task,
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, CanMatchResponse::new));
}
public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener<TransportResponse> listener) {
transportService.sendRequest(connection, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, TransportRequest.Empty.INSTANCE,
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE));
@ -285,8 +292,7 @@ public class SearchTransportService extends AbstractComponent {
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new);
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE,
ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<TransportRequest.Empty>() {
ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler<TransportRequest.Empty>() {
@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) throws Exception {
searchService.freeAllScrollContexts();
@ -366,8 +372,48 @@ public class SearchTransportService extends AbstractComponent {
}
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
// this is super cheap and should not hit thread-pool rejections
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
false, true, new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
boolean canMatch = searchService.canMatch(request);
channel.sendResponse(new CanMatchResponse(canMatch));
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, QuerySearchResult::new);
}
public static final class CanMatchResponse extends SearchPhaseResult {
private boolean canMatch;
public CanMatchResponse() {
}
public CanMatchResponse(boolean canMatch) {
this.canMatch = canMatch;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
canMatch = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(canMatch);
}
public boolean canMatch() {
return canMatch;
}
}
/**
* Returns a connection to the given node on the provided cluster. If the cluster alias is <code>null</code> the node will be resolved
* against the local cluster.

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

View File

@ -50,6 +50,7 @@ import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -313,8 +314,16 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
searchRequest.setMaxConcurrentShardRequests(Math.min(256, nodeCount
* IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getDefault(Settings.EMPTY)));
}
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener).start();
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards).start();
}
private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupShardsIterator<SearchShardIterator> shardIterators) {
SearchSourceBuilder source = searchRequest.source();
return searchRequest.searchType() == QUERY_THEN_FETCH && // we can't do this for DFS it needs to fan out to all shards all the time
SearchService.canRewriteToMatchNone(source) &&
searchRequest.getPreFilterShardSize() < shardIterators.size();
}
static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
@ -341,25 +350,40 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
BiFunction<String, String, Transport.Connection> connectionLookup,
long clusterStateVersion, Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
ActionListener<SearchResponse> listener) {
ActionListener<SearchResponse> listener, boolean preFilter) {
Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
AbstractSearchAsyncAction searchAsyncAction;
switch(searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task);
break;
case QUERY_AND_FETCH:
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task);
break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
if (preFilter) {
return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task, (iter) -> {
AbstractSearchAsyncAction action = searchAsyncAction(task, searchRequest, iter, timeProvider, connectionLookup,
clusterStateVersion, aliasFilter, concreteIndexBoosts, listener, false);
return new SearchPhase(action.getName()) {
@Override
public void run() throws IOException {
action.start();
}
};
});
} else {
AbstractSearchAsyncAction searchAsyncAction;
switch (searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task);
break;
case QUERY_AND_FETCH:
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task);
break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
}
return searchAsyncAction;
}
return searchAsyncAction;
}
private static void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) {

View File

@ -52,6 +52,7 @@ public class RestActions {
public static final ParseField _SHARDS_FIELD = new ParseField("_shards");
public static final ParseField TOTAL_FIELD = new ParseField("total");
public static final ParseField SUCCESSFUL_FIELD = new ParseField("successful");
public static final ParseField SKIPPED_FIELD = new ParseField("skipped");
public static final ParseField FAILED_FIELD = new ParseField("failed");
public static final ParseField FAILURES_FIELD = new ParseField("failures");
@ -73,16 +74,19 @@ public class RestActions {
public static void buildBroadcastShardsHeader(XContentBuilder builder, Params params, BroadcastResponse response) throws IOException {
buildBroadcastShardsHeader(builder, params,
response.getTotalShards(), response.getSuccessfulShards(), response.getFailedShards(),
response.getTotalShards(), response.getSuccessfulShards(), -1, response.getFailedShards(),
response.getShardFailures());
}
public static void buildBroadcastShardsHeader(XContentBuilder builder, Params params,
int total, int successful, int failed,
int total, int successful, int skipped, int failed,
ShardOperationFailedException[] shardFailures) throws IOException {
builder.startObject(_SHARDS_FIELD.getPreferredName());
builder.field(TOTAL_FIELD.getPreferredName(), total);
builder.field(SUCCESSFUL_FIELD.getPreferredName(), successful);
if (skipped >= 0) {
builder.field(SKIPPED_FIELD.getPreferredName(), skipped);
}
builder.field(FAILED_FIELD.getPreferredName(), failed);
if (shardFailures != null && shardFailures.length > 0) {
builder.startArray(FAILURES_FIELD.getPreferredName());

View File

@ -98,7 +98,7 @@ public class RestCountAction extends BaseRestHandler {
}
builder.field("count", response.getHits().getTotalHits());
buildBroadcastShardsHeader(builder, request, response.getTotalShards(), response.getSuccessfulShards(),
response.getFailedShards(), response.getShardFailures());
0, response.getFailedShards(), response.getShardFailures());
builder.endObject();
return new BytesRestResponse(response.status(), builder);

View File

@ -39,6 +39,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
@ -88,6 +89,9 @@ public class RestMultiSearchAction extends BaseRestHandler {
multiRequest.maxConcurrentSearchRequests(restRequest.paramAsInt("max_concurrent_searches", 0));
}
int preFilterShardSize = restRequest.paramAsInt("pre_filter_shard_size", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE);
parseMultiLineRequest(restRequest, multiRequest.indicesOptions(), allowExplicitIndex, (searchRequest, parser) -> {
try {
searchRequest.source(SearchSourceBuilder.fromXContent(parser));
@ -96,7 +100,12 @@ public class RestMultiSearchAction extends BaseRestHandler {
throw new ElasticsearchParseException("Exception when parsing search request", e);
}
});
List<SearchRequest> requests = multiRequest.requests();
preFilterShardSize = Math.max(1, preFilterShardSize / (requests.size()+1));
for (SearchRequest request : requests) {
// preserve if it's set on the request
request.setPreFilterShardSize(Math.min(preFilterShardSize, request.getPreFilterShardSize()));
}
return multiRequest;
}

View File

@ -98,6 +98,7 @@ public class RestSearchAction extends BaseRestHandler {
final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());
searchRequest.setBatchedReduceSize(batchedReduceSize);
searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", searchRequest.getPreFilterShardSize()));
if (request.hasParam("max_concurrent_shard_requests")) {
// only set if we have the parameter since we auto adjust the max concurrency on the coordinator

View File

@ -26,6 +26,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -42,6 +43,9 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.query.InnerHitContextBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchNoneQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
@ -832,4 +836,35 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public AliasFilter buildAliasFilter(ClusterState state, String index, String... expressions) {
return indicesService.buildAliasFilter(state, index, expressions);
}
/**
* This method does a very quick rewrite of the query and returns true if the query can potentially match any documents.
* This method can have false positives while if it returns <code>false</code> the query won't match any documents on the current
* shard.
*/
public boolean canMatch(ShardSearchRequest request) throws IOException {
assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType();
try (DefaultSearchContext context = createSearchContext(request, defaultSearchTimeout, null)) {
SearchSourceBuilder source = context.request().source();
if (canRewriteToMatchNone(source)) {
QueryBuilder queryBuilder = source.query();
return queryBuilder instanceof MatchNoneQueryBuilder == false;
}
return true; // null query means match_all
}
}
public static boolean canRewriteToMatchNone(SearchSourceBuilder source) {
if (source == null || source.query() == null || source.query() instanceof MatchAllQueryBuilder) {
return false;
} else {
AggregatorFactories.Builder aggregations = source.aggregations();
if (aggregations != null) {
if (aggregations.mustVisiteAllDocs()) {
return false;
}
}
}
return true;
}
}

View File

@ -26,6 +26,10 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement;
@ -282,10 +286,22 @@ public class AggregatorFactories {
}
}
public Builder addAggregators(AggregatorFactories factories) {
throw new UnsupportedOperationException("This needs to be removed");
public boolean mustVisiteAllDocs() {
for (AggregationBuilder builder : aggregationBuilders) {
if (builder instanceof GlobalAggregationBuilder) {
return true;
} else if (builder instanceof TermsAggregationBuilder) {
if (((TermsAggregationBuilder) builder).minDocCount() == 0) {
return true;
}
}
}
return false;
}
public Builder addAggregator(AggregationBuilder factory) {
if (!names.add(factory.name)) {
throw new IllegalArgumentException("Two sibling aggregations cannot have the same name: [" + factory.name + "]");

View File

@ -379,6 +379,9 @@ public class TasksIT extends ESIntegTestCase {
assertTrue(taskInfo.getDescription(), Regex.simpleMatch("id[*], size[1], lastEmittedDoc[null]",
taskInfo.getDescription()));
break;
case SearchTransportService.QUERY_CAN_MATCH_NAME:
assertTrue(taskInfo.getDescription(), Regex.simpleMatch("shardId[[test][*]]", taskInfo.getDescription()));
break;
default:
fail("Unexpected action [" + taskInfo.getAction() + "] with description [" + taskInfo.getDescription() + "]");
}

View File

@ -64,7 +64,7 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null,
new SearchRequest(), null, new GroupShardsIterator<>(Collections.singletonList(
new SearchShardIterator(null, null, Collections.emptyList(), null))), timeProvider, 0, null,
new InitialSearchPhase.SearchPhaseResults<>(10)) {
new InitialSearchPhase.ArraySearchPhaseResults<>(10)) {
@Override
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {
return null;

View File

@ -0,0 +1,156 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
public void testFilterShards() throws InterruptedException {
final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(),
System::nanoTime);
Map<String, Transport.Connection> lookup = new ConcurrentHashMap<>();
DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode));
lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode));
final boolean shard1 = randomBoolean();
final boolean shard2 = randomBoolean();
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null) {
@Override
public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task,
ActionListener<CanMatchResponse> listener) {
new Thread(() -> listener.onResponse(new CanMatchResponse(request.shardId().id() == 0 ? shard1 :
shard2))).start();
}
};
AtomicReference<GroupShardsIterator<SearchShardIterator>> result = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
GroupShardsIterator<SearchShardIterator> shardsIter = SearchAsyncActionTests.getShardsIter("idx",
new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()),
2, randomBoolean(), primaryNode, replicaNode);
CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(logger,
searchTransportService,
(clusterAlias, node) -> lookup.get(node),
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
Collections.emptyMap(), EsExecutors.newDirectExecutorService(),
new SearchRequest(), null, shardsIter, timeProvider, 0, null,
(iter) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
result.set(iter);
latch.countDown();
}});
canMatchPhase.start();
latch.await();
if (shard1 && shard2) {
for (SearchShardIterator i : result.get()) {
assertFalse(i.skip());
}
} else if (shard1 == false && shard2 == false) {
assertFalse(result.get().get(0).skip());
assertTrue(result.get().get(1).skip());
} else {
assertEquals(0, result.get().get(0).shardId().id());
assertEquals(1, result.get().get(1).shardId().id());
assertEquals(shard1, !result.get().get(0).skip());
assertEquals(shard2, !result.get().get(1).skip());
}
}
public void testFilterWithFailure() throws InterruptedException {
final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(),
System::nanoTime);
Map<String, Transport.Connection> lookup = new ConcurrentHashMap<>();
DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode));
lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode));
final boolean shard1 = randomBoolean();
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null) {
@Override
public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRequest request, SearchTask task,
ActionListener<CanMatchResponse> listener) {
new Thread(() -> {
if (request.shardId().id() == 0) {
listener.onResponse(new CanMatchResponse(shard1));
} else {
listener.onFailure(new NullPointerException());
}
}).start();
}
};
AtomicReference<GroupShardsIterator<SearchShardIterator>> result = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
GroupShardsIterator<SearchShardIterator> shardsIter = SearchAsyncActionTests.getShardsIter("idx",
new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()),
2, randomBoolean(), primaryNode, replicaNode);
CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(logger,
searchTransportService,
(clusterAlias, node) -> lookup.get(node),
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
Collections.emptyMap(), EsExecutors.newDirectExecutorService(),
new SearchRequest(), null, shardsIter, timeProvider, 0, null,
(iter) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
result.set(iter);
latch.countDown();
}});
canMatchPhase.start();
latch.await();
assertEquals(0, result.get().get(0).shardId().id());
assertEquals(1, result.get().get(1).shardId().id());
assertEquals(shard1, !result.get().get(0).skip());
assertFalse(result.get().get(1).skip()); // never skip the failure
}
}

View File

@ -46,7 +46,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
public void testShortcutQueryAndFetchOptimization() throws IOException {
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> results =
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 1);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
boolean hasHits = randomBoolean();
@ -86,7 +86,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
public void testFetchTwoDocument() throws IOException {
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> results =
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = randomIntBetween(2, 10);
@ -140,7 +140,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
public void testFailFetchOneDoc() throws IOException {
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> results =
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = randomIntBetween(2, 10);
@ -199,7 +199,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
int numHits = randomIntBetween(2, 100); // also numshards --> 1 hit per shard
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits);
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> results =
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), numHits);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
for (int i = 0; i < numHits; i++) {
@ -254,7 +254,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
public void testExceptionFailsPhase() throws IOException {
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> results =
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = randomIntBetween(2, 10);
@ -307,7 +307,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
public void testCleanupIrrelevantContexts() throws IOException { // contexts that are not fetched should be cleaned up
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2);
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> results =
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2);
AtomicReference<SearchResponse> responseRef = new AtomicReference<>();
int resultSetSize = 1;

View File

@ -83,7 +83,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
@Override
public SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
return new SearchResponse(internalSearchResponse, scrollId, numShards, numSuccess.get(), 0,
return new SearchResponse(internalSearchResponse, scrollId, numShards, numSuccess.get(), 0, 0,
failures.toArray(new ShardSearchFailure[0]));
}

View File

@ -55,6 +55,99 @@ import java.util.concurrent.atomic.AtomicReference;
public class SearchAsyncActionTests extends ESTestCase {
public void testSkipSearchShards() throws InterruptedException {
SearchRequest request = new SearchRequest();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<TestSearchResponse> response = new AtomicReference<>();
ActionListener<SearchResponse> responseListener = new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
response.set((TestSearchResponse) searchResponse);
}
@Override
public void onFailure(Exception e) {
logger.warn("test failed", e);
fail(e.getMessage());
}
};
DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
AtomicInteger contextIdGenerator = new AtomicInteger(0);
GroupShardsIterator<SearchShardIterator> shardsIter = getShardsIter("idx",
new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()),
10, randomBoolean(), primaryNode, replicaNode);
int numSkipped = 0;
for (SearchShardIterator iter : shardsIter) {
if (iter.shardId().id() % 2 == 0) {
iter.resetAndSkip();
numSkipped++;
}
}
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null);
Map<String, Transport.Connection> lookup = new HashMap<>();
Map<ShardId, Boolean> seenShard = new ConcurrentHashMap<>();
lookup.put(primaryNode.getId(), new MockConnection(primaryNode));
lookup.put(replicaNode.getId(), new MockConnection(replicaNode));
Map<String, AliasFilter> aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY));
AtomicInteger numRequests = new AtomicInteger(0);
AbstractSearchAsyncAction asyncAction =
new AbstractSearchAsyncAction<TestSearchPhaseResult>(
"test",
logger,
transportService,
(cluster, node) -> {
assert cluster == null : "cluster was not null: " + cluster;
return lookup.get(node); },
aliasFilters,
Collections.emptyMap(),
null,
request,
responseListener,
shardsIter,
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
0,
null,
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) {
@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
SearchActionListener<TestSearchPhaseResult> listener) {
seenShard.computeIfAbsent(shard.shardId(), (i) -> {
numRequests.incrementAndGet(); // only count this once per replica
return Boolean.TRUE;
});
new Thread(() -> {
Transport.Connection connection = getConnection(null, shard.currentNodeId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
connection.getNode());
listener.onResponse(testSearchPhaseResult);
}).start();
}
@Override
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
return new SearchPhase("test") {
@Override
public void run() throws IOException {
latch.countDown();
}
};
}
};
asyncAction.start();
latch.await();
SearchResponse searchResponse = asyncAction.buildSearchResponse(null, null);
assertEquals(shardsIter.size()-numSkipped, numRequests.get());
assertEquals(0, searchResponse.getFailedShards());
assertEquals(numSkipped, searchResponse.getSkippedShards());
assertEquals(shardsIter.size(), searchResponse.getSuccessfulShards());
}
public void testLimitConcurrentShardRequests() throws InterruptedException {
SearchRequest request = new SearchRequest();
int numConcurrent = randomIntBetween(1, 5);
@ -106,7 +199,7 @@ public class SearchAsyncActionTests extends ESTestCase {
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
0,
null,
new InitialSearchPhase.SearchPhaseResults<>(shardsIter.size())) {
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) {
@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
@ -207,7 +300,7 @@ public class SearchAsyncActionTests extends ESTestCase {
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
0,
null,
new InitialSearchPhase.SearchPhaseResults<>(shardsIter.size())) {
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) {
TestSearchResponse response = new TestSearchResponse();
@Override
@ -232,7 +325,7 @@ public class SearchAsyncActionTests extends ESTestCase {
@Override
public void run() throws IOException {
for (int i = 0; i < results.getNumShards(); i++) {
TestSearchPhaseResult result = results.results.get(i);
TestSearchPhaseResult result = results.getAtomicArray().get(i);
assertEquals(result.node.getId(), result.getSearchShardTarget().getNodeId());
sendReleaseSearchContext(result.getRequestId(), new MockConnection(result.node), OriginalIndices.NONE);
}
@ -255,7 +348,7 @@ public class SearchAsyncActionTests extends ESTestCase {
}
}
private static GroupShardsIterator<SearchShardIterator> getShardsIter(String index, OriginalIndices originalIndices, int numShards,
static GroupShardsIterator<SearchShardIterator> getShardsIter(String index, OriginalIndices originalIndices, int numShards,
boolean doReplicas, DiscoveryNode primaryNode, DiscoveryNode replicaNode) {
ArrayList<SearchShardIterator> list = new ArrayList<>();
for (int i = 0; i < numShards; i++) {

View File

@ -288,7 +288,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
SearchRequest request = new SearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer = searchPhaseController.newSearchPhaseResults(request, 3);
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer = searchPhaseController.newSearchPhaseResults(request, 3);
QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new Index("a", "b"), 0, null));
result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]);
InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 1.0D, DocValueFormat.RAW,
@ -335,7 +335,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
SearchRequest request = new SearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
AtomicInteger max = new AtomicInteger();
Thread[] threads = new Thread[expectedNumResults];
@ -374,7 +374,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
SearchRequest request = new SearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0));
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
AtomicInteger max = new AtomicInteger();
for (int i = 0; i < expectedNumResults; i++) {
@ -407,7 +407,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10)));
}
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
AtomicInteger max = new AtomicInteger();
for (int i = 0; i < expectedNumResults; i++) {
@ -450,7 +450,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
}
}
request.setBatchedReduceSize(bufferSize);
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer
= searchPhaseController.newSearchPhaseResults(request, expectedNumResults);
if ((hasAggs || hasTopDocs) && expectedNumResults > bufferSize) {
assertThat("expectedNumResults: " + expectedNumResults + " bufferSize: " + bufferSize,
@ -466,7 +466,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
SearchRequest request = new SearchRequest();
request.source(new SearchSourceBuilder().size(5).from(5));
request.setBatchedReduceSize(randomIntBetween(2, 4));
InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> consumer =
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
searchPhaseController.newSearchPhaseResults(request, 4);
int score = 100;
for (int i = 0; i < 4; i++) {

View File

@ -98,8 +98,9 @@ public class SearchResponseTests extends ESTestCase {
Boolean terminatedEarly = randomBoolean() ? null : randomBoolean();
int numReducePhases = randomIntBetween(1, 10);
long tookInMillis = randomNonNegativeLong();
int successfulShards = randomInt();
int totalShards = randomInt();
int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = randomIntBetween(0, totalShards);
InternalSearchResponse internalSearchResponse;
if (minimal == false) {
SearchHits hits = SearchHitsTests.createTestItem();
@ -111,7 +112,8 @@ public class SearchResponseTests extends ESTestCase {
} else {
internalSearchResponse = InternalSearchResponse.empty();
}
return new SearchResponse(internalSearchResponse, null, totalShards, successfulShards, tookInMillis, shardSearchFailures);
return new SearchResponse(internalSearchResponse, null, totalShards, successfulShards, skippedShards, tookInMillis,
shardSearchFailures);
}
/**
@ -192,7 +194,7 @@ public class SearchResponseTests extends ESTestCase {
hit.score(2.0f);
SearchHit[] hits = new SearchHit[] { hit };
SearchResponse response = new SearchResponse(
new InternalSearchResponse(new SearchHits(hits, 100, 1.5f), null, null, null, false, null, 1), null, 0, 0, 0,
new InternalSearchResponse(new SearchHits(hits, 100, 1.5f), null, null, null, false, null, 1), null, 0, 0, 0, 0,
new ShardSearchFailure[0]);
StringBuilder expectedString = new StringBuilder();
expectedString.append("{");
@ -203,6 +205,7 @@ public class SearchResponseTests extends ESTestCase {
{
expectedString.append("{\"total\":0,");
expectedString.append("\"successful\":0,");
expectedString.append("\"skipped\":0,");
expectedString.append("\"failed\":0},");
}
expectedString.append("\"hits\":");
@ -215,4 +218,4 @@ public class SearchResponseTests extends ESTestCase {
expectedString.append("}");
assertEquals(expectedString.toString(), Strings.toString(response));
}
}
}

View File

@ -39,7 +39,7 @@ public class BroadcastActionsIT extends ESIntegTestCase {
}
public void testBroadcastOperations() throws IOException {
assertAcked(prepareCreate("test", 1).execute().actionGet(5000));
assertAcked(prepareCreate("test", 1));
NumShards numShards = getNumShards("test");

View File

@ -111,7 +111,8 @@ public class IndicesRequestCacheIT extends ESIntegTestCase {
equalTo(0L));
final SearchResponse r1 = client().prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")).get();
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")).setPreFilterShardSize(Integer.MAX_VALUE)
.get();
assertSearchResponse(r1);
assertThat(r1.getHits().getTotalHits(), equalTo(7L));
assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
@ -120,7 +121,8 @@ public class IndicesRequestCacheIT extends ESIntegTestCase {
equalTo(5L));
final SearchResponse r2 = client().prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")).get();
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26"))
.setPreFilterShardSize(Integer.MAX_VALUE).get();
assertSearchResponse(r2);
assertThat(r2.getHits().getTotalHits(), equalTo(7L));
assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
@ -129,7 +131,8 @@ public class IndicesRequestCacheIT extends ESIntegTestCase {
equalTo(7L));
final SearchResponse r3 = client().prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27")).get();
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27")).setPreFilterShardSize(Integer.MAX_VALUE)
.get();
assertSearchResponse(r3);
assertThat(r3.getHits().getTotalHits(), equalTo(7L));
assertThat(client().admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),

View File

@ -25,6 +25,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
@ -37,13 +38,19 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchNoneQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.ShardFetchRequest;
import org.elasticsearch.search.internal.AliasFilter;
@ -305,4 +312,47 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
return null;
}
}
public void testCanMatch() throws IOException {
createIndex("index");
final SearchService service = getInstanceFromNode(SearchService.class);
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
final IndexShard indexShard = indexService.getShard(0);
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH, null,
Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f)));
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
new SearchSourceBuilder(), Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f)));
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
new SearchSourceBuilder().query(new MatchAllQueryBuilder()), Strings.EMPTY_ARRAY, false,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f)));
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
new SearchSourceBuilder().query(new MatchNoneQueryBuilder())
.aggregation(new TermsAggregationBuilder("test", ValueType.STRING).minDocCount(0)), Strings.EMPTY_ARRAY, false,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f)));
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
new SearchSourceBuilder().query(new MatchNoneQueryBuilder())
.aggregation(new GlobalAggregationBuilder("test")), Strings.EMPTY_ARRAY, false,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f)));
assertFalse(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
new SearchSourceBuilder().query(new MatchNoneQueryBuilder()), Strings.EMPTY_ARRAY, false,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f)));
}
public void testCanRewriteToMatchNone() {
assertFalse(SearchService.canRewriteToMatchNone(new SearchSourceBuilder().query(new MatchNoneQueryBuilder())
.aggregation(new GlobalAggregationBuilder("test"))));
assertFalse(SearchService.canRewriteToMatchNone(new SearchSourceBuilder()));
assertFalse(SearchService.canRewriteToMatchNone(null));
assertFalse(SearchService.canRewriteToMatchNone(new SearchSourceBuilder().query(new MatchNoneQueryBuilder())
.aggregation(new TermsAggregationBuilder("test", ValueType.STRING).minDocCount(0))));
assertTrue(SearchService.canRewriteToMatchNone(new SearchSourceBuilder().query(new TermQueryBuilder("foo", "bar"))));
assertTrue(SearchService.canRewriteToMatchNone(new SearchSourceBuilder().query(new MatchNoneQueryBuilder())
.aggregation(new TermsAggregationBuilder("test", ValueType.STRING).minDocCount(1))));
}
}

View File

@ -139,6 +139,7 @@ Possible response:
"_shards": {
"total": 5,
"successful": 5,
"skipped" : 0,
"failed": 0
},
"hits": {

View File

@ -237,6 +237,7 @@ The output from the above is:
"_shards": {
"total": 5,
"successful": 5,
"skipped" : 0,
"failed": 0
},
"hits": {

View File

@ -296,6 +296,7 @@ GET my_index/_search
"_shards": {
"total": 5,
"successful": 5,
"skipped" : 0,
"failed": 0
},
"hits": {

View File

@ -707,6 +707,7 @@ And the response (partially shown):
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
@ -774,6 +775,7 @@ to clutter the docs with it:
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
@ -1104,6 +1106,7 @@ And the response (partially shown):
"_shards": {
"total": 5,
"successful": 5,
"skipped" : 0,
"failed": 0
},
"hits" : {

View File

@ -84,6 +84,7 @@ GET index/_search
"_shards": {
"total": 5,
"successful": 5,
"skipped" : 0,
"failed": 0
},
"hits": {
@ -141,6 +142,7 @@ GET index/_search
"_shards": {
"total": 5,
"successful": 5,
"skipped" : 0,
"failed": 0
},
"hits": {
@ -197,6 +199,7 @@ GET index/_search
"_shards": {
"total": 5,
"successful": 5,
"skipped" : 0,
"failed": 0
},
"hits": {

View File

@ -75,6 +75,7 @@ both index and query time.
"_shards": {
"total": 5,
"successful": 5,
"skipped" : 0,
"failed": 0
},
"hits": {
@ -135,6 +136,7 @@ returns
"_shards": {
"total": 5,
"successful": 5,
"skipped" : 0,
"failed": 0
},
"hits": {

View File

@ -77,6 +77,7 @@ The result produced by the above query.
"_shards" : {
"total": 2,
"successful": 2,
"skipped" : 0,
"failed": 0
},
"hits" : {

View File

@ -85,6 +85,7 @@ The above request will yield the following response:
"_shards": {
"total": 5,
"successful": 5,
"skipped" : 0,
"failed": 0
},
"hits": {
@ -283,6 +284,7 @@ This will yield the following response.
"_shards": {
"total": 5,
"successful": 5,
"skipped" : 0,
"failed": 0
},
"hits": {

View File

@ -39,6 +39,7 @@ tweets from the twitter index for a certain user. The result is:
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
}
}

View File

@ -41,6 +41,7 @@ This will yield the following result:
"_shards": {
"total": 1,
"successful": 1,
"skipped" : 0,
"failed": 0
},
"hits": {

View File

@ -27,6 +27,7 @@ And here is a sample response:
"_shards":{
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits":{
@ -142,6 +143,7 @@ be set to `true` in the response.
"_shards": {
"total": 1,
"successful": 1,
"skipped" : 0,
"failed": 0
},
"hits": {

View File

@ -175,6 +175,7 @@ returns this response:
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits": ...
@ -243,6 +244,7 @@ Which should look like:
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits": {

View File

@ -23,6 +23,7 @@ And here is a sample response:
"_shards":{
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits":{

View File

@ -452,7 +452,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
SearchHit hit = new SearchHit(0, "id", new Text("type"), emptyMap());
SearchHits hits = new SearchHits(new SearchHit[] { hit }, 0, 0);
InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false, 1);
SearchResponse searchResponse = new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), null);
SearchResponse searchResponse = new SearchResponse(internalResponse, scrollId(), 5, 4, 0, randomLong(), null);
if (randomBoolean()) {
client.lastScroll.get().listener.onResponse(searchResponse);

View File

@ -0,0 +1,55 @@
---
"Test that remote indices are subject to shard skipping":
- do:
indices.create:
index: skip_shards_index
body:
settings:
index:
number_of_shards: 1
number_of_replicas: 0
mappings:
test_type:
properties:
created_at:
type: date
format: "yyyy-MM-dd"
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "skip_shards_index", "_type": "test_type"}}'
- '{"f1": "local_cluster", "sort_field": 0, "created_at" : "2017-01-01"}'
# check that we skip the remote shard
- do:
search:
index: "skip_shards_index,my_remote_cluster:single_doc_index"
pre_filter_shard_size: 1
body: { "size" : 10, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"} } } }
- match: { hits.total: 1 }
- match: { hits.hits.0._index: "skip_shards_index"}
- match: { _shards.total: 2 }
- match: { _shards.successful: 2 }
- match: { _shards.skipped : 1}
- match: { _shards.failed: 0 }
- match: { hits.total: 1 }
# check that we skip the local shard
- do:
search:
index: "skip_shards_index,my_remote_cluster:single_doc_index"
pre_filter_shard_size: 1
body: { "size" : 10, "query" : { "range" : { "created_at" : { "gte" : "2015-02-01", "lt": "2016-02-01"} } } }
- match: { hits.total: 1 }
- match: { hits.hits.0._index: "my_remote_cluster:single_doc_index"}
- match: { _shards.total: 2 }
- match: { _shards.successful: 2 }
- match: { _shards.skipped : 1}
- match: { _shards.failed: 0 }
- match: { hits.total: 1 }

View File

@ -9,13 +9,19 @@
index:
number_of_shards: 1
number_of_replicas: 0
mappings:
test_type:
properties:
created_at:
type: date
format: "yyyy-MM-dd"
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "single_doc_index", "_type": "test_type"}}'
- '{"f1": "remote_cluster", "sort_field": 1}'
- '{"f1": "remote_cluster", "sort_field": 1, "created_at" : "2016-01-01"}'
- do:
indices.create:

View File

@ -28,6 +28,11 @@
"typed_keys": {
"type" : "boolean",
"description" : "Specify whether aggregation and suggester names should be prefixed by their respective types in the response"
},
"pre_filter_shard_size" : {
"type" : "number",
"description" : "A threshold that enforces a pre-filter roundtrip to prefilter search shards based on query rewriting if the number of shards the search request expands to exceeds the threshold. This filter roundtrip can limit the number of shards significantly if for instance a shard can not match any documents based on it's rewrite method ie. if date filters are mandatory to match but the shard bounds and the query are disjoint.",
"default" : 128
}
}
},

View File

@ -168,6 +168,11 @@
"type" : "number",
"description" : "The number of concurrent shard requests this search executes concurrently. This value should be used to limit the impact of the search on the cluster in order to limit the number of concurrent shard requests",
"default" : "The default grows with the number of nodes in the cluster but is at most 256."
},
"pre_filter_shard_size" : {
"type" : "number",
"description" : "A threshold that enforces a pre-filter roundtrip to prefilter search shards based on query rewriting if the number of shards the search request expands to exceeds the threshold. This filter roundtrip can limit the number of shards significantly if for instance a shard can not match any documents based on it's rewrite method ie. if date filters are mandatory to match but the shard bounds and the query are disjoint.",
"default" : 128
}
}
},

View File

@ -0,0 +1,163 @@
setup:
- do:
indices.create:
index: index_1
body:
settings:
number_of_shards: 1
mappings:
doc:
properties:
created_at:
type: date
format: "yyyy-MM-dd"
- do:
indices.create:
index: index_2
body:
settings:
number_of_shards: 1
mappings:
doc:
properties:
created_at:
type: date
format: "yyyy-MM-dd"
- do:
indices.create:
index: index_3
body:
settings:
number_of_shards: 1
mappings:
doc:
properties:
created_at:
type: date
format: "yyyy-MM-dd"
---
"pre_filter_shard_size with invalid parameter":
- skip:
version: " - 5.99.99"
reason: this was added in 6.0.0
- do:
catch: /preFilterShardSize must be >= 1/
search:
index: test_1
pre_filter_shard_size: 0
---
"pre_filter_shard_size with shards that have no hit":
- skip:
version: " - 5.99.99"
reason: this was added in 6.0.0
- do:
index:
index: index_1
type: doc
id: 1
body: { "created_at": "2016-01-01"}
- do:
index:
index: index_2
type: doc
id: 2
body: { "created_at": "2017-01-01" }
- do:
index:
index: index_3
type: doc
id: 3
body: { "created_at": "2018-01-01" }
- do:
indices.refresh: {}
- do:
search:
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"} } } }
- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped: 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
# this is the case where we have an empty body and don't skip anything since it's match_all
- do:
search:
pre_filter_shard_size: 1
- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped: 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 3 }
# this is a case where we can actually skip due to rewrite
- do:
search:
pre_filter_shard_size: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"} } } }
- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 1}
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
# this case we skip all except of one since we need a real result
- do:
search:
pre_filter_shard_size: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01", "lt": "2020-02-01"} } } }
- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 2} # skip 2 and execute one to fetch the actual empty result
- match: { _shards.failed: 0 }
- match: { hits.total: 0 }
- do:
search:
pre_filter_shard_size: 1
body: {"size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"} } }, "aggs" : { "some_agg" : { "global" : {} }}}
- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
- match: { aggregations.some_agg.doc_count: 3 }
- do:
search:
pre_filter_shard_size: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index", "min_doc_count" : 0 } } } }
- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 3 }
- do:
search:
pre_filter_shard_size: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }
- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 1 }
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

View File

@ -38,6 +38,7 @@ public class RandomizingClient extends FilterClient {
private final String defaultPreference;
private final int batchedReduceSize;
private final int maxConcurrentShardRequests;
private final int preFilterShardSize;
public RandomizingClient(Client client, Random random) {
@ -61,6 +62,11 @@ public class RandomizingClient extends FilterClient {
} else {
this.maxConcurrentShardRequests = -1; // randomly use the default
}
if (random.nextBoolean()) {
preFilterShardSize = 1 + random.nextInt(1 << random.nextInt(7));
} else {
preFilterShardSize = -1;
}
}
@Override
@ -70,6 +76,9 @@ public class RandomizingClient extends FilterClient {
if (maxConcurrentShardRequests != -1) {
searchRequestBuilder.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
}
if (preFilterShardSize != -1) {
searchRequestBuilder.setPreFilterShardSize(preFilterShardSize);
}
return searchRequestBuilder;
}