This commit ensures that we rewrite the shard request with a short-lived can_match searcher. This is required for frozen indices since the high level rewrite is now performed on a network thread where we don't want to perform I/O. Closes #53985
This commit is contained in:
parent
381d7586e4
commit
3b4751bdb7
|
@ -1228,12 +1228,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
/**
|
||||
* Acquire the searcher without applying the additional reader wrapper.
|
||||
* Acquire a lightweight searcher which can be used to rewrite shard search requests.
|
||||
*/
|
||||
public Engine.Searcher acquireSearcherNoWrap(String source) {
|
||||
public Engine.Searcher acquireCanMatchSearcher() {
|
||||
readAllowed();
|
||||
markSearcherAccessed();
|
||||
return getEngine().acquireSearcher(source, Engine.SearcherScope.EXTERNAL);
|
||||
return getEngine().acquireSearcher("can_match", Engine.SearcherScope.EXTERNAL);
|
||||
}
|
||||
|
||||
public Engine.Searcher acquireSearcher(String source) {
|
||||
|
@ -1252,10 +1252,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
return wrapSearcher(searcher);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps the provided searcher acquired with {@link #acquireSearcherNoWrap(String)}.
|
||||
*/
|
||||
public Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
|
||||
private Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
|
||||
assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader())
|
||||
!= null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader";
|
||||
boolean success = false;
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.action.search.SearchShardTask;
|
|||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.CheckedSupplier;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
|
@ -315,31 +316,24 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
}
|
||||
|
||||
public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
|
||||
rewriteShardRequest(request, ActionListener.wrap(
|
||||
// fork the execution in the search thread pool and wraps the searcher
|
||||
// to execute the query
|
||||
context -> {
|
||||
try {
|
||||
context.wrapSearcher().execute(() -> {
|
||||
final SearchPhaseResult result;
|
||||
try {
|
||||
result = executeDfsPhase(context, task);
|
||||
} catch (Exception exc) {
|
||||
listener.onFailure(exc);
|
||||
return;
|
||||
}
|
||||
listener.onResponse(result);
|
||||
});
|
||||
} catch (Exception exc) {
|
||||
// if the execution is rejected we need to close the searcher
|
||||
IOUtils.closeWhileHandlingException(context.searcher);
|
||||
listener.onFailure(exc);
|
||||
}
|
||||
}, listener::onFailure));
|
||||
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
|
||||
IndexShard shard = indexService.getShard(request.shardId().id());
|
||||
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
|
||||
@Override
|
||||
public void onResponse(ShardSearchRequest rewritten) {
|
||||
// fork the execution in the search thread pool
|
||||
runAsync(shard, () -> executeDfsPhase(request, task), listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception exc) {
|
||||
listener.onFailure(exc);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private DfsSearchResult executeDfsPhase(SearchRewriteContext rewriteContext, SearchShardTask task) throws IOException {
|
||||
final SearchContext context = createAndPutContext(rewriteContext, task);
|
||||
private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardTask task) throws IOException {
|
||||
final SearchContext context = createAndPutContext(request, task);
|
||||
context.incRef();
|
||||
try {
|
||||
contextProcessing(context);
|
||||
|
@ -371,58 +365,66 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
|
||||
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
|
||||
: "empty responses require more than one shard";
|
||||
rewriteShardRequest(request, ActionListener.wrap(
|
||||
context -> {
|
||||
try {
|
||||
ShardSearchRequest rewritten = context.request;
|
||||
if (rewritten.canReturnNullResponseIfMatchNoDocs()
|
||||
&& canRewriteToMatchNone(rewritten.source())
|
||||
&& rewritten.source().query() instanceof MatchNoneQueryBuilder) {
|
||||
assert request.scroll() == null : "must always create search context for scroll requests";
|
||||
onMatchNoDocs(context, listener);
|
||||
} else {
|
||||
// fork the execution in the search thread pool and wraps the searcher
|
||||
// to execute the query
|
||||
context.wrapSearcher().execute(() -> {
|
||||
final SearchPhaseResult result;
|
||||
try {
|
||||
result = executeQueryPhase(context, task);
|
||||
} catch (Exception exc) {
|
||||
listener.onFailure(exc);
|
||||
return;
|
||||
}
|
||||
listener.onResponse(result);
|
||||
});
|
||||
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
|
||||
IndexShard shard = indexService.getShard(request.shardId().id());
|
||||
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
|
||||
@Override
|
||||
public void onResponse(ShardSearchRequest orig) {
|
||||
if (orig.canReturnNullResponseIfMatchNoDocs()) {
|
||||
// we clone the shard request and perform a quick rewrite using a lightweight
|
||||
// searcher since we are outside of the search thread pool.
|
||||
// If the request rewrites to "match none" we can shortcut the query phase
|
||||
// entirely. Otherwise we fork the execution in the search thread pool.
|
||||
ShardSearchRequest canMatchRequest = new ShardSearchRequest(orig);
|
||||
try (Engine.Searcher searcher = shard.acquireCanMatchSearcher()) {
|
||||
QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), searcher,
|
||||
request::nowInMillis, request.getClusterAlias());
|
||||
Rewriteable.rewrite(request.getRewriteable(), context, true);
|
||||
} catch (Exception exc) {
|
||||
listener.onFailure(exc);
|
||||
}
|
||||
if (canRewriteToMatchNone(canMatchRequest.source())
|
||||
&& canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) {
|
||||
assert canMatchRequest.scroll() == null : "must always create search context for scroll requests";
|
||||
listener.onResponse(QuerySearchResult.nullInstance());
|
||||
return;
|
||||
}
|
||||
} catch (Exception exc) {
|
||||
// if the execution is rejected we need to close the searcher
|
||||
IOUtils.closeWhileHandlingException(context.searcher);
|
||||
listener.onFailure(exc);
|
||||
}
|
||||
}, listener::onFailure));
|
||||
// fork the execution in the search thread pool
|
||||
runAsync(shard, () -> executeQueryPhase(orig, task), listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception exc) {
|
||||
listener.onFailure(exc);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void onMatchNoDocs(SearchRewriteContext rewriteContext, ActionListener<SearchPhaseResult> listener) throws IOException {
|
||||
// creates a lightweight search context that we use to inform context listeners
|
||||
// before closing
|
||||
SearchContext searchContext = createSearchContext(rewriteContext, defaultSearchTimeout);
|
||||
try (SearchContext dummy = searchContext) {
|
||||
onNewContext(searchContext);
|
||||
onFreeContext(searchContext);
|
||||
private <T> void runAsync(IndexShard shard, CheckedSupplier<T, Exception> command, ActionListener<T> listener) {
|
||||
Executor executor = getExecutor(shard);
|
||||
try {
|
||||
executor.execute(() -> {
|
||||
T result;
|
||||
try {
|
||||
result = command.get();
|
||||
} catch (Exception exc) {
|
||||
listener.onFailure(exc);
|
||||
return;
|
||||
}
|
||||
listener.onResponse(result);
|
||||
});
|
||||
} catch (Exception exc) {
|
||||
listener.onFailure(exc);
|
||||
return;
|
||||
}
|
||||
listener.onResponse(QuerySearchResult.nullInstance());
|
||||
}
|
||||
|
||||
private <T> void runAsync(SearchContextId contextId, Supplier<T> executable, ActionListener<T> listener) {
|
||||
getExecutor(contextId).execute(ActionRunnable.supply(listener, executable::get));
|
||||
}
|
||||
|
||||
private SearchPhaseResult executeQueryPhase(SearchRewriteContext rewriteContext, SearchShardTask task) throws Exception {
|
||||
final SearchContext context = createAndPutContext(rewriteContext, task);
|
||||
final ShardSearchRequest request = rewriteContext.request;
|
||||
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception {
|
||||
final SearchContext context = createAndPutContext(request, task);
|
||||
context.incRef();
|
||||
try {
|
||||
final long afterQueryTime;
|
||||
|
@ -624,8 +626,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
}
|
||||
}
|
||||
|
||||
final SearchContext createAndPutContext(SearchRewriteContext rewriteContext, SearchShardTask task) throws IOException {
|
||||
SearchContext context = createContext(rewriteContext, task);
|
||||
final SearchContext createAndPutContext(ShardSearchRequest request, SearchShardTask task) throws IOException {
|
||||
SearchContext context = createContext(request, task);
|
||||
onNewContext(context);
|
||||
boolean success = false;
|
||||
try {
|
||||
|
@ -658,10 +660,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
}
|
||||
}
|
||||
|
||||
final SearchContext createContext(SearchRewriteContext rewriteContext, SearchShardTask searchTask) throws IOException {
|
||||
final DefaultSearchContext context = createSearchContext(rewriteContext, defaultSearchTimeout);
|
||||
final SearchContext createContext(ShardSearchRequest request, SearchShardTask searchTask) throws IOException {
|
||||
final DefaultSearchContext context = createSearchContext(request, defaultSearchTimeout);
|
||||
try {
|
||||
final ShardSearchRequest request = rewriteContext.request;
|
||||
if (request.scroll() != null) {
|
||||
context.addReleasable(openScrollContexts::decrementAndGet, Lifetime.CONTEXT);
|
||||
if (openScrollContexts.incrementAndGet() > maxOpenScrollContext) {
|
||||
|
@ -704,21 +705,19 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
}
|
||||
|
||||
public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException {
|
||||
IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
|
||||
SearchRewriteContext rewriteContext = acquireSearcherAndRewrite(request, shard);
|
||||
// make sure that we wrap the searcher when executing the query
|
||||
return createSearchContext(rewriteContext.wrapSearcher(), timeout);
|
||||
return createSearchContext(request, timeout, "search");
|
||||
}
|
||||
|
||||
private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteContext, TimeValue timeout) throws IOException {
|
||||
private DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout, String source) throws IOException {
|
||||
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
|
||||
IndexShard indexShard = indexService.getShard(request.shardId().getId());
|
||||
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(),
|
||||
indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE);
|
||||
Engine.Searcher searcher = indexShard.acquireSearcher(source);
|
||||
|
||||
boolean success = false;
|
||||
DefaultSearchContext searchContext = null;
|
||||
try {
|
||||
final ShardSearchRequest request = rewriteContext.request;
|
||||
final Engine.Searcher searcher = rewriteContext.searcher;
|
||||
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
|
||||
IndexShard indexShard = indexService.getShard(request.shardId().getId());
|
||||
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(),
|
||||
indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE);
|
||||
// TODO: If no changes are made since the last commit, and the searcher is opened from that commit, then we can use the
|
||||
// commit_id as the context_id. And if the local checkpoint and max_seq_no of that commit equal the global checkpoint,
|
||||
// then we can use a combination of history_uuid and one of these values as a **weaker** context_id.
|
||||
|
@ -727,19 +726,24 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
// the Lucene doc ids can be different.
|
||||
final String readerId = UUIDs.base64UUID();
|
||||
final SearchContextId searchContextId = new SearchContextId(readerId, idGenerator.incrementAndGet());
|
||||
DefaultSearchContext searchContext = new DefaultSearchContext(searchContextId, request, shardTarget,
|
||||
searchContext = new DefaultSearchContext(searchContextId, request, shardTarget,
|
||||
searcher, clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout,
|
||||
fetchPhase, lowLevelCancellation, clusterService.state().nodes().getMinNodeVersion());
|
||||
// we clone the query shard context here just for rewriting otherwise we
|
||||
// might end up with incorrect state since we are using now() or script services
|
||||
// during rewrite and normalized / evaluate templates etc.
|
||||
QueryShardContext context = new QueryShardContext(searchContext.getQueryShardContext());
|
||||
Rewriteable.rewrite(request.getRewriteable(), context, true);
|
||||
assert searchContext.getQueryShardContext().isCacheable();
|
||||
success = true;
|
||||
return searchContext;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
// we handle the case where `IndicesService#indexServiceSafe`or `IndexService#getShard`, or the DefaultSearchContext
|
||||
// constructor throws an exception since we would otherwise leak a searcher and this can have severe implications
|
||||
// (unable to obtain shard lock exceptions).
|
||||
IOUtils.closeWhileHandlingException(rewriteContext.searcher);
|
||||
// we handle the case where the DefaultSearchContext constructor throws an exception since we would otherwise
|
||||
// leak a searcher and this can have severe implications (unable to obtain shard lock exceptions).
|
||||
IOUtils.closeWhileHandlingException(searcher);
|
||||
}
|
||||
}
|
||||
return searchContext;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1110,7 +1114,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
IndexShard indexShard = indexService.getShard(request.shardId().getId());
|
||||
// we don't want to use the reader wrapper since it could run costly operations
|
||||
// and we can afford false positives.
|
||||
try (Engine.Searcher searcher = indexShard.acquireSearcherNoWrap("can_match")) {
|
||||
try (Engine.Searcher searcher = indexShard.acquireCanMatchSearcher()) {
|
||||
QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), searcher,
|
||||
request::nowInMillis, request.getClusterAlias());
|
||||
Rewriteable.rewrite(request.getRewriteable(), context, false);
|
||||
|
@ -1146,50 +1150,17 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
return aggregations == null || aggregations.mustVisitAllDocs() == false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Rewrites the search request with a light weight rewrite context in order to fetch resources asynchronously
|
||||
* and then rewrites with a searcher when the shard is active.
|
||||
* The provided action listener is executed on the same thread or in a listener threadpool.
|
||||
*/
|
||||
private void rewriteShardRequest(ShardSearchRequest request, ActionListener<SearchRewriteContext> listener) {
|
||||
IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
|
||||
private void rewriteAndFetchShardRequest(IndexShard shard, ShardSearchRequest request, ActionListener<ShardSearchRequest> listener) {
|
||||
ActionListener<Rewriteable> actionListener = ActionListener.wrap(r ->
|
||||
// now we need to check if there is a pending refresh and register
|
||||
shard.awaitShardSearchActive(b -> {
|
||||
try {
|
||||
// we can now acquire a searcher and rewrite the request with it
|
||||
SearchRewriteContext rewriteContext = acquireSearcherAndRewrite(request, shard);
|
||||
listener.onResponse(rewriteContext);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}), listener::onFailure);
|
||||
// now we need to check if there is a pending refresh and register
|
||||
shard.awaitShardSearchActive(b -> listener.onResponse(request)),
|
||||
listener::onFailure);
|
||||
// we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
|
||||
// AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not
|
||||
// adding a lot of overhead
|
||||
Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis), actionListener);
|
||||
}
|
||||
|
||||
SearchRewriteContext acquireSearcherAndRewrite(ShardSearchRequest request, IndexShard shard) throws IOException {
|
||||
// acquire the searcher for rewrite with no wrapping in order to avoid costly
|
||||
// operations. We'll wrap the searcher at a later stage (when executing the query).
|
||||
Engine.Searcher searcher = shard.acquireSearcherNoWrap("search");
|
||||
boolean success = false;
|
||||
try {
|
||||
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
|
||||
QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), searcher,
|
||||
request::nowInMillis, request.getClusterAlias());
|
||||
Rewriteable.rewrite(request.getRewriteable(), context, true);
|
||||
SearchRewriteContext rewrite = new SearchRewriteContext(request, shard, searcher, getExecutor(shard));
|
||||
success = true;
|
||||
return rewrite;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
IOUtils.closeWhileHandlingException(searcher);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@link QueryRewriteContext} with the given {@code now} provider
|
||||
*/
|
||||
|
@ -1228,37 +1199,6 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
return request.source().aggregations().buildPipelineTree();
|
||||
}
|
||||
|
||||
static class SearchRewriteContext {
|
||||
private final ShardSearchRequest request;
|
||||
private final IndexShard shard;
|
||||
private Engine.Searcher searcher;
|
||||
private final Executor executor;
|
||||
|
||||
private boolean isWrapped;
|
||||
|
||||
private SearchRewriteContext(ShardSearchRequest request,
|
||||
IndexShard shard,
|
||||
Engine.Searcher searcher,
|
||||
Executor executor) {
|
||||
this.request = request;
|
||||
this.shard = shard;
|
||||
this.searcher = searcher;
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
SearchRewriteContext wrapSearcher() {
|
||||
assert isWrapped == false : "searcher already wrapped";
|
||||
isWrapped = true;
|
||||
searcher = shard.wrapSearcher(searcher);
|
||||
return this;
|
||||
}
|
||||
|
||||
void execute(Runnable runnable) {
|
||||
assert isWrapped : "searcher is not wrapped";
|
||||
executor.execute(runnable);
|
||||
}
|
||||
}
|
||||
|
||||
public static final class CanMatchResponse extends SearchPhaseResult {
|
||||
private final boolean canMatch;
|
||||
private final MinAndMax<?> minAndMax;
|
||||
|
|
|
@ -195,6 +195,26 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
|
|||
originalIndices = OriginalIndices.readOriginalIndices(in);
|
||||
}
|
||||
|
||||
public ShardSearchRequest(ShardSearchRequest clone) {
|
||||
this.shardId = clone.shardId;
|
||||
this.searchType = clone.searchType;
|
||||
this.numberOfShards = clone.numberOfShards;
|
||||
this.scroll = clone.scroll;
|
||||
this.source = clone.source;
|
||||
this.types = clone.types;
|
||||
this.aliasFilter = clone.aliasFilter;
|
||||
this.indexBoost = clone.indexBoost;
|
||||
this.nowInMillis = clone.nowInMillis;
|
||||
this.requestCache = clone.requestCache;
|
||||
this.clusterAlias = clone.clusterAlias;
|
||||
this.allowPartialSearchResults = clone.allowPartialSearchResults;
|
||||
this.indexRoutings = clone.indexRoutings;
|
||||
this.preference = clone.preference;
|
||||
this.canReturnNullResponseIfMatchNoDocs = clone.canReturnNullResponseIfMatchNoDocs;
|
||||
this.bottomSortValues = clone.bottomSortValues;
|
||||
this.originalIndices = clone.originalIndices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
|
|
|
@ -364,11 +364,15 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
|
||||
final IndexShard indexShard = indexService.getShard(0);
|
||||
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
|
||||
SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(
|
||||
new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null),
|
||||
indexShard);
|
||||
final SearchContext contextWithDefaultTimeout = service.createContext(rewriteContext, null);
|
||||
final SearchContext contextWithDefaultTimeout = service.createContext(
|
||||
new ShardSearchRequest(
|
||||
OriginalIndices.NONE,
|
||||
searchRequest,
|
||||
indexShard.shardId(),
|
||||
1,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY),
|
||||
1.0f, -1, null, null
|
||||
), null);
|
||||
try {
|
||||
// the search context should inherit the default timeout
|
||||
assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5)));
|
||||
|
@ -379,11 +383,15 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
|
||||
final long seconds = randomIntBetween(6, 10);
|
||||
searchRequest.source(new SearchSourceBuilder().timeout(TimeValue.timeValueSeconds(seconds)));
|
||||
rewriteContext = service.acquireSearcherAndRewrite(
|
||||
new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null),
|
||||
indexShard);
|
||||
final SearchContext context = service.createContext(rewriteContext, null);
|
||||
final SearchContext context = service.createContext(
|
||||
new ShardSearchRequest(
|
||||
OriginalIndices.NONE,
|
||||
searchRequest,
|
||||
indexShard.shardId(),
|
||||
1,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY),
|
||||
1.0f, -1, null, null
|
||||
), null);
|
||||
try {
|
||||
// the search context should inherit the query timeout
|
||||
assertThat(context.timeout(), equalTo(TimeValue.timeValueSeconds(seconds)));
|
||||
|
@ -411,22 +419,16 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
for (int i = 0; i < indexService.getIndexSettings().getMaxDocvalueFields(); i++) {
|
||||
searchSourceBuilder.docValueField("field" + i);
|
||||
}
|
||||
|
||||
ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null);
|
||||
|
||||
{
|
||||
SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard);
|
||||
try (SearchContext context = service.createContext(rewriteContext, null)) {
|
||||
assertNotNull(context);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard);
|
||||
try (SearchContext context = service.createContext(
|
||||
new ShardSearchRequest(OriginalIndices.NONE,
|
||||
searchRequest, indexShard.shardId(), 1,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null
|
||||
), null)) {
|
||||
assertNotNull(context);
|
||||
searchSourceBuilder.docValueField("one_field_too_much");
|
||||
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
|
||||
() -> service.createContext(rewriteContext, null));
|
||||
() -> service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), null));
|
||||
assertEquals(
|
||||
"Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. "
|
||||
+ "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.", ex.getMessage());
|
||||
|
@ -452,28 +454,20 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
searchSourceBuilder.scriptField("field" + i,
|
||||
new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap()));
|
||||
}
|
||||
|
||||
ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null);
|
||||
|
||||
{
|
||||
SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard);
|
||||
try (SearchContext context = service.createContext(rewriteContext, null)) {
|
||||
assertNotNull(context);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
try (SearchContext context = service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest,
|
||||
indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY),
|
||||
1.0f, -1, null, null), null)) {
|
||||
assertNotNull(context);
|
||||
searchSourceBuilder.scriptField("anotherScriptField",
|
||||
new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap()));
|
||||
SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(shardRequest, indexShard);
|
||||
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
|
||||
() -> service.createContext(rewriteContext, null));
|
||||
() -> service.createContext(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), null));
|
||||
assertEquals(
|
||||
"Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was ["
|
||||
+ (maxScriptFields + 1)
|
||||
+ "]. This limit can be set by changing the [index.max_script_fields] index level setting.",
|
||||
ex.getMessage());
|
||||
"Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was ["
|
||||
+ (maxScriptFields + 1)
|
||||
+ "]. This limit can be set by changing the [index.max_script_fields] index level setting.",
|
||||
ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -490,19 +484,17 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
searchSourceBuilder.scriptField("field" + 0,
|
||||
new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap()));
|
||||
searchSourceBuilder.size(0);
|
||||
SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(
|
||||
new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null),
|
||||
indexShard);
|
||||
try (SearchContext context = service.createContext(rewriteContext, null)) {
|
||||
assertEquals(0, context.scriptFields().fields().size());
|
||||
try (SearchContext context = service.createContext(new ShardSearchRequest(OriginalIndices.NONE,
|
||||
searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY),
|
||||
1.0f, -1, null, null), null)) {
|
||||
assertEquals(0, context.scriptFields().fields().size());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* test that creating more than the allowed number of scroll contexts throws an exception
|
||||
*/
|
||||
public void testMaxOpenScrollContexts() throws RuntimeException, IOException {
|
||||
public void testMaxOpenScrollContexts() throws RuntimeException {
|
||||
createIndex("index");
|
||||
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
|
||||
|
||||
|
@ -528,10 +520,8 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
client().prepareSearch("index").setSize(1).setScroll("1m").get();
|
||||
}
|
||||
|
||||
SearchService.SearchRewriteContext rewriteContext =
|
||||
service.acquireSearcherAndRewrite(new ShardScrollRequestTest(indexShard.shardId()), indexShard);
|
||||
ElasticsearchException ex = expectThrows(ElasticsearchException.class,
|
||||
() -> service.createAndPutContext(rewriteContext, null));
|
||||
() -> service.createAndPutContext(new ShardScrollRequestTest(indexShard.shardId()), null));
|
||||
assertEquals(
|
||||
"Trying to create too many scroll contexts. Must be less than or equal to: [" +
|
||||
SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY) + "]. " +
|
||||
|
@ -554,10 +544,8 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
try {
|
||||
latch.await();
|
||||
for (; ; ) {
|
||||
SearchService.SearchRewriteContext rewriteContext =
|
||||
searchService.acquireSearcherAndRewrite(new ShardScrollRequestTest(indexShard.shardId()), indexShard);
|
||||
try {
|
||||
searchService.createAndPutContext(rewriteContext, null);
|
||||
searchService.createAndPutContext(new ShardScrollRequestTest(indexShard.shardId()), null);
|
||||
} catch (ElasticsearchException e) {
|
||||
assertThat(e.getMessage(), equalTo(
|
||||
"Trying to create too many scroll contexts. Must be less than or equal to: " +
|
||||
|
@ -657,7 +645,6 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
|
||||
final IndexShard indexShard = indexService.getShard(0);
|
||||
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
|
||||
int numWrapReader = numWrapInvocations.get();
|
||||
assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch());
|
||||
|
||||
|
@ -681,7 +668,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
searchRequest.source(new SearchSourceBuilder().query(new MatchNoneQueryBuilder()));
|
||||
assertFalse(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch());
|
||||
assertEquals(numWrapReader, numWrapInvocations.get());
|
||||
assertEquals(0, numWrapInvocations.get());
|
||||
|
||||
ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null);
|
||||
|
@ -693,7 +680,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
public void onResponse(SearchPhaseResult searchPhaseResult) {
|
||||
try {
|
||||
// make sure that the wrapper is called when the query is actually executed
|
||||
assertEquals(numWrapReader+1, numWrapInvocations.get());
|
||||
assertEquals(1, numWrapInvocations.get());
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
|
@ -826,16 +813,15 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
final ShardId shardId = new ShardId(indexService.index(), 0);
|
||||
IndexShard indexShard = indexService.getShard(0);
|
||||
|
||||
SearchService.SearchRewriteContext rewriteContext = service.acquireSearcherAndRewrite(
|
||||
new ShardSearchRequest(shardId, null, 0, AliasFilter.EMPTY) {
|
||||
@Override
|
||||
public SearchType searchType() {
|
||||
// induce an artificial NPE
|
||||
throw new NullPointerException("expected");
|
||||
}
|
||||
}, indexShard);
|
||||
NullPointerException e = expectThrows(NullPointerException.class,
|
||||
() -> service.createContext(rewriteContext, null));
|
||||
() -> service.createContext(
|
||||
new ShardSearchRequest(shardId, null, 0, AliasFilter.EMPTY) {
|
||||
@Override
|
||||
public SearchType searchType() {
|
||||
// induce an artificial NPE
|
||||
throw new NullPointerException("expected");
|
||||
}
|
||||
}, null));
|
||||
assertEquals("expected", e.getMessage());
|
||||
assertEquals("should have 2 store refs (IndexService + InternalEngine)", 2, indexService.getShard(0).store().refCount());
|
||||
}
|
||||
|
@ -987,8 +973,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
List<SearchContextId> contextIds = new ArrayList<>();
|
||||
int numContexts = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < numContexts; i++) {
|
||||
SearchService.SearchRewriteContext rewriteContext = searchService.acquireSearcherAndRewrite(shardSearchRequest, indexShard);
|
||||
final SearchContext searchContext = searchService.createContext(rewriteContext, null);
|
||||
final SearchContext searchContext = searchService.createContext(shardSearchRequest, null);
|
||||
assertThat(searchContext.id().getId(), equalTo((long) (i + 1)));
|
||||
searchService.putContext(searchContext);
|
||||
contextIds.add(searchContext.id());
|
||||
|
|
|
@ -60,26 +60,15 @@ public class ShardSearchRequestTests extends AbstractSearchTestCase {
|
|||
ShardSearchRequest shardSearchTransportRequest = createShardSearchRequest();
|
||||
ShardSearchRequest deserializedRequest =
|
||||
copyWriteable(shardSearchTransportRequest, namedWriteableRegistry, ShardSearchRequest::new);
|
||||
assertEquals(deserializedRequest.scroll(), shardSearchTransportRequest.scroll());
|
||||
assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter());
|
||||
assertArrayEquals(deserializedRequest.indices(), shardSearchTransportRequest.indices());
|
||||
assertArrayEquals(deserializedRequest.types(), shardSearchTransportRequest.types());
|
||||
assertEquals(deserializedRequest.indicesOptions(), shardSearchTransportRequest.indicesOptions());
|
||||
assertEquals(deserializedRequest.nowInMillis(), shardSearchTransportRequest.nowInMillis());
|
||||
assertEquals(deserializedRequest.source(), shardSearchTransportRequest.source());
|
||||
assertEquals(deserializedRequest.searchType(), shardSearchTransportRequest.searchType());
|
||||
assertEquals(deserializedRequest.shardId(), shardSearchTransportRequest.shardId());
|
||||
assertEquals(deserializedRequest.numberOfShards(), shardSearchTransportRequest.numberOfShards());
|
||||
assertArrayEquals(deserializedRequest.indexRoutings(), shardSearchTransportRequest.indexRoutings());
|
||||
assertEquals(deserializedRequest.preference(), shardSearchTransportRequest.preference());
|
||||
assertEquals(deserializedRequest.cacheKey(), shardSearchTransportRequest.cacheKey());
|
||||
assertNotSame(deserializedRequest, shardSearchTransportRequest);
|
||||
assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter());
|
||||
assertEquals(deserializedRequest.indexBoost(), shardSearchTransportRequest.indexBoost(), 0.0f);
|
||||
assertEquals(deserializedRequest.getClusterAlias(), shardSearchTransportRequest.getClusterAlias());
|
||||
assertEquals(shardSearchTransportRequest.allowPartialSearchResults(), deserializedRequest.allowPartialSearchResults());
|
||||
assertEquals(deserializedRequest.canReturnNullResponseIfMatchNoDocs(),
|
||||
shardSearchTransportRequest.canReturnNullResponseIfMatchNoDocs());
|
||||
assertEquals(shardSearchTransportRequest, deserializedRequest);
|
||||
}
|
||||
|
||||
public void testClone() throws Exception {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
ShardSearchRequest shardSearchTransportRequest = createShardSearchRequest();
|
||||
ShardSearchRequest clone = new ShardSearchRequest(shardSearchTransportRequest);
|
||||
assertEquals(shardSearchTransportRequest, clone);
|
||||
}
|
||||
}
|
||||
|
||||
public void testAllowPartialResultsSerializationPre7_0_0() throws IOException {
|
||||
|
@ -161,6 +150,28 @@ public class ShardSearchRequestTests extends AbstractSearchTestCase {
|
|||
expectThrows(InvalidAliasNameException.class, () -> aliasFilter(finalIndexMetadata, "unknown"));
|
||||
}
|
||||
|
||||
private static void assertEquals(ShardSearchRequest orig, ShardSearchRequest copy) throws IOException {
|
||||
assertEquals(orig.scroll(), copy.scroll());
|
||||
assertEquals(orig.getAliasFilter(), copy.getAliasFilter());
|
||||
assertArrayEquals(orig.indices(), copy.indices());
|
||||
assertEquals(orig.indicesOptions(), copy.indicesOptions());
|
||||
assertEquals(orig.nowInMillis(), copy.nowInMillis());
|
||||
assertEquals(orig.source(), copy.source());
|
||||
assertEquals(orig.searchType(), copy.searchType());
|
||||
assertEquals(orig.shardId(), copy.shardId());
|
||||
assertEquals(orig.numberOfShards(), copy.numberOfShards());
|
||||
assertArrayEquals(orig.indexRoutings(), copy.indexRoutings());
|
||||
assertEquals(orig.preference(), copy.preference());
|
||||
assertEquals(orig.cacheKey(), copy.cacheKey());
|
||||
assertNotSame(orig, copy);
|
||||
assertEquals(orig.getAliasFilter(), copy.getAliasFilter());
|
||||
assertEquals(orig.indexBoost(), copy.indexBoost(), 0.0f);
|
||||
assertEquals(orig.getClusterAlias(), copy.getClusterAlias());
|
||||
assertEquals(orig.allowPartialSearchResults(), copy.allowPartialSearchResults());
|
||||
assertEquals(orig.canReturnNullResponseIfMatchNoDocs(),
|
||||
orig.canReturnNullResponseIfMatchNoDocs());
|
||||
}
|
||||
|
||||
public static CompressedXContent filter(QueryBuilder filterBuilder) throws IOException {
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
filterBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
|
|
Loading…
Reference in New Issue