Add support for local cluster alias to SearchRequest (#36997)
With the upcoming cross-cluster search alternate execution mode, the CCS node will be able to split a CCS request into multiple search requests, one per remote cluster involved. In order to do that, the CCS node has to be able to signal to each remote cluster that such sub-requests are part of a CCS request. Each cluster does not know about the other clusters involved, and does not know either what alias it is given in the CCS node, hence the CCS coordinating node needs to be able to provide the alias as part of the search request so that it is used as index prefix in the returned search hits. The cluster alias is a notion that's already supported in the search shards iterator and search shard target, but it is currently used in CCS as both index prefix and connection lookup key when fanning out to all the shards. With CCS alternate execution mode the provided cluster alias needs to be used only as index prefix, as shards are local to each cluster hence no cluster alias should be used for connection lookups. The local cluster alias can be set to the SearchRequest at the transport layer only, and its constructor/getter methods are package private. Relates to #32125
This commit is contained in:
parent
0cae979dfe
commit
51fe20e0c3
|
@ -318,8 +318,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt) {
|
public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt) {
|
||||||
String clusterAlias = shardIt.getClusterAlias();
|
|
||||||
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
|
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
|
||||||
assert filter != null;
|
assert filter != null;
|
||||||
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
|
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
|
||||||
|
@ -327,7 +327,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
|
||||||
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
|
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
|
||||||
.toArray(new String[0]);
|
.toArray(new String[0]);
|
||||||
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
|
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
|
||||||
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias, routings);
|
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -90,8 +90,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
|
||||||
final SearchShardIterator shardIt, Exception e) {
|
final SearchShardIterator shardIt, Exception e) {
|
||||||
// we always add the shard failure for a specific shard instance
|
// we always add the shard failure for a specific shard instance
|
||||||
// we do make sure to clean it on a successful response from a shard
|
// we do make sure to clean it on a successful response from a shard
|
||||||
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId(), shardIt.getClusterAlias(),
|
SearchShardTarget shardTarget = shardIt.newSearchShardTarget(nodeId);
|
||||||
shardIt.getOriginalIndices());
|
|
||||||
onShardFailure(shardIndex, shardTarget, e);
|
onShardFailure(shardIndex, shardTarget, e);
|
||||||
|
|
||||||
if (totalOps.incrementAndGet() == expectedTotalOps) {
|
if (totalOps.incrementAndGet() == expectedTotalOps) {
|
||||||
|
@ -257,8 +256,8 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
|
||||||
Runnable r = () -> {
|
Runnable r = () -> {
|
||||||
final Thread thread = Thread.currentThread();
|
final Thread thread = Thread.currentThread();
|
||||||
try {
|
try {
|
||||||
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
|
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(
|
||||||
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
|
shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {
|
||||||
@Override
|
@Override
|
||||||
public void innerOnResponse(FirstResult result) {
|
public void innerOnResponse(FirstResult result) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -36,6 +36,7 @@ class ScrollIdForNode {
|
||||||
return node;
|
return node;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
public String getClusterAlias() {
|
public String getClusterAlias() {
|
||||||
return clusterAlias;
|
return clusterAlias;
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,6 +62,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
|
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
|
||||||
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;
|
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;
|
||||||
|
|
||||||
|
private final String localClusterAlias;
|
||||||
|
|
||||||
private SearchType searchType = SearchType.DEFAULT;
|
private SearchType searchType = SearchType.DEFAULT;
|
||||||
|
|
||||||
private String[] indices = Strings.EMPTY_ARRAY;
|
private String[] indices = Strings.EMPTY_ARRAY;
|
||||||
|
@ -92,6 +94,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
|
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
|
||||||
|
|
||||||
public SearchRequest() {
|
public SearchRequest() {
|
||||||
|
this.localClusterAlias = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -111,6 +114,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
this.searchType = searchRequest.searchType;
|
this.searchType = searchRequest.searchType;
|
||||||
this.source = searchRequest.source;
|
this.source = searchRequest.source;
|
||||||
this.types = searchRequest.types;
|
this.types = searchRequest.types;
|
||||||
|
this.localClusterAlias = searchRequest.localClusterAlias;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -125,6 +129,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
* Constructs a new search request against the provided indices with the given search source.
|
* Constructs a new search request against the provided indices with the given search source.
|
||||||
*/
|
*/
|
||||||
public SearchRequest(String[] indices, SearchSourceBuilder source) {
|
public SearchRequest(String[] indices, SearchSourceBuilder source) {
|
||||||
|
this();
|
||||||
if (source == null) {
|
if (source == null) {
|
||||||
throw new IllegalArgumentException("source must not be null");
|
throw new IllegalArgumentException("source must not be null");
|
||||||
}
|
}
|
||||||
|
@ -132,6 +137,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
this.source = source;
|
this.source = source;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new search request by providing the alias of the cluster where it will be executed. Used when a {@link SearchRequest}
|
||||||
|
* is created and executed as part of a cross-cluster search request performing local reduction on each cluster.
|
||||||
|
* The coordinating CCS node provides the alias to prefix index names with in the returned search results.
|
||||||
|
*/
|
||||||
|
SearchRequest(String localClusterAlias) {
|
||||||
|
this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a new search request from reading the specified stream.
|
* Constructs a new search request from reading the specified stream.
|
||||||
*
|
*
|
||||||
|
@ -158,6 +172,12 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
|
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
|
||||||
allowPartialSearchResults = in.readOptionalBoolean();
|
allowPartialSearchResults = in.readOptionalBoolean();
|
||||||
}
|
}
|
||||||
|
//TODO update version after backport
|
||||||
|
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||||
|
localClusterAlias = in.readOptionalString();
|
||||||
|
} else {
|
||||||
|
localClusterAlias = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -181,6 +201,10 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
|
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
|
||||||
out.writeOptionalBoolean(allowPartialSearchResults);
|
out.writeOptionalBoolean(allowPartialSearchResults);
|
||||||
}
|
}
|
||||||
|
//TODO update version after backport
|
||||||
|
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||||
|
out.writeOptionalString(localClusterAlias);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -209,6 +233,16 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
return validationException;
|
return validationException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the alias of the cluster that this search request is being executed on. A non-null value indicates that this search request
|
||||||
|
* is being executed as part of a locally reduced cross-cluster search request. The cluster alias is used to prefix index names
|
||||||
|
* returned as part of search hits with the alias of the cluster they came from.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
String getLocalClusterAlias() {
|
||||||
|
return localClusterAlias;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the indices the search will be executed on.
|
* Sets the indices the search will be executed on.
|
||||||
*/
|
*/
|
||||||
|
@ -529,14 +563,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
Objects.equals(maxConcurrentShardRequests, that.maxConcurrentShardRequests) &&
|
Objects.equals(maxConcurrentShardRequests, that.maxConcurrentShardRequests) &&
|
||||||
Objects.equals(preFilterShardSize, that.preFilterShardSize) &&
|
Objects.equals(preFilterShardSize, that.preFilterShardSize) &&
|
||||||
Objects.equals(indicesOptions, that.indicesOptions) &&
|
Objects.equals(indicesOptions, that.indicesOptions) &&
|
||||||
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults);
|
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
|
||||||
|
Objects.equals(localClusterAlias, that.localClusterAlias);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
|
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
|
||||||
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
|
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
|
||||||
allowPartialSearchResults);
|
allowPartialSearchResults, localClusterAlias);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -554,6 +589,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
|
||||||
", batchedReduceSize=" + batchedReduceSize +
|
", batchedReduceSize=" + batchedReduceSize +
|
||||||
", preFilterShardSize=" + preFilterShardSize +
|
", preFilterShardSize=" + preFilterShardSize +
|
||||||
", allowPartialSearchResults=" + allowPartialSearchResults +
|
", allowPartialSearchResults=" + allowPartialSearchResults +
|
||||||
|
", localClusterAlias=" + localClusterAlias +
|
||||||
", source=" + source + '}';
|
", source=" + source + '}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,28 +22,34 @@ package org.elasticsearch.action.search;
|
||||||
import org.elasticsearch.action.OriginalIndices;
|
import org.elasticsearch.action.OriginalIndices;
|
||||||
import org.elasticsearch.cluster.routing.PlainShardIterator;
|
import org.elasticsearch.cluster.routing.PlainShardIterator;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.search.SearchShardTarget;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extension of {@link PlainShardIterator} used in the search api, which also holds the {@link OriginalIndices}
|
* Extension of {@link PlainShardIterator} used in the search api, which also holds the {@link OriginalIndices}
|
||||||
* of the search request. Useful especially with cross cluster search, as each cluster has its own set of original indices.
|
* of the search request (useful especially with cross-cluster search, as each cluster has its own set of original indices) as well as
|
||||||
|
* the cluster alias.
|
||||||
|
* @see OriginalIndices
|
||||||
*/
|
*/
|
||||||
public final class SearchShardIterator extends PlainShardIterator {
|
public final class SearchShardIterator extends PlainShardIterator {
|
||||||
|
|
||||||
private final OriginalIndices originalIndices;
|
private final OriginalIndices originalIndices;
|
||||||
private String clusterAlias;
|
private final String clusterAlias;
|
||||||
private boolean skip = false;
|
private boolean skip = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
|
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
|
||||||
* this the a given <code>shardId</code>.
|
* this the a given <code>shardId</code>.
|
||||||
*
|
*
|
||||||
|
* @param clusterAlias the alias of the cluster where the shard is located
|
||||||
* @param shardId shard id of the group
|
* @param shardId shard id of the group
|
||||||
* @param shards shards to iterate
|
* @param shards shards to iterate
|
||||||
|
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
|
||||||
*/
|
*/
|
||||||
public SearchShardIterator(String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
|
public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
|
||||||
super(shardId, shards);
|
super(shardId, shards);
|
||||||
this.originalIndices = originalIndices;
|
this.originalIndices = originalIndices;
|
||||||
this.clusterAlias = clusterAlias;
|
this.clusterAlias = clusterAlias;
|
||||||
|
@ -56,10 +62,22 @@ public final class SearchShardIterator extends PlainShardIterator {
|
||||||
return originalIndices;
|
return originalIndices;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the alias of the cluster where the shard is located.
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
public String getClusterAlias() {
|
public String getClusterAlias() {
|
||||||
return clusterAlias;
|
return clusterAlias;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new shard target from this iterator, pointing at the node identified by the provided identifier.
|
||||||
|
* @see SearchShardTarget
|
||||||
|
*/
|
||||||
|
SearchShardTarget newSearchShardTarget(String nodeId) {
|
||||||
|
return new SearchShardTarget(nodeId, shardId(), clusterAlias, originalIndices);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reset the iterator and mark it as skippable
|
* Reset the iterator and mark it as skippable
|
||||||
* @see #skip()
|
* @see #skip()
|
||||||
|
|
|
@ -98,8 +98,8 @@ public class ShardSearchFailure extends ShardOperationFailedException {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
if (in.readBoolean()) {
|
shardTarget = in.readOptionalWriteable(SearchShardTarget::new);
|
||||||
shardTarget = new SearchShardTarget(in);
|
if (shardTarget != null) {
|
||||||
index = shardTarget.getFullyQualifiedIndexName();
|
index = shardTarget.getFullyQualifiedIndexName();
|
||||||
shardId = shardTarget.getShardId().getId();
|
shardId = shardTarget.getShardId().getId();
|
||||||
}
|
}
|
||||||
|
@ -110,12 +110,7 @@ public class ShardSearchFailure extends ShardOperationFailedException {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
if (shardTarget == null) {
|
out.writeOptionalWriteable(shardTarget);
|
||||||
out.writeBoolean(false);
|
|
||||||
} else {
|
|
||||||
out.writeBoolean(true);
|
|
||||||
shardTarget.writeTo(out);
|
|
||||||
}
|
|
||||||
out.writeString(reason);
|
out.writeString(reason);
|
||||||
RestStatus.writeTo(out, status);
|
RestStatus.writeTo(out, status);
|
||||||
out.writeException(cause);
|
out.writeException(cause);
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
|
@ -60,6 +61,7 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
|
import java.util.function.Function;
|
||||||
import java.util.function.LongSupplier;
|
import java.util.function.LongSupplier;
|
||||||
|
|
||||||
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
|
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
|
||||||
|
@ -311,7 +313,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
||||||
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
|
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
|
||||||
concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
|
concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
|
||||||
GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
|
GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
|
||||||
remoteShardIterators);
|
searchRequest.getLocalClusterAlias(), remoteShardIterators);
|
||||||
|
|
||||||
failIfOverShardCountLimit(clusterService, shardIterators.size());
|
failIfOverShardCountLimit(clusterService, shardIterators.size());
|
||||||
|
|
||||||
|
@ -338,19 +340,34 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
||||||
}
|
}
|
||||||
|
|
||||||
final DiscoveryNodes nodes = clusterState.nodes();
|
final DiscoveryNodes nodes = clusterState.nodes();
|
||||||
BiFunction<String, String, Transport.Connection> connectionLookup = (clusterName, nodeId) -> {
|
BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
|
||||||
final DiscoveryNode discoveryNode = clusterName == null ? nodes.get(nodeId) : remoteConnections.apply(clusterName, nodeId);
|
nodes::get, remoteConnections, searchTransportService::getConnection);
|
||||||
if (discoveryNode == null) {
|
|
||||||
throw new IllegalStateException("no node found for id: " + nodeId);
|
|
||||||
}
|
|
||||||
return searchTransportService.getConnection(clusterName, discoveryNode);
|
|
||||||
};
|
|
||||||
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
|
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
|
||||||
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
|
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
|
||||||
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
|
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupShardsIterator<SearchShardIterator> shardIterators) {
|
static BiFunction<String, String, Transport.Connection> buildConnectionLookup(String requestClusterAlias,
|
||||||
|
Function<String, DiscoveryNode> localNodes,
|
||||||
|
BiFunction<String, String, DiscoveryNode> remoteNodes,
|
||||||
|
BiFunction<String, DiscoveryNode, Transport.Connection> nodeToConnection) {
|
||||||
|
return (clusterAlias, nodeId) -> {
|
||||||
|
final DiscoveryNode discoveryNode;
|
||||||
|
if (clusterAlias == null || requestClusterAlias != null) {
|
||||||
|
assert requestClusterAlias == null || requestClusterAlias.equals(clusterAlias);
|
||||||
|
discoveryNode = localNodes.apply(nodeId);
|
||||||
|
} else {
|
||||||
|
discoveryNode = remoteNodes.apply(clusterAlias, nodeId);
|
||||||
|
}
|
||||||
|
if (discoveryNode == null) {
|
||||||
|
throw new IllegalStateException("no node found for id: " + nodeId);
|
||||||
|
}
|
||||||
|
return nodeToConnection.apply(clusterAlias, discoveryNode);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean shouldPreFilterSearchShards(SearchRequest searchRequest,
|
||||||
|
GroupShardsIterator<SearchShardIterator> shardIterators) {
|
||||||
SearchSourceBuilder source = searchRequest.source();
|
SearchSourceBuilder source = searchRequest.source();
|
||||||
return searchRequest.searchType() == QUERY_THEN_FETCH && // we can't do this for DFS it needs to fan out to all shards all the time
|
return searchRequest.searchType() == QUERY_THEN_FETCH && // we can't do this for DFS it needs to fan out to all shards all the time
|
||||||
SearchService.canRewriteToMatchNone(source) &&
|
SearchService.canRewriteToMatchNone(source) &&
|
||||||
|
@ -359,10 +376,11 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
||||||
|
|
||||||
static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
|
static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
|
||||||
OriginalIndices localIndices,
|
OriginalIndices localIndices,
|
||||||
|
@Nullable String localClusterAlias,
|
||||||
List<SearchShardIterator> remoteShardIterators) {
|
List<SearchShardIterator> remoteShardIterators) {
|
||||||
List<SearchShardIterator> shards = new ArrayList<>(remoteShardIterators);
|
List<SearchShardIterator> shards = new ArrayList<>(remoteShardIterators);
|
||||||
for (ShardIterator shardIterator : localShardsIterator) {
|
for (ShardIterator shardIterator : localShardsIterator) {
|
||||||
shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
|
shards.add(new SearchShardIterator(localClusterAlias, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
|
||||||
}
|
}
|
||||||
return new GroupShardsIterator<>(shards);
|
return new GroupShardsIterator<>(shards);
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,8 +44,8 @@ final class TransportSearchHelper {
|
||||||
out.writeLong(searchPhaseResult.getRequestId());
|
out.writeLong(searchPhaseResult.getRequestId());
|
||||||
SearchShardTarget searchShardTarget = searchPhaseResult.getSearchShardTarget();
|
SearchShardTarget searchShardTarget = searchPhaseResult.getSearchShardTarget();
|
||||||
if (searchShardTarget.getClusterAlias() != null) {
|
if (searchShardTarget.getClusterAlias() != null) {
|
||||||
out.writeString(RemoteClusterAware.buildRemoteIndexName(searchShardTarget.getClusterAlias(),
|
out.writeString(
|
||||||
searchShardTarget.getNodeId()));
|
RemoteClusterAware.buildRemoteIndexName(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()));
|
||||||
} else {
|
} else {
|
||||||
out.writeString(searchShardTarget.getNodeId());
|
out.writeString(searchShardTarget.getNodeId());
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,7 +159,7 @@ final class DefaultSearchContext extends SearchContext {
|
||||||
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
|
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
|
||||||
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
|
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
|
||||||
IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter, TimeValue timeout,
|
IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter, TimeValue timeout,
|
||||||
FetchPhase fetchPhase, String clusterAlias, Version minNodeVersion) {
|
FetchPhase fetchPhase, Version minNodeVersion) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.request = request;
|
this.request = request;
|
||||||
this.fetchPhase = fetchPhase;
|
this.fetchPhase = fetchPhase;
|
||||||
|
@ -179,7 +179,7 @@ final class DefaultSearchContext extends SearchContext {
|
||||||
this.timeout = timeout;
|
this.timeout = timeout;
|
||||||
this.minNodeVersion = minNodeVersion;
|
this.minNodeVersion = minNodeVersion;
|
||||||
queryShardContext = indexService.newQueryShardContext(request.shardId().id(), searcher.getIndexReader(), request::nowInMillis,
|
queryShardContext = indexService.newQueryShardContext(request.shardId().id(), searcher.getIndexReader(), request::nowInMillis,
|
||||||
clusterAlias);
|
shardTarget.getClusterAlias());
|
||||||
queryShardContext.setTypes(request.types());
|
queryShardContext.setTypes(request.types());
|
||||||
queryBoost = request.indexBoost();
|
queryBoost = request.indexBoost();
|
||||||
}
|
}
|
||||||
|
|
|
@ -668,8 +668,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout)
|
public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException {
|
||||||
throws IOException {
|
|
||||||
return createSearchContext(request, timeout, true, "search");
|
return createSearchContext(request, timeout, true, "search");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -684,7 +683,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
||||||
|
|
||||||
final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
|
final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
|
||||||
engineSearcher, clusterService, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout,
|
engineSearcher, clusterService, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout,
|
||||||
fetchPhase, request.getClusterAlias(), clusterService.state().nodes().getMinNodeVersion());
|
fetchPhase, clusterService.state().nodes().getMinNodeVersion());
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
// we clone the query shard context here just for rewriting otherwise we
|
// we clone the query shard context here just for rewriting otherwise we
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.transport.RemoteClusterAware;
|
import org.elasticsearch.transport.RemoteClusterAware;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The target that the search request was executed on.
|
* The target that the search request was executed on.
|
||||||
|
@ -54,7 +55,7 @@ public final class SearchShardTarget implements Writeable, Comparable<SearchShar
|
||||||
clusterAlias = in.readOptionalString();
|
clusterAlias = in.readOptionalString();
|
||||||
}
|
}
|
||||||
|
|
||||||
public SearchShardTarget(String nodeId, ShardId shardId, String clusterAlias, OriginalIndices originalIndices) {
|
public SearchShardTarget(String nodeId, ShardId shardId, @Nullable String clusterAlias, OriginalIndices originalIndices) {
|
||||||
this.nodeId = nodeId == null ? null : new Text(nodeId);
|
this.nodeId = nodeId == null ? null : new Text(nodeId);
|
||||||
this.shardId = shardId;
|
this.shardId = shardId;
|
||||||
this.originalIndices = originalIndices;
|
this.originalIndices = originalIndices;
|
||||||
|
@ -87,15 +88,16 @@ public final class SearchShardTarget implements Writeable, Comparable<SearchShar
|
||||||
return originalIndices;
|
return originalIndices;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
public String getClusterAlias() {
|
public String getClusterAlias() {
|
||||||
return clusterAlias;
|
return clusterAlias;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the fully qualified index name, including the cluster alias.
|
* Returns the fully qualified index name, including the index prefix that indicates which cluster results come from.
|
||||||
*/
|
*/
|
||||||
public String getFullyQualifiedIndexName() {
|
public String getFullyQualifiedIndexName() {
|
||||||
return RemoteClusterAware.buildRemoteIndexName(getClusterAlias(), getIndex());
|
return RemoteClusterAware.buildRemoteIndexName(clusterAlias, getIndex());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -121,28 +123,27 @@ public final class SearchShardTarget implements Writeable, Comparable<SearchShar
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
if (this == o) return true;
|
if (this == o) {
|
||||||
if (o == null || getClass() != o.getClass()) return false;
|
|
||||||
SearchShardTarget that = (SearchShardTarget) o;
|
|
||||||
if (shardId.equals(that.shardId) == false) return false;
|
|
||||||
if (nodeId != null ? !nodeId.equals(that.nodeId) : that.nodeId != null) return false;
|
|
||||||
if (clusterAlias != null ? !clusterAlias.equals(that.clusterAlias) : that.clusterAlias != null) return false;
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
SearchShardTarget that = (SearchShardTarget) o;
|
||||||
|
return Objects.equals(nodeId, that.nodeId) &&
|
||||||
|
Objects.equals(shardId, that.shardId) &&
|
||||||
|
Objects.equals(clusterAlias, that.clusterAlias);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
int result = nodeId != null ? nodeId.hashCode() : 0;
|
return Objects.hash(nodeId, shardId, clusterAlias);
|
||||||
result = 31 * result + (shardId.getIndexName() != null ? shardId.getIndexName().hashCode() : 0);
|
|
||||||
result = 31 * result + shardId.hashCode();
|
|
||||||
result = 31 * result + (clusterAlias != null ? clusterAlias.hashCode() : 0);
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
String shardToString = "[" + RemoteClusterAware.buildRemoteIndexName(clusterAlias, shardId.getIndexName()) + "][" + shardId.getId()
|
String shardToString = "[" + RemoteClusterAware.buildRemoteIndexName(
|
||||||
+ "]";
|
clusterAlias, shardId.getIndexName()) + "][" + shardId.getId() + "]";
|
||||||
if (nodeId == null) {
|
if (nodeId == null) {
|
||||||
return "[_na_]" + shardToString;
|
return "[_na_]" + shardToString;
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.search.internal;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.search.SearchRequest;
|
import org.elasticsearch.action.search.SearchRequest;
|
||||||
import org.elasticsearch.action.search.SearchType;
|
import org.elasticsearch.action.search.SearchType;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
@ -76,8 +77,8 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
||||||
ShardSearchLocalRequest() {
|
ShardSearchLocalRequest() {
|
||||||
}
|
}
|
||||||
|
|
||||||
ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards,
|
ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards, AliasFilter aliasFilter, float indexBoost,
|
||||||
AliasFilter aliasFilter, float indexBoost, long nowInMillis, String clusterAlias, String[] indexRoutings) {
|
long nowInMillis, @Nullable String clusterAlias, String[] indexRoutings) {
|
||||||
this(shardId, numberOfShards, searchRequest.searchType(),
|
this(shardId, numberOfShards, searchRequest.searchType(),
|
||||||
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost,
|
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost,
|
||||||
searchRequest.allowPartialSearchResults(), indexRoutings, searchRequest.preference());
|
searchRequest.allowPartialSearchResults(), indexRoutings, searchRequest.preference());
|
||||||
|
@ -113,7 +114,6 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
|
||||||
this.preference = preference;
|
this.preference = preference;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ShardId shardId() {
|
public ShardId shardId() {
|
||||||
return shardId;
|
return shardId;
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.action.search.SearchType;
|
||||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.CheckedFunction;
|
import org.elasticsearch.common.CheckedFunction;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
|
@ -151,9 +152,9 @@ public interface ShardSearchRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the cluster alias if this request is for a remote cluster or <code>null</code> if the request if targeted to the local
|
* Returns the cluster alias in case the request is part of a cross-cluster search request, <code>null</code> otherwise.
|
||||||
* cluster.
|
|
||||||
*/
|
*/
|
||||||
|
@Nullable
|
||||||
String getClusterAlias();
|
String getClusterAlias();
|
||||||
|
|
||||||
Rewriteable<Rewriteable> getRewriteable();
|
Rewriteable<Rewriteable> getRewriteable();
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.action.search.SearchRequest;
|
||||||
import org.elasticsearch.action.search.SearchTask;
|
import org.elasticsearch.action.search.SearchTask;
|
||||||
import org.elasticsearch.action.search.SearchType;
|
import org.elasticsearch.action.search.SearchType;
|
||||||
import org.elasticsearch.action.support.IndicesOptions;
|
import org.elasticsearch.action.support.IndicesOptions;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
@ -51,12 +52,9 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
|
||||||
|
|
||||||
private ShardSearchLocalRequest shardSearchLocalRequest;
|
private ShardSearchLocalRequest shardSearchLocalRequest;
|
||||||
|
|
||||||
public ShardSearchTransportRequest(){
|
|
||||||
}
|
|
||||||
|
|
||||||
public ShardSearchTransportRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, int numberOfShards,
|
public ShardSearchTransportRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, int numberOfShards,
|
||||||
AliasFilter aliasFilter, float indexBoost, long nowInMillis,
|
AliasFilter aliasFilter, float indexBoost, long nowInMillis,
|
||||||
String clusterAlias, String[] indexRoutings) {
|
@Nullable String clusterAlias, String[] indexRoutings) {
|
||||||
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, numberOfShards, aliasFilter, indexBoost,
|
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, numberOfShards, aliasFilter, indexBoost,
|
||||||
nowInMillis, clusterAlias, indexRoutings);
|
nowInMillis, clusterAlias, indexRoutings);
|
||||||
this.originalIndices = originalIndices;
|
this.originalIndices = originalIndices;
|
||||||
|
|
|
@ -116,7 +116,8 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
|
||||||
public void testBuildShardSearchTransportRequest() {
|
public void testBuildShardSearchTransportRequest() {
|
||||||
final AtomicLong expected = new AtomicLong();
|
final AtomicLong expected = new AtomicLong();
|
||||||
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(false, expected);
|
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(false, expected);
|
||||||
SearchShardIterator iterator = new SearchShardIterator("test-cluster", new ShardId(new Index("name", "foo"), 1),
|
String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10);
|
||||||
|
SearchShardIterator iterator = new SearchShardIterator(clusterAlias, new ShardId(new Index("name", "foo"), 1),
|
||||||
Collections.emptyList(), new OriginalIndices(new String[] {"name", "name1"}, IndicesOptions.strictExpand()));
|
Collections.emptyList(), new OriginalIndices(new String[] {"name", "name1"}, IndicesOptions.strictExpand()));
|
||||||
ShardSearchTransportRequest shardSearchTransportRequest = action.buildShardSearchRequest(iterator);
|
ShardSearchTransportRequest shardSearchTransportRequest = action.buildShardSearchRequest(iterator);
|
||||||
assertEquals(IndicesOptions.strictExpand(), shardSearchTransportRequest.indicesOptions());
|
assertEquals(IndicesOptions.strictExpand(), shardSearchTransportRequest.indicesOptions());
|
||||||
|
@ -126,5 +127,6 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
|
||||||
assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices());
|
assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices());
|
||||||
assertArrayEquals(new String[] {"bar", "baz"}, shardSearchTransportRequest.indexRoutings());
|
assertArrayEquals(new String[] {"bar", "baz"}, shardSearchTransportRequest.indexRoutings());
|
||||||
assertEquals("_shards:1,3", shardSearchTransportRequest.preference());
|
assertEquals("_shards:1,3", shardSearchTransportRequest.preference());
|
||||||
|
assertEquals(clusterAlias, shardSearchTransportRequest.getClusterAlias());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,12 +29,14 @@ import org.apache.lucene.search.TotalHits;
|
||||||
import org.apache.lucene.search.TotalHits.Relation;
|
import org.apache.lucene.search.TotalHits.Relation;
|
||||||
import org.apache.lucene.search.grouping.CollapseTopFieldDocs;
|
import org.apache.lucene.search.grouping.CollapseTopFieldDocs;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
|
import org.elasticsearch.action.OriginalIndices;
|
||||||
import org.elasticsearch.common.lucene.Lucene;
|
import org.elasticsearch.common.lucene.Lucene;
|
||||||
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
|
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
|
||||||
import org.elasticsearch.common.text.Text;
|
import org.elasticsearch.common.text.Text;
|
||||||
import org.elasticsearch.common.util.BigArrays;
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.search.DocValueFormat;
|
import org.elasticsearch.search.DocValueFormat;
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
import org.elasticsearch.search.SearchHits;
|
import org.elasticsearch.search.SearchHits;
|
||||||
|
@ -157,14 +159,18 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
||||||
for (boolean trackTotalHits : new boolean[] {true, false}) {
|
for (boolean trackTotalHits : new boolean[] {true, false}) {
|
||||||
SearchPhaseController.ReducedQueryPhase reducedQueryPhase =
|
SearchPhaseController.ReducedQueryPhase reducedQueryPhase =
|
||||||
searchPhaseController.reducedQueryPhase(queryResults.asList(), false, trackTotalHits);
|
searchPhaseController.reducedQueryPhase(queryResults.asList(), false, trackTotalHits);
|
||||||
AtomicArray<SearchPhaseResult> searchPhaseResultAtomicArray = generateFetchResults(nShards,
|
AtomicArray<SearchPhaseResult> fetchResults = generateFetchResults(nShards,
|
||||||
reducedQueryPhase.sortedTopDocs.scoreDocs, reducedQueryPhase.suggest);
|
reducedQueryPhase.sortedTopDocs.scoreDocs, reducedQueryPhase.suggest);
|
||||||
InternalSearchResponse mergedResponse = searchPhaseController.merge(false,
|
InternalSearchResponse mergedResponse = searchPhaseController.merge(false,
|
||||||
reducedQueryPhase,
|
reducedQueryPhase,
|
||||||
searchPhaseResultAtomicArray.asList(), searchPhaseResultAtomicArray::get);
|
fetchResults.asList(), fetchResults::get);
|
||||||
if (trackTotalHits == false) {
|
if (trackTotalHits == false) {
|
||||||
assertNull(mergedResponse.hits.getTotalHits());
|
assertNull(mergedResponse.hits.getTotalHits());
|
||||||
}
|
}
|
||||||
|
for (SearchHit hit : mergedResponse.hits().getHits()) {
|
||||||
|
SearchPhaseResult searchPhaseResult = fetchResults.get(hit.getShard().getShardId().id());
|
||||||
|
assertSame(searchPhaseResult.getSearchShardTarget(), hit.getShard());
|
||||||
|
}
|
||||||
int suggestSize = 0;
|
int suggestSize = 0;
|
||||||
for (Suggest.Suggestion s : reducedQueryPhase.suggest) {
|
for (Suggest.Suggestion s : reducedQueryPhase.suggest) {
|
||||||
Stream<CompletionSuggestion.Entry> stream = s.getEntries().stream();
|
Stream<CompletionSuggestion.Entry> stream = s.getEntries().stream();
|
||||||
|
@ -182,6 +188,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
||||||
assertThat(options.size(), equalTo(suggestion.getEntries().get(0).getOptions().size()));
|
assertThat(options.size(), equalTo(suggestion.getEntries().get(0).getOptions().size()));
|
||||||
for (CompletionSuggestion.Entry.Option option : options) {
|
for (CompletionSuggestion.Entry.Option option : options) {
|
||||||
assertNotNull(option.getHit());
|
assertNotNull(option.getHit());
|
||||||
|
SearchPhaseResult searchPhaseResult = fetchResults.get(option.getHit().getShard().getShardId().id());
|
||||||
|
assertSame(searchPhaseResult.getSearchShardTarget(), option.getHit().getShard());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -193,8 +201,10 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
||||||
int searchHitsSize, boolean useConstantScore) {
|
int searchHitsSize, boolean useConstantScore) {
|
||||||
AtomicArray<SearchPhaseResult> queryResults = new AtomicArray<>(nShards);
|
AtomicArray<SearchPhaseResult> queryResults = new AtomicArray<>(nShards);
|
||||||
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
|
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
|
||||||
QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex,
|
String clusterAlias = randomBoolean() ? null : "remote";
|
||||||
new SearchShardTarget("", new Index("", ""), shardIndex, null));
|
SearchShardTarget searchShardTarget = new SearchShardTarget("", new ShardId("", "", shardIndex),
|
||||||
|
clusterAlias, OriginalIndices.NONE);
|
||||||
|
QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex, searchShardTarget);
|
||||||
final TopDocs topDocs;
|
final TopDocs topDocs;
|
||||||
float maxScore = 0;
|
float maxScore = 0;
|
||||||
if (searchHitsSize == 0) {
|
if (searchHitsSize == 0) {
|
||||||
|
@ -237,7 +247,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
||||||
return queryResults;
|
return queryResults;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getTotalQueryHits(AtomicArray<SearchPhaseResult> results) {
|
private static int getTotalQueryHits(AtomicArray<SearchPhaseResult> results) {
|
||||||
int resultCount = 0;
|
int resultCount = 0;
|
||||||
for (SearchPhaseResult shardResult : results.asList()) {
|
for (SearchPhaseResult shardResult : results.asList()) {
|
||||||
TopDocs topDocs = shardResult.queryResult().topDocs().topDocs;
|
TopDocs topDocs = shardResult.queryResult().topDocs().topDocs;
|
||||||
|
@ -247,7 +257,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
||||||
return resultCount;
|
return resultCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Suggest reducedSuggest(AtomicArray<SearchPhaseResult> results) {
|
private static Suggest reducedSuggest(AtomicArray<SearchPhaseResult> results) {
|
||||||
Map<String, List<Suggest.Suggestion<CompletionSuggestion.Entry>>> groupedSuggestion = new HashMap<>();
|
Map<String, List<Suggest.Suggestion<CompletionSuggestion.Entry>>> groupedSuggestion = new HashMap<>();
|
||||||
for (SearchPhaseResult entry : results.asList()) {
|
for (SearchPhaseResult entry : results.asList()) {
|
||||||
for (Suggest.Suggestion<?> suggestion : entry.queryResult().suggest()) {
|
for (Suggest.Suggestion<?> suggestion : entry.queryResult().suggest()) {
|
||||||
|
@ -260,11 +270,12 @@ public class SearchPhaseControllerTests extends ESTestCase {
|
||||||
.collect(Collectors.toList()));
|
.collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private AtomicArray<SearchPhaseResult> generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) {
|
private static AtomicArray<SearchPhaseResult> generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) {
|
||||||
AtomicArray<SearchPhaseResult> fetchResults = new AtomicArray<>(nShards);
|
AtomicArray<SearchPhaseResult> fetchResults = new AtomicArray<>(nShards);
|
||||||
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
|
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
|
||||||
float maxScore = -1F;
|
float maxScore = -1F;
|
||||||
SearchShardTarget shardTarget = new SearchShardTarget("", new Index("", ""), shardIndex, null);
|
String clusterAlias = randomBoolean() ? null : "remote";
|
||||||
|
SearchShardTarget shardTarget = new SearchShardTarget("", new ShardId("", "", shardIndex), clusterAlias, OriginalIndices.NONE);
|
||||||
FetchSearchResult fetchSearchResult = new FetchSearchResult(shardIndex, shardTarget);
|
FetchSearchResult fetchSearchResult = new FetchSearchResult(shardIndex, shardTarget);
|
||||||
List<SearchHit> searchHits = new ArrayList<>();
|
List<SearchHit> searchHits = new ArrayList<>();
|
||||||
for (ScoreDoc scoreDoc : mergedSearchDocs) {
|
for (ScoreDoc scoreDoc : mergedSearchDocs) {
|
||||||
|
|
|
@ -17,27 +17,43 @@
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.search;
|
package org.elasticsearch.action.search;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionRequestValidationException;
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
import org.elasticsearch.action.search.SearchRequest;
|
|
||||||
import org.elasticsearch.action.search.SearchType;
|
|
||||||
import org.elasticsearch.action.support.IndicesOptions;
|
import org.elasticsearch.action.support.IndicesOptions;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.ArrayUtils;
|
import org.elasticsearch.common.util.ArrayUtils;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
|
import org.elasticsearch.search.AbstractSearchTestCase;
|
||||||
|
import org.elasticsearch.search.RandomSearchRequestGenerator;
|
||||||
|
import org.elasticsearch.search.Scroll;
|
||||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||||
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
|
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.test.VersionUtils;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Base64;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode;
|
import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode;
|
||||||
|
|
||||||
public class SearchRequestTests extends AbstractSearchTestCase {
|
public class SearchRequestTests extends AbstractSearchTestCase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SearchRequest createSearchRequest() throws IOException {
|
||||||
|
if (randomBoolean()) {
|
||||||
|
return super.createSearchRequest();
|
||||||
|
}
|
||||||
|
//clusterAlias does not have public getter/setter hence we randomize it only in this test specifically.
|
||||||
|
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10));
|
||||||
|
RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder);
|
||||||
|
return searchRequest;
|
||||||
|
}
|
||||||
|
|
||||||
public void testSerialization() throws Exception {
|
public void testSerialization() throws Exception {
|
||||||
SearchRequest searchRequest = createSearchRequest();
|
SearchRequest searchRequest = createSearchRequest();
|
||||||
SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new);
|
SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new);
|
||||||
|
@ -46,6 +62,28 @@ public class SearchRequestTests extends AbstractSearchTestCase {
|
||||||
assertNotSame(deserializedRequest, searchRequest);
|
assertNotSame(deserializedRequest, searchRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testClusterAliasSerialization() throws IOException {
|
||||||
|
SearchRequest searchRequest = createSearchRequest();
|
||||||
|
Version version = VersionUtils.randomVersion(random());
|
||||||
|
SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new, version);
|
||||||
|
//TODO update version after backport
|
||||||
|
if (version.before(Version.V_7_0_0)) {
|
||||||
|
assertNull(deserializedRequest.getLocalClusterAlias());
|
||||||
|
} else {
|
||||||
|
assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO rename and update version after backport
|
||||||
|
public void testReadFromPre7_0_0() throws IOException {
|
||||||
|
String msg = "AAEBBWluZGV4AAAAAQACAAAA/////w8AAAAAAAAA/////w8AAAAAAAACAAAAAAABAAMCBAUBAAKABACAAQIAAA==";
|
||||||
|
try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(msg))) {
|
||||||
|
SearchRequest searchRequest = new SearchRequest(in);
|
||||||
|
assertArrayEquals(new String[]{"index"}, searchRequest.indices());
|
||||||
|
assertNull(searchRequest.getLocalClusterAlias());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testIllegalArguments() {
|
public void testIllegalArguments() {
|
||||||
SearchRequest searchRequest = new SearchRequest();
|
SearchRequest searchRequest = new SearchRequest();
|
||||||
assertNotNull(searchRequest.indices());
|
assertNotNull(searchRequest.indices());
|
||||||
|
@ -140,11 +178,11 @@ public class SearchRequestTests extends AbstractSearchTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testEqualsAndHashcode() throws IOException {
|
public void testEqualsAndHashcode() throws IOException {
|
||||||
checkEqualsAndHashCode(createSearchRequest(), SearchRequestTests::copyRequest, this::mutate);
|
checkEqualsAndHashCode(createSearchRequest(), SearchRequest::new, this::mutate);
|
||||||
}
|
}
|
||||||
|
|
||||||
private SearchRequest mutate(SearchRequest searchRequest) {
|
private SearchRequest mutate(SearchRequest searchRequest) {
|
||||||
SearchRequest mutation = copyRequest(searchRequest);
|
SearchRequest mutation = new SearchRequest(searchRequest);
|
||||||
List<Runnable> mutators = new ArrayList<>();
|
List<Runnable> mutators = new ArrayList<>();
|
||||||
mutators.add(() -> mutation.indices(ArrayUtils.concat(searchRequest.indices(), new String[] { randomAlphaOfLength(10) })));
|
mutators.add(() -> mutation.indices(ArrayUtils.concat(searchRequest.indices(), new String[] { randomAlphaOfLength(10) })));
|
||||||
mutators.add(() -> mutation.indicesOptions(randomValueOtherThan(searchRequest.indicesOptions(),
|
mutators.add(() -> mutation.indicesOptions(randomValueOtherThan(searchRequest.indicesOptions(),
|
||||||
|
@ -161,8 +199,4 @@ public class SearchRequestTests extends AbstractSearchTestCase {
|
||||||
randomFrom(mutators).run();
|
randomFrom(mutators).run();
|
||||||
return mutation;
|
return mutation;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static SearchRequest copyRequest(SearchRequest searchRequest) {
|
|
||||||
return new SearchRequest(searchRequest);
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -0,0 +1,67 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.action.search;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.OriginalIndices;
|
||||||
|
import org.elasticsearch.action.support.IndicesOptions;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.search.SearchShardTarget;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
|
public class SearchShardIteratorTests extends ESTestCase {
|
||||||
|
|
||||||
|
public void testShardId() {
|
||||||
|
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt());
|
||||||
|
SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE);
|
||||||
|
assertSame(shardId, searchShardIterator.shardId());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testGetOriginalIndices() {
|
||||||
|
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt());
|
||||||
|
OriginalIndices originalIndices = new OriginalIndices(new String[]{randomAlphaOfLengthBetween(3, 10)},
|
||||||
|
IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()));
|
||||||
|
SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), originalIndices);
|
||||||
|
assertSame(originalIndices, searchShardIterator.getOriginalIndices());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testGetClusterAlias() {
|
||||||
|
String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10);
|
||||||
|
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt());
|
||||||
|
SearchShardIterator searchShardIterator = new SearchShardIterator(clusterAlias, shardId, Collections.emptyList(),
|
||||||
|
OriginalIndices.NONE);
|
||||||
|
assertEquals(clusterAlias, searchShardIterator.getClusterAlias());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNewSearchShardTarget() {
|
||||||
|
String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10);
|
||||||
|
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt());
|
||||||
|
OriginalIndices originalIndices = new OriginalIndices(new String[]{randomAlphaOfLengthBetween(3, 10)},
|
||||||
|
IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()));
|
||||||
|
SearchShardIterator searchShardIterator = new SearchShardIterator(clusterAlias, shardId, Collections.emptyList(), originalIndices);
|
||||||
|
String nodeId = randomAlphaOfLengthBetween(3, 10);
|
||||||
|
SearchShardTarget searchShardTarget = searchShardIterator.newSearchShardTarget(nodeId);
|
||||||
|
assertEquals(clusterAlias, searchShardTarget.getClusterAlias());
|
||||||
|
assertSame(shardId, searchShardTarget.getShardId());
|
||||||
|
assertEquals(nodeId, searchShardTarget.getNodeId());
|
||||||
|
assertSame(originalIndices, searchShardTarget.getOriginalIndices());
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.action.search;
|
package org.elasticsearch.action.search;
|
||||||
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.OriginalIndices;
|
import org.elasticsearch.action.OriginalIndices;
|
||||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
|
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
|
||||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
|
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
|
||||||
|
@ -33,6 +34,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
import org.elasticsearch.index.query.MatchAllQueryBuilder;
|
import org.elasticsearch.index.query.MatchAllQueryBuilder;
|
||||||
import org.elasticsearch.index.query.TermsQueryBuilder;
|
import org.elasticsearch.index.query.TermsQueryBuilder;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
@ -42,6 +44,10 @@ import org.elasticsearch.test.transport.MockTransportService;
|
||||||
import org.elasticsearch.threadpool.TestThreadPool;
|
import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.RemoteClusterService;
|
import org.elasticsearch.transport.RemoteClusterService;
|
||||||
|
import org.elasticsearch.transport.Transport;
|
||||||
|
import org.elasticsearch.transport.TransportException;
|
||||||
|
import org.elasticsearch.transport.TransportRequest;
|
||||||
|
import org.elasticsearch.transport.TransportRequestOptions;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -50,8 +56,11 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
|
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
|
||||||
|
import static org.hamcrest.CoreMatchers.startsWith;
|
||||||
|
|
||||||
public class TransportSearchActionTests extends ESTestCase {
|
public class TransportSearchActionTests extends ESTestCase {
|
||||||
|
|
||||||
|
@ -109,8 +118,9 @@ public class TransportSearchActionTests extends ESTestCase {
|
||||||
remoteShardIterators.add(remoteShardIterator3);
|
remoteShardIterators.add(remoteShardIterator3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String localClusterAlias = randomBoolean() ? null : "local";
|
||||||
GroupShardsIterator<SearchShardIterator> searchShardIterators = TransportSearchAction.mergeShardsIterators(localShardsIterator,
|
GroupShardsIterator<SearchShardIterator> searchShardIterators = TransportSearchAction.mergeShardsIterators(localShardsIterator,
|
||||||
localIndices, remoteShardIterators);
|
localIndices, localClusterAlias, remoteShardIterators);
|
||||||
|
|
||||||
assertEquals(searchShardIterators.size(), 5);
|
assertEquals(searchShardIterators.size(), 5);
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
@ -120,26 +130,31 @@ public class TransportSearchActionTests extends ESTestCase {
|
||||||
assertEquals("local_index", searchShardIterator.shardId().getIndexName());
|
assertEquals("local_index", searchShardIterator.shardId().getIndexName());
|
||||||
assertEquals(0, searchShardIterator.shardId().getId());
|
assertEquals(0, searchShardIterator.shardId().getId());
|
||||||
assertSame(localIndices, searchShardIterator.getOriginalIndices());
|
assertSame(localIndices, searchShardIterator.getOriginalIndices());
|
||||||
|
assertEquals(localClusterAlias, searchShardIterator.getClusterAlias());
|
||||||
break;
|
break;
|
||||||
case 1:
|
case 1:
|
||||||
assertEquals("local_index_2", searchShardIterator.shardId().getIndexName());
|
assertEquals("local_index_2", searchShardIterator.shardId().getIndexName());
|
||||||
assertEquals(1, searchShardIterator.shardId().getId());
|
assertEquals(1, searchShardIterator.shardId().getId());
|
||||||
assertSame(localIndices, searchShardIterator.getOriginalIndices());
|
assertSame(localIndices, searchShardIterator.getOriginalIndices());
|
||||||
|
assertEquals(localClusterAlias, searchShardIterator.getClusterAlias());
|
||||||
break;
|
break;
|
||||||
case 2:
|
case 2:
|
||||||
assertEquals("remote_index", searchShardIterator.shardId().getIndexName());
|
assertEquals("remote_index", searchShardIterator.shardId().getIndexName());
|
||||||
assertEquals(2, searchShardIterator.shardId().getId());
|
assertEquals(2, searchShardIterator.shardId().getId());
|
||||||
assertSame(remoteIndices, searchShardIterator.getOriginalIndices());
|
assertSame(remoteIndices, searchShardIterator.getOriginalIndices());
|
||||||
|
assertEquals("remote", searchShardIterator.getClusterAlias());
|
||||||
break;
|
break;
|
||||||
case 3:
|
case 3:
|
||||||
assertEquals("remote_index_2", searchShardIterator.shardId().getIndexName());
|
assertEquals("remote_index_2", searchShardIterator.shardId().getIndexName());
|
||||||
assertEquals(3, searchShardIterator.shardId().getId());
|
assertEquals(3, searchShardIterator.shardId().getId());
|
||||||
assertSame(remoteIndices, searchShardIterator.getOriginalIndices());
|
assertSame(remoteIndices, searchShardIterator.getOriginalIndices());
|
||||||
|
assertEquals("remote", searchShardIterator.getClusterAlias());
|
||||||
break;
|
break;
|
||||||
case 4:
|
case 4:
|
||||||
assertEquals("remote_index_3", searchShardIterator.shardId().getIndexName());
|
assertEquals("remote_index_3", searchShardIterator.shardId().getIndexName());
|
||||||
assertEquals(4, searchShardIterator.shardId().getId());
|
assertEquals(4, searchShardIterator.shardId().getId());
|
||||||
assertSame(remoteIndices2, searchShardIterator.getOriginalIndices());
|
assertSame(remoteIndices2, searchShardIterator.getOriginalIndices());
|
||||||
|
assertEquals("remote", searchShardIterator.getClusterAlias());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -239,6 +254,56 @@ public class TransportSearchActionTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testBuildConnectionLookup() {
|
||||||
|
Function<String, DiscoveryNode> localNodes = (nodeId) -> new DiscoveryNode("local-" + nodeId,
|
||||||
|
new TransportAddress(TransportAddress.META_ADDRESS, 1024), Version.CURRENT);
|
||||||
|
BiFunction<String, String, DiscoveryNode> remoteNodes = (clusterAlias, nodeId) -> new DiscoveryNode("remote-" + nodeId,
|
||||||
|
new TransportAddress(TransportAddress.META_ADDRESS, 2048), Version.CURRENT);
|
||||||
|
BiFunction<String, DiscoveryNode, Transport.Connection> nodeToConnection = (clusterAlias, node) -> new Transport.Connection() {
|
||||||
|
@Override
|
||||||
|
public DiscoveryNode getNode() {
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
|
||||||
|
throws TransportException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addCloseListener(ActionListener<Void> listener) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isClosed() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
BiFunction<String, String, Transport.Connection> connectionLookup = TransportSearchAction.buildConnectionLookup(
|
||||||
|
null, localNodes, remoteNodes, nodeToConnection);
|
||||||
|
|
||||||
|
Transport.Connection localConnection = connectionLookup.apply(null, randomAlphaOfLengthBetween(5, 10));
|
||||||
|
assertThat(localConnection.getNode().getId(), startsWith("local-"));
|
||||||
|
Transport.Connection remoteConnection = connectionLookup.apply(randomAlphaOfLengthBetween(5, 10),
|
||||||
|
randomAlphaOfLengthBetween(5, 10));
|
||||||
|
assertThat(remoteConnection.getNode().getId(), startsWith("remote-"));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
String requestClusterAlias = randomAlphaOfLengthBetween(5, 10);
|
||||||
|
BiFunction<String, String, Transport.Connection> connectionLookup = TransportSearchAction.buildConnectionLookup(
|
||||||
|
requestClusterAlias, localNodes, remoteNodes, nodeToConnection);
|
||||||
|
|
||||||
|
Transport.Connection localConnection = connectionLookup.apply(requestClusterAlias, randomAlphaOfLengthBetween(5, 10));
|
||||||
|
assertThat(localConnection.getNode().getId(), startsWith("local-"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testBuildClusters() {
|
public void testBuildClusters() {
|
||||||
OriginalIndices localIndices = randomBoolean() ? null : randomOriginalIndices();
|
OriginalIndices localIndices = randomBoolean() ? null : randomOriginalIndices();
|
||||||
Map<String, OriginalIndices> remoteIndices = new HashMap<>();
|
Map<String, OriginalIndices> remoteIndices = new HashMap<>();
|
||||||
|
|
|
@ -45,7 +45,6 @@ public class TransportSearchHelperTests extends ESTestCase {
|
||||||
array.setOnce(1, testSearchPhaseResult2);
|
array.setOnce(1, testSearchPhaseResult2);
|
||||||
array.setOnce(2, testSearchPhaseResult3);
|
array.setOnce(2, testSearchPhaseResult3);
|
||||||
|
|
||||||
|
|
||||||
String scrollId = TransportSearchHelper.buildScrollId(array);
|
String scrollId = TransportSearchHelper.buildScrollId(array);
|
||||||
ParsedScrollId parseScrollId = TransportSearchHelper.parseScrollId(scrollId);
|
ParsedScrollId parseScrollId = TransportSearchHelper.parseScrollId(scrollId);
|
||||||
assertEquals(3, parseScrollId.getContext().length);
|
assertEquals(3, parseScrollId.getContext().length);
|
||||||
|
|
|
@ -43,11 +43,11 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.endsWith;
|
import static org.hamcrest.Matchers.endsWith;
|
||||||
import static org.hamcrest.Matchers.hasToString;
|
import static org.hamcrest.Matchers.hasToString;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.hamcrest.Matchers.not;
|
import static org.hamcrest.Matchers.not;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
|
||||||
import static org.hamcrest.Matchers.startsWith;
|
import static org.hamcrest.Matchers.startsWith;
|
||||||
|
|
||||||
public class SearchSlowLogTests extends ESSingleNodeTestCase {
|
public class SearchSlowLogTests extends ESSingleNodeTestCase {
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.lucene.search.QueryCachingPolicy;
|
||||||
import org.apache.lucene.search.Sort;
|
import org.apache.lucene.search.Sort;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.action.OriginalIndices;
|
||||||
import org.elasticsearch.action.search.SearchType;
|
import org.elasticsearch.action.search.SearchType;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -110,10 +111,12 @@ public class DefaultSearchContextTests extends ESTestCase {
|
||||||
try (Directory dir = newDirectory();
|
try (Directory dir = newDirectory();
|
||||||
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
|
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
|
||||||
IndexReader reader = w.getReader();
|
IndexReader reader = w.getReader();
|
||||||
Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader), reader::close)) {
|
Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader), reader)) {
|
||||||
|
|
||||||
DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, null, searcher, null, indexService,
|
SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE);
|
||||||
indexShard, bigArrays, null, timeout, null, null, Version.CURRENT);
|
|
||||||
|
DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, target, searcher, null, indexService,
|
||||||
|
indexShard, bigArrays, null, timeout, null, Version.CURRENT);
|
||||||
context1.from(300);
|
context1.from(300);
|
||||||
|
|
||||||
// resultWindow greater than maxResultWindow and scrollContext is null
|
// resultWindow greater than maxResultWindow and scrollContext is null
|
||||||
|
@ -153,8 +156,8 @@ public class DefaultSearchContextTests extends ESTestCase {
|
||||||
+ "] index level setting."));
|
+ "] index level setting."));
|
||||||
|
|
||||||
// rescore is null but sliceBuilder is not null
|
// rescore is null but sliceBuilder is not null
|
||||||
DefaultSearchContext context2 = new DefaultSearchContext(2L, shardSearchRequest, null, searcher,
|
DefaultSearchContext context2 = new DefaultSearchContext(2L, shardSearchRequest, target, searcher,
|
||||||
null, indexService, indexShard, bigArrays, null, timeout, null, null, Version.CURRENT);
|
null, indexService, indexShard, bigArrays, null, timeout, null, Version.CURRENT);
|
||||||
|
|
||||||
SliceBuilder sliceBuilder = mock(SliceBuilder.class);
|
SliceBuilder sliceBuilder = mock(SliceBuilder.class);
|
||||||
int numSlices = maxSlicesPerScroll + randomIntBetween(1, 100);
|
int numSlices = maxSlicesPerScroll + randomIntBetween(1, 100);
|
||||||
|
@ -170,8 +173,8 @@ public class DefaultSearchContextTests extends ESTestCase {
|
||||||
when(shardSearchRequest.getAliasFilter()).thenReturn(AliasFilter.EMPTY);
|
when(shardSearchRequest.getAliasFilter()).thenReturn(AliasFilter.EMPTY);
|
||||||
when(shardSearchRequest.indexBoost()).thenReturn(AbstractQueryBuilder.DEFAULT_BOOST);
|
when(shardSearchRequest.indexBoost()).thenReturn(AbstractQueryBuilder.DEFAULT_BOOST);
|
||||||
|
|
||||||
DefaultSearchContext context3 = new DefaultSearchContext(3L, shardSearchRequest, null, searcher, null,
|
DefaultSearchContext context3 = new DefaultSearchContext(3L, shardSearchRequest, target, searcher, null,
|
||||||
indexService, indexShard, bigArrays, null, timeout, null, null, Version.CURRENT);
|
indexService, indexShard, bigArrays, null, timeout, null, Version.CURRENT);
|
||||||
ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery();
|
ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery();
|
||||||
context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false);
|
context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false);
|
||||||
assertEquals(context3.query(), context3.buildFilteredQuery(parsedQuery.query()));
|
assertEquals(context3.query(), context3.buildFilteredQuery(parsedQuery.query()));
|
||||||
|
|
|
@ -21,13 +21,12 @@ package org.elasticsearch.search;
|
||||||
|
|
||||||
import org.apache.lucene.search.Explanation;
|
import org.apache.lucene.search.Explanation;
|
||||||
import org.apache.lucene.search.TotalHits;
|
import org.apache.lucene.search.TotalHits;
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.OriginalIndices;
|
import org.elasticsearch.action.OriginalIndices;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.document.DocumentField;
|
import org.elasticsearch.common.document.DocumentField;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|
||||||
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
|
|
||||||
import org.elasticsearch.common.text.Text;
|
import org.elasticsearch.common.text.Text;
|
||||||
import org.elasticsearch.common.xcontent.ToXContent;
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
@ -42,9 +41,9 @@ import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
|
||||||
import org.elasticsearch.search.fetch.subphase.highlight.HighlightFieldTests;
|
import org.elasticsearch.search.fetch.subphase.highlight.HighlightFieldTests;
|
||||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||||
import org.elasticsearch.test.RandomObjects;
|
import org.elasticsearch.test.RandomObjects;
|
||||||
|
import org.elasticsearch.test.VersionUtils;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -214,7 +213,8 @@ public class SearchHitTests extends AbstractStreamableTestCase<SearchHit> {
|
||||||
|
|
||||||
public void testSerializeShardTarget() throws Exception {
|
public void testSerializeShardTarget() throws Exception {
|
||||||
String clusterAlias = randomBoolean() ? null : "cluster_alias";
|
String clusterAlias = randomBoolean() ? null : "cluster_alias";
|
||||||
SearchShardTarget target = new SearchShardTarget("_node_id", new Index("_index", "_na_"), 0, clusterAlias);
|
SearchShardTarget target = new SearchShardTarget("_node_id", new ShardId(new Index("_index", "_na_"), 0),
|
||||||
|
clusterAlias, OriginalIndices.NONE);
|
||||||
|
|
||||||
Map<String, SearchHits> innerHits = new HashMap<>();
|
Map<String, SearchHits> innerHits = new HashMap<>();
|
||||||
SearchHit innerHit1 = new SearchHit(0, "_id", new Text("_type"), null);
|
SearchHit innerHit1 = new SearchHit(0, "_id", new Text("_type"), null);
|
||||||
|
@ -240,12 +240,10 @@ public class SearchHitTests extends AbstractStreamableTestCase<SearchHit> {
|
||||||
|
|
||||||
SearchHits hits = new SearchHits(new SearchHit[]{hit1, hit2}, new TotalHits(2, TotalHits.Relation.EQUAL_TO), 1f);
|
SearchHits hits = new SearchHits(new SearchHit[]{hit1, hit2}, new TotalHits(2, TotalHits.Relation.EQUAL_TO), 1f);
|
||||||
|
|
||||||
|
Version version = VersionUtils.randomVersion(random());
|
||||||
BytesStreamOutput output = new BytesStreamOutput();
|
SearchHits results = copyStreamable(hits, getNamedWriteableRegistry(), SearchHits::new, version);
|
||||||
hits.writeTo(output);
|
SearchShardTarget deserializedTarget = results.getAt(0).getShard();
|
||||||
InputStream input = output.bytes().streamInput();
|
assertThat(deserializedTarget, equalTo(target));
|
||||||
SearchHits results = SearchHits.readSearchHits(new InputStreamStreamInput(input));
|
|
||||||
assertThat(results.getAt(0).getShard(), equalTo(target));
|
|
||||||
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getShard(), notNullValue());
|
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getShard(), notNullValue());
|
||||||
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getInnerHits().get("1").getAt(0).getShard(), notNullValue());
|
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getInnerHits().get("1").getAt(0).getShard(), notNullValue());
|
||||||
assertThat(results.getAt(0).getInnerHits().get("1").getAt(1).getShard(), notNullValue());
|
assertThat(results.getAt(0).getInnerHits().get("1").getAt(1).getShard(), notNullValue());
|
||||||
|
@ -260,7 +258,6 @@ public class SearchHitTests extends AbstractStreamableTestCase<SearchHit> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assertThat(results.getAt(1).getShard(), equalTo(target));
|
assertThat(results.getAt(1).getShard(), equalTo(target));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,12 +23,14 @@ import org.apache.lucene.search.Query;
|
||||||
import org.apache.lucene.store.AlreadyClosedException;
|
import org.apache.lucene.store.AlreadyClosedException;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.OriginalIndices;
|
||||||
import org.elasticsearch.action.index.IndexResponse;
|
import org.elasticsearch.action.index.IndexResponse;
|
||||||
|
import org.elasticsearch.action.search.ClearScrollRequest;
|
||||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||||
|
import org.elasticsearch.action.search.SearchRequest;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.action.search.SearchTask;
|
import org.elasticsearch.action.search.SearchTask;
|
||||||
import org.elasticsearch.action.search.SearchType;
|
import org.elasticsearch.action.search.SearchType;
|
||||||
import org.elasticsearch.action.search.ClearScrollRequest;
|
|
||||||
import org.elasticsearch.action.support.IndicesOptions;
|
import org.elasticsearch.action.support.IndicesOptions;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
import org.elasticsearch.action.support.WriteRequest;
|
import org.elasticsearch.action.support.WriteRequest;
|
||||||
|
@ -71,14 +73,16 @@ import org.elasticsearch.search.fetch.ShardFetchRequest;
|
||||||
import org.elasticsearch.search.internal.AliasFilter;
|
import org.elasticsearch.search.internal.AliasFilter;
|
||||||
import org.elasticsearch.search.internal.SearchContext;
|
import org.elasticsearch.search.internal.SearchContext;
|
||||||
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
|
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
|
||||||
|
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
|
||||||
import org.elasticsearch.search.suggest.SuggestBuilder;
|
import org.elasticsearch.search.suggest.SuggestBuilder;
|
||||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
@ -114,7 +118,6 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
||||||
|
|
||||||
static final String DUMMY_SCRIPT = "dummyScript";
|
static final String DUMMY_SCRIPT = "dummyScript";
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
|
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
|
||||||
return Collections.singletonMap(DUMMY_SCRIPT, vars -> "dummy");
|
return Collections.singletonMap(DUMMY_SCRIPT, vars -> "dummy");
|
||||||
|
@ -151,7 +154,6 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -637,4 +639,28 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
|
||||||
reduceContext.consumeBucketsAndMaybeBreak(MultiBucketConsumerService.DEFAULT_MAX_BUCKETS + 1);
|
reduceContext.consumeBucketsAndMaybeBreak(MultiBucketConsumerService.DEFAULT_MAX_BUCKETS + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCreateSearchContext() throws IOException {
|
||||||
|
String index = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT);
|
||||||
|
IndexService indexService = createIndex(index);
|
||||||
|
final SearchService service = getInstanceFromNode(SearchService.class);
|
||||||
|
ShardId shardId = new ShardId(indexService.index(), 0);
|
||||||
|
long nowInMillis = System.currentTimeMillis();
|
||||||
|
String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10);
|
||||||
|
SearchRequest searchRequest = new SearchRequest();
|
||||||
|
searchRequest.allowPartialSearchResults(randomBoolean());
|
||||||
|
ShardSearchTransportRequest request = new ShardSearchTransportRequest(OriginalIndices.NONE, searchRequest, shardId,
|
||||||
|
indexService.numberOfShards(), AliasFilter.EMPTY, 1f, nowInMillis, clusterAlias, Strings.EMPTY_ARRAY);
|
||||||
|
DefaultSearchContext searchContext = service.createSearchContext(request, new TimeValue(System.currentTimeMillis()));
|
||||||
|
SearchShardTarget searchShardTarget = searchContext.shardTarget();
|
||||||
|
QueryShardContext queryShardContext = searchContext.getQueryShardContext();
|
||||||
|
String expectedIndexName = clusterAlias == null ? index : clusterAlias + ":" + index;
|
||||||
|
assertEquals(expectedIndexName, queryShardContext.getFullyQualifiedIndex().getName());
|
||||||
|
assertEquals(expectedIndexName, searchShardTarget.getFullyQualifiedIndexName());
|
||||||
|
assertEquals(clusterAlias, searchShardTarget.getClusterAlias());
|
||||||
|
assertEquals(shardId, searchShardTarget.getShardId());
|
||||||
|
assertSame(searchShardTarget, searchContext.dfsResult().getSearchShardTarget());
|
||||||
|
assertSame(searchShardTarget, searchContext.queryResult().getSearchShardTarget());
|
||||||
|
assertSame(searchShardTarget, searchContext.fetchResult().getSearchShardTarget());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,9 +28,6 @@ import org.elasticsearch.common.CheckedFunction;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.compress.CompressedXContent;
|
import org.elasticsearch.common.compress.CompressedXContent;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|
||||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||||
import org.elasticsearch.common.xcontent.ToXContent;
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
|
@ -59,10 +56,8 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase {
|
||||||
|
|
||||||
public void testSerialization() throws Exception {
|
public void testSerialization() throws Exception {
|
||||||
ShardSearchTransportRequest shardSearchTransportRequest = createShardSearchTransportRequest();
|
ShardSearchTransportRequest shardSearchTransportRequest = createShardSearchTransportRequest();
|
||||||
try (BytesStreamOutput output = new BytesStreamOutput()) {
|
ShardSearchTransportRequest deserializedRequest =
|
||||||
shardSearchTransportRequest.writeTo(output);
|
copyWriteable(shardSearchTransportRequest, namedWriteableRegistry, ShardSearchTransportRequest::new);
|
||||||
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
|
|
||||||
ShardSearchTransportRequest deserializedRequest = new ShardSearchTransportRequest(in);
|
|
||||||
assertEquals(deserializedRequest.scroll(), shardSearchTransportRequest.scroll());
|
assertEquals(deserializedRequest.scroll(), shardSearchTransportRequest.scroll());
|
||||||
assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter());
|
assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter());
|
||||||
assertArrayEquals(deserializedRequest.indices(), shardSearchTransportRequest.indices());
|
assertArrayEquals(deserializedRequest.indices(), shardSearchTransportRequest.indices());
|
||||||
|
@ -74,14 +69,13 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase {
|
||||||
assertEquals(deserializedRequest.searchType(), shardSearchTransportRequest.searchType());
|
assertEquals(deserializedRequest.searchType(), shardSearchTransportRequest.searchType());
|
||||||
assertEquals(deserializedRequest.shardId(), shardSearchTransportRequest.shardId());
|
assertEquals(deserializedRequest.shardId(), shardSearchTransportRequest.shardId());
|
||||||
assertEquals(deserializedRequest.numberOfShards(), shardSearchTransportRequest.numberOfShards());
|
assertEquals(deserializedRequest.numberOfShards(), shardSearchTransportRequest.numberOfShards());
|
||||||
assertEquals(deserializedRequest.indexRoutings(), shardSearchTransportRequest.indexRoutings());
|
assertArrayEquals(deserializedRequest.indexRoutings(), shardSearchTransportRequest.indexRoutings());
|
||||||
assertEquals(deserializedRequest.preference(), shardSearchTransportRequest.preference());
|
assertEquals(deserializedRequest.preference(), shardSearchTransportRequest.preference());
|
||||||
assertEquals(deserializedRequest.cacheKey(), shardSearchTransportRequest.cacheKey());
|
assertEquals(deserializedRequest.cacheKey(), shardSearchTransportRequest.cacheKey());
|
||||||
assertNotSame(deserializedRequest, shardSearchTransportRequest);
|
assertNotSame(deserializedRequest, shardSearchTransportRequest);
|
||||||
assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter());
|
assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter());
|
||||||
assertEquals(deserializedRequest.indexBoost(), shardSearchTransportRequest.indexBoost(), 0.0f);
|
assertEquals(deserializedRequest.indexBoost(), shardSearchTransportRequest.indexBoost(), 0.0f);
|
||||||
}
|
assertEquals(deserializedRequest.getClusterAlias(), shardSearchTransportRequest.getClusterAlias());
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ShardSearchTransportRequest createShardSearchTransportRequest() throws IOException {
|
private ShardSearchTransportRequest createShardSearchTransportRequest() throws IOException {
|
||||||
|
@ -97,7 +91,7 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase {
|
||||||
final String[] routings = generateRandomStringArray(5, 10, false, true);
|
final String[] routings = generateRandomStringArray(5, 10, false, true);
|
||||||
return new ShardSearchTransportRequest(new OriginalIndices(searchRequest), searchRequest, shardId,
|
return new ShardSearchTransportRequest(new OriginalIndices(searchRequest), searchRequest, shardId,
|
||||||
randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(),
|
randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(),
|
||||||
Math.abs(randomLong()), null, routings);
|
Math.abs(randomLong()), randomAlphaOfLengthBetween(3, 10), routings);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFilteringAliases() throws Exception {
|
public void testFilteringAliases() throws Exception {
|
||||||
|
@ -154,8 +148,7 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private IndexMetaData remove(IndexMetaData indexMetaData, String alias) {
|
private IndexMetaData remove(IndexMetaData indexMetaData, String alias) {
|
||||||
IndexMetaData build = IndexMetaData.builder(indexMetaData).removeAlias(alias).build();
|
return IndexMetaData.builder(indexMetaData).removeAlias(alias).build();
|
||||||
return build;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private IndexMetaData add(IndexMetaData indexMetaData, String alias, @Nullable CompressedXContent filter) {
|
private IndexMetaData add(IndexMetaData indexMetaData, String alias, @Nullable CompressedXContent filter) {
|
||||||
|
|
|
@ -83,7 +83,17 @@ public class RandomSearchRequestGenerator {
|
||||||
* {@link #randomSearchSourceBuilder(Supplier, Supplier, Supplier, Supplier, Supplier)}.
|
* {@link #randomSearchSourceBuilder(Supplier, Supplier, Supplier, Supplier, Supplier)}.
|
||||||
*/
|
*/
|
||||||
public static SearchRequest randomSearchRequest(Supplier<SearchSourceBuilder> randomSearchSourceBuilder) {
|
public static SearchRequest randomSearchRequest(Supplier<SearchSourceBuilder> randomSearchSourceBuilder) {
|
||||||
SearchRequest searchRequest = new SearchRequest();
|
return randomSearchRequest(new SearchRequest(), randomSearchSourceBuilder);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set random fields to the provided search request.
|
||||||
|
*
|
||||||
|
* @param searchRequest the search request
|
||||||
|
* @param randomSearchSourceBuilder builds a random {@link SearchSourceBuilder}. You can use
|
||||||
|
* {@link #randomSearchSourceBuilder(Supplier, Supplier, Supplier, Supplier, Supplier)}.
|
||||||
|
*/
|
||||||
|
public static SearchRequest randomSearchRequest(SearchRequest searchRequest, Supplier<SearchSourceBuilder> randomSearchSourceBuilder) {
|
||||||
searchRequest.allowPartialSearchResults(true);
|
searchRequest.allowPartialSearchResults(true);
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
searchRequest.indices(generateRandomStringArray(10, 10, false, false));
|
searchRequest.indices(generateRandomStringArray(10, 10, false, false));
|
||||||
|
|
Loading…
Reference in New Issue