diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 27293e8e50f..2da609e906c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -318,8 +318,8 @@ abstract class AbstractSearchAsyncAction exten listener.onFailure(e); } + @Override public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt) { - String clusterAlias = shardIt.getClusterAlias(); AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID()); assert filter != null; float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST); @@ -327,7 +327,7 @@ abstract class AbstractSearchAsyncAction exten final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet()) .toArray(new String[0]); return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(), - filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias, routings); + filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings); } /** diff --git a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index 4e0db464478..30ad0529f5c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -90,8 +90,7 @@ abstract class InitialSearchPhase extends final SearchShardIterator shardIt, Exception e) { // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard - SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId(), shardIt.getClusterAlias(), - shardIt.getOriginalIndices()); + SearchShardTarget shardTarget = shardIt.newSearchShardTarget(nodeId); onShardFailure(shardIndex, shardTarget, e); if (totalOps.incrementAndGet() == expectedTotalOps) { @@ -257,8 +256,8 @@ abstract class InitialSearchPhase extends Runnable r = () -> { final Thread thread = Thread.currentThread(); try { - executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard.currentNodeId(), - shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) { + executePhaseOnShard(shardIt, shard, new SearchActionListener( + shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) { @Override public void innerOnResponse(FirstResult result) { try { diff --git a/server/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java b/server/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java index fc6585054dd..18b61516897 100644 --- a/server/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java +++ b/server/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java @@ -36,6 +36,7 @@ class ScrollIdForNode { return node; } + @Nullable public String getClusterAlias() { return clusterAlias; } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 68968c071f4..9789e03c836 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -62,6 +62,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128; public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512; + private final String localClusterAlias; + private SearchType searchType = SearchType.DEFAULT; private String[] indices = Strings.EMPTY_ARRAY; @@ -92,6 +94,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; public SearchRequest() { + this.localClusterAlias = null; } /** @@ -111,6 +114,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest this.searchType = searchRequest.searchType; this.source = searchRequest.source; this.types = searchRequest.types; + this.localClusterAlias = searchRequest.localClusterAlias; } /** @@ -125,6 +129,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest * Constructs a new search request against the provided indices with the given search source. */ public SearchRequest(String[] indices, SearchSourceBuilder source) { + this(); if (source == null) { throw new IllegalArgumentException("source must not be null"); } @@ -132,6 +137,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest this.source = source; } + /** + * Creates a new search request by providing the alias of the cluster where it will be executed. Used when a {@link SearchRequest} + * is created and executed as part of a cross-cluster search request performing local reduction on each cluster. + * The coordinating CCS node provides the alias to prefix index names with in the returned search results. + */ + SearchRequest(String localClusterAlias) { + this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null"); + } + /** * Constructs a new search request from reading the specified stream. * @@ -158,6 +172,12 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest if (in.getVersion().onOrAfter(Version.V_6_3_0)) { allowPartialSearchResults = in.readOptionalBoolean(); } + //TODO update version after backport + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + localClusterAlias = in.readOptionalString(); + } else { + localClusterAlias = null; + } } @Override @@ -181,6 +201,10 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest if (out.getVersion().onOrAfter(Version.V_6_3_0)) { out.writeOptionalBoolean(allowPartialSearchResults); } + //TODO update version after backport + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeOptionalString(localClusterAlias); + } } @Override @@ -209,6 +233,16 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest return validationException; } + /** + * Returns the alias of the cluster that this search request is being executed on. A non-null value indicates that this search request + * is being executed as part of a locally reduced cross-cluster search request. The cluster alias is used to prefix index names + * returned as part of search hits with the alias of the cluster they came from. + */ + @Nullable + String getLocalClusterAlias() { + return localClusterAlias; + } + /** * Sets the indices the search will be executed on. */ @@ -529,14 +563,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest Objects.equals(maxConcurrentShardRequests, that.maxConcurrentShardRequests) && Objects.equals(preFilterShardSize, that.preFilterShardSize) && Objects.equals(indicesOptions, that.indicesOptions) && - Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults); + Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) && + Objects.equals(localClusterAlias, that.localClusterAlias); } @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize, - allowPartialSearchResults); + allowPartialSearchResults, localClusterAlias); } @Override @@ -554,6 +589,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest ", batchedReduceSize=" + batchedReduceSize + ", preFilterShardSize=" + preFilterShardSize + ", allowPartialSearchResults=" + allowPartialSearchResults + + ", localClusterAlias=" + localClusterAlias + ", source=" + source + '}'; } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java b/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java index c36d2b7908f..be3b5d7a9c2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java @@ -22,28 +22,34 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.cluster.routing.PlainShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Nullable; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchShardTarget; import java.util.List; /** * Extension of {@link PlainShardIterator} used in the search api, which also holds the {@link OriginalIndices} - * of the search request. Useful especially with cross cluster search, as each cluster has its own set of original indices. + * of the search request (useful especially with cross-cluster search, as each cluster has its own set of original indices) as well as + * the cluster alias. + * @see OriginalIndices */ public final class SearchShardIterator extends PlainShardIterator { private final OriginalIndices originalIndices; - private String clusterAlias; + private final String clusterAlias; private boolean skip = false; /** * Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards * this the a given shardId. * + * @param clusterAlias the alias of the cluster where the shard is located * @param shardId shard id of the group * @param shards shards to iterate + * @param originalIndices the indices that the search request originally related to (before any rewriting happened) */ - public SearchShardIterator(String clusterAlias, ShardId shardId, List shards, OriginalIndices originalIndices) { + public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List shards, OriginalIndices originalIndices) { super(shardId, shards); this.originalIndices = originalIndices; this.clusterAlias = clusterAlias; @@ -56,10 +62,22 @@ public final class SearchShardIterator extends PlainShardIterator { return originalIndices; } + /** + * Returns the alias of the cluster where the shard is located. + */ + @Nullable public String getClusterAlias() { return clusterAlias; } + /** + * Creates a new shard target from this iterator, pointing at the node identified by the provided identifier. + * @see SearchShardTarget + */ + SearchShardTarget newSearchShardTarget(String nodeId) { + return new SearchShardTarget(nodeId, shardId(), clusterAlias, originalIndices); + } + /** * Reset the iterator and mark it as skippable * @see #skip() diff --git a/server/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java b/server/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java index ddfadfa57e3..451ceda70fd 100644 --- a/server/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java +++ b/server/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java @@ -98,8 +98,8 @@ public class ShardSearchFailure extends ShardOperationFailedException { @Override public void readFrom(StreamInput in) throws IOException { - if (in.readBoolean()) { - shardTarget = new SearchShardTarget(in); + shardTarget = in.readOptionalWriteable(SearchShardTarget::new); + if (shardTarget != null) { index = shardTarget.getFullyQualifiedIndexName(); shardId = shardTarget.getShardId().getId(); } @@ -110,12 +110,7 @@ public class ShardSearchFailure extends ShardOperationFailedException { @Override public void writeTo(StreamOutput out) throws IOException { - if (shardTarget == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - shardTarget.writeTo(out); - } + out.writeOptionalWriteable(shardTarget); out.writeString(reason); RestStatus.writeTo(out, status); out.writeException(cause); @@ -175,7 +170,7 @@ public class ShardSearchFailure extends ShardOperationFailedException { SearchShardTarget searchShardTarget = null; if (nodeId != null) { searchShardTarget = new SearchShardTarget(nodeId, - new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), clusterAlias, OriginalIndices.NONE); + new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), clusterAlias, OriginalIndices.NONE); } return new ShardSearchFailure(exception, searchShardTarget); } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 4c1a953965f..9df930544e6 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; @@ -60,6 +61,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.LongSupplier; import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH; @@ -311,7 +313,7 @@ public class TransportSearchAction extends HandledTransportAction localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts); GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, localIndices, - remoteShardIterators); + searchRequest.getLocalClusterAlias(), remoteShardIterators); failIfOverShardCountLimit(clusterService, shardIterators.size()); @@ -338,19 +340,34 @@ public class TransportSearchAction extends HandledTransportAction connectionLookup = (clusterName, nodeId) -> { - final DiscoveryNode discoveryNode = clusterName == null ? nodes.get(nodeId) : remoteConnections.apply(clusterName, nodeId); - if (discoveryNode == null) { - throw new IllegalStateException("no node found for id: " + nodeId); - } - return searchTransportService.getConnection(clusterName, discoveryNode); - }; + BiFunction connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(), + nodes::get, remoteConnections, searchTransportService::getConnection); boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators); searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(), Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start(); } - private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupShardsIterator shardIterators) { + static BiFunction buildConnectionLookup(String requestClusterAlias, + Function localNodes, + BiFunction remoteNodes, + BiFunction nodeToConnection) { + return (clusterAlias, nodeId) -> { + final DiscoveryNode discoveryNode; + if (clusterAlias == null || requestClusterAlias != null) { + assert requestClusterAlias == null || requestClusterAlias.equals(clusterAlias); + discoveryNode = localNodes.apply(nodeId); + } else { + discoveryNode = remoteNodes.apply(clusterAlias, nodeId); + } + if (discoveryNode == null) { + throw new IllegalStateException("no node found for id: " + nodeId); + } + return nodeToConnection.apply(clusterAlias, discoveryNode); + }; + } + + private static boolean shouldPreFilterSearchShards(SearchRequest searchRequest, + GroupShardsIterator shardIterators) { SearchSourceBuilder source = searchRequest.source(); return searchRequest.searchType() == QUERY_THEN_FETCH && // we can't do this for DFS it needs to fan out to all shards all the time SearchService.canRewriteToMatchNone(source) && @@ -359,10 +376,11 @@ public class TransportSearchAction extends HandledTransportAction mergeShardsIterators(GroupShardsIterator localShardsIterator, OriginalIndices localIndices, + @Nullable String localClusterAlias, List remoteShardIterators) { List shards = new ArrayList<>(remoteShardIterators); for (ShardIterator shardIterator : localShardsIterator) { - shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices)); + shards.add(new SearchShardIterator(localClusterAlias, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices)); } return new GroupShardsIterator<>(shards); } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java index 7a0bb63478c..c848e227af4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java @@ -44,8 +44,8 @@ final class TransportSearchHelper { out.writeLong(searchPhaseResult.getRequestId()); SearchShardTarget searchShardTarget = searchPhaseResult.getSearchShardTarget(); if (searchShardTarget.getClusterAlias() != null) { - out.writeString(RemoteClusterAware.buildRemoteIndexName(searchShardTarget.getClusterAlias(), - searchShardTarget.getNodeId())); + out.writeString( + RemoteClusterAware.buildRemoteIndexName(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId())); } else { out.writeString(searchShardTarget.getNodeId()); } diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 091fd5f8c85..05e90214e15 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -159,7 +159,7 @@ final class DefaultSearchContext extends SearchContext { DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService, IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter, TimeValue timeout, - FetchPhase fetchPhase, String clusterAlias, Version minNodeVersion) { + FetchPhase fetchPhase, Version minNodeVersion) { this.id = id; this.request = request; this.fetchPhase = fetchPhase; @@ -179,7 +179,7 @@ final class DefaultSearchContext extends SearchContext { this.timeout = timeout; this.minNodeVersion = minNodeVersion; queryShardContext = indexService.newQueryShardContext(request.shardId().id(), searcher.getIndexReader(), request::nowInMillis, - clusterAlias); + shardTarget.getClusterAlias()); queryShardContext.setTypes(request.types()); queryBoost = request.indexBoost(); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 98f2e1d2e7e..500e70a65b4 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -668,8 +668,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv return context; } - public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) - throws IOException { + public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException { return createSearchContext(request, timeout, true, "search"); } @@ -684,7 +683,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, clusterService, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, - fetchPhase, request.getClusterAlias(), clusterService.state().nodes().getMinNodeVersion()); + fetchPhase, clusterService.state().nodes().getMinNodeVersion()); boolean success = false; try { // we clone the query shard context here just for rewriting otherwise we diff --git a/server/src/main/java/org/elasticsearch/search/SearchShardTarget.java b/server/src/main/java/org/elasticsearch/search/SearchShardTarget.java index 4a46c7202d1..42f3b67e358 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchShardTarget.java +++ b/server/src/main/java/org/elasticsearch/search/SearchShardTarget.java @@ -30,6 +30,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.transport.RemoteClusterAware; import java.io.IOException; +import java.util.Objects; /** * The target that the search request was executed on. @@ -54,7 +55,7 @@ public final class SearchShardTarget implements Writeable, Comparablenull if the request if targeted to the local - * cluster. + * Returns the cluster alias in case the request is part of a cross-cluster search request, null otherwise. */ + @Nullable String getClusterAlias(); Rewriteable getRewriteable(); diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java index e7aad0bd517..59d1c2e089e 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -51,12 +52,9 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha private ShardSearchLocalRequest shardSearchLocalRequest; - public ShardSearchTransportRequest(){ - } - public ShardSearchTransportRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, int numberOfShards, AliasFilter aliasFilter, float indexBoost, long nowInMillis, - String clusterAlias, String[] indexRoutings) { + @Nullable String clusterAlias, String[] indexRoutings) { this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, numberOfShards, aliasFilter, indexBoost, nowInMillis, clusterAlias, indexRoutings); this.originalIndices = originalIndices; diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 193878e2f5e..70f70268a0a 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -116,7 +116,8 @@ public class AbstractSearchAsyncActionTests extends ESTestCase { public void testBuildShardSearchTransportRequest() { final AtomicLong expected = new AtomicLong(); AbstractSearchAsyncAction action = createAction(false, expected); - SearchShardIterator iterator = new SearchShardIterator("test-cluster", new ShardId(new Index("name", "foo"), 1), + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + SearchShardIterator iterator = new SearchShardIterator(clusterAlias, new ShardId(new Index("name", "foo"), 1), Collections.emptyList(), new OriginalIndices(new String[] {"name", "name1"}, IndicesOptions.strictExpand())); ShardSearchTransportRequest shardSearchTransportRequest = action.buildShardSearchRequest(iterator); assertEquals(IndicesOptions.strictExpand(), shardSearchTransportRequest.indicesOptions()); @@ -126,5 +127,6 @@ public class AbstractSearchAsyncActionTests extends ESTestCase { assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices()); assertArrayEquals(new String[] {"bar", "baz"}, shardSearchTransportRequest.indexRoutings()); assertEquals("_shards:1,3", shardSearchTransportRequest.preference()); + assertEquals(clusterAlias, shardSearchTransportRequest.getClusterAlias()); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 4a8afe22b18..585108fef8a 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -29,12 +29,14 @@ import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.TotalHits.Relation; import org.apache.lucene.search.grouping.CollapseTopFieldDocs; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; @@ -157,14 +159,18 @@ public class SearchPhaseControllerTests extends ESTestCase { for (boolean trackTotalHits : new boolean[] {true, false}) { SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList(), false, trackTotalHits); - AtomicArray searchPhaseResultAtomicArray = generateFetchResults(nShards, + AtomicArray fetchResults = generateFetchResults(nShards, reducedQueryPhase.sortedTopDocs.scoreDocs, reducedQueryPhase.suggest); InternalSearchResponse mergedResponse = searchPhaseController.merge(false, reducedQueryPhase, - searchPhaseResultAtomicArray.asList(), searchPhaseResultAtomicArray::get); + fetchResults.asList(), fetchResults::get); if (trackTotalHits == false) { assertNull(mergedResponse.hits.getTotalHits()); } + for (SearchHit hit : mergedResponse.hits().getHits()) { + SearchPhaseResult searchPhaseResult = fetchResults.get(hit.getShard().getShardId().id()); + assertSame(searchPhaseResult.getSearchShardTarget(), hit.getShard()); + } int suggestSize = 0; for (Suggest.Suggestion s : reducedQueryPhase.suggest) { Stream stream = s.getEntries().stream(); @@ -182,6 +188,8 @@ public class SearchPhaseControllerTests extends ESTestCase { assertThat(options.size(), equalTo(suggestion.getEntries().get(0).getOptions().size())); for (CompletionSuggestion.Entry.Option option : options) { assertNotNull(option.getHit()); + SearchPhaseResult searchPhaseResult = fetchResults.get(option.getHit().getShard().getShardId().id()); + assertSame(searchPhaseResult.getSearchShardTarget(), option.getHit().getShard()); } } } @@ -193,8 +201,10 @@ public class SearchPhaseControllerTests extends ESTestCase { int searchHitsSize, boolean useConstantScore) { AtomicArray queryResults = new AtomicArray<>(nShards); for (int shardIndex = 0; shardIndex < nShards; shardIndex++) { - QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex, - new SearchShardTarget("", new Index("", ""), shardIndex, null)); + String clusterAlias = randomBoolean() ? null : "remote"; + SearchShardTarget searchShardTarget = new SearchShardTarget("", new ShardId("", "", shardIndex), + clusterAlias, OriginalIndices.NONE); + QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex, searchShardTarget); final TopDocs topDocs; float maxScore = 0; if (searchHitsSize == 0) { @@ -237,7 +247,7 @@ public class SearchPhaseControllerTests extends ESTestCase { return queryResults; } - private int getTotalQueryHits(AtomicArray results) { + private static int getTotalQueryHits(AtomicArray results) { int resultCount = 0; for (SearchPhaseResult shardResult : results.asList()) { TopDocs topDocs = shardResult.queryResult().topDocs().topDocs; @@ -247,7 +257,7 @@ public class SearchPhaseControllerTests extends ESTestCase { return resultCount; } - private Suggest reducedSuggest(AtomicArray results) { + private static Suggest reducedSuggest(AtomicArray results) { Map>> groupedSuggestion = new HashMap<>(); for (SearchPhaseResult entry : results.asList()) { for (Suggest.Suggestion suggestion : entry.queryResult().suggest()) { @@ -260,11 +270,12 @@ public class SearchPhaseControllerTests extends ESTestCase { .collect(Collectors.toList())); } - private AtomicArray generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) { + private static AtomicArray generateFetchResults(int nShards, ScoreDoc[] mergedSearchDocs, Suggest mergedSuggest) { AtomicArray fetchResults = new AtomicArray<>(nShards); for (int shardIndex = 0; shardIndex < nShards; shardIndex++) { float maxScore = -1F; - SearchShardTarget shardTarget = new SearchShardTarget("", new Index("", ""), shardIndex, null); + String clusterAlias = randomBoolean() ? null : "remote"; + SearchShardTarget shardTarget = new SearchShardTarget("", new ShardId("", "", shardIndex), clusterAlias, OriginalIndices.NONE); FetchSearchResult fetchSearchResult = new FetchSearchResult(shardIndex, shardTarget); List searchHits = new ArrayList<>(); for (ScoreDoc scoreDoc : mergedSearchDocs) { diff --git a/server/src/test/java/org/elasticsearch/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java similarity index 80% rename from server/src/test/java/org/elasticsearch/search/SearchRequestTests.java rename to server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java index ced279a1bab..719a14491ae 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java @@ -17,27 +17,43 @@ * under the License. */ -package org.elasticsearch.search; +package org.elasticsearch.action.search; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.ArrayUtils; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.AbstractSearchTestCase; +import org.elasticsearch.search.RandomSearchRequestGenerator; +import org.elasticsearch.search.Scroll; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.rescore.QueryRescorerBuilder; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import java.io.IOException; import java.util.ArrayList; +import java.util.Base64; import java.util.List; import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode; public class SearchRequestTests extends AbstractSearchTestCase { + @Override + protected SearchRequest createSearchRequest() throws IOException { + if (randomBoolean()) { + return super.createSearchRequest(); + } + //clusterAlias does not have public getter/setter hence we randomize it only in this test specifically. + SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10)); + RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder); + return searchRequest; + } + public void testSerialization() throws Exception { SearchRequest searchRequest = createSearchRequest(); SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new); @@ -46,6 +62,28 @@ public class SearchRequestTests extends AbstractSearchTestCase { assertNotSame(deserializedRequest, searchRequest); } + public void testClusterAliasSerialization() throws IOException { + SearchRequest searchRequest = createSearchRequest(); + Version version = VersionUtils.randomVersion(random()); + SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new, version); + //TODO update version after backport + if (version.before(Version.V_7_0_0)) { + assertNull(deserializedRequest.getLocalClusterAlias()); + } else { + assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias()); + } + } + + //TODO rename and update version after backport + public void testReadFromPre7_0_0() throws IOException { + String msg = "AAEBBWluZGV4AAAAAQACAAAA/////w8AAAAAAAAA/////w8AAAAAAAACAAAAAAABAAMCBAUBAAKABACAAQIAAA=="; + try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(msg))) { + SearchRequest searchRequest = new SearchRequest(in); + assertArrayEquals(new String[]{"index"}, searchRequest.indices()); + assertNull(searchRequest.getLocalClusterAlias()); + } + } + public void testIllegalArguments() { SearchRequest searchRequest = new SearchRequest(); assertNotNull(searchRequest.indices()); @@ -140,11 +178,11 @@ public class SearchRequestTests extends AbstractSearchTestCase { } public void testEqualsAndHashcode() throws IOException { - checkEqualsAndHashCode(createSearchRequest(), SearchRequestTests::copyRequest, this::mutate); + checkEqualsAndHashCode(createSearchRequest(), SearchRequest::new, this::mutate); } private SearchRequest mutate(SearchRequest searchRequest) { - SearchRequest mutation = copyRequest(searchRequest); + SearchRequest mutation = new SearchRequest(searchRequest); List mutators = new ArrayList<>(); mutators.add(() -> mutation.indices(ArrayUtils.concat(searchRequest.indices(), new String[] { randomAlphaOfLength(10) }))); mutators.add(() -> mutation.indicesOptions(randomValueOtherThan(searchRequest.indicesOptions(), @@ -161,8 +199,4 @@ public class SearchRequestTests extends AbstractSearchTestCase { randomFrom(mutators).run(); return mutation; } - - private static SearchRequest copyRequest(SearchRequest searchRequest) { - return new SearchRequest(searchRequest); - } } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java new file mode 100644 index 00000000000..09595650932 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.test.ESTestCase; + +import java.util.Collections; + +public class SearchShardIteratorTests extends ESTestCase { + + public void testShardId() { + ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); + SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); + assertSame(shardId, searchShardIterator.shardId()); + } + + public void testGetOriginalIndices() { + ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); + OriginalIndices originalIndices = new OriginalIndices(new String[]{randomAlphaOfLengthBetween(3, 10)}, + IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); + SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), originalIndices); + assertSame(originalIndices, searchShardIterator.getOriginalIndices()); + } + + public void testGetClusterAlias() { + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); + SearchShardIterator searchShardIterator = new SearchShardIterator(clusterAlias, shardId, Collections.emptyList(), + OriginalIndices.NONE); + assertEquals(clusterAlias, searchShardIterator.getClusterAlias()); + } + + public void testNewSearchShardTarget() { + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); + OriginalIndices originalIndices = new OriginalIndices(new String[]{randomAlphaOfLengthBetween(3, 10)}, + IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); + SearchShardIterator searchShardIterator = new SearchShardIterator(clusterAlias, shardId, Collections.emptyList(), originalIndices); + String nodeId = randomAlphaOfLengthBetween(3, 10); + SearchShardTarget searchShardTarget = searchShardIterator.newSearchShardTarget(nodeId); + assertEquals(clusterAlias, searchShardTarget.getClusterAlias()); + assertSame(shardId, searchShardTarget.getShardId()); + assertEquals(nodeId, searchShardTarget.getNodeId()); + assertSame(originalIndices, searchShardTarget.getOriginalIndices()); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 3e4747a4db7..16ff4389d7c 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.search; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; @@ -33,6 +34,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.shard.ShardId; @@ -42,6 +44,10 @@ import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; @@ -50,8 +56,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.hamcrest.CoreMatchers.startsWith; public class TransportSearchActionTests extends ESTestCase { @@ -109,8 +118,9 @@ public class TransportSearchActionTests extends ESTestCase { remoteShardIterators.add(remoteShardIterator3); } + String localClusterAlias = randomBoolean() ? null : "local"; GroupShardsIterator searchShardIterators = TransportSearchAction.mergeShardsIterators(localShardsIterator, - localIndices, remoteShardIterators); + localIndices, localClusterAlias, remoteShardIterators); assertEquals(searchShardIterators.size(), 5); int i = 0; @@ -120,26 +130,31 @@ public class TransportSearchActionTests extends ESTestCase { assertEquals("local_index", searchShardIterator.shardId().getIndexName()); assertEquals(0, searchShardIterator.shardId().getId()); assertSame(localIndices, searchShardIterator.getOriginalIndices()); + assertEquals(localClusterAlias, searchShardIterator.getClusterAlias()); break; case 1: assertEquals("local_index_2", searchShardIterator.shardId().getIndexName()); assertEquals(1, searchShardIterator.shardId().getId()); assertSame(localIndices, searchShardIterator.getOriginalIndices()); + assertEquals(localClusterAlias, searchShardIterator.getClusterAlias()); break; case 2: assertEquals("remote_index", searchShardIterator.shardId().getIndexName()); assertEquals(2, searchShardIterator.shardId().getId()); assertSame(remoteIndices, searchShardIterator.getOriginalIndices()); + assertEquals("remote", searchShardIterator.getClusterAlias()); break; case 3: assertEquals("remote_index_2", searchShardIterator.shardId().getIndexName()); assertEquals(3, searchShardIterator.shardId().getId()); assertSame(remoteIndices, searchShardIterator.getOriginalIndices()); + assertEquals("remote", searchShardIterator.getClusterAlias()); break; case 4: assertEquals("remote_index_3", searchShardIterator.shardId().getIndexName()); assertEquals(4, searchShardIterator.shardId().getId()); assertSame(remoteIndices2, searchShardIterator.getOriginalIndices()); + assertEquals("remote", searchShardIterator.getClusterAlias()); break; } } @@ -239,6 +254,56 @@ public class TransportSearchActionTests extends ESTestCase { } } + public void testBuildConnectionLookup() { + Function localNodes = (nodeId) -> new DiscoveryNode("local-" + nodeId, + new TransportAddress(TransportAddress.META_ADDRESS, 1024), Version.CURRENT); + BiFunction remoteNodes = (clusterAlias, nodeId) -> new DiscoveryNode("remote-" + nodeId, + new TransportAddress(TransportAddress.META_ADDRESS, 2048), Version.CURRENT); + BiFunction nodeToConnection = (clusterAlias, node) -> new Transport.Connection() { + @Override + public DiscoveryNode getNode() { + return node; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws TransportException { + } + + @Override + public void addCloseListener(ActionListener listener) { + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public void close() { + } + }; + + { + BiFunction connectionLookup = TransportSearchAction.buildConnectionLookup( + null, localNodes, remoteNodes, nodeToConnection); + + Transport.Connection localConnection = connectionLookup.apply(null, randomAlphaOfLengthBetween(5, 10)); + assertThat(localConnection.getNode().getId(), startsWith("local-")); + Transport.Connection remoteConnection = connectionLookup.apply(randomAlphaOfLengthBetween(5, 10), + randomAlphaOfLengthBetween(5, 10)); + assertThat(remoteConnection.getNode().getId(), startsWith("remote-")); + } + { + String requestClusterAlias = randomAlphaOfLengthBetween(5, 10); + BiFunction connectionLookup = TransportSearchAction.buildConnectionLookup( + requestClusterAlias, localNodes, remoteNodes, nodeToConnection); + + Transport.Connection localConnection = connectionLookup.apply(requestClusterAlias, randomAlphaOfLengthBetween(5, 10)); + assertThat(localConnection.getNode().getId(), startsWith("local-")); + } + } + public void testBuildClusters() { OriginalIndices localIndices = randomBoolean() ? null : randomOriginalIndices(); Map remoteIndices = new HashMap<>(); diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchHelperTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchHelperTests.java index 49d7450096b..a666f45c343 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchHelperTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchHelperTests.java @@ -45,7 +45,6 @@ public class TransportSearchHelperTests extends ESTestCase { array.setOnce(1, testSearchPhaseResult2); array.setOnce(2, testSearchPhaseResult3); - String scrollId = TransportSearchHelper.buildScrollId(array); ParsedScrollId parseScrollId = TransportSearchHelper.parseScrollId(scrollId); assertEquals(3, parseScrollId.getContext().length); diff --git a/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java b/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java index 8d547c617e5..fc24bdf9691 100644 --- a/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java +++ b/server/src/test/java/org/elasticsearch/index/SearchSlowLogTests.java @@ -43,11 +43,11 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collections; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.startsWith; public class SearchSlowLogTests extends ESSingleNodeTestCase { diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 4b86ff668c0..189929171a5 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -26,6 +26,7 @@ import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.Sort; import org.apache.lucene.store.Directory; import org.elasticsearch.Version; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; @@ -110,10 +111,12 @@ public class DefaultSearchContextTests extends ESTestCase { try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir); IndexReader reader = w.getReader(); - Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader), reader::close)) { + Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader), reader)) { - DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, null, searcher, null, indexService, - indexShard, bigArrays, null, timeout, null, null, Version.CURRENT); + SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE); + + DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, target, searcher, null, indexService, + indexShard, bigArrays, null, timeout, null, Version.CURRENT); context1.from(300); // resultWindow greater than maxResultWindow and scrollContext is null @@ -153,8 +156,8 @@ public class DefaultSearchContextTests extends ESTestCase { + "] index level setting.")); // rescore is null but sliceBuilder is not null - DefaultSearchContext context2 = new DefaultSearchContext(2L, shardSearchRequest, null, searcher, - null, indexService, indexShard, bigArrays, null, timeout, null, null, Version.CURRENT); + DefaultSearchContext context2 = new DefaultSearchContext(2L, shardSearchRequest, target, searcher, + null, indexService, indexShard, bigArrays, null, timeout, null, Version.CURRENT); SliceBuilder sliceBuilder = mock(SliceBuilder.class); int numSlices = maxSlicesPerScroll + randomIntBetween(1, 100); @@ -170,8 +173,8 @@ public class DefaultSearchContextTests extends ESTestCase { when(shardSearchRequest.getAliasFilter()).thenReturn(AliasFilter.EMPTY); when(shardSearchRequest.indexBoost()).thenReturn(AbstractQueryBuilder.DEFAULT_BOOST); - DefaultSearchContext context3 = new DefaultSearchContext(3L, shardSearchRequest, null, searcher, null, - indexService, indexShard, bigArrays, null, timeout, null, null, Version.CURRENT); + DefaultSearchContext context3 = new DefaultSearchContext(3L, shardSearchRequest, target, searcher, null, + indexService, indexShard, bigArrays, null, timeout, null, Version.CURRENT); ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery(); context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false); assertEquals(context3.query(), context3.buildFilteredQuery(parsedQuery.query())); diff --git a/server/src/test/java/org/elasticsearch/search/SearchHitTests.java b/server/src/test/java/org/elasticsearch/search/SearchHitTests.java index 3ad39404afe..fee55f1e22f 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchHitTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchHitTests.java @@ -21,13 +21,12 @@ package org.elasticsearch.search; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.TotalHits; +import org.elasticsearch.Version; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.document.DocumentField; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -42,9 +41,9 @@ import org.elasticsearch.search.fetch.subphase.highlight.HighlightField; import org.elasticsearch.search.fetch.subphase.highlight.HighlightFieldTests; import org.elasticsearch.test.AbstractStreamableTestCase; import org.elasticsearch.test.RandomObjects; +import org.elasticsearch.test.VersionUtils; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -214,7 +213,8 @@ public class SearchHitTests extends AbstractStreamableTestCase { public void testSerializeShardTarget() throws Exception { String clusterAlias = randomBoolean() ? null : "cluster_alias"; - SearchShardTarget target = new SearchShardTarget("_node_id", new Index("_index", "_na_"), 0, clusterAlias); + SearchShardTarget target = new SearchShardTarget("_node_id", new ShardId(new Index("_index", "_na_"), 0), + clusterAlias, OriginalIndices.NONE); Map innerHits = new HashMap<>(); SearchHit innerHit1 = new SearchHit(0, "_id", new Text("_type"), null); @@ -240,12 +240,10 @@ public class SearchHitTests extends AbstractStreamableTestCase { SearchHits hits = new SearchHits(new SearchHit[]{hit1, hit2}, new TotalHits(2, TotalHits.Relation.EQUAL_TO), 1f); - - BytesStreamOutput output = new BytesStreamOutput(); - hits.writeTo(output); - InputStream input = output.bytes().streamInput(); - SearchHits results = SearchHits.readSearchHits(new InputStreamStreamInput(input)); - assertThat(results.getAt(0).getShard(), equalTo(target)); + Version version = VersionUtils.randomVersion(random()); + SearchHits results = copyStreamable(hits, getNamedWriteableRegistry(), SearchHits::new, version); + SearchShardTarget deserializedTarget = results.getAt(0).getShard(); + assertThat(deserializedTarget, equalTo(target)); assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getShard(), notNullValue()); assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getInnerHits().get("1").getAt(0).getShard(), notNullValue()); assertThat(results.getAt(0).getInnerHits().get("1").getAt(1).getShard(), notNullValue()); @@ -260,7 +258,6 @@ public class SearchHitTests extends AbstractStreamableTestCase { } } } - assertThat(results.getAt(1).getShard(), equalTo(target)); } diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 30598311ad5..894a4fa9d4a 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -23,12 +23,14 @@ import org.apache.lucene.search.Query; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; @@ -71,14 +73,16 @@ import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchLocalRequest; +import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.test.ESSingleNodeTestCase; import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.LinkedList; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -114,7 +118,6 @@ public class SearchServiceTests extends ESSingleNodeTestCase { static final String DUMMY_SCRIPT = "dummyScript"; - @Override protected Map, Object>> pluginScripts() { return Collections.singletonMap(DUMMY_SCRIPT, vars -> "dummy"); @@ -151,7 +154,6 @@ public class SearchServiceTests extends ESSingleNodeTestCase { } }); } - } @Override @@ -637,4 +639,28 @@ public class SearchServiceTests extends ESSingleNodeTestCase { reduceContext.consumeBucketsAndMaybeBreak(MultiBucketConsumerService.DEFAULT_MAX_BUCKETS + 1); } } + + public void testCreateSearchContext() throws IOException { + String index = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT); + IndexService indexService = createIndex(index); + final SearchService service = getInstanceFromNode(SearchService.class); + ShardId shardId = new ShardId(indexService.index(), 0); + long nowInMillis = System.currentTimeMillis(); + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.allowPartialSearchResults(randomBoolean()); + ShardSearchTransportRequest request = new ShardSearchTransportRequest(OriginalIndices.NONE, searchRequest, shardId, + indexService.numberOfShards(), AliasFilter.EMPTY, 1f, nowInMillis, clusterAlias, Strings.EMPTY_ARRAY); + DefaultSearchContext searchContext = service.createSearchContext(request, new TimeValue(System.currentTimeMillis())); + SearchShardTarget searchShardTarget = searchContext.shardTarget(); + QueryShardContext queryShardContext = searchContext.getQueryShardContext(); + String expectedIndexName = clusterAlias == null ? index : clusterAlias + ":" + index; + assertEquals(expectedIndexName, queryShardContext.getFullyQualifiedIndex().getName()); + assertEquals(expectedIndexName, searchShardTarget.getFullyQualifiedIndexName()); + assertEquals(clusterAlias, searchShardTarget.getClusterAlias()); + assertEquals(shardId, searchShardTarget.getShardId()); + assertSame(searchShardTarget, searchContext.dfsResult().getSearchShardTarget()); + assertSame(searchShardTarget, searchContext.queryResult().getSearchShardTarget()); + assertSame(searchShardTarget, searchContext.fetchResult().getSearchShardTarget()); + } } diff --git a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java index 21a4f099f5a..da987a65726 100644 --- a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java +++ b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java @@ -28,9 +28,6 @@ import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.CompressedXContent; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.ToXContent; @@ -59,29 +56,26 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase { public void testSerialization() throws Exception { ShardSearchTransportRequest shardSearchTransportRequest = createShardSearchTransportRequest(); - try (BytesStreamOutput output = new BytesStreamOutput()) { - shardSearchTransportRequest.writeTo(output); - try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) { - ShardSearchTransportRequest deserializedRequest = new ShardSearchTransportRequest(in); - assertEquals(deserializedRequest.scroll(), shardSearchTransportRequest.scroll()); - assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter()); - assertArrayEquals(deserializedRequest.indices(), shardSearchTransportRequest.indices()); - assertArrayEquals(deserializedRequest.types(), shardSearchTransportRequest.types()); - assertEquals(deserializedRequest.indicesOptions(), shardSearchTransportRequest.indicesOptions()); - assertEquals(deserializedRequest.isProfile(), shardSearchTransportRequest.isProfile()); - assertEquals(deserializedRequest.nowInMillis(), shardSearchTransportRequest.nowInMillis()); - assertEquals(deserializedRequest.source(), shardSearchTransportRequest.source()); - assertEquals(deserializedRequest.searchType(), shardSearchTransportRequest.searchType()); - assertEquals(deserializedRequest.shardId(), shardSearchTransportRequest.shardId()); - assertEquals(deserializedRequest.numberOfShards(), shardSearchTransportRequest.numberOfShards()); - assertEquals(deserializedRequest.indexRoutings(), shardSearchTransportRequest.indexRoutings()); - assertEquals(deserializedRequest.preference(), shardSearchTransportRequest.preference()); - assertEquals(deserializedRequest.cacheKey(), shardSearchTransportRequest.cacheKey()); - assertNotSame(deserializedRequest, shardSearchTransportRequest); - assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter()); - assertEquals(deserializedRequest.indexBoost(), shardSearchTransportRequest.indexBoost(), 0.0f); - } - } + ShardSearchTransportRequest deserializedRequest = + copyWriteable(shardSearchTransportRequest, namedWriteableRegistry, ShardSearchTransportRequest::new); + assertEquals(deserializedRequest.scroll(), shardSearchTransportRequest.scroll()); + assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter()); + assertArrayEquals(deserializedRequest.indices(), shardSearchTransportRequest.indices()); + assertArrayEquals(deserializedRequest.types(), shardSearchTransportRequest.types()); + assertEquals(deserializedRequest.indicesOptions(), shardSearchTransportRequest.indicesOptions()); + assertEquals(deserializedRequest.isProfile(), shardSearchTransportRequest.isProfile()); + assertEquals(deserializedRequest.nowInMillis(), shardSearchTransportRequest.nowInMillis()); + assertEquals(deserializedRequest.source(), shardSearchTransportRequest.source()); + assertEquals(deserializedRequest.searchType(), shardSearchTransportRequest.searchType()); + assertEquals(deserializedRequest.shardId(), shardSearchTransportRequest.shardId()); + assertEquals(deserializedRequest.numberOfShards(), shardSearchTransportRequest.numberOfShards()); + assertArrayEquals(deserializedRequest.indexRoutings(), shardSearchTransportRequest.indexRoutings()); + assertEquals(deserializedRequest.preference(), shardSearchTransportRequest.preference()); + assertEquals(deserializedRequest.cacheKey(), shardSearchTransportRequest.cacheKey()); + assertNotSame(deserializedRequest, shardSearchTransportRequest); + assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter()); + assertEquals(deserializedRequest.indexBoost(), shardSearchTransportRequest.indexBoost(), 0.0f); + assertEquals(deserializedRequest.getClusterAlias(), shardSearchTransportRequest.getClusterAlias()); } private ShardSearchTransportRequest createShardSearchTransportRequest() throws IOException { @@ -97,7 +91,7 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase { final String[] routings = generateRandomStringArray(5, 10, false, true); return new ShardSearchTransportRequest(new OriginalIndices(searchRequest), searchRequest, shardId, randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), - Math.abs(randomLong()), null, routings); + Math.abs(randomLong()), randomAlphaOfLengthBetween(3, 10), routings); } public void testFilteringAliases() throws Exception { @@ -154,8 +148,7 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase { } private IndexMetaData remove(IndexMetaData indexMetaData, String alias) { - IndexMetaData build = IndexMetaData.builder(indexMetaData).removeAlias(alias).build(); - return build; + return IndexMetaData.builder(indexMetaData).removeAlias(alias).build(); } private IndexMetaData add(IndexMetaData indexMetaData, String alias, @Nullable CompressedXContent filter) { diff --git a/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java b/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java index d534af57894..279bddebc4a 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java +++ b/test/framework/src/main/java/org/elasticsearch/search/RandomSearchRequestGenerator.java @@ -83,7 +83,17 @@ public class RandomSearchRequestGenerator { * {@link #randomSearchSourceBuilder(Supplier, Supplier, Supplier, Supplier, Supplier)}. */ public static SearchRequest randomSearchRequest(Supplier randomSearchSourceBuilder) { - SearchRequest searchRequest = new SearchRequest(); + return randomSearchRequest(new SearchRequest(), randomSearchSourceBuilder); + } + + /** + * Set random fields to the provided search request. + * + * @param searchRequest the search request + * @param randomSearchSourceBuilder builds a random {@link SearchSourceBuilder}. You can use + * {@link #randomSearchSourceBuilder(Supplier, Supplier, Supplier, Supplier, Supplier)}. + */ + public static SearchRequest randomSearchRequest(SearchRequest searchRequest, Supplier randomSearchSourceBuilder) { searchRequest.allowPartialSearchResults(true); if (randomBoolean()) { searchRequest.indices(generateRandomStringArray(10, 10, false, false));