Always rewrite search shard request outside of the search thread pool (#51708) (#51979)

This change ensures that the rewrite of the shard request is executed in the network thread or in the refresh listener when waiting for an active shard. This allows queries that rewrite to match_no_docs to bypass the search thread pool entirely even if the can_match phase was skipped (pre_filter_shard_size > number of shards). Coordinating nodes don't have the ability to create empty responses so this change also ensures that at least one shard creates a full empty response while the other can return null ones. This is needed since creating true empty responses on shards require to create concrete aggregators which would be too costly to build on a network thread. We should move this functionality to aggregation builders in a follow up but that would be a much bigger change.
This change is also important for #49601 since we want to add the ability to use the result of other shards to rewrite the request of subsequent ones. For instance if the first M shards have their top N computed, the top worst document in the global queue can be pass to subsequent shards that can then rewrite to match_no_docs if they can guarantee that they don't have any document better than the provided one.
This commit is contained in:
Jim Ferenczi 2020-02-06 10:53:11 +01:00 committed by GitHub
parent fb710cc62b
commit 0f333c89b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 624 additions and 170 deletions

View File

@ -153,3 +153,32 @@ setup:
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 2 }
# check that empty responses are correctly handled when rewriting to match_no_docs
- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }
- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 2 }
- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 2
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }
- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 0 }
- length: { aggregations.idx_terms.buckets: 0 }

View File

@ -51,6 +51,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@ -82,6 +83,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, Set<String>> indexRoutings;
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
private final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger skippedOps = new AtomicInteger();
private final SearchTimeProvider timeProvider;
@ -467,6 +469,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
assert result.getSearchShardTarget() != null : "search shard target must not be null";
successfulOps.incrementAndGet();
results.consumeResult(result);
hasShardResponse.set(true);
if (logger.isTraceEnabled()) {
logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);
}
@ -602,8 +605,13 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
String indexName = shardIt.shardId().getIndex().getName();
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
.toArray(new String[0]);
return new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings);
// if we already received a search result we can inform the shard that it
// can return a null response if the request rewrites to match none rather
// than creating an empty response in the search thread pool.
shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get());
return shardRequest;
}
/**

View File

@ -39,6 +39,7 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
@ -65,6 +66,7 @@ import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
public final class SearchPhaseController {
@ -427,6 +429,15 @@ public final class SearchPhaseController {
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
false, null, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
}
int total = queryResults.size();
queryResults = queryResults.stream()
.filter(res -> res.queryResult().isNull() == false)
.collect(Collectors.toList());
String errorMsg = "must have at least one non-empty search result, got 0 out of " + total;
assert queryResults.isEmpty() == false : errorMsg;
if (queryResults.isEmpty()) {
throw new IllegalStateException(errorMsg);
}
final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
final boolean hasProfileResults = firstResult.hasProfileResults();
@ -497,6 +508,18 @@ public final class SearchPhaseController {
firstResult.sortValueFormats(), numReducePhases, size, from, false);
}
/*
* Returns the size of the requested top documents (from + size)
*/
static int getTopDocsSize(SearchRequest request) {
if (request.source() == null) {
return SearchService.DEFAULT_SIZE;
}
SearchSourceBuilder source = request.source();
return (source.size() == -1 ? SearchService.DEFAULT_SIZE : source.size()) +
(source.from() == -1 ? SearchService.DEFAULT_FROM : source.from());
}
public static final class ReducedQueryPhase {
// the sum of all hits across all reduces shards
final TotalHits totalHits;
@ -576,6 +599,7 @@ public final class SearchPhaseController {
private final SearchProgressListener progressListener;
private int numReducePhases = 0;
private final TopDocsStats topDocsStats;
private final int topNSize;
private final boolean performFinalReduce;
/**
@ -589,7 +613,7 @@ public final class SearchPhaseController {
*/
private QueryPhaseResultConsumer(SearchProgressListener progressListener, SearchPhaseController controller,
int expectedResultSize, int bufferSize, boolean hasTopDocs, boolean hasAggs,
int trackTotalHitsUpTo, boolean performFinalReduce) {
int trackTotalHitsUpTo, int topNSize, boolean performFinalReduce) {
super(expectedResultSize);
if (expectedResultSize != 1 && bufferSize < 2) {
throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result");
@ -610,6 +634,7 @@ public final class SearchPhaseController {
this.hasAggs = hasAggs;
this.bufferSize = bufferSize;
this.topDocsStats = new TopDocsStats(trackTotalHitsUpTo);
this.topNSize = topNSize;
this.performFinalReduce = performFinalReduce;
}
@ -622,36 +647,38 @@ public final class SearchPhaseController {
}
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (index == bufferSize) {
if (querySearchResult.isNull() == false) {
if (index == bufferSize) {
if (hasAggs) {
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
}
if (hasTopDocs) {
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
// we have to merge here in the same way we collect on a shard
topNSize, 0);
Arrays.fill(topDocsBuffer, null);
topDocsBuffer[0] = reducedTopDocs;
}
numReducePhases++;
index = 1;
if (hasAggs) {
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
}
}
final int i = index++;
if (hasAggs) {
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
}
if (hasTopDocs) {
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
// we have to merge here in the same way we collect on a shard
querySearchResult.from() + querySearchResult.size(), 0);
Arrays.fill(topDocsBuffer, null);
topDocsBuffer[0] = reducedTopDocs;
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
numReducePhases++;
index = 1;
if (hasAggs) {
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
}
}
final int i = index++;
if (hasAggs) {
aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
}
if (hasTopDocs) {
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget();
}
@ -706,9 +733,10 @@ public final class SearchPhaseController {
if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
if (request.getBatchedReduceSize() < numShards) {
int topNSize = getTopDocsSize(request);
// only use this if there are aggs and if there are more shards than we should reduce at once
return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
trackTotalHitsUpTo, request.isFinalReduce());
trackTotalHitsUpTo, topNSize, request.isFinalReduce());
}
}
return new ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
@ -731,7 +759,7 @@ public final class SearchPhaseController {
static final class TopDocsStats {
final int trackTotalHitsUpTo;
private long totalHits;
long totalHits;
private TotalHits.Relation totalHitsRelation;
long fetchHits;
private float maxScore = Float.NEGATIVE_INFINITY;

View File

@ -1254,6 +1254,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
markSearcherAccessed();
final Engine engine = getEngine();
final Engine.Searcher searcher = engine.acquireSearcher(source, scope);
return wrapSearcher(searcher);
}
/**
* Wraps the provided searcher acquired with {@link #acquireSearcherNoWrap(String)}.
*/
public Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader())
!= null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader";
boolean success = false;

View File

@ -308,11 +308,31 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
rewriteShardRequest(request, ActionListener.map(listener, r -> executeDfsPhase(r, task)));
rewriteShardRequest(request, ActionListener.wrap(
// fork the execution in the search thread pool and wraps the searcher
// to execute the query
context -> {
try {
context.wrapSearcher().execute(() -> {
final SearchPhaseResult result;
try {
result = executeDfsPhase(context, task);
} catch (Exception exc) {
listener.onFailure(exc);
return;
}
listener.onResponse(result);
});
} catch (Exception exc) {
// if the execution is rejected we need to close the searcher
IOUtils.closeWhileHandlingException(context.searcher);
listener.onFailure(exc);
}
}, listener::onFailure));
}
private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardTask task) throws IOException {
final SearchContext context = createAndPutContext(request);
private DfsSearchResult executeDfsPhase(SearchRewriteContext rewriteContext, SearchShardTask task) throws IOException {
final SearchContext context = createAndPutContext(rewriteContext);
context.incRef();
try {
context.setTask(task);
@ -343,15 +363,59 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
rewriteShardRequest(request, ActionListener.map(listener, r -> executeQueryPhase(r, task)));
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
: "empty responses require more than one shard";
rewriteShardRequest(request, ActionListener.wrap(
context -> {
try {
ShardSearchRequest rewritten = context.request;
if (rewritten.canReturnNullResponseIfMatchNoDocs()
&& canRewriteToMatchNone(rewritten.source())
&& rewritten.source().query() instanceof MatchNoneQueryBuilder) {
onMatchNoDocs(context, listener);
} else {
// fork the execution in the search thread pool and wraps the searcher
// to execute the query
context.wrapSearcher().execute(() -> {
final SearchPhaseResult result;
try {
result = executeQueryPhase(context, task);
} catch (Exception exc) {
listener.onFailure(exc);
return;
}
listener.onResponse(result);
});
}
} catch (Exception exc) {
// if the execution is rejected we need to close the searcher
IOUtils.closeWhileHandlingException(context.searcher);
listener.onFailure(exc);
}
}, listener::onFailure));
}
private void onMatchNoDocs(SearchRewriteContext rewriteContext, ActionListener<SearchPhaseResult> listener) {
// creates a lightweight search context that we use to inform context listeners
// before closing
SearchContext searchContext = createSearchContext(rewriteContext, defaultSearchTimeout);
try (SearchContext dummy = searchContext) {
onNewContext(searchContext);
onFreeContext(searchContext);
} catch (Exception exc) {
listener.onFailure(exc);
return;
}
listener.onResponse(QuerySearchResult.nullInstance());
}
private <T> void runAsync(long id, Supplier<T> executable, ActionListener<T> listener) {
getExecutor(id).execute(ActionRunnable.supply(listener, executable::get));
}
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception {
final SearchContext context = createAndPutContext(request);
private SearchPhaseResult executeQueryPhase(SearchRewriteContext rewriteContext, SearchShardTask task) throws Exception {
final SearchContext context = createAndPutContext(rewriteContext);
final ShardSearchRequest request = rewriteContext.request;
context.incRef();
try {
context.setTask(task);
@ -542,15 +606,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
final SearchContext createAndPutContext(ShardSearchRequest request) throws IOException {
if (request.scroll() != null && openScrollContexts.get() >= maxOpenScrollContext) {
throw new ElasticsearchException(
"Trying to create too many scroll contexts. Must be less than or equal to: [" +
maxOpenScrollContext + "]. " + "This limit can be set by changing the ["
+ MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting.");
}
SearchContext context = createContext(request);
final SearchContext createAndPutContext(SearchRewriteContext rewriteContext) {
SearchContext context = createContext(rewriteContext);
onNewContext(context);
boolean success = false;
try {
@ -584,9 +641,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
final SearchContext createContext(ShardSearchRequest request) throws IOException {
final DefaultSearchContext context = createSearchContext(request, defaultSearchTimeout);
final SearchContext createContext(SearchRewriteContext rewriteContext) {
final DefaultSearchContext context = createSearchContext(rewriteContext, defaultSearchTimeout);
try {
if (rewriteContext.request != null && openScrollContexts.get() >= maxOpenScrollContext) {
throw new ElasticsearchException(
"Trying to create too many scroll contexts. Must be less than or equal to: [" +
maxOpenScrollContext + "]. " + "This limit can be set by changing the ["
+ MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting.");
}
final ShardSearchRequest request = rewriteContext.request;
if (request.scroll() != null) {
context.scrollContext(new ScrollContext());
context.scrollContext().scroll = request.scroll();
@ -622,42 +686,33 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException {
return createSearchContext(request, timeout, true, "search");
IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
SearchRewriteContext rewriteContext = acquireSearcherAndRewrite(request, shard);
// make sure that we wrap the searcher when executing the query
return createSearchContext(rewriteContext.wrapSearcher(), timeout);
}
private DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout,
boolean assertAsyncActions, String source)
throws IOException {
private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) {
final ShardSearchRequest request = rewriteContext.request;
final Engine.Searcher searcher = rewriteContext.searcher;
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(),
indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE);
Engine.Searcher searcher = indexShard.acquireSearcher(source);
boolean success = false;
DefaultSearchContext searchContext = null;
try {
searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
searcher, clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout,
fetchPhase, clusterService.state().nodes().getMinNodeVersion());
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
// during rewrite and normalized / evaluate templates etc.
QueryShardContext context = new QueryShardContext(searchContext.getQueryShardContext());
Rewriteable.rewrite(request.getRewriteable(), context, assertAsyncActions);
assert searchContext.getQueryShardContext().isCacheable();
success = true;
return searchContext;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(searchContext);
if (searchContext == null) {
// we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise
// leak a searcher and this can have severe implications (unable to obtain shard lock exceptions).
IOUtils.closeWhileHandlingException(searcher);
}
// we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise
// leak a searcher and this can have severe implications (unable to obtain shard lock exceptions).
IOUtils.closeWhileHandlingException(rewriteContext.searcher);
}
}
return searchContext;
}
private void freeAllContextForIndex(Index index) {
@ -1063,24 +1118,50 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
return aggregations == null || aggregations.mustVisitAllDocs() == false;
}
/*
* Rewrites the search request with a light weight rewrite context in order to fetch resources asynchronously
* The action listener is guaranteed to be executed on the search thread-pool
* and then rewrites with a searcher when the shard is active.
* The provided action listener is executed on the same thread or in a listener threadpool.
*/
private void rewriteShardRequest(ShardSearchRequest request, ActionListener<ShardSearchRequest> listener) {
private void rewriteShardRequest(ShardSearchRequest request, ActionListener<SearchRewriteContext> listener) {
IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
Executor executor = getExecutor(shard);
ActionListener<Rewriteable> actionListener = ActionListener.wrap(r ->
// now we need to check if there is a pending refresh and register
shard.awaitShardSearchActive(b -> executor.execute(ActionRunnable.supply(listener, () -> request))),
listener::onFailure);
shard.awaitShardSearchActive(b -> {
try {
// we can now acquire a searcher and rewrite the request with it
SearchRewriteContext rewriteContext = acquireSearcherAndRewrite(request, shard);
listener.onResponse(rewriteContext);
} catch (Exception e) {
listener.onFailure(e);
}
}), listener::onFailure);
// we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
// AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not
// adding a lot of overhead
Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), actionListener);
}
SearchRewriteContext acquireSearcherAndRewrite(ShardSearchRequest request, IndexShard shard) throws IOException {
// acquire the searcher for rewrite with no wrapping in order to avoid costly
// operations. We'll wrap the searcher at a later stage (when executing the query).
Engine.Searcher searcher = shard.acquireSearcherNoWrap("search");
boolean success = false;
try {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), searcher,
request::nowInMillis, request.getClusterAlias());
Rewriteable.rewrite(request.getRewriteable(), context, true);
SearchRewriteContext rewrite = new SearchRewriteContext(request, shard, searcher, getExecutor(shard));
success = true;
return rewrite;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(searcher);
}
}
}
/**
* Returns a new {@link QueryRewriteContext} with the given {@code now} provider
*/
@ -1097,6 +1178,37 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
finalReduce ? multiBucketConsumerService.create() : bucketCount -> {}, finalReduce);
}
static class SearchRewriteContext {
private final ShardSearchRequest request;
private final IndexShard shard;
private Engine.Searcher searcher;
private final Executor executor;
private boolean isWrapped;
private SearchRewriteContext(ShardSearchRequest request,
IndexShard shard,
Engine.Searcher searcher,
Executor executor) {
this.request = request;
this.shard = shard;
this.searcher = searcher;
this.executor = executor;
}
SearchRewriteContext wrapSearcher() {
assert isWrapped == false : "searcher already wrapped";
isWrapped = true;
searcher = shard.wrapSearcher(searcher);
return this;
}
void execute(Runnable runnable) {
assert isWrapped : "searcher is not wrapped";
executor.execute(runnable);
}
}
public static final class CanMatchResponse extends SearchPhaseResult {
private final boolean canMatch;
private final MinAndMax<?> minAndMax;

View File

@ -47,6 +47,7 @@ import org.elasticsearch.indices.AliasFilterParsingException;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
@ -75,7 +76,9 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
private final String preference;
private final OriginalIndices originalIndices;
//these are the only two mutable fields, as they are subject to rewriting
private boolean canReturnNullResponseIfMatchNoDocs;
//these are the only mutable fields, as they are subject to rewriting
private AliasFilter aliasFilter;
private SearchSourceBuilder source;
@ -175,6 +178,11 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
indexRoutings = Strings.EMPTY_ARRAY;
preference = null;
}
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
canReturnNullResponseIfMatchNoDocs = in.readBoolean();
} else {
canReturnNullResponseIfMatchNoDocs = false;
}
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@ -212,6 +220,9 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
out.writeOptionalString(preference);
}
}
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
out.writeBoolean(canReturnNullResponseIfMatchNoDocs);
}
}
@Override
@ -290,6 +301,19 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
return preference;
}
/**
* Returns true if the caller can handle null response {@link QuerySearchResult#nullInstance()}.
* Defaults to false since the coordinator node needs at least one shard response to build the global
* response.
*/
public boolean canReturnNullResponseIfMatchNoDocs() {
return canReturnNullResponseIfMatchNoDocs;
}
public void canReturnNullResponseIfMatchNoDocs(boolean value) {
this.canReturnNullResponseIfMatchNoDocs = value;
}
/**
* Returns the cache key for this shard search request, based on its content
*/

View File

@ -63,18 +63,56 @@ public final class QuerySearchResult extends SearchPhaseResult {
private long serviceTimeEWMA = -1;
private int nodeQueueSize = -1;
private final boolean isNull;
public QuerySearchResult() {
this(false);
}
public QuerySearchResult(StreamInput in) throws IOException {
super(in);
long id = in.readLong();
readFromWithId(id, in);
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
isNull = in.readBoolean();
} else {
isNull = false;
}
if (isNull == false) {
long id = in.readLong();
readFromWithId(id, in);
}
}
public QuerySearchResult(long id, SearchShardTarget shardTarget) {
this.requestId = id;
setSearchShardTarget(shardTarget);
isNull = false;
}
private QuerySearchResult(boolean isNull) {
this.isNull = isNull;
}
private static final QuerySearchResult nullInstance = new QuerySearchResult(true);
/**
* Returns an instance that contains no response.
*/
public static QuerySearchResult nullInstance() {
return nullInstance;
}
/**
* Returns true if the result doesn't contain any useful information.
* It is used by the search action to avoid creating an empty response on
* shard request that rewrites to match_no_docs.
*
* TODO: Currently we need the concrete aggregators to build empty responses. This means that we cannot
* build an empty response in the coordinating node so we rely on this hack to ensure that at least one shard
* returns a valid empty response. We should move the ability to create empty responses to aggregation builders
* in order to allow building empty responses directly from the coordinating node.
*/
public boolean isNull() {
return isNull;
}
@Override
@ -173,6 +211,10 @@ public final class QuerySearchResult extends SearchPhaseResult {
hasAggs = aggregations != null;
}
public InternalAggregations aggregations() {
return aggregations;
}
/**
* Returns and nulls out the profiled results for this search, or potentially null if result was empty.
* This allows to free up memory once the profiled result is consumed.
@ -305,8 +347,13 @@ public final class QuerySearchResult extends SearchPhaseResult {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(requestId);
writeToNoId(out);
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
out.writeBoolean(isNull);
}
if (isNull == false) {
out.writeLong(requestId);
writeToNoId(out);
}
}
public void writeToNoId(StreamOutput out) throws IOException {

View File

@ -338,16 +338,37 @@ public class SearchPhaseControllerTests extends ESTestCase {
}
public void testConsumer() {
consumerTestCase(0);
}
public void testConsumerWithEmptyResponse() {
consumerTestCase(randomIntBetween(1, 5));
}
private void consumerTestCase(int numEmptyResponses) {
int numShards = 3 + numEmptyResponses;
int bufferSize = randomIntBetween(2, 3);
SearchRequest request = randomSearchRequest();
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
request.setBatchedReduceSize(bufferSize);
ArraySearchPhaseResults<SearchPhaseResult> consumer = searchPhaseController.newSearchPhaseResults(NOOP, request, 3);
assertEquals(0, reductions.size());
ArraySearchPhaseResults<SearchPhaseResult> consumer =
searchPhaseController.newSearchPhaseResults(NOOP, request, 3+numEmptyResponses);
if (numEmptyResponses == 0) {
assertEquals(0, reductions.size());
}
if (numEmptyResponses > 0) {
QuerySearchResult empty = QuerySearchResult.nullInstance();
int shardId = 2 + numEmptyResponses;
empty.setShardIndex(2+numEmptyResponses);
empty.setSearchShardTarget(new SearchShardTarget("node", new ShardId("a", "b", shardId), null, OriginalIndices.NONE));
consumer.consumeResult(empty);
numEmptyResponses --;
}
QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new ShardId("a", "b", 0),
null, OriginalIndices.NONE));
result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN),
new DocValueFormat[0]);
new DocValueFormat[0]);
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 1.0D, DocValueFormat.RAW,
Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs);
@ -356,7 +377,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
result = new QuerySearchResult(1, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE));
result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN),
new DocValueFormat[0]);
new DocValueFormat[0]);
aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 3.0D, DocValueFormat.RAW,
Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs);
@ -365,20 +386,38 @@ public class SearchPhaseControllerTests extends ESTestCase {
result = new QuerySearchResult(1, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE));
result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN),
new DocValueFormat[0]);
new DocValueFormat[0]);
aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 2.0D, DocValueFormat.RAW,
Collections.emptyList(), Collections.emptyMap())));
result.aggregations(aggs);
result.setShardIndex(1);
consumer.consumeResult(result);
while (numEmptyResponses > 0) {
result = QuerySearchResult.nullInstance();
int shardId = 2 + numEmptyResponses;
result.setShardIndex(shardId);
result.setSearchShardTarget(new SearchShardTarget("node", new ShardId("a", "b", shardId), null, OriginalIndices.NONE));
consumer.consumeResult(result);
numEmptyResponses--;
}
final int numTotalReducePhases;
if (bufferSize == 2) {
if (numShards > bufferSize) {
assertThat(consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class));
assertEquals(1, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumReducePhases());
assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumBuffered());
assertEquals(1, reductions.size());
assertEquals(false, reductions.get(0));
numTotalReducePhases = 2;
if (bufferSize == 2) {
assertEquals(1, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumReducePhases());
assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumBuffered());
assertEquals(1, reductions.size());
assertEquals(false, reductions.get(0));
numTotalReducePhases = 2;
} else {
assertEquals(0, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumReducePhases());
assertEquals(3, ((SearchPhaseController.QueryPhaseResultConsumer) consumer).getNumBuffered());
assertEquals(0, reductions.size());
numTotalReducePhases = 1;
}
} else {
assertThat(consumer, not(instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)));
assertEquals(0, reductions.size());

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.index.cache.request.RequestCacheStats;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
@ -123,20 +124,26 @@ public class IndicesRequestCacheIT extends ESIntegTestCase {
assertCacheState(client, "index", 0, 0);
final SearchResponse r1 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")).setPreFilterShardSize(Integer.MAX_VALUE).get();
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25"))
// to ensure that query is executed even if it rewrites to match_no_docs
.addAggregation(new GlobalAggregationBuilder("global"))
.setPreFilterShardSize(Integer.MAX_VALUE).get();
ElasticsearchAssertions.assertAllSuccessful(r1);
assertThat(r1.getHits().getTotalHits().value, equalTo(7L));
assertCacheState(client, "index", 0, 5);
final SearchResponse r2 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26"))
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26"))
.addAggregation(new GlobalAggregationBuilder("global"))
.setPreFilterShardSize(Integer.MAX_VALUE).get();
ElasticsearchAssertions.assertAllSuccessful(r2);
assertThat(r2.getHits().getTotalHits().value, equalTo(7L));
assertCacheState(client, "index", 3, 7);
final SearchResponse r3 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27")).setPreFilterShardSize(Integer.MAX_VALUE)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27"))
.addAggregation(new GlobalAggregationBuilder("global"))
.setPreFilterShardSize(Integer.MAX_VALUE)
.get();
ElasticsearchAssertions.assertAllSuccessful(r3);
assertThat(r3.getHits().getTotalHits().value, equalTo(7L));

View File

@ -66,6 +66,7 @@ import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
@ -77,6 +78,7 @@ import org.elasticsearch.search.fetch.ShardFetchRequest;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.junit.Before;
@ -168,10 +170,12 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
indexModule.addSearchOperationListener(new SearchOperationListener() {
@Override
public void onNewContext(SearchContext context) {
if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) {
assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]"));
} else {
assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]"));
if (context.query() != null) {
if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) {
assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]"));
} else {
assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]"));
}
}
}
@ -357,15 +361,11 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
final IndexShard indexShard = indexService.getShard(0);
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
final SearchContext contextWithDefaultTimeout = service.createContext(
new ShardSearchRequest(
OriginalIndices.NONE,
searchRequest,
indexShard.shardId(),
1,
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f, -1, null, null)
);
SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(
new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null),
indexShard);
final SearchContext contextWithDefaultTimeout = service.createContext(rewriteContext);
try {
// the search context should inherit the default timeout
assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5)));
@ -376,15 +376,11 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
final long seconds = randomIntBetween(6, 10);
searchRequest.source(new SearchSourceBuilder().timeout(TimeValue.timeValueSeconds(seconds)));
final SearchContext context = service.createContext(
new ShardSearchRequest(
OriginalIndices.NONE,
searchRequest,
indexShard.shardId(),
1,
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f, -1, null, null)
);
rewriteContext = service.acquireSearcherAndRewrite(
new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null),
indexShard);
final SearchContext context = service.createContext(rewriteContext);
try {
// the search context should inherit the query timeout
assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds)));
@ -412,19 +408,25 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
for (int i = 0; i < indexService.getIndexSettings().getMaxDocvalueFields(); i++) {
searchSourceBuilder.docValueField("field" + i);
}
try (SearchContext context = service.createContext(
new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null))
) {
assertNotNull(context);
ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null);
{
SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard);
try (SearchContext context = service.createContext(rewriteContext)) {
assertNotNull(context);
}
}
{
SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard);
searchSourceBuilder.docValueField("one_field_too_much");
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null)));
() -> service.createContext(rewriteContext));
assertEquals(
"Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. "
+ "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.",
ex.getMessage());
"Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. "
+ "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.", ex.getMessage());
}
}
@ -447,20 +449,28 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
searchSourceBuilder.scriptField("field" + i,
new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap()));
}
try (SearchContext context = service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest,
indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f, -1, null, null))) {
assertNotNull(context);
ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null);
{
SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard);
try (SearchContext context = service.createContext(rewriteContext)) {
assertNotNull(context);
}
}
{
searchSourceBuilder.scriptField("anotherScriptField",
new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap()));
new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap()));
SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard);
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null)));
() -> service.createContext(rewriteContext));
assertEquals(
"Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was ["
+ (maxScriptFields + 1)
+ "]. This limit can be set by changing the [index.max_script_fields] index level setting.",
ex.getMessage());
"Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was ["
+ (maxScriptFields + 1)
+ "]. This limit can be set by changing the [index.max_script_fields] index level setting.",
ex.getMessage());
}
}
@ -477,17 +487,19 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
searchSourceBuilder.scriptField("field" + 0,
new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap()));
searchSourceBuilder.size(0);
try (SearchContext context = service.createContext(new ShardSearchRequest(OriginalIndices.NONE,
searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f, -1, null, null))) {
assertEquals(0, context.scriptFields().fields().size());
SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(
new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null),
indexShard);
try (SearchContext context = service.createContext(rewriteContext)) {
assertEquals(0, context.scriptFields().fields().size());
}
}
/**
* test that creating more than the allowed number of scroll contexts throws an exception
*/
public void testMaxOpenScrollContexts() throws RuntimeException {
public void testMaxOpenScrollContexts() throws RuntimeException, IOException {
createIndex("index");
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
@ -513,8 +525,10 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
client().prepareSearch("index").setSize(1).setScroll("1m").get();
}
SearchService.SearchRewriteContext rewriteContext =
service.acquireSearcherAndRewrite(new ShardScrollRequestTest(indexShard.shardId()), indexShard);
ElasticsearchException ex = expectThrows(ElasticsearchException.class,
() -> service.createAndPutContext(new ShardScrollRequestTest(indexShard.shardId())));
() -> service.createAndPutContext(rewriteContext));
assertEquals(
"Trying to create too many scroll contexts. Must be less than or equal to: [" +
SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY) + "]. " +
@ -592,7 +606,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
}
}
public void testCanMatch() throws IOException {
public void testCanMatch() throws IOException, InterruptedException {
createIndex("index");
final SearchService service = getInstanceFromNode(SearchService.class);
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
@ -625,11 +639,32 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch());
assertEquals(numWrapReader, numWrapInvocations.get());
// make sure that the wrapper is called when the context is actually created
service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest,
indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY),
1f, -1, null, null)).close();
assertEquals(numWrapReader+1, numWrapInvocations.get());
ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null);
CountDownLatch latch = new CountDownLatch(1);
SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap());
service.executeQueryPhase(request, task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
// make sure that the wrapper is called when the query is actually executed
assertEquals(numWrapReader+1, numWrapInvocations.get());
} finally {
latch.countDown();
}
}
@Override
public void onFailure(Exception e) {
try {
throw new AssertionError(e);
} finally {
latch.countDown();
}
}
});
latch.await();
}
public void testCanRewriteToMatchNone() {
@ -744,18 +779,123 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
final IndexService indexService = createIndex(index);
final SearchService service = getInstanceFromNode(SearchService.class);
final ShardId shardId = new ShardId(indexService.index(), 0);
IndexShard indexShard = indexService.getShard(0);
NullPointerException e = expectThrows(NullPointerException.class,
() -> service.createContext(
new ShardSearchRequest(shardId, null, 0, null) {
@Override
public SearchType searchType() {
// induce an artificial NPE
throw new NullPointerException("expected");
}
SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(
new ShardSearchRequest(shardId, null, 0, AliasFilter.EMPTY) {
@Override
public SearchType searchType() {
// induce an artificial NPE
throw new NullPointerException("expected");
}
));
}, indexShard);
NullPointerException e = expectThrows(NullPointerException.class,
() -> service.createContext(rewriteContext));
assertEquals("expected", e.getMessage());
assertEquals("should have 2 store refs (IndexService + InternalEngine)", 2, indexService.getShard(0).store().refCount());
}
public void testMatchNoDocsEmptyResponse() throws InterruptedException {
createIndex("index");
Thread currentThread = Thread.currentThread();
SearchService service = getInstanceFromNode(SearchService.class);
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
IndexShard indexShard = indexService.getShard(0);
SearchRequest searchRequest = new SearchRequest()
.allowPartialSearchResults(false)
.source(new SearchSourceBuilder()
.aggregation(AggregationBuilders.count("count").field("value")));
ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(),
5, AliasFilter.EMPTY, 1.0f, 0, null, null);
SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap());
{
CountDownLatch latch = new CountDownLatch(1);
shardRequest.source().query(new MatchAllQueryBuilder());
service.executeQueryPhase(shardRequest, task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult result) {
try {
assertNotSame(Thread.currentThread(), currentThread);
assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]"));
assertThat(result, instanceOf(QuerySearchResult.class));
assertFalse(result.queryResult().isNull());
assertNotNull(result.queryResult().topDocs());
assertNotNull(result.queryResult().aggregations());
} finally {
latch.countDown();
}
}
@Override
public void onFailure(Exception exc) {
try {
throw new AssertionError(exc);
} finally {
latch.countDown();
}
}
});
latch.await();
}
{
CountDownLatch latch = new CountDownLatch(1);
shardRequest.source().query(new MatchNoneQueryBuilder());
service.executeQueryPhase(shardRequest, task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult result) {
try {
assertNotSame(Thread.currentThread(), currentThread);
assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]"));
assertThat(result, instanceOf(QuerySearchResult.class));
assertFalse(result.queryResult().isNull());
assertNotNull(result.queryResult().topDocs());
assertNotNull(result.queryResult().aggregations());
} finally {
latch.countDown();
}
}
@Override
public void onFailure(Exception exc) {
try {
throw new AssertionError(exc);
} finally {
latch.countDown();
}
}
});
latch.await();
}
{
CountDownLatch latch = new CountDownLatch(1);
shardRequest.canReturnNullResponseIfMatchNoDocs(true);
service.executeQueryPhase(shardRequest, task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult result) {
try {
// make sure we don't use the search threadpool
assertSame(Thread.currentThread(), currentThread);
assertThat(result, instanceOf(QuerySearchResult.class));
assertTrue(result.queryResult().isNull());
} finally {
latch.countDown();
}
}
@Override
public void onFailure(Exception e) {
try {
throw new AssertionError(e);
} finally {
latch.countDown();
}
}
});
latch.await();
}
}
}

View File

@ -77,6 +77,8 @@ public class ShardSearchRequestTests extends AbstractSearchTestCase {
assertEquals(deserializedRequest.indexBoost(), shardSearchTransportRequest.indexBoost(), 0.0f);
assertEquals(deserializedRequest.getClusterAlias(), shardSearchTransportRequest.getClusterAlias());
assertEquals(shardSearchTransportRequest.allowPartialSearchResults(), deserializedRequest.allowPartialSearchResults());
assertEquals(deserializedRequest.canReturnNullResponseIfMatchNoDocs(),
shardSearchTransportRequest.canReturnNullResponseIfMatchNoDocs());
}
public void testAllowPartialResultsSerializationPre7_0_0() throws IOException {
@ -102,9 +104,11 @@ public class ShardSearchRequestTests extends AbstractSearchTestCase {
filteringAliases = new AliasFilter(null, Strings.EMPTY_ARRAY);
}
final String[] routings = generateRandomStringArray(5, 10, false, true);
return new ShardSearchRequest(new OriginalIndices(searchRequest), searchRequest, shardId,
ShardSearchRequest req = new ShardSearchRequest(new OriginalIndices(searchRequest), searchRequest, shardId,
randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(),
Math.abs(randomLong()), randomAlphaOfLengthBetween(3, 10), routings);
req.canReturnNullResponseIfMatchNoDocs(randomBoolean());
return req;
}
public void testFilteringAliases() throws Exception {

View File

@ -124,10 +124,17 @@ public class QuerySearchResultTests extends ESTestCase {
QuerySearchResult querySearchResult = new QuerySearchResult(in);
assertEquals(100, querySearchResult.getRequestId());
assertTrue(querySearchResult.hasAggs());
InternalAggregations aggs = (InternalAggregations)querySearchResult.consumeAggs();
InternalAggregations aggs = (InternalAggregations) querySearchResult.consumeAggs();
assertEquals(1, aggs.asList().size());
//top-level pipeline aggs are retrieved as part of InternalAggregations although they were serialized separately
assertEquals(1, aggs.getTopLevelPipelineAggregators().size());
}
}
public void testNullResponse() throws Exception {
QuerySearchResult querySearchResult = QuerySearchResult.nullInstance();
QuerySearchResult deserialized =
copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, Version.CURRENT);
assertEquals(querySearchResult.isNull(), deserialized.isNull());
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
@ -44,9 +43,9 @@ import org.elasticsearch.xpack.frozen.FrozenIndices;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
@ -119,33 +118,36 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
XPackClient xPackClient = new XPackClient(client());
assertAcked(xPackClient.freeze(new FreezeRequest("index")));
int numRequests = randomIntBetween(20, 50);
CountDownLatch latch = new CountDownLatch(numRequests);
int numRefreshes = 0;
for (int i = 0; i < numRequests; i++) {
numRefreshes++;
switch (randomIntBetween(0, 3)) {
// make sure that we don't share the frozen reader in concurrent requests since we acquire the
// searcher and rewrite the request outside of the search-throttle thread pool
switch (randomFrom(Arrays.asList(0, 1, 2))) {
case 0:
client().prepareGet("index", "_doc", "" + randomIntBetween(0, 9)).execute(ActionListener.wrap(latch::countDown));
client().prepareGet("index", "_doc", "" + randomIntBetween(0, 9))
.get();
break;
case 1:
client().prepareSearch("index").setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.execute(ActionListener.wrap(latch::countDown));
.get();
// in total 4 refreshes 1x query & 1x fetch per shard (we have 2)
numRefreshes += 3;
break;
case 2:
client().prepareTermVectors("index", "_doc", "" + randomIntBetween(0, 9)).execute(ActionListener.wrap(latch::countDown));
client().prepareTermVectors("index", "_doc", "" + randomIntBetween(0, 9))
.get();
break;
case 3:
client().prepareExplain("index", "_doc", "" + randomIntBetween(0, 9)).setQuery(new MatchAllQueryBuilder())
.execute(ActionListener.wrap(latch::countDown));
.get();
break;
default:
assert false;
default:
assert false;
}
}
latch.await();
IndicesStatsResponse index = client().admin().indices().prepareStats("index").clear().setRefresh(true).get();
assertEquals(numRefreshes, index.getTotal().refresh.getTotal());
}