Add additional shards routing info in ShardSearchRequest (#29533)
This commit propagates the preference and routing of the original SearchRequest in the ShardSearchRequest. This information is then use to fix a bug in sliced scrolls when executed with a preference (or a routing). Instead of computing the slice query from the total number of shards in the index, this commit computes this number from the number of shards per index that participates in the request. Fixes #27550
This commit is contained in:
parent
fadcce8367
commit
8b8c0c0b4d
|
@ -37,8 +37,10 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
|
|||
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -62,6 +64,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
|
|||
private final long clusterStateVersion;
|
||||
private final Map<String, AliasFilter> aliasFilter;
|
||||
private final Map<String, Float> concreteIndexBoosts;
|
||||
private final Map<String, Set<String>> indexRoutings;
|
||||
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
|
||||
private final Object shardFailuresMutex = new Object();
|
||||
private final AtomicInteger successfulOps = new AtomicInteger();
|
||||
|
@ -72,6 +75,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
|
|||
protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
|
||||
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
|
||||
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
|
||||
Map<String, Set<String>> indexRoutings,
|
||||
Executor executor, SearchRequest request,
|
||||
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
|
||||
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
|
||||
|
@ -89,6 +93,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
|
|||
this.clusterStateVersion = clusterStateVersion;
|
||||
this.concreteIndexBoosts = concreteIndexBoosts;
|
||||
this.aliasFilter = aliasFilter;
|
||||
this.indexRoutings = indexRoutings;
|
||||
this.results = resultConsumer;
|
||||
this.clusters = clusters;
|
||||
}
|
||||
|
@ -318,8 +323,11 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
|
|||
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
|
||||
assert filter != null;
|
||||
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
|
||||
String indexName = shardIt.shardId().getIndex().getName();
|
||||
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
|
||||
.toArray(new String[0]);
|
||||
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
|
||||
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias);
|
||||
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias, routings);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.search.internal.AliasFilter;
|
|||
import org.elasticsearch.transport.Transport;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
@ -47,6 +48,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
|
|||
CanMatchPreFilterSearchPhase(Logger logger, SearchTransportService searchTransportService,
|
||||
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
|
||||
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
|
||||
Map<String, Set<String>> indexRoutings,
|
||||
Executor executor, SearchRequest request,
|
||||
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
|
||||
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
|
||||
|
@ -56,9 +58,9 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
|
|||
* We set max concurrent shard requests to the number of shards to otherwise avoid deep recursing that would occur if the local node
|
||||
* is the coordinating node for the query, holds all the shards for the request, and there are a lot of shards.
|
||||
*/
|
||||
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request,
|
||||
listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(),
|
||||
clusters);
|
||||
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
|
||||
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
|
||||
new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
|
||||
this.phaseFactory = phaseFactory;
|
||||
this.shardsIts = shardsIts;
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.search.internal.AliasFilter;
|
|||
import org.elasticsearch.transport.Transport;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
|
@ -37,11 +38,13 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
|
|||
|
||||
SearchDfsQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService,
|
||||
final BiFunction<String, String, Transport.Connection> nodeIdToConnection, final Map<String, AliasFilter> aliasFilter,
|
||||
final Map<String, Float> concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor,
|
||||
final Map<String, Float> concreteIndexBoosts, final Map<String, Set<String>> indexRoutings,
|
||||
final SearchPhaseController searchPhaseController, final Executor executor,
|
||||
final SearchRequest request, final ActionListener<SearchResponse> listener,
|
||||
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
|
||||
final long clusterStateVersion, final SearchTask task, SearchResponse.Clusters clusters) {
|
||||
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
|
||||
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
|
||||
executor, request, listener,
|
||||
shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()),
|
||||
request.getMaxConcurrentShardRequests(), clusters);
|
||||
this.searchPhaseController = searchPhaseController;
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.search.internal.AliasFilter;
|
|||
import org.elasticsearch.transport.Transport;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
|
@ -37,13 +38,14 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
|
|||
|
||||
SearchQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService,
|
||||
final BiFunction<String, String, Transport.Connection> nodeIdToConnection, final Map<String, AliasFilter> aliasFilter,
|
||||
final Map<String, Float> concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor,
|
||||
final Map<String, Float> concreteIndexBoosts, final Map<String, Set<String>> indexRoutings,
|
||||
final SearchPhaseController searchPhaseController, final Executor executor,
|
||||
final SearchRequest request, final ActionListener<SearchResponse> listener,
|
||||
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
|
||||
long clusterStateVersion, SearchTask task, SearchResponse.Clusters clusters) {
|
||||
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
|
||||
shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()),
|
||||
request.getMaxConcurrentShardRequests(), clusters);
|
||||
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
|
||||
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
|
||||
searchPhaseController.newSearchPhaseResults(request, shardsIts.size()), request.getMaxConcurrentShardRequests(), clusters);
|
||||
this.searchPhaseController = searchPhaseController;
|
||||
}
|
||||
|
||||
|
|
|
@ -297,6 +297,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
|
||||
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
|
||||
searchRequest.indices());
|
||||
routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);
|
||||
String[] concreteIndices = new String[indices.length];
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
concreteIndices[i] = indices[i].getName();
|
||||
|
@ -350,7 +351,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
}
|
||||
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
|
||||
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
|
||||
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards, clusters).start();
|
||||
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
|
||||
}
|
||||
|
||||
private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupShardsIterator<SearchShardIterator> shardIterators) {
|
||||
|
@ -380,17 +381,20 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
GroupShardsIterator<SearchShardIterator> shardIterators,
|
||||
SearchTimeProvider timeProvider,
|
||||
BiFunction<String, String, Transport.Connection> connectionLookup,
|
||||
long clusterStateVersion, Map<String, AliasFilter> aliasFilter,
|
||||
long clusterStateVersion,
|
||||
Map<String, AliasFilter> aliasFilter,
|
||||
Map<String, Float> concreteIndexBoosts,
|
||||
ActionListener<SearchResponse> listener, boolean preFilter,
|
||||
Map<String, Set<String>> indexRoutings,
|
||||
ActionListener<SearchResponse> listener,
|
||||
boolean preFilter,
|
||||
SearchResponse.Clusters clusters) {
|
||||
Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
|
||||
if (preFilter) {
|
||||
return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup,
|
||||
aliasFilter, concreteIndexBoosts, executor, searchRequest, listener, shardIterators,
|
||||
aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators,
|
||||
timeProvider, clusterStateVersion, task, (iter) -> {
|
||||
AbstractSearchAsyncAction action = searchAsyncAction(task, searchRequest, iter, timeProvider, connectionLookup,
|
||||
clusterStateVersion, aliasFilter, concreteIndexBoosts, listener, false, clusters);
|
||||
clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters);
|
||||
return new SearchPhase(action.getName()) {
|
||||
@Override
|
||||
public void run() throws IOException {
|
||||
|
@ -403,14 +407,14 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
switch (searchRequest.searchType()) {
|
||||
case DFS_QUERY_THEN_FETCH:
|
||||
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
|
||||
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
|
||||
timeProvider, clusterStateVersion, task, clusters);
|
||||
aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,
|
||||
shardIterators, timeProvider, clusterStateVersion, task, clusters);
|
||||
break;
|
||||
case QUERY_AND_FETCH:
|
||||
case QUERY_THEN_FETCH:
|
||||
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
|
||||
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
|
||||
timeProvider, clusterStateVersion, task, clusters);
|
||||
aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,
|
||||
shardIterators, timeProvider, clusterStateVersion, task, clusters);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
|
||||
|
|
|
@ -24,7 +24,7 @@ import java.util.List;
|
|||
|
||||
/**
|
||||
* A simple {@link ShardsIterator} that iterates a list or sub-list of
|
||||
* {@link ShardRouting shard routings}.
|
||||
* {@link ShardRouting shard indexRoutings}.
|
||||
*/
|
||||
public class PlainShardsIterator implements ShardsIterator {
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ import java.util.List;
|
|||
|
||||
/**
|
||||
* {@link ShardRouting} immutably encapsulates information about shard
|
||||
* routings like id, state, version, etc.
|
||||
* indexRoutings like id, state, version, etc.
|
||||
*/
|
||||
public final class ShardRouting implements Writeable, ToXContentObject {
|
||||
|
||||
|
@ -477,7 +477,7 @@ public final class ShardRouting implements Writeable, ToXContentObject {
|
|||
"ShardRouting is a relocation target but current node id isn't equal to source relocating node. This [" + this + "], other [" + other + "]";
|
||||
|
||||
assert b == false || this.shardId.equals(other.shardId) :
|
||||
"ShardRouting is a relocation target but both routings are not of the same shard id. This [" + this + "], other [" + other + "]";
|
||||
"ShardRouting is a relocation target but both indexRoutings are not of the same shard id. This [" + this + "], other [" + other + "]";
|
||||
|
||||
assert b == false || this.primary == other.primary :
|
||||
"ShardRouting is a relocation target but primary flag is different. This [" + this + "], target [" + other + "]";
|
||||
|
@ -504,7 +504,7 @@ public final class ShardRouting implements Writeable, ToXContentObject {
|
|||
"ShardRouting is a relocation source but relocating node isn't equal to other's current node. This [" + this + "], other [" + other + "]";
|
||||
|
||||
assert b == false || this.shardId.equals(other.shardId) :
|
||||
"ShardRouting is a relocation source but both routings are not of the same shard. This [" + this + "], target [" + other + "]";
|
||||
"ShardRouting is a relocation source but both indexRoutings are not of the same shard. This [" + this + "], target [" + other + "]";
|
||||
|
||||
assert b == false || this.primary == other.primary :
|
||||
"ShardRouting is a relocation source but primary flag is different. This [" + this + "], target [" + other + "]";
|
||||
|
|
|
@ -25,8 +25,10 @@ import org.apache.lucene.search.Collector;
|
|||
import org.apache.lucene.search.FieldDoc;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.util.Counter;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.search.SearchTask;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.lucene.search.Queries;
|
||||
|
@ -91,6 +93,7 @@ final class DefaultSearchContext extends SearchContext {
|
|||
private final Engine.Searcher engineSearcher;
|
||||
private final BigArrays bigArrays;
|
||||
private final IndexShard indexShard;
|
||||
private final ClusterService clusterService;
|
||||
private final IndexService indexService;
|
||||
private final ContextIndexSearcher searcher;
|
||||
private final DfsSearchResult dfsResult;
|
||||
|
@ -120,6 +123,7 @@ final class DefaultSearchContext extends SearchContext {
|
|||
// filter for sliced scroll
|
||||
private SliceBuilder sliceBuilder;
|
||||
private SearchTask task;
|
||||
private final Version minNodeVersion;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -152,9 +156,10 @@ final class DefaultSearchContext extends SearchContext {
|
|||
private final QueryShardContext queryShardContext;
|
||||
private FetchPhase fetchPhase;
|
||||
|
||||
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, Engine.Searcher engineSearcher,
|
||||
IndexService indexService, IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter,
|
||||
TimeValue timeout, FetchPhase fetchPhase, String clusterAlias) {
|
||||
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
|
||||
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
|
||||
IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter, TimeValue timeout,
|
||||
FetchPhase fetchPhase, String clusterAlias, Version minNodeVersion) {
|
||||
this.id = id;
|
||||
this.request = request;
|
||||
this.fetchPhase = fetchPhase;
|
||||
|
@ -168,9 +173,11 @@ final class DefaultSearchContext extends SearchContext {
|
|||
this.fetchResult = new FetchSearchResult(id, shardTarget);
|
||||
this.indexShard = indexShard;
|
||||
this.indexService = indexService;
|
||||
this.clusterService = clusterService;
|
||||
this.searcher = new ContextIndexSearcher(engineSearcher, indexService.cache().query(), indexShard.getQueryCachingPolicy());
|
||||
this.timeEstimateCounter = timeEstimateCounter;
|
||||
this.timeout = timeout;
|
||||
this.minNodeVersion = minNodeVersion;
|
||||
queryShardContext = indexService.newQueryShardContext(request.shardId().id(), searcher.getIndexReader(), request::nowInMillis,
|
||||
clusterAlias);
|
||||
queryShardContext.setTypes(request.types());
|
||||
|
@ -278,8 +285,7 @@ final class DefaultSearchContext extends SearchContext {
|
|||
}
|
||||
|
||||
if (sliceBuilder != null) {
|
||||
filters.add(sliceBuilder.toFilter(queryShardContext, shardTarget().getShardId().getId(),
|
||||
queryShardContext.getIndexSettings().getNumberOfShards()));
|
||||
filters.add(sliceBuilder.toFilter(clusterService, request, queryShardContext, minNodeVersion));
|
||||
}
|
||||
|
||||
if (filters.isEmpty()) {
|
||||
|
|
|
@ -616,8 +616,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
Engine.Searcher engineSearcher = indexShard.acquireSearcher("search");
|
||||
|
||||
final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
|
||||
engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase,
|
||||
request.getClusterAlias());
|
||||
engineSearcher, clusterService, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout,
|
||||
fetchPhase, request.getClusterAlias(), clusterService.state().nodes().getMinNodeVersion());
|
||||
boolean success = false;
|
||||
try {
|
||||
// we clone the query shard context here just for rewriting otherwise we
|
||||
|
|
|
@ -28,13 +28,10 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryRewriteContext;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.index.query.Rewriteable;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.search.Scroll;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -61,7 +58,6 @@ import java.util.Optional;
|
|||
*/
|
||||
|
||||
public class ShardSearchLocalRequest implements ShardSearchRequest {
|
||||
|
||||
private String clusterAlias;
|
||||
private ShardId shardId;
|
||||
private int numberOfShards;
|
||||
|
@ -74,17 +70,18 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
private Boolean requestCache;
|
||||
private long nowInMillis;
|
||||
private boolean allowPartialSearchResults;
|
||||
|
||||
private String[] indexRoutings = Strings.EMPTY_ARRAY;
|
||||
private String preference;
|
||||
private boolean profile;
|
||||
|
||||
ShardSearchLocalRequest() {
|
||||
}
|
||||
|
||||
ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards,
|
||||
AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias) {
|
||||
AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias, String[] indexRoutings) {
|
||||
this(shardId, numberOfShards, searchRequest.searchType(),
|
||||
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost,
|
||||
searchRequest.allowPartialSearchResults());
|
||||
searchRequest.allowPartialSearchResults(), indexRoutings, searchRequest.preference());
|
||||
// If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted
|
||||
// at this stage. Any NPEs in the above are therefore an error in request preparation logic.
|
||||
assert searchRequest.allowPartialSearchResults() != null;
|
||||
|
@ -102,7 +99,8 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
}
|
||||
|
||||
public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType searchType, SearchSourceBuilder source, String[] types,
|
||||
Boolean requestCache, AliasFilter aliasFilter, float indexBoost, boolean allowPartialSearchResults) {
|
||||
Boolean requestCache, AliasFilter aliasFilter, float indexBoost, boolean allowPartialSearchResults,
|
||||
String[] indexRoutings, String preference) {
|
||||
this.shardId = shardId;
|
||||
this.numberOfShards = numberOfShards;
|
||||
this.searchType = searchType;
|
||||
|
@ -112,6 +110,8 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
this.aliasFilter = aliasFilter;
|
||||
this.indexBoost = indexBoost;
|
||||
this.allowPartialSearchResults = allowPartialSearchResults;
|
||||
this.indexRoutings = indexRoutings;
|
||||
this.preference = preference;
|
||||
}
|
||||
|
||||
|
||||
|
@ -181,6 +181,16 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
return scroll;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] indexRoutings() {
|
||||
return indexRoutings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String preference() {
|
||||
return preference;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProfile(boolean profile) {
|
||||
this.profile = profile;
|
||||
|
@ -225,6 +235,13 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
|
||||
allowPartialSearchResults = in.readOptionalBoolean();
|
||||
}
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
indexRoutings = in.readStringArray();
|
||||
preference = in.readOptionalString();
|
||||
} else {
|
||||
indexRoutings = Strings.EMPTY_ARRAY;
|
||||
preference = null;
|
||||
}
|
||||
}
|
||||
|
||||
protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException {
|
||||
|
@ -240,7 +257,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
if (out.getVersion().onOrAfter(Version.V_5_2_0)) {
|
||||
out.writeFloat(indexBoost);
|
||||
}
|
||||
if (!asKey) {
|
||||
if (asKey == false) {
|
||||
out.writeVLong(nowInMillis);
|
||||
}
|
||||
out.writeOptionalBoolean(requestCache);
|
||||
|
@ -250,7 +267,12 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
|||
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
|
||||
out.writeOptionalBoolean(allowPartialSearchResults);
|
||||
}
|
||||
|
||||
if (asKey == false) {
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
out.writeStringArray(indexRoutings);
|
||||
out.writeOptionalString(preference);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,9 @@
|
|||
|
||||
package org.elasticsearch.search.internal;
|
||||
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
|
@ -28,8 +30,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryRewriteContext;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.index.query.Rewriteable;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.AliasFilterParsingException;
|
||||
|
@ -73,6 +73,16 @@ public interface ShardSearchRequest {
|
|||
|
||||
Scroll scroll();
|
||||
|
||||
/**
|
||||
* Returns the routing values resolved by the coordinating node for the index pointed by {@link #shardId()}.
|
||||
*/
|
||||
String[] indexRoutings();
|
||||
|
||||
/**
|
||||
* Returns the preference of the original {@link SearchRequest#preference()}.
|
||||
*/
|
||||
String preference();
|
||||
|
||||
/**
|
||||
* Sets if this shard search needs to be profiled or not
|
||||
* @param profile True if the shard should be profiled
|
||||
|
|
|
@ -28,9 +28,6 @@ import org.elasticsearch.action.support.IndicesOptions;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryRewriteContext;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.index.query.Rewriteable;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.search.Scroll;
|
||||
|
@ -57,9 +54,10 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
|
|||
}
|
||||
|
||||
public ShardSearchTransportRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, int numberOfShards,
|
||||
AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias) {
|
||||
AliasFilter aliasFilter, float indexBoost, long nowInMillis,
|
||||
String clusterAlias, String[] indexRoutings) {
|
||||
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, numberOfShards, aliasFilter, indexBoost,
|
||||
nowInMillis, clusterAlias);
|
||||
nowInMillis, clusterAlias, indexRoutings);
|
||||
this.originalIndices = originalIndices;
|
||||
}
|
||||
|
||||
|
@ -162,6 +160,16 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
|
|||
return shardSearchLocalRequest.scroll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] indexRoutings() {
|
||||
return shardSearchLocalRequest.indexRoutings();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String preference() {
|
||||
return shardSearchLocalRequest.preference();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
|
||||
|
|
|
@ -23,6 +23,10 @@ import org.apache.lucene.search.MatchAllDocsQuery;
|
|||
import org.apache.lucene.search.MatchNoDocsQuery;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -30,6 +34,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -39,9 +44,13 @@ import org.elasticsearch.index.fielddata.IndexNumericFieldData;
|
|||
import org.elasticsearch.index.mapper.IdFieldMapper;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.search.internal.ShardSearchRequest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A slice builder allowing to split a scroll in multiple partitions.
|
||||
|
@ -203,12 +212,49 @@ public class SliceBuilder implements Writeable, ToXContentObject {
|
|||
return Objects.hash(this.field, this.id, this.max);
|
||||
}
|
||||
|
||||
public Query toFilter(QueryShardContext context, int shardId, int numShards) {
|
||||
/**
|
||||
* Converts this QueryBuilder to a lucene {@link Query}.
|
||||
*
|
||||
* @param context Additional information needed to build the query
|
||||
*/
|
||||
public Query toFilter(ClusterService clusterService, ShardSearchRequest request, QueryShardContext context, Version minNodeVersion) {
|
||||
final MappedFieldType type = context.fieldMapper(field);
|
||||
if (type == null) {
|
||||
throw new IllegalArgumentException("field " + field + " not found");
|
||||
}
|
||||
|
||||
int shardId = request.shardId().id();
|
||||
int numShards = context.getIndexSettings().getNumberOfShards();
|
||||
if (minNodeVersion.onOrAfter(Version.V_7_0_0_alpha1) &&
|
||||
(request.preference() != null || request.indexRoutings().length > 0)) {
|
||||
GroupShardsIterator<ShardIterator> group = buildShardIterator(clusterService, request);
|
||||
assert group.size() <= numShards : "index routing shards: " + group.size() +
|
||||
" cannot be greater than total number of shards: " + numShards;
|
||||
if (group.size() < numShards) {
|
||||
/**
|
||||
* The routing of this request targets a subset of the shards of this index so we need to we retrieve
|
||||
* the original {@link GroupShardsIterator} and compute the request shard id and number of
|
||||
* shards from it.
|
||||
* This behavior has been added in {@link Version#V_7_0_0_alpha1} so if there is another node in the cluster
|
||||
* with an older version we use the original shard id and number of shards in order to ensure that all
|
||||
* slices use the same numbers.
|
||||
*/
|
||||
numShards = group.size();
|
||||
int ord = 0;
|
||||
shardId = -1;
|
||||
// remap the original shard id with its index (position) in the sorted shard iterator.
|
||||
for (ShardIterator it : group) {
|
||||
assert it.shardId().getIndex().equals(request.shardId().getIndex());
|
||||
if (request.shardId().equals(it.shardId())) {
|
||||
shardId = ord;
|
||||
break;
|
||||
}
|
||||
++ord;
|
||||
}
|
||||
assert shardId != -1 : "shard id: " + request.shardId().getId() + " not found in index shard routing";
|
||||
}
|
||||
}
|
||||
|
||||
String field = this.field;
|
||||
boolean useTermQuery = false;
|
||||
if ("_uid".equals(field)) {
|
||||
|
@ -273,6 +319,17 @@ public class SliceBuilder implements Writeable, ToXContentObject {
|
|||
return new MatchAllDocsQuery();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link GroupShardsIterator} for the provided <code>request</code>.
|
||||
*/
|
||||
private GroupShardsIterator<ShardIterator> buildShardIterator(ClusterService clusterService, ShardSearchRequest request) {
|
||||
final ClusterState state = clusterService.state();
|
||||
String[] indices = new String[] { request.shardId().getIndex().getName() };
|
||||
Map<String, Set<String>> routingMap = request.indexRoutings().length > 0 ?
|
||||
Collections.singletonMap(indices[0], Sets.newHashSet(request.indexRoutings())) : null;
|
||||
return clusterService.operationRouting().searchShards(state, indices, routingMap, request.preference());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this, true, true);
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.action.OriginalIndices;
|
|||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.query.MatchAllQueryBuilder;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -62,10 +63,15 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
|
|||
|
||||
final SearchRequest request = new SearchRequest();
|
||||
request.allowPartialSearchResults(true);
|
||||
request.preference("_shards:1,3");
|
||||
return new AbstractSearchAsyncAction<SearchPhaseResult>("test", null, null, null,
|
||||
Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null,
|
||||
request, null, new GroupShardsIterator<>(Collections.singletonList(
|
||||
new SearchShardIterator(null, null, Collections.emptyList(), null))), timeProvider, 0, null,
|
||||
Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f),
|
||||
Collections.singletonMap("name", Sets.newHashSet("bar", "baz")),null, request, null,
|
||||
new GroupShardsIterator<>(
|
||||
Collections.singletonList(
|
||||
new SearchShardIterator(null, null, Collections.emptyList(), null)
|
||||
)
|
||||
), timeProvider, 0, null,
|
||||
new InitialSearchPhase.ArraySearchPhaseResults<>(10), request.getMaxConcurrentShardRequests(),
|
||||
SearchResponse.Clusters.EMPTY) {
|
||||
@Override
|
||||
|
@ -117,5 +123,8 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
|
|||
assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices());
|
||||
assertEquals(new MatchAllQueryBuilder(), shardSearchTransportRequest.getAliasFilter().getQueryBuilder());
|
||||
assertEquals(2.0f, shardSearchTransportRequest.indexBoost(), 0.0f);
|
||||
assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices());
|
||||
assertArrayEquals(new String[] {"bar", "baz"}, shardSearchTransportRequest.indexRoutings());
|
||||
assertEquals("_shards:1,3", shardSearchTransportRequest.preference());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
|
|||
searchTransportService,
|
||||
(clusterAlias, node) -> lookup.get(node),
|
||||
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
|
||||
Collections.emptyMap(), EsExecutors.newDirectExecutorService(),
|
||||
Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(),
|
||||
searchRequest, null, shardsIter, timeProvider, 0, null,
|
||||
(iter) -> new SearchPhase("test") {
|
||||
@Override
|
||||
|
@ -164,7 +164,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
|
|||
searchTransportService,
|
||||
(clusterAlias, node) -> lookup.get(node),
|
||||
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
|
||||
Collections.emptyMap(), EsExecutors.newDirectExecutorService(),
|
||||
Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(),
|
||||
searchRequest, null, shardsIter, timeProvider, 0, null,
|
||||
(iter) -> new SearchPhase("test") {
|
||||
@Override
|
||||
|
@ -222,6 +222,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
|
|||
(clusterAlias, node) -> lookup.get(node),
|
||||
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
|
||||
Collections.emptyMap(),
|
||||
Collections.emptyMap(),
|
||||
EsExecutors.newDirectExecutorService(),
|
||||
searchRequest,
|
||||
null,
|
||||
|
|
|
@ -106,6 +106,7 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
return lookup.get(node); },
|
||||
aliasFilters,
|
||||
Collections.emptyMap(),
|
||||
Collections.emptyMap(),
|
||||
null,
|
||||
request,
|
||||
responseListener,
|
||||
|
@ -198,6 +199,7 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
return lookup.get(node); },
|
||||
aliasFilters,
|
||||
Collections.emptyMap(),
|
||||
Collections.emptyMap(),
|
||||
null,
|
||||
request,
|
||||
responseListener,
|
||||
|
@ -303,6 +305,7 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
return lookup.get(node); },
|
||||
aliasFilters,
|
||||
Collections.emptyMap(),
|
||||
Collections.emptyMap(),
|
||||
executor,
|
||||
request,
|
||||
responseListener,
|
||||
|
|
|
@ -83,7 +83,7 @@ public class PrimaryTermsTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
/**
|
||||
* puts primary shard routings into initializing state
|
||||
* puts primary shard indexRoutings into initializing state
|
||||
*/
|
||||
private void initPrimaries() {
|
||||
logger.info("adding {} nodes and performing rerouting", this.numberOfReplicas + 1);
|
||||
|
|
|
@ -83,7 +83,7 @@ public class RoutingTableTests extends ESAllocationTestCase {
|
|||
}
|
||||
|
||||
/**
|
||||
* puts primary shard routings into initializing state
|
||||
* puts primary shard indexRoutings into initializing state
|
||||
*/
|
||||
private void initPrimaries() {
|
||||
logger.info("adding {} nodes and performing rerouting", this.numberOfReplicas + 1);
|
||||
|
|
|
@ -122,6 +122,16 @@ public class SearchSlowLogTests extends ESSingleNodeTestCase {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] indexRoutings() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String preference() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProfile(boolean profile) {
|
||||
|
||||
|
|
|
@ -170,7 +170,7 @@ public class AliasRoutingIT extends ESIntegTestCase {
|
|||
assertThat(client().prepareSearch("alias1").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(1L));
|
||||
}
|
||||
|
||||
logger.info("--> search with 0,1 routings , should find two");
|
||||
logger.info("--> search with 0,1 indexRoutings , should find two");
|
||||
for (int i = 0; i < 5; i++) {
|
||||
assertThat(client().prepareSearch().setRouting("0", "1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(2L));
|
||||
assertThat(client().prepareSearch().setSize(0).setRouting("0", "1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(2L));
|
||||
|
|
|
@ -173,13 +173,13 @@ public class SimpleRoutingIT extends ESIntegTestCase {
|
|||
assertThat(client().prepareSearch().setSize(0).setRouting(secondRoutingValue).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(1L));
|
||||
}
|
||||
|
||||
logger.info("--> search with {},{} routings , should find two", routingValue, "1");
|
||||
logger.info("--> search with {},{} indexRoutings , should find two", routingValue, "1");
|
||||
for (int i = 0; i < 5; i++) {
|
||||
assertThat(client().prepareSearch().setRouting(routingValue, secondRoutingValue).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(2L));
|
||||
assertThat(client().prepareSearch().setSize(0).setRouting(routingValue, secondRoutingValue).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(2L));
|
||||
}
|
||||
|
||||
logger.info("--> search with {},{},{} routings , should find two", routingValue, secondRoutingValue, routingValue);
|
||||
logger.info("--> search with {},{},{} indexRoutings , should find two", routingValue, secondRoutingValue, routingValue);
|
||||
for (int i = 0; i < 5; i++) {
|
||||
assertThat(client().prepareSearch().setRouting(routingValue, secondRoutingValue, routingValue).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(2L));
|
||||
assertThat(client().prepareSearch().setSize(0).setRouting(routingValue, secondRoutingValue,routingValue).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getHits().getTotalHits(), equalTo(2L));
|
||||
|
|
|
@ -112,8 +112,8 @@ public class DefaultSearchContextTests extends ESTestCase {
|
|||
IndexReader reader = w.getReader();
|
||||
Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader))) {
|
||||
|
||||
DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, null, searcher, indexService,
|
||||
indexShard, bigArrays, null, timeout, null, null);
|
||||
DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, null, searcher, null, indexService,
|
||||
indexShard, bigArrays, null, timeout, null, null, Version.CURRENT);
|
||||
context1.from(300);
|
||||
|
||||
// resultWindow greater than maxResultWindow and scrollContext is null
|
||||
|
@ -153,8 +153,8 @@ public class DefaultSearchContextTests extends ESTestCase {
|
|||
+ "] index level setting."));
|
||||
|
||||
// rescore is null but sliceBuilder is not null
|
||||
DefaultSearchContext context2 = new DefaultSearchContext(2L, shardSearchRequest, null, searcher, indexService,
|
||||
indexShard, bigArrays, null, timeout, null, null);
|
||||
DefaultSearchContext context2 = new DefaultSearchContext(2L, shardSearchRequest, null, searcher,
|
||||
null, indexService, indexShard, bigArrays, null, timeout, null, null, Version.CURRENT);
|
||||
|
||||
SliceBuilder sliceBuilder = mock(SliceBuilder.class);
|
||||
int numSlices = maxSlicesPerScroll + randomIntBetween(1, 100);
|
||||
|
@ -170,8 +170,8 @@ public class DefaultSearchContextTests extends ESTestCase {
|
|||
when(shardSearchRequest.getAliasFilter()).thenReturn(AliasFilter.EMPTY);
|
||||
when(shardSearchRequest.indexBoost()).thenReturn(AbstractQueryBuilder.DEFAULT_BOOST);
|
||||
|
||||
DefaultSearchContext context3 = new DefaultSearchContext(3L, shardSearchRequest, null, searcher, indexService,
|
||||
indexShard, bigArrays, null, timeout, null, null);
|
||||
DefaultSearchContext context3 = new DefaultSearchContext(3L, shardSearchRequest, null, searcher, null,
|
||||
indexService, indexShard, bigArrays, null, timeout, null, null, Version.CURRENT);
|
||||
ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery();
|
||||
context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false);
|
||||
assertEquals(context3.query(), context3.buildFilteredQuery(parsedQuery.query()));
|
||||
|
|
|
@ -213,7 +213,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
SearchPhaseResult searchPhaseResult = service.executeQueryPhase(
|
||||
new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
|
||||
new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f,
|
||||
true),
|
||||
true, null, null),
|
||||
new SearchTask(123L, "", "", "", null, Collections.emptyMap()));
|
||||
IntArrayList intCursors = new IntArrayList(1);
|
||||
intCursors.add(0);
|
||||
|
@ -249,7 +249,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
new String[0],
|
||||
false,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY),
|
||||
1.0f, true)
|
||||
1.0f, true, null, null)
|
||||
);
|
||||
try {
|
||||
// the search context should inherit the default timeout
|
||||
|
@ -269,7 +269,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
new String[0],
|
||||
false,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY),
|
||||
1.0f, true)
|
||||
1.0f, true, null, null)
|
||||
);
|
||||
try {
|
||||
// the search context should inherit the query timeout
|
||||
|
@ -297,12 +297,13 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
searchSourceBuilder.docValueField("field" + i);
|
||||
}
|
||||
try (SearchContext context = service.createContext(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
|
||||
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true))) {
|
||||
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true, null, null))) {
|
||||
assertNotNull(context);
|
||||
searchSourceBuilder.docValueField("one_field_too_much");
|
||||
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
|
||||
() -> service.createContext(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
|
||||
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true)));
|
||||
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f,
|
||||
true, null, null)));
|
||||
assertEquals(
|
||||
"Trying to retrieve too many docvalue_fields. Must be less than or equal to: [100] but was [101]. "
|
||||
+ "This limit can be set by changing the [index.max_docvalue_fields_search] index level setting.",
|
||||
|
@ -328,13 +329,14 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap()));
|
||||
}
|
||||
try (SearchContext context = service.createContext(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
|
||||
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true))) {
|
||||
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true, null, null))) {
|
||||
assertNotNull(context);
|
||||
searchSourceBuilder.scriptField("anotherScriptField",
|
||||
new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap()));
|
||||
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
|
||||
() -> service.createContext(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
|
||||
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, true)));
|
||||
searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY),
|
||||
1.0f, true, null, null)));
|
||||
assertEquals(
|
||||
"Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxScriptFields + "] but was ["
|
||||
+ (maxScriptFields + 1)
|
||||
|
@ -406,28 +408,28 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
|||
final IndexShard indexShard = indexService.getShard(0);
|
||||
final boolean allowPartialSearchResults = true;
|
||||
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH, null,
|
||||
Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults)));
|
||||
Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults, null, null)));
|
||||
|
||||
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
|
||||
new SearchSourceBuilder(), Strings.EMPTY_ARRAY, false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f,
|
||||
allowPartialSearchResults)));
|
||||
allowPartialSearchResults, null, null)));
|
||||
|
||||
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
|
||||
new SearchSourceBuilder().query(new MatchAllQueryBuilder()), Strings.EMPTY_ARRAY, false,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults)));
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults, null, null)));
|
||||
|
||||
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
|
||||
new SearchSourceBuilder().query(new MatchNoneQueryBuilder())
|
||||
.aggregation(new TermsAggregationBuilder("test", ValueType.STRING).minDocCount(0)), Strings.EMPTY_ARRAY, false,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults)));
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults, null, null)));
|
||||
assertTrue(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
|
||||
new SearchSourceBuilder().query(new MatchNoneQueryBuilder())
|
||||
.aggregation(new GlobalAggregationBuilder("test")), Strings.EMPTY_ARRAY, false,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults)));
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults, null, null)));
|
||||
|
||||
assertFalse(service.canMatch(new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.QUERY_THEN_FETCH,
|
||||
new SearchSourceBuilder().query(new MatchNoneQueryBuilder()), Strings.EMPTY_ARRAY, false,
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults)));
|
||||
new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, allowPartialSearchResults, null, null)));
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -74,6 +74,8 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase {
|
|||
assertEquals(deserializedRequest.searchType(), shardSearchTransportRequest.searchType());
|
||||
assertEquals(deserializedRequest.shardId(), shardSearchTransportRequest.shardId());
|
||||
assertEquals(deserializedRequest.numberOfShards(), shardSearchTransportRequest.numberOfShards());
|
||||
assertEquals(deserializedRequest.indexRoutings(), shardSearchTransportRequest.indexRoutings());
|
||||
assertEquals(deserializedRequest.preference(), shardSearchTransportRequest.preference());
|
||||
assertEquals(deserializedRequest.cacheKey(), shardSearchTransportRequest.cacheKey());
|
||||
assertNotSame(deserializedRequest, shardSearchTransportRequest);
|
||||
assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter());
|
||||
|
@ -92,8 +94,10 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase {
|
|||
} else {
|
||||
filteringAliases = new AliasFilter(null, Strings.EMPTY_ARRAY);
|
||||
}
|
||||
final String[] routings = generateRandomStringArray(5, 10, false, true);
|
||||
return new ShardSearchTransportRequest(new OriginalIndices(searchRequest), searchRequest, shardId,
|
||||
randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), Math.abs(randomLong()), null);
|
||||
randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(),
|
||||
Math.abs(randomLong()), null, routings);
|
||||
}
|
||||
|
||||
public void testFilteringAliases() throws Exception {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.search.slice;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
|
@ -48,9 +49,7 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
public class SearchSliceIT extends ESIntegTestCase {
|
||||
private static final int NUM_DOCS = 1000;
|
||||
|
||||
private int setupIndex(boolean withDocs) throws IOException, ExecutionException, InterruptedException {
|
||||
private void setupIndex(int numDocs, int numberOfShards) throws IOException, ExecutionException, InterruptedException {
|
||||
String mapping = Strings.toString(XContentFactory.jsonBuilder().
|
||||
startObject()
|
||||
.startObject("type")
|
||||
|
@ -70,74 +69,112 @@ public class SearchSliceIT extends ESIntegTestCase {
|
|||
.endObject()
|
||||
.endObject()
|
||||
.endObject());
|
||||
int numberOfShards = randomIntBetween(1, 7);
|
||||
assertAcked(client().admin().indices().prepareCreate("test")
|
||||
.setSettings(Settings.builder().put("number_of_shards", numberOfShards).put("index.max_slices_per_scroll", 10000))
|
||||
.addMapping("type", mapping, XContentType.JSON));
|
||||
ensureGreen();
|
||||
|
||||
if (withDocs == false) {
|
||||
return numberOfShards;
|
||||
}
|
||||
|
||||
List<IndexRequestBuilder> requests = new ArrayList<>();
|
||||
for (int i = 0; i < NUM_DOCS; i++) {
|
||||
XContentBuilder builder = jsonBuilder();
|
||||
builder.startObject();
|
||||
builder.field("invalid_random_kw", randomAlphaOfLengthBetween(5, 20));
|
||||
builder.field("random_int", randomInt());
|
||||
builder.field("static_int", 0);
|
||||
builder.field("invalid_random_int", randomInt());
|
||||
builder.endObject();
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
XContentBuilder builder = jsonBuilder()
|
||||
.startObject()
|
||||
.field("invalid_random_kw", randomAlphaOfLengthBetween(5, 20))
|
||||
.field("random_int", randomInt())
|
||||
.field("static_int", 0)
|
||||
.field("invalid_random_int", randomInt())
|
||||
.endObject();
|
||||
requests.add(client().prepareIndex("test", "type").setSource(builder));
|
||||
}
|
||||
indexRandom(true, requests);
|
||||
return numberOfShards;
|
||||
}
|
||||
|
||||
public void testDocIdSort() throws Exception {
|
||||
int numShards = setupIndex(true);
|
||||
SearchResponse sr = client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
.setSize(0)
|
||||
.get();
|
||||
int numDocs = (int) sr.getHits().getTotalHits();
|
||||
assertThat(numDocs, equalTo(NUM_DOCS));
|
||||
public void testSearchSort() throws Exception {
|
||||
int numShards = randomIntBetween(1, 7);
|
||||
int numDocs = randomIntBetween(100, 1000);
|
||||
setupIndex(numDocs, numShards);
|
||||
int max = randomIntBetween(2, numShards * 3);
|
||||
for (String field : new String[]{"_id", "random_int", "static_int"}) {
|
||||
int fetchSize = randomIntBetween(10, 100);
|
||||
// test _doc sort
|
||||
SearchRequestBuilder request = client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
|
||||
.setSize(fetchSize)
|
||||
.addSort(SortBuilders.fieldSort("_doc"));
|
||||
assertSearchSlicesWithScroll(request, field, max);
|
||||
}
|
||||
}
|
||||
assertSearchSlicesWithScroll(request, field, max, numDocs);
|
||||
|
||||
public void testNumericSort() throws Exception {
|
||||
int numShards = setupIndex(true);
|
||||
SearchResponse sr = client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
.setSize(0)
|
||||
.get();
|
||||
int numDocs = (int) sr.getHits().getTotalHits();
|
||||
assertThat(numDocs, equalTo(NUM_DOCS));
|
||||
|
||||
int max = randomIntBetween(2, numShards*3);
|
||||
for (String field : new String[]{"_id", "random_int", "static_int"}) {
|
||||
int fetchSize = randomIntBetween(10, 100);
|
||||
SearchRequestBuilder request = client().prepareSearch("test")
|
||||
// test numeric sort
|
||||
request = client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
|
||||
.addSort(SortBuilders.fieldSort("random_int"))
|
||||
.setSize(fetchSize);
|
||||
assertSearchSlicesWithScroll(request, field, max);
|
||||
assertSearchSlicesWithScroll(request, field, max, numDocs);
|
||||
}
|
||||
}
|
||||
|
||||
public void testWithPreferenceAndRoutings() throws Exception {
|
||||
int numShards = 10;
|
||||
int totalDocs = randomIntBetween(100, 1000);
|
||||
setupIndex(totalDocs, numShards);
|
||||
{
|
||||
SearchResponse sr = client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
.setPreference("_shards:1,4")
|
||||
.setSize(0)
|
||||
.get();
|
||||
int numDocs = (int) sr.getHits().getTotalHits();
|
||||
int max = randomIntBetween(2, numShards * 3);
|
||||
int fetchSize = randomIntBetween(10, 100);
|
||||
SearchRequestBuilder request = client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
|
||||
.setSize(fetchSize)
|
||||
.setPreference("_shards:1,4")
|
||||
.addSort(SortBuilders.fieldSort("_doc"));
|
||||
assertSearchSlicesWithScroll(request, "_id", max, numDocs);
|
||||
}
|
||||
{
|
||||
SearchResponse sr = client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
.setRouting("foo", "bar")
|
||||
.setSize(0)
|
||||
.get();
|
||||
int numDocs = (int) sr.getHits().getTotalHits();
|
||||
int max = randomIntBetween(2, numShards * 3);
|
||||
int fetchSize = randomIntBetween(10, 100);
|
||||
SearchRequestBuilder request = client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
|
||||
.setSize(fetchSize)
|
||||
.setRouting("foo", "bar")
|
||||
.addSort(SortBuilders.fieldSort("_doc"));
|
||||
assertSearchSlicesWithScroll(request, "_id", max, numDocs);
|
||||
}
|
||||
{
|
||||
assertAcked(client().admin().indices().prepareAliases()
|
||||
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index("test").alias("alias1").routing("foo"))
|
||||
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index("test").alias("alias2").routing("bar"))
|
||||
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index("test").alias("alias3").routing("baz"))
|
||||
.get());
|
||||
SearchResponse sr = client().prepareSearch("alias1", "alias3")
|
||||
.setQuery(matchAllQuery())
|
||||
.setSize(0)
|
||||
.get();
|
||||
int numDocs = (int) sr.getHits().getTotalHits();
|
||||
int max = randomIntBetween(2, numShards * 3);
|
||||
int fetchSize = randomIntBetween(10, 100);
|
||||
SearchRequestBuilder request = client().prepareSearch("alias1", "alias3")
|
||||
.setQuery(matchAllQuery())
|
||||
.setScroll(new Scroll(TimeValue.timeValueSeconds(10)))
|
||||
.setSize(fetchSize)
|
||||
.addSort(SortBuilders.fieldSort("_doc"));
|
||||
assertSearchSlicesWithScroll(request, "_id", max, numDocs);
|
||||
}
|
||||
}
|
||||
|
||||
public void testInvalidFields() throws Exception {
|
||||
setupIndex(false);
|
||||
setupIndex(0, 1);
|
||||
SearchPhaseExecutionException exc = expectThrows(SearchPhaseExecutionException.class,
|
||||
() -> client().prepareSearch("test")
|
||||
.setQuery(matchAllQuery())
|
||||
|
@ -161,7 +198,7 @@ public class SearchSliceIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
public void testInvalidQuery() throws Exception {
|
||||
setupIndex(false);
|
||||
setupIndex(0, 1);
|
||||
SearchPhaseExecutionException exc = expectThrows(SearchPhaseExecutionException.class,
|
||||
() -> client().prepareSearch()
|
||||
.setQuery(matchAllQuery())
|
||||
|
@ -173,7 +210,7 @@ public class SearchSliceIT extends ESIntegTestCase {
|
|||
equalTo("`slice` cannot be used outside of a scroll context"));
|
||||
}
|
||||
|
||||
private void assertSearchSlicesWithScroll(SearchRequestBuilder request, String field, int numSlice) {
|
||||
private void assertSearchSlicesWithScroll(SearchRequestBuilder request, String field, int numSlice, int numDocs) {
|
||||
int totalResults = 0;
|
||||
List<String> keys = new ArrayList<>();
|
||||
for (int id = 0; id < numSlice; id++) {
|
||||
|
@ -184,7 +221,7 @@ public class SearchSliceIT extends ESIntegTestCase {
|
|||
int numSliceResults = searchResponse.getHits().getHits().length;
|
||||
String scrollId = searchResponse.getScrollId();
|
||||
for (SearchHit hit : searchResponse.getHits().getHits()) {
|
||||
keys.add(hit.getId());
|
||||
assertTrue(keys.add(hit.getId()));
|
||||
}
|
||||
while (searchResponse.getHits().getHits().length > 0) {
|
||||
searchResponse = client().prepareSearchScroll("test")
|
||||
|
@ -195,15 +232,15 @@ public class SearchSliceIT extends ESIntegTestCase {
|
|||
totalResults += searchResponse.getHits().getHits().length;
|
||||
numSliceResults += searchResponse.getHits().getHits().length;
|
||||
for (SearchHit hit : searchResponse.getHits().getHits()) {
|
||||
keys.add(hit.getId());
|
||||
assertTrue(keys.add(hit.getId()));
|
||||
}
|
||||
}
|
||||
assertThat(numSliceResults, equalTo(expectedSliceResults));
|
||||
clearScroll(scrollId);
|
||||
}
|
||||
assertThat(totalResults, equalTo(NUM_DOCS));
|
||||
assertThat(keys.size(), equalTo(NUM_DOCS));
|
||||
assertThat(new HashSet(keys).size(), equalTo(NUM_DOCS));
|
||||
assertThat(totalResults, equalTo(numDocs));
|
||||
assertThat(keys.size(), equalTo(numDocs));
|
||||
assertThat(new HashSet(keys).size(), equalTo(numDocs));
|
||||
}
|
||||
|
||||
private Throwable findRootCause(Exception e) {
|
||||
|
|
|
@ -30,19 +30,38 @@ import org.apache.lucene.search.Query;
|
|||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.RAMDirectory;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.search.SearchShardIterator;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.OperationRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
|
||||
import org.elasticsearch.index.mapper.IdFieldMapper;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.index.query.Rewriteable;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.search.Scroll;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.internal.AliasFilter;
|
||||
import org.elasticsearch.search.internal.ShardSearchRequest;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -58,13 +77,138 @@ import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashC
|
|||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class SliceBuilderTests extends ESTestCase {
|
||||
private static final int MAX_SLICE = 20;
|
||||
|
||||
private static SliceBuilder randomSliceBuilder() throws IOException {
|
||||
static class ShardSearchRequestTest implements IndicesRequest, ShardSearchRequest {
|
||||
private final String[] indices;
|
||||
private final int shardId;
|
||||
private final String[] indexRoutings;
|
||||
private final String preference;
|
||||
|
||||
ShardSearchRequestTest(String index, int shardId, String[] indexRoutings, String preference) {
|
||||
this.indices = new String[] { index };
|
||||
this.shardId = shardId;
|
||||
this.indexRoutings = indexRoutings;
|
||||
this.preference = preference;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] indices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndicesOptions indicesOptions() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardId shardId() {
|
||||
return new ShardId(new Index(indices[0], indices[0]), shardId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] types() {
|
||||
return new String[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchSourceBuilder source() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AliasFilter getAliasFilter() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAliasFilter(AliasFilter filter) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void source(SearchSourceBuilder source) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numberOfShards() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchType searchType() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public float indexBoost() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long nowInMillis() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean requestCache() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean allowPartialSearchResults() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scroll scroll() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] indexRoutings() {
|
||||
return indexRoutings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String preference() {
|
||||
return preference;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProfile(boolean profile) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isProfile() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BytesReference cacheKey() throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getClusterAlias() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Rewriteable<Rewriteable> getRewriteable() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static SliceBuilder randomSliceBuilder() {
|
||||
int max = randomIntBetween(2, MAX_SLICE);
|
||||
int id = randomIntBetween(1, max - 1);
|
||||
String field = randomAlphaOfLengthBetween(5, 20);
|
||||
|
@ -75,7 +219,7 @@ public class SliceBuilderTests extends ESTestCase {
|
|||
return copyWriteable(original, new NamedWriteableRegistry(Collections.emptyList()), SliceBuilder::new);
|
||||
}
|
||||
|
||||
private static SliceBuilder mutate(SliceBuilder original) throws IOException {
|
||||
private static SliceBuilder mutate(SliceBuilder original) {
|
||||
switch (randomIntBetween(0, 2)) {
|
||||
case 0: return new SliceBuilder(original.getField() + "_xyz", original.getId(), original.getMax());
|
||||
case 1: return new SliceBuilder(original.getField(), original.getId() - 1, original.getMax());
|
||||
|
@ -84,6 +228,63 @@ public class SliceBuilderTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private IndexSettings createIndexSettings(Version indexVersionCreated, int numShards) {
|
||||
Settings settings = Settings.builder()
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, indexVersionCreated)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.build();
|
||||
IndexMetaData indexState = IndexMetaData.builder("index").settings(settings).build();
|
||||
return new IndexSettings(indexState, Settings.EMPTY);
|
||||
}
|
||||
|
||||
private ShardSearchRequest createRequest(int shardId) {
|
||||
return createRequest(shardId, Strings.EMPTY_ARRAY, null);
|
||||
}
|
||||
|
||||
private ShardSearchRequest createRequest(int shardId, String[] routings, String preference) {
|
||||
return new ShardSearchRequestTest("index", shardId, routings, preference);
|
||||
}
|
||||
|
||||
private QueryShardContext createShardContext(Version indexVersionCreated, IndexReader reader,
|
||||
String fieldName, DocValuesType dvType, int numShards, int shardId) {
|
||||
MappedFieldType fieldType = new MappedFieldType() {
|
||||
@Override
|
||||
public MappedFieldType clone() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String typeName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query termQuery(Object value, @Nullable QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Query existsQuery(QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
fieldType.setName(fieldName);
|
||||
QueryShardContext context = mock(QueryShardContext.class);
|
||||
when(context.fieldMapper(fieldName)).thenReturn(fieldType);
|
||||
when(context.getIndexReader()).thenReturn(reader);
|
||||
when(context.getShardId()).thenReturn(shardId);
|
||||
IndexSettings indexSettings = createIndexSettings(indexVersionCreated, numShards);
|
||||
when(context.getIndexSettings()).thenReturn(indexSettings);
|
||||
if (dvType != null) {
|
||||
fieldType.setHasDocValues(true);
|
||||
fieldType.setDocValuesType(dvType);
|
||||
IndexNumericFieldData fd = mock(IndexNumericFieldData.class);
|
||||
when(context.getForField(fieldType)).thenReturn(fd);
|
||||
}
|
||||
return context;
|
||||
|
||||
}
|
||||
|
||||
public void testSerialization() throws Exception {
|
||||
SliceBuilder original = randomSliceBuilder();
|
||||
SliceBuilder deserialized = serializedCopy(original);
|
||||
|
@ -131,92 +332,41 @@ public class SliceBuilderTests extends ESTestCase {
|
|||
assertEquals("max must be greater than id", e.getMessage());
|
||||
}
|
||||
|
||||
public void testToFilter() throws IOException {
|
||||
public void testToFilterSimple() throws IOException {
|
||||
Directory dir = new RAMDirectory();
|
||||
try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) {
|
||||
writer.commit();
|
||||
}
|
||||
QueryShardContext context = mock(QueryShardContext.class);
|
||||
try (IndexReader reader = DirectoryReader.open(dir)) {
|
||||
MappedFieldType fieldType = new MappedFieldType() {
|
||||
@Override
|
||||
public MappedFieldType clone() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String typeName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query termQuery(Object value, @Nullable QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Query existsQuery(QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
fieldType.setName(IdFieldMapper.NAME);
|
||||
fieldType.setHasDocValues(false);
|
||||
when(context.fieldMapper(IdFieldMapper.NAME)).thenReturn(fieldType);
|
||||
when(context.getIndexReader()).thenReturn(reader);
|
||||
Settings settings = Settings.builder()
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.build();
|
||||
IndexMetaData indexState = IndexMetaData.builder("index").settings(settings).build();
|
||||
IndexSettings indexSettings = new IndexSettings(indexState, Settings.EMPTY);
|
||||
when(context.getIndexSettings()).thenReturn(indexSettings);
|
||||
QueryShardContext context =
|
||||
createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED_NUMERIC, 1,0);
|
||||
SliceBuilder builder = new SliceBuilder(5, 10);
|
||||
Query query = builder.toFilter(context, 0, 1);
|
||||
Query query = builder.toFilter(null, createRequest(0), context, Version.CURRENT);
|
||||
assertThat(query, instanceOf(TermsSliceQuery.class));
|
||||
|
||||
assertThat(builder.toFilter(context, 0, 1), equalTo(query));
|
||||
assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query));
|
||||
try (IndexReader newReader = DirectoryReader.open(dir)) {
|
||||
when(context.getIndexReader()).thenReturn(newReader);
|
||||
assertThat(builder.toFilter(context, 0, 1), equalTo(query));
|
||||
assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testToFilterRandom() throws IOException {
|
||||
Directory dir = new RAMDirectory();
|
||||
try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) {
|
||||
writer.commit();
|
||||
}
|
||||
try (IndexReader reader = DirectoryReader.open(dir)) {
|
||||
MappedFieldType fieldType = new MappedFieldType() {
|
||||
@Override
|
||||
public MappedFieldType clone() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String typeName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query termQuery(Object value, @Nullable QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Query existsQuery(QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
fieldType.setName("field_doc_values");
|
||||
fieldType.setHasDocValues(true);
|
||||
fieldType.setDocValuesType(DocValuesType.SORTED_NUMERIC);
|
||||
when(context.fieldMapper("field_doc_values")).thenReturn(fieldType);
|
||||
when(context.getIndexReader()).thenReturn(reader);
|
||||
IndexNumericFieldData fd = mock(IndexNumericFieldData.class);
|
||||
when(context.getForField(fieldType)).thenReturn(fd);
|
||||
SliceBuilder builder = new SliceBuilder("field_doc_values", 5, 10);
|
||||
Query query = builder.toFilter(context, 0, 1);
|
||||
QueryShardContext context =
|
||||
createShardContext(Version.CURRENT, reader, "field", DocValuesType.SORTED_NUMERIC, 1,0);
|
||||
SliceBuilder builder = new SliceBuilder("field", 5, 10);
|
||||
Query query = builder.toFilter(null, createRequest(0), context, Version.CURRENT);
|
||||
assertThat(query, instanceOf(DocValuesSliceQuery.class));
|
||||
|
||||
assertThat(builder.toFilter(context, 0, 1), equalTo(query));
|
||||
assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query));
|
||||
try (IndexReader newReader = DirectoryReader.open(dir)) {
|
||||
when(context.getIndexReader()).thenReturn(newReader);
|
||||
assertThat(builder.toFilter(context, 0, 1), equalTo(query));
|
||||
assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query));
|
||||
}
|
||||
|
||||
// numSlices > numShards
|
||||
|
@ -226,7 +376,8 @@ public class SliceBuilderTests extends ESTestCase {
|
|||
for (int i = 0; i < numSlices; i++) {
|
||||
for (int j = 0; j < numShards; j++) {
|
||||
SliceBuilder slice = new SliceBuilder("_id", i, numSlices);
|
||||
Query q = slice.toFilter(context, j, numShards);
|
||||
context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards, j);
|
||||
Query q = slice.toFilter(null, createRequest(j), context, Version.CURRENT);
|
||||
if (q instanceof TermsSliceQuery || q instanceof MatchAllDocsQuery) {
|
||||
AtomicInteger count = numSliceMap.get(j);
|
||||
if (count == null) {
|
||||
|
@ -255,7 +406,8 @@ public class SliceBuilderTests extends ESTestCase {
|
|||
for (int i = 0; i < numSlices; i++) {
|
||||
for (int j = 0; j < numShards; j++) {
|
||||
SliceBuilder slice = new SliceBuilder("_id", i, numSlices);
|
||||
Query q = slice.toFilter(context, j, numShards);
|
||||
context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards, j);
|
||||
Query q = slice.toFilter(null, createRequest(j), context, Version.CURRENT);
|
||||
if (q instanceof MatchNoDocsQuery == false) {
|
||||
assertThat(q, instanceOf(MatchAllDocsQuery.class));
|
||||
targetShards.add(j);
|
||||
|
@ -271,7 +423,7 @@ public class SliceBuilderTests extends ESTestCase {
|
|||
for (int i = 0; i < numSlices; i++) {
|
||||
for (int j = 0; j < numShards; j++) {
|
||||
SliceBuilder slice = new SliceBuilder("_id", i, numSlices);
|
||||
Query q = slice.toFilter(context, j, numShards);
|
||||
Query q = slice.toFilter(null, createRequest(j), context, Version.CURRENT);
|
||||
if (i == j) {
|
||||
assertThat(q, instanceOf(MatchAllDocsQuery.class));
|
||||
} else {
|
||||
|
@ -280,85 +432,35 @@ public class SliceBuilderTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testInvalidField() throws IOException {
|
||||
Directory dir = new RAMDirectory();
|
||||
try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) {
|
||||
writer.commit();
|
||||
}
|
||||
try (IndexReader reader = DirectoryReader.open(dir)) {
|
||||
MappedFieldType fieldType = new MappedFieldType() {
|
||||
@Override
|
||||
public MappedFieldType clone() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String typeName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query termQuery(Object value, @Nullable QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Query existsQuery(QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
fieldType.setName("field_without_doc_values");
|
||||
when(context.fieldMapper("field_without_doc_values")).thenReturn(fieldType);
|
||||
when(context.getIndexReader()).thenReturn(reader);
|
||||
SliceBuilder builder = new SliceBuilder("field_without_doc_values", 5, 10);
|
||||
IllegalArgumentException exc =
|
||||
expectThrows(IllegalArgumentException.class, () -> builder.toFilter(context, 0, 1));
|
||||
QueryShardContext context = createShardContext(Version.CURRENT, reader, "field", null, 1,0);
|
||||
SliceBuilder builder = new SliceBuilder("field", 5, 10);
|
||||
IllegalArgumentException exc = expectThrows(IllegalArgumentException.class,
|
||||
() -> builder.toFilter(null, createRequest(0), context, Version.CURRENT));
|
||||
assertThat(exc.getMessage(), containsString("cannot load numeric doc values"));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void testToFilterDeprecationMessage() throws IOException {
|
||||
Directory dir = new RAMDirectory();
|
||||
try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) {
|
||||
writer.commit();
|
||||
}
|
||||
QueryShardContext context = mock(QueryShardContext.class);
|
||||
try (IndexReader reader = DirectoryReader.open(dir)) {
|
||||
MappedFieldType fieldType = new MappedFieldType() {
|
||||
@Override
|
||||
public MappedFieldType clone() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String typeName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Query termQuery(Object value, @Nullable QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Query existsQuery(QueryShardContext context) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
fieldType.setName("_uid");
|
||||
fieldType.setHasDocValues(false);
|
||||
when(context.fieldMapper("_uid")).thenReturn(fieldType);
|
||||
when(context.getIndexReader()).thenReturn(reader);
|
||||
Settings settings = Settings.builder()
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_3_0)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.build();
|
||||
IndexMetaData indexState = IndexMetaData.builder("index").settings(settings).build();
|
||||
IndexSettings indexSettings = new IndexSettings(indexState, Settings.EMPTY);
|
||||
when(context.getIndexSettings()).thenReturn(indexSettings);
|
||||
QueryShardContext context = createShardContext(Version.V_6_3_0, reader, "_uid", null, 1,0);
|
||||
SliceBuilder builder = new SliceBuilder("_uid", 5, 10);
|
||||
Query query = builder.toFilter(context, 0, 1);
|
||||
Query query = builder.toFilter(null, createRequest(0), context, Version.CURRENT);
|
||||
assertThat(query, instanceOf(TermsSliceQuery.class));
|
||||
assertThat(builder.toFilter(context, 0, 1), equalTo(query));
|
||||
assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query));
|
||||
assertWarnings("Computing slices on the [_uid] field is deprecated for 6.x indices, use [_id] instead");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testSerializationBackcompat() throws IOException {
|
||||
|
@ -375,4 +477,35 @@ public class SliceBuilderTests extends ESTestCase {
|
|||
SliceBuilder::new, Version.V_6_3_0);
|
||||
assertEquals(sliceBuilder, copy63);
|
||||
}
|
||||
|
||||
public void testToFilterWithRouting() throws IOException {
|
||||
Directory dir = new RAMDirectory();
|
||||
try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) {
|
||||
writer.commit();
|
||||
}
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
ClusterState state = mock(ClusterState.class);
|
||||
when(state.metaData()).thenReturn(MetaData.EMPTY_META_DATA);
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
OperationRouting routing = mock(OperationRouting.class);
|
||||
GroupShardsIterator<ShardIterator> it = new GroupShardsIterator<>(
|
||||
Collections.singletonList(
|
||||
new SearchShardIterator(null, new ShardId("index", "index", 1), null, null)
|
||||
)
|
||||
);
|
||||
when(routing.searchShards(any(), any(), any(), any())).thenReturn(it);
|
||||
when(clusterService.operationRouting()).thenReturn(routing);
|
||||
when(clusterService.getSettings()).thenReturn(Settings.EMPTY);
|
||||
try (IndexReader reader = DirectoryReader.open(dir)) {
|
||||
QueryShardContext context = createShardContext(Version.CURRENT, reader, "field", DocValuesType.SORTED, 5, 0);
|
||||
SliceBuilder builder = new SliceBuilder("field", 6, 10);
|
||||
String[] routings = new String[] { "foo" };
|
||||
Query query = builder.toFilter(clusterService, createRequest(1, routings, null), context, Version.CURRENT);
|
||||
assertEquals(new DocValuesSliceQuery("field", 6, 10), query);
|
||||
query = builder.toFilter(clusterService, createRequest(1, Strings.EMPTY_ARRAY, "foo"), context, Version.CURRENT);
|
||||
assertEquals(new DocValuesSliceQuery("field", 6, 10), query);
|
||||
query = builder.toFilter(clusterService, createRequest(1, Strings.EMPTY_ARRAY, "foo"), context, Version.V_6_2_0);
|
||||
assertEquals(new DocValuesSliceQuery("field", 1, 2), query);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue