From 149629fec657e9948488195087f30583cd561f11 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 26 Apr 2017 21:45:49 +0200 Subject: [PATCH] Cross Cluster Search: propagate original indices per cluster (#24328) In case of a Cross Cluster Search, the coordinating node should split the original indices per cluster, and send over to each cluster only its own set of original indices, rather than the set taken from the original search request which contains all the indices. In fact, each remote cluster should not be aware of the indices belonging to other remote clusters. --- .../elasticsearch/action/OriginalIndices.java | 8 +- .../TransportClusterSearchShardsAction.java | 2 +- .../search/AbstractSearchAsyncAction.java | 15 ++- .../action/search/DfsQueryPhase.java | 5 +- .../action/search/FetchSearchPhase.java | 20 +-- .../action/search/InitialSearchPhase.java | 17 +-- .../search/RemoteClusterConnection.java | 6 +- .../action/search/RemoteClusterService.java | 27 ++-- .../action/search/SearchActionListener.java | 1 - .../SearchDfsQueryThenFetchAsyncAction.java | 5 +- .../action/search/SearchPhaseContext.java | 8 +- .../SearchQueryThenFetchAsyncAction.java | 8 +- .../action/search/SearchShardIterator.java | 55 ++++++++ .../action/search/SearchTransportService.java | 8 +- .../action/search/ShardSearchFailure.java | 4 +- .../action/search/TransportSearchAction.java | 60 +++++---- .../broadcast/TransportBroadcastAction.java | 4 +- .../node/TransportBroadcastByNodeAction.java | 2 +- .../TransportTermVectorsAction.java | 2 +- .../cluster/routing/GroupShardsIterator.java | 10 +- .../cluster/routing/IndexRoutingTable.java | 32 ----- .../cluster/routing/OperationRouting.java | 4 +- .../cluster/routing/PlainShardIterator.java | 1 - .../cluster/routing/PlainShardsIterator.java | 11 +- .../cluster/routing/RoutingTable.java | 17 ++- .../cluster/routing/ShardsIterator.java | 9 +- .../org/elasticsearch/search/SearchHit.java | 3 +- .../elasticsearch/search/SearchService.java | 6 +- .../search/SearchShardTarget.java | 15 ++- .../search/fetch/ShardFetchSearchRequest.java | 5 +- .../internal/ShardSearchTransportRequest.java | 5 +- .../search/query/QuerySearchRequest.java | 5 +- .../snapshots/SnapshotsService.java | 1 + .../ElasticsearchExceptionTests.java | 5 +- .../AbstractSearchAsyncActionTookTests.java | 50 +------ .../action/search/MockSearchPhaseContext.java | 9 +- .../search/RemoteClusterConnectionTests.java | 2 +- .../search/RemoteClusterServiceTests.java | 41 +++++- .../action/search/SearchAsyncActionTests.java | 24 ++-- .../search/ShardSearchFailureTests.java | 5 +- .../search/TransportSearchActionTests.java | 122 ++++++++++++++++++ .../TransportBroadcastByNodeActionTests.java | 10 +- ...rdFailedClusterStateTaskExecutorTests.java | 5 +- .../routing/GroupShardsIteratorTests.java | 4 +- .../structure/RoutingIteratorTests.java | 4 +- .../index/store/CorruptedFileIT.java | 7 +- .../index/suggest/stats/SuggestStatsIT.java | 4 +- .../elasticsearch/search/SearchHitTests.java | 4 +- .../search/SearchServiceTests.java | 42 +++--- .../ShardSearchTransportRequestTests.java | 3 +- .../search/stats/SearchStatsIT.java | 4 +- 51 files changed, 443 insertions(+), 283 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java create mode 100644 core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java diff --git a/core/src/main/java/org/elasticsearch/action/OriginalIndices.java b/core/src/main/java/org/elasticsearch/action/OriginalIndices.java index cc299f544b3..39cf5c63242 100644 --- a/core/src/main/java/org/elasticsearch/action/OriginalIndices.java +++ b/core/src/main/java/org/elasticsearch/action/OriginalIndices.java @@ -28,7 +28,10 @@ import java.io.IOException; /** * Used to keep track of original indices within internal (e.g. shard level) requests */ -public class OriginalIndices implements IndicesRequest { +public final class OriginalIndices implements IndicesRequest { + + //constant to use when original indices are not applicable and will not be serialized across the wire + public static final OriginalIndices NONE = new OriginalIndices(null, null); private final String[] indices; private final IndicesOptions indicesOptions; @@ -39,7 +42,6 @@ public class OriginalIndices implements IndicesRequest { public OriginalIndices(String[] indices, IndicesOptions indicesOptions) { this.indices = indices; - assert indicesOptions != null; this.indicesOptions = indicesOptions; } @@ -57,8 +59,8 @@ public class OriginalIndices implements IndicesRequest { return new OriginalIndices(in.readStringArray(), IndicesOptions.readIndicesOptions(in)); } - public static void writeOriginalIndices(OriginalIndices originalIndices, StreamOutput out) throws IOException { + assert originalIndices != NONE; out.writeStringArrayNullable(originalIndices.indices); originalIndices.indicesOptions.writeIndicesOptions(out); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java index 01aafc0b0a9..8825a426768 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java @@ -88,7 +88,7 @@ public class TransportClusterSearchShardsAction extends } Set nodeIds = new HashSet<>(); - GroupShardsIterator groupShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, + GroupShardsIterator groupShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, request.preference()); ShardRouting shard; ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()]; diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index c2137803411..0abebebdb18 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -75,8 +74,9 @@ abstract class AbstractSearchAsyncAction exten Function nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, Executor executor, SearchRequest request, - ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, - long clusterStateVersion, SearchTask task, SearchPhaseResults resultConsumer) { + ActionListener listener, GroupShardsIterator shardsIts, + TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, + SearchTask task, SearchPhaseResults resultConsumer) { super(name, request, shardsIts, logger); this.timeProvider = timeProvider; this.logger = logger; @@ -209,8 +209,9 @@ abstract class AbstractSearchAsyncAction exten private void raisePhaseFailure(SearchPhaseExecutionException exception) { results.getSuccessfulResults().forEach((entry) -> { try { - Transport.Connection connection = nodeIdToConnection.apply(entry.getSearchShardTarget().getNodeId()); - sendReleaseSearchContext(entry.getRequestId(), connection); + SearchShardTarget searchShardTarget = entry.getSearchShardTarget(); + Transport.Connection connection = nodeIdToConnection.apply(searchShardTarget.getNodeId()); + sendReleaseSearchContext(entry.getRequestId(), connection, searchShardTarget.getOriginalIndices()); } catch (Exception inner) { inner.addSuppressed(exception); logger.trace("failed to release context", inner); @@ -296,11 +297,11 @@ abstract class AbstractSearchAsyncAction exten listener.onFailure(e); } - public final ShardSearchTransportRequest buildShardSearchRequest(ShardIterator shardIt, ShardRouting shard) { + public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt, ShardRouting shard) { AliasFilter filter = aliasFilter.get(shard.index().getUUID()); assert filter != null; float indexBoost = concreteIndexBoosts.getOrDefault(shard.index().getUUID(), DEFAULT_INDEX_BOOST); - return new ShardSearchTransportRequest(request, shardIt.shardId(), getNumShards(), + return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis()); } diff --git a/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java b/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java index 353baf11750..66a88ce2fee 100644 --- a/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java @@ -73,7 +73,8 @@ final class DfsQueryPhase extends SearchPhase { for (final DfsSearchResult dfsResult : resultList) { final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget(); Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId()); - QuerySearchRequest querySearchRequest = new QuerySearchRequest(context.getRequest(), dfsResult.getRequestId(), dfs); + QuerySearchRequest querySearchRequest = new QuerySearchRequest(searchShardTarget.getOriginalIndices(), + dfsResult.getRequestId(), dfs); final int shardIndex = dfsResult.getShardIndex(); searchTransportService.sendExecuteQuery(connection, querySearchRequest, context.getTask(), new SearchActionListener(searchShardTarget, shardIndex) { @@ -95,7 +96,7 @@ final class DfsQueryPhase extends SearchPhase { // the query might not have been executed at all (for example because thread pool rejected // execution) and the search context that was created in dfs phase might not be released. // release it again to be in the safe side - context.sendReleaseSearchContext(querySearchRequest.id(), connection); + context.sendReleaseSearchContext(querySearchRequest.id(), connection, searchShardTarget.getOriginalIndices()); } } }); diff --git a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index a0e313f1d73..25231efe49b 100644 --- a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; @@ -73,7 +74,6 @@ final class FetchSearchPhase extends SearchPhase { this.context = context; this.logger = context.getLogger(); this.resultConsumer = resultConsumer; - } @Override @@ -112,7 +112,7 @@ final class FetchSearchPhase extends SearchPhase { final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs); if (reducedQueryPhase.scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return phaseResults.stream() - .map(e -> e.queryResult()) + .map(SearchPhaseResult::queryResult) .forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources finishPhase.run(); } else { @@ -135,10 +135,11 @@ final class FetchSearchPhase extends SearchPhase { // in any case we count down this result since we don't talk to this shard anymore counter.countDown(); } else { - Transport.Connection connection = context.getConnection(queryResult.getSearchShardTarget().getNodeId()); + SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget(); + Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId()); ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry, - lastEmittedDocPerShard); - executeFetch(i, queryResult.getSearchShardTarget(), counter, fetchSearchRequest, queryResult.queryResult(), + lastEmittedDocPerShard, searchShardTarget.getOriginalIndices()); + executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(), connection); } } @@ -147,9 +148,9 @@ final class FetchSearchPhase extends SearchPhase { } protected ShardFetchSearchRequest createFetchRequest(long queryId, int index, IntArrayList entry, - ScoreDoc[] lastEmittedDocPerShard) { + ScoreDoc[] lastEmittedDocPerShard, OriginalIndices originalIndices) { final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[index] : null; - return new ShardFetchSearchRequest(context.getRequest(), queryId, entry, lastEmittedDoc); + return new ShardFetchSearchRequest(originalIndices, queryId, entry, lastEmittedDoc); } private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, @@ -189,8 +190,9 @@ final class FetchSearchPhase extends SearchPhase { // and if it has at lease one hit that didn't make it to the global topDocs if (context.getRequest().scroll() == null && queryResult.hasSearchContext()) { try { - Transport.Connection connection = context.getConnection(queryResult.getSearchShardTarget().getNodeId()); - context.sendReleaseSearchContext(queryResult.getRequestId(), connection); + SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget(); + Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId()); + context.sendReleaseSearchContext(queryResult.getRequestId(), connection, searchShardTarget.getOriginalIndices()); } catch (Exception e) { context.getLogger().trace("failed to release context", e); } diff --git a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index be91cebe501..2453e2b80b5 100644 --- a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -46,12 +46,12 @@ import java.util.stream.Stream; */ abstract class InitialSearchPhase extends SearchPhase { private final SearchRequest request; - private final GroupShardsIterator shardsIts; + private final GroupShardsIterator shardsIts; private final Logger logger; private final int expectedTotalOps; private final AtomicInteger totalOps = new AtomicInteger(); - InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger) { + InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger) { super(name); this.request = request; this.shardsIts = shardsIts; @@ -64,10 +64,10 @@ abstract class InitialSearchPhase extends } private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, - final ShardIterator shardIt, Exception e) { + final SearchShardIterator shardIt, Exception e) { // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard - SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId()); + SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId(), shardIt.getOriginalIndices()); onShardFailure(shardIndex, shardTarget, e); if (totalOps.incrementAndGet() == expectedTotalOps) { @@ -124,7 +124,7 @@ abstract class InitialSearchPhase extends @Override public final void run() throws IOException { int shardIndex = -1; - for (final ShardIterator shardIt : shardsIts) { + for (final SearchShardIterator shardIt : shardsIts) { shardIndex++; final ShardRouting shard = shardIt.nextOrNull(); if (shard != null) { @@ -136,7 +136,7 @@ abstract class InitialSearchPhase extends } } - private void performPhaseOnShard(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) { + private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) { if (shard == null) { // TODO upgrade this to an assert... // no more active shards... (we should not really get here, but just for safety) @@ -144,7 +144,7 @@ abstract class InitialSearchPhase extends } else { try { executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard.currentNodeId(), - shardIt.shardId()), shardIndex) { + shardIt.shardId(), shardIt.getOriginalIndices()), shardIndex) { @Override public void innerOnResponse(FirstResult result) { onShardResult(result, shardIt); @@ -213,7 +213,8 @@ abstract class InitialSearchPhase extends * @param shard the shard routing to send the request for * @param listener the listener to notify on response */ - protected abstract void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, SearchActionListener listener); + protected abstract void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, + SearchActionListener listener); /** * This class acts as a basic result collection that can be extended to do on-the-fly reduction or result processing diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java index a9739cfe21a..a3f3f3a9612 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java @@ -162,7 +162,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo /** * Fetches all shards for the search request from this remote connection. This is used to later run the search on the remote end. */ - public void fetchSearchShards(SearchRequest searchRequest, final List indices, + public void fetchSearchShards(SearchRequest searchRequest, final String[] indices, ActionListener listener) { if (connectedNodes.isEmpty()) { // just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener @@ -176,10 +176,10 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo } } - private void fetchShardsInternal(SearchRequest searchRequest, List indices, + private void fetchShardsInternal(SearchRequest searchRequest, String[] indices, final ActionListener listener) { final DiscoveryNode node = nodeSupplier.get(); - ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices.toArray(new String[indices.size()])) + ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices) .indicesOptions(searchRequest.indicesOptions()).local(true).preference(searchRequest.preference()) .routing(searchRequest.routing()); transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest, diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java index 34cb5a84da7..40fed0299b3 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java @@ -22,14 +22,13 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.PlainShardIterator; -import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; @@ -243,18 +242,18 @@ public final class RemoteClusterService extends AbstractComponent implements Clo return remoteClusters.containsKey(clusterName); } - void collectSearchShards(SearchRequest searchRequest, Map> remoteIndicesByCluster, + void collectSearchShards(SearchRequest searchRequest, Map remoteIndicesByCluster, ActionListener> listener) { final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); final Map searchShardsResponses = new ConcurrentHashMap<>(); final AtomicReference transportException = new AtomicReference<>(); - for (Map.Entry> entry : remoteIndicesByCluster.entrySet()) { + for (Map.Entry entry : remoteIndicesByCluster.entrySet()) { final String clusterName = entry.getKey(); RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName); if (remoteClusterConnection == null) { throw new IllegalArgumentException("no such remote cluster: " + clusterName); } - final List indices = entry.getValue(); + final String[] indices = entry.getValue().indices(); remoteClusterConnection.fetchSearchShards(searchRequest, indices, new ActionListener() { @Override @@ -288,16 +287,16 @@ public final class RemoteClusterService extends AbstractComponent implements Clo } } - Function processRemoteShards(Map searchShardsResponses, - List remoteShardIterators, - Map aliasFilterMap) { + Map remoteIndicesByCluster, + List remoteShardIterators, + Map aliasFilterMap) { Map> nodeToCluster = new HashMap<>(); for (Map.Entry entry : searchShardsResponses.entrySet()) { - String clusterName = entry.getKey(); + String clusterAlias = entry.getKey(); ClusterSearchShardsResponse searchShardsResponse = entry.getValue(); for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) { - nodeToCluster.put(remoteNode.getId(), () -> getConnection(remoteNode, clusterName)); + nodeToCluster.put(remoteNode.getId(), () -> getConnection(remoteNode, clusterAlias)); } Map indicesAndFilters = searchShardsResponse.getIndicesAndFilters(); for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) { @@ -305,9 +304,11 @@ public final class RemoteClusterService extends AbstractComponent implements Clo //this ends up in the hits returned with the search response ShardId shardId = clusterSearchShardsGroup.getShardId(); Index remoteIndex = shardId.getIndex(); - Index index = new Index(clusterName + REMOTE_CLUSTER_INDEX_SEPARATOR + remoteIndex.getName(), remoteIndex.getUUID()); - ShardIterator shardIterator = new PlainShardIterator(new ShardId(index, shardId.getId()), - Arrays.asList(clusterSearchShardsGroup.getShards())); + Index index = new Index(clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + remoteIndex.getName(), remoteIndex.getUUID()); + OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias); + assert originalIndices != null; + SearchShardIterator shardIterator = new SearchShardIterator(new ShardId(index, shardId.getId()), + Arrays.asList(clusterSearchShardsGroup.getShards()), originalIndices); remoteShardIterators.add(shardIterator); AliasFilter aliasFilter; if (indicesAndFilters == null) { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java b/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java index 709d1e5e237..67de87b1bb1 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchActionListener.java @@ -49,5 +49,4 @@ abstract class SearchActionListener implements Acti } protected abstract void innerOnResponse(T response); - } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index 7151c8712ed..be8cb0cff01 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -22,7 +22,6 @@ package org.elasticsearch.action.search; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.internal.AliasFilter; @@ -46,7 +45,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction final Executor executor, final SearchRequest request, final ActionListener listener, - final GroupShardsIterator shardsIts, + final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, final long clusterStateVersion, final SearchTask task) { @@ -70,7 +69,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction @Override protected void executePhaseOnShard( - final ShardIterator shardIt, + final SearchShardIterator shardIt, final ShardRouting shard, final SearchActionListener listener) { getSearchTransport().sendExecuteDfs(getConnection(shard.currentNodeId()), diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java index 26c5403f4ab..a109ab96397 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.search; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.search.SearchShardTarget; @@ -97,16 +97,16 @@ interface SearchPhaseContext extends ActionListener, Executor { * @see org.elasticsearch.search.fetch.FetchSearchResult#getRequestId() * */ - default void sendReleaseSearchContext(long contextId, Transport.Connection connection) { + default void sendReleaseSearchContext(long contextId, Transport.Connection connection, OriginalIndices originalIndices) { if (connection != null) { - getSearchTransport().sendFreeContext(connection, contextId, getRequest()); + getSearchTransport().sendFreeContext(connection, contextId, originalIndices); } } /** * Builds an request for the initial search phase. */ - ShardSearchTransportRequest buildShardSearchRequest(ShardIterator shardIt, ShardRouting shard); + ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt, ShardRouting shard); /** * Processes the phase transition from on phase to another. This method handles all errors that happen during the initial run execution diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index fd1d1977029..855e0216284 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -22,7 +22,6 @@ package org.elasticsearch.action.search; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.internal.AliasFilter; @@ -32,8 +31,7 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.function.Function; -final class SearchQueryThenFetchAsyncAction - extends AbstractSearchAsyncAction { +final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { private final SearchPhaseController searchPhaseController; @@ -47,7 +45,7 @@ final class SearchQueryThenFetchAsyncAction final Executor executor, final SearchRequest request, final ActionListener listener, - final GroupShardsIterator shardsIts, + final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, SearchTask task) { @@ -70,7 +68,7 @@ final class SearchQueryThenFetchAsyncAction } protected void executePhaseOnShard( - final ShardIterator shardIt, + final SearchShardIterator shardIt, final ShardRouting shard, final SearchActionListener listener) { getSearchTransport().sendExecuteQuery( diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java b/core/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java new file mode 100644 index 00000000000..ca78945a299 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.cluster.routing.PlainShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.index.shard.ShardId; + +import java.util.List; + +/** + * Extension of {@link PlainShardIterator} used in the search api, which also holds the {@link OriginalIndices} + * of the search request. Useful especially with cross cluster search, as each cluster has its own set of original indices. + */ +public final class SearchShardIterator extends PlainShardIterator { + + private final OriginalIndices originalIndices; + + /** + * Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards + * this the a given shardId. + * + * @param shardId shard id of the group + * @param shards shards to iterate + */ + public SearchShardIterator(ShardId shardId, List shards, OriginalIndices originalIndices) { + super(shardId, shards); + this.originalIndices = originalIndices; + } + + /** + * Returns the original indices associated with this shard iterator, specifically with the cluster that this shard belongs to. + */ + public OriginalIndices getOriginalIndices() { + return originalIndices; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 80583e24c9c..436d8da95eb 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -92,8 +92,8 @@ public class SearchTransportService extends AbstractLifecycleComponent { } } - public void sendFreeContext(Transport.Connection connection, final long contextId, SearchRequest request) { - transportService.sendRequest(connection, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), + public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) { + transportService.sendRequest(connection, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(originalIndices, contextId), TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(new ActionListener() { @Override public void onResponse(SearchFreeContextResponse response) { @@ -219,9 +219,9 @@ public class SearchTransportService extends AbstractLifecycleComponent { SearchFreeContextRequest() { } - SearchFreeContextRequest(SearchRequest request, long id) { + SearchFreeContextRequest(OriginalIndices originalIndices, long id) { super(id); - this.originalIndices = new OriginalIndices(request); + this.originalIndices = originalIndices; } @Override diff --git a/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java b/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java index 2aa0ad3c7be..6d5b30fd9bd 100644 --- a/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java +++ b/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.search; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; @@ -212,7 +213,8 @@ public class ShardSearchFailure implements ShardOperationFailedException { } } return new ShardSearchFailure(exception, - new SearchShardTarget(nodeId, new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId))); + new SearchShardTarget(nodeId, + new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), OriginalIndices.NONE)); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 63a3ad0b62d..6f7cc26e59e 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterState; @@ -162,10 +163,8 @@ public class TransportSearchAction extends HandledTransportAction listener) { final long absoluteStartMillis = System.currentTimeMillis(); @@ -173,17 +172,27 @@ public class TransportSearchAction extends HandledTransportAction> remoteClusterIndices; + final OriginalIndices localIndices; + final Map remoteClusterIndices; final ClusterState clusterState = clusterService.state(); if (remoteClusterService.isCrossClusterSearchEnabled()) { - remoteClusterIndices = remoteClusterService.groupClusterIndices( searchRequest.indices(), // empty string is not allowed + final Map> groupedIndices = remoteClusterService.groupClusterIndices(searchRequest.indices(), + // empty string is not allowed idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState)); - List remove = remoteClusterIndices.remove(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY); - localIndices = remove == null ? Strings.EMPTY_ARRAY : remove.toArray(new String[remove.size()]); + List remove = groupedIndices.remove(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY); + String[] indices = remove == null ? Strings.EMPTY_ARRAY : remove.toArray(new String[remove.size()]); + localIndices = new OriginalIndices(indices, searchRequest.indicesOptions()); + Map originalIndicesMap = new HashMap<>(); + for (Map.Entry> entry : groupedIndices.entrySet()) { + String clusterAlias = entry.getKey(); + List originalIndices = entry.getValue(); + originalIndicesMap.put(clusterAlias, + new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), searchRequest.indicesOptions())); + } + remoteClusterIndices = Collections.unmodifiableMap(originalIndicesMap); } else { remoteClusterIndices = Collections.emptyMap(); - localIndices = searchRequest.indices(); + localIndices = new OriginalIndices(searchRequest); } if (remoteClusterIndices.isEmpty()) { @@ -192,18 +201,18 @@ public class TransportSearchAction extends HandledTransportAction { - List remoteShardIterators = new ArrayList<>(); + List remoteShardIterators = new ArrayList<>(); Map remoteAliasFilters = new HashMap<>(); Function connectionFunction = remoteClusterService.processRemoteShards( - searchShardsResponses, remoteShardIterators, remoteAliasFilters); + searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteShardIterators, connectionFunction, clusterState, remoteAliasFilters, listener); }, listener::onFailure)); } } - private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, String[] localIndices, - List remoteShardIterators, Function remoteConnections, + private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices, + List remoteShardIterators, Function remoteConnections, ClusterState clusterState, Map remoteAliasMap, ActionListener listener) { @@ -212,11 +221,11 @@ public class TransportSearchAction extends HandledTransportAction 0) { + if (localIndices.indices().length == 0 && remoteShardIterators.size() > 0) { indices = Index.EMPTY_ARRAY; // don't search on _all if only remote indices were specified } else { indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(), - timeProvider.getAbsoluteStartMillis(), localIndices); + timeProvider.getAbsoluteStartMillis(), localIndices.indices()); } Map aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap); Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), @@ -225,9 +234,9 @@ public class TransportSearchAction extends HandledTransportAction localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, searchRequest.preference()); - GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, remoteShardIterators); + GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, localIndices, remoteShardIterators); failIfOverShardCountLimit(clusterService, shardIterators.size()); @@ -268,19 +277,17 @@ public class TransportSearchAction extends HandledTransportAction remoteShardIterators) { - if (remoteShardIterators.isEmpty()) { - return localShardsIterator; - } - List shards = new ArrayList<>(); - for (ShardIterator shardIterator : remoteShardIterators) { + static GroupShardsIterator mergeShardsIterators(GroupShardsIterator localShardsIterator, + OriginalIndices localIndices, + List remoteShardIterators) { + List shards = new ArrayList<>(); + for (SearchShardIterator shardIterator : remoteShardIterators) { shards.add(shardIterator); } for (ShardIterator shardIterator : localShardsIterator) { - shards.add(shardIterator); + shards.add(new SearchShardIterator(shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices)); } - return new GroupShardsIterator(shards); + return new GroupShardsIterator<>(shards); } @Override @@ -288,7 +295,8 @@ public class TransportSearchAction extends HandledTransportAction shardIterators, SearchTimeProvider timeProvider, Function connectionLookup, long clusterStateVersion, Map aliasFilter, Map concreteIndexBoosts, diff --git a/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index 0408b04cc83..53764f4ee88 100644 --- a/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -94,7 +94,7 @@ public abstract class TransportBroadcastAction shards(ClusterState clusterState, Request request, String[] concreteIndices); protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request); @@ -107,7 +107,7 @@ public abstract class TransportBroadcastAction listener; private final ClusterState clusterState; private final DiscoveryNodes nodes; - private final GroupShardsIterator shardsIts; + private final GroupShardsIterator shardsIts; private final int expectedOps; private final AtomicInteger counterOps = new AtomicInteger(); private final AtomicReferenceArray shardsResponses; diff --git a/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 412f54f4354..3ef967472a5 100644 --- a/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -270,7 +270,7 @@ public abstract class TransportBroadcastByNodeAction(); - for (ShardRouting shard : shardIt.asUnordered()) { + for (ShardRouting shard : shardIt) { // send a request to the shard only if it is assigned to a node that is in the local node's cluster state // a scenario in which a shard can be assigned but to a node that is not in the local node's cluster state // is when the shard is assigned to the master node, the local node has detected the master as failed diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java b/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java index bb1e776f2e9..5ff55a6fa55 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TransportTermVectorsAction.java @@ -58,7 +58,7 @@ public class TransportTermVectorsAction extends TransportSingleShardAction groupShardsIter = clusterService.operationRouting().searchShards(state, new String[] { request.concreteIndex() }, null, request.request().preference()); return groupShardsIter.iterator().next(); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java b/core/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java index e8e752fda12..7b33c24d15f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java @@ -30,14 +30,14 @@ import java.util.List; * ShardsIterators are always returned in ascending order independently of their order at construction * time. The incoming iterators are sorted to ensure consistent iteration behavior across Nodes / JVMs. */ -public final class GroupShardsIterator implements Iterable { +public final class GroupShardsIterator implements Iterable { - private final List iterators; + private final List iterators; /** * Constructs a enw GroupShardsIterator from the given list. */ - public GroupShardsIterator(List iterators) { + public GroupShardsIterator(List iterators) { CollectionUtil.timSort(iterators); this.iterators = iterators; } @@ -60,7 +60,7 @@ public final class GroupShardsIterator implements Iterable { */ public int totalSizeWith1ForEmpty() { int size = 0; - for (ShardIterator shard : iterators) { + for (ShardIt shard : iterators) { size += Math.max(1, shard.size()); } return size; @@ -75,7 +75,7 @@ public final class GroupShardsIterator implements Iterable { } @Override - public Iterator iterator() { + public Iterator iterator() { return iterators.iterator(); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index c587629ef0c..5a0bd0d4263 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -23,7 +23,6 @@ import com.carrotsearch.hppc.IntSet; import com.carrotsearch.hppc.cursors.IntCursor; import com.carrotsearch.hppc.cursors.IntObjectCursor; import org.apache.lucene.util.CollectionUtil; -import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -265,37 +264,6 @@ public class IndexRoutingTable extends AbstractDiffable imple return new PlainShardsIterator(shuffler.shuffle(allActiveShards)); } - /** - * A group shards iterator where each group ({@link ShardIterator} - * is an iterator across shard replication group. - */ - public GroupShardsIterator groupByShardsIt() { - // use list here since we need to maintain identity across shards - ArrayList set = new ArrayList<>(shards.size()); - for (IndexShardRoutingTable indexShard : this) { - set.add(indexShard.shardsIt()); - } - return new GroupShardsIterator(set); - } - - /** - * A groups shards iterator where each groups is a single {@link ShardRouting} and a group - * is created for each shard routing. - *

- * This basically means that components that use the {@link GroupShardsIterator} will iterate - * over *all* the shards (all the replicas) within the index.

- */ - public GroupShardsIterator groupByAllIt() { - // use list here since we need to maintain identity across shards - ArrayList set = new ArrayList<>(); - for (IndexShardRoutingTable indexShard : this) { - for (ShardRouting shardRouting : indexShard) { - set.add(shardRouting.shardsIt()); - } - } - return new GroupShardsIterator(set); - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 6881cc75657..52807251699 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -68,7 +68,7 @@ public class OperationRouting extends AbstractComponent { return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference); } - public GroupShardsIterator searchShards(ClusterState clusterState, String[] concreteIndices, @Nullable Map> routing, @Nullable String preference) { + public GroupShardsIterator searchShards(ClusterState clusterState, String[] concreteIndices, @Nullable Map> routing, @Nullable String preference) { final Set shards = computeTargetedShards(clusterState, concreteIndices, routing); final Set set = new HashSet<>(shards.size()); for (IndexShardRoutingTable shard : shards) { @@ -77,7 +77,7 @@ public class OperationRouting extends AbstractComponent { set.add(iterator); } } - return new GroupShardsIterator(new ArrayList<>(set)); + return new GroupShardsIterator<>(new ArrayList<>(set)); } private static final Map> EMPTY_ROUTING = Collections.emptyMap(); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/PlainShardIterator.java b/core/src/main/java/org/elasticsearch/cluster/routing/PlainShardIterator.java index 5950bd35d37..bb45ca66956 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/PlainShardIterator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/PlainShardIterator.java @@ -43,7 +43,6 @@ public class PlainShardIterator extends PlainShardsIterator implements ShardIter this.shardId = shardId; } - @Override public ShardId shardId() { return this.shardId; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java b/core/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java index c2ac9416079..6cb1989a8dd 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.cluster.routing; +import java.util.Collections; +import java.util.Iterator; import java.util.List; /** @@ -74,7 +76,12 @@ public class PlainShardsIterator implements ShardsIterator { } @Override - public Iterable asUnordered() { - return shards; + public List getShardRoutings() { + return Collections.unmodifiableList(shards); + } + + @Override + public Iterator iterator() { + return shards.iterator(); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index 0b1a0044567..a248d6a939a 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -238,7 +238,7 @@ public class RoutingTable implements Iterable, Diffable allAssignedShardsGrouped(String[] indices, boolean includeEmpty) { return allAssignedShardsGrouped(indices, includeEmpty, false); } @@ -249,14 +249,14 @@ public class RoutingTable implements Iterable, Diffableextra shard iterator will be added for relocating shards. The extra * iterator contains a single ShardRouting pointing at the relocating target */ - public GroupShardsIterator allAssignedShardsGrouped(String[] indices, boolean includeEmpty, boolean includeRelocationTargets) { + public GroupShardsIterator allAssignedShardsGrouped(String[] indices, boolean includeEmpty, boolean includeRelocationTargets) { return allSatisfyingPredicateShardsGrouped(indices, includeEmpty, includeRelocationTargets, ASSIGNED_PREDICATE); } - private static Predicate ACTIVE_PREDICATE = shardRouting -> shardRouting.active(); - private static Predicate ASSIGNED_PREDICATE = shardRouting -> shardRouting.assignedToNode(); + private static Predicate ACTIVE_PREDICATE = ShardRouting::active; + private static Predicate ASSIGNED_PREDICATE = ShardRouting::assignedToNode; - private GroupShardsIterator allSatisfyingPredicateShardsGrouped(String[] indices, boolean includeEmpty, boolean includeRelocationTargets, Predicate predicate) { + private GroupShardsIterator allSatisfyingPredicateShardsGrouped(String[] indices, boolean includeEmpty, boolean includeRelocationTargets, Predicate predicate) { // use list here since we need to maintain identity across shards ArrayList set = new ArrayList<>(); for (String index : indices) { @@ -278,7 +278,7 @@ public class RoutingTable implements Iterable, Diffable(set); } public ShardsIterator allShards(String[] indices) { @@ -320,9 +320,8 @@ public class RoutingTable implements Iterable, Diffable activePrimaryShardsGrouped(String[] indices, boolean includeEmpty) { // use list here since we need to maintain identity across shards ArrayList set = new ArrayList<>(); for (String index : indices) { @@ -339,7 +338,7 @@ public class RoutingTable implements Iterable, Diffable(set); } @Override diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java b/core/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java index 024138e4db6..638875ea071 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java @@ -18,10 +18,12 @@ */ package org.elasticsearch.cluster.routing; +import java.util.List; + /** * Allows to iterate over unrelated shards. */ -public interface ShardsIterator { +public interface ShardsIterator extends Iterable { /** * Resets the iterator to its initial state. @@ -60,6 +62,9 @@ public interface ShardsIterator { @Override boolean equals(Object other); - Iterable asUnordered(); + /** + * Returns the {@link ShardRouting}s that this shards iterator holds. + */ + List getShardRoutings(); } diff --git a/core/src/main/java/org/elasticsearch/search/SearchHit.java b/core/src/main/java/org/elasticsearch/search/SearchHit.java index 71b0b9127b2..d0d5047863f 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchHit.java +++ b/core/src/main/java/org/elasticsearch/search/SearchHit.java @@ -21,6 +21,7 @@ package org.elasticsearch.search; import org.apache.lucene.search.Explanation; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParsingException; @@ -544,7 +545,7 @@ public final class SearchHit implements Streamable, ToXContentObject, Iterable asUnordered() { - return null; - } - }; - return new AbstractSearchAsyncAction( "test", null, @@ -108,7 +63,7 @@ public class AbstractSearchAsyncActionTookTests extends ESTestCase { null, null, null, - new GroupShardsIterator(Collections.singletonList(it)), + new GroupShardsIterator<>(Collections.singletonList(new SearchShardIterator(null, Collections.emptyList(), null))), timeProvider, 0, null, @@ -123,7 +78,7 @@ public class AbstractSearchAsyncActionTookTests extends ESTestCase { @Override protected void executePhaseOnShard( - final ShardIterator shardIt, + final SearchShardIterator shardIt, final ShardRouting shard, final SearchActionListener listener) { @@ -157,5 +112,4 @@ public class AbstractSearchAsyncActionTookTests extends ESTestCase { assertThat(actual, greaterThanOrEqualTo(TimeUnit.NANOSECONDS.toMillis(expected.get()))); } } - } diff --git a/core/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/core/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index 4a5b65c0a0a..98b6d2e7527 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/core/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.search; import org.apache.logging.log4j.Logger; -import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; @@ -29,14 +29,11 @@ import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.transport.Transport; import org.junit.Assert; -import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -114,7 +111,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext { } @Override - public ShardSearchTransportRequest buildShardSearchRequest(ShardIterator shardIt, ShardRouting shard) { + public ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt, ShardRouting shard) { Assert.fail("should not be called"); return null; } @@ -145,7 +142,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext { } @Override - public void sendReleaseSearchContext(long contextId, Transport.Connection connection) { + public void sendReleaseSearchContext(long contextId, Transport.Connection connection, OriginalIndices originalIndices) { releasedSearchContexts.add(contextId); } } diff --git a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java index d73b6709121..8cf6d7d48c7 100644 --- a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java @@ -382,7 +382,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { failReference.set(x); responseLatch.countDown(); }); - connection.fetchSearchShards(request, Arrays.asList("test-index"), shardsListener); + connection.fetchSearchShards(request, new String[]{"test-index"}, shardsListener); responseLatch.await(); assertNull(failReference.get()); assertNotNull(reference.get()); diff --git a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterServiceTests.java b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterServiceTests.java index 81ee9141e2b..63f6e8aa5a6 100644 --- a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/RemoteClusterServiceTests.java @@ -20,10 +20,11 @@ package org.elasticsearch.action.search; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; @@ -204,7 +205,7 @@ public class RemoteClusterServiceTests extends ESTestCase { public void testProcessRemoteShards() throws IOException { try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, null)) { assertFalse(service.isCrossClusterSearchEnabled()); - List iteratorList = new ArrayList<>(); + List iteratorList = new ArrayList<>(); Map searchShardsResponseMap = new HashMap<>(); DiscoveryNode[] nodes = new DiscoveryNode[] { new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT), @@ -225,11 +226,26 @@ public class RemoteClusterServiceTests extends ESTestCase { TestShardRouting.newShardRouting("bar", 0, "node1", false, ShardRoutingState.STARTED)}) }; searchShardsResponseMap.put("test_cluster_1", new ClusterSearchShardsResponse(groups, nodes, indicesAndAliases)); + DiscoveryNode[] nodes2 = new DiscoveryNode[] { + new DiscoveryNode("node3", buildNewFakeTransportAddress(), Version.CURRENT) + }; + ClusterSearchShardsGroup[] groups2 = new ClusterSearchShardsGroup[] { + new ClusterSearchShardsGroup(new ShardId("xyz", "xyz_id", 0), + new ShardRouting[] {TestShardRouting.newShardRouting("xyz", 0, "node3", true, ShardRoutingState.STARTED)}) + }; + searchShardsResponseMap.put("test_cluster_2", new ClusterSearchShardsResponse(groups2, nodes2, null)); + + Map remoteIndicesByCluster = new HashMap<>(); + remoteIndicesByCluster.put("test_cluster_1", + new OriginalIndices(new String[]{"fo*", "ba*"}, IndicesOptions.strictExpandOpenAndForbidClosed())); + remoteIndicesByCluster.put("test_cluster_2", + new OriginalIndices(new String[]{"x*"}, IndicesOptions.strictExpandOpenAndForbidClosed())); Map remoteAliases = new HashMap<>(); - service.processRemoteShards(searchShardsResponseMap, iteratorList, remoteAliases); - assertEquals(3, iteratorList.size()); - for (ShardIterator iterator : iteratorList) { + service.processRemoteShards(searchShardsResponseMap, remoteIndicesByCluster, iteratorList, remoteAliases); + assertEquals(4, iteratorList.size()); + for (SearchShardIterator iterator : iteratorList) { if (iterator.shardId().getIndexName().endsWith("foo")) { + assertArrayEquals(new String[]{"fo*", "ba*"}, iterator.getOriginalIndices().indices()); assertTrue(iterator.shardId().getId() == 0 || iterator.shardId().getId() == 1); assertEquals("test_cluster_1:foo", iterator.shardId().getIndexName()); ShardRouting shardRouting = iterator.nextOrNull(); @@ -239,7 +255,8 @@ public class RemoteClusterServiceTests extends ESTestCase { assertNotNull(shardRouting); assertEquals(shardRouting.getIndexName(), "foo"); assertNull(iterator.nextOrNull()); - } else { + } else if (iterator.shardId().getIndexName().endsWith("bar")) { + assertArrayEquals(new String[]{"fo*", "ba*"}, iterator.getOriginalIndices().indices()); assertEquals(0, iterator.shardId().getId()); assertEquals("test_cluster_1:bar", iterator.shardId().getIndexName()); ShardRouting shardRouting = iterator.nextOrNull(); @@ -249,13 +266,23 @@ public class RemoteClusterServiceTests extends ESTestCase { assertNotNull(shardRouting); assertEquals(shardRouting.getIndexName(), "bar"); assertNull(iterator.nextOrNull()); + } else if (iterator.shardId().getIndexName().endsWith("xyz")) { + assertArrayEquals(new String[]{"x*"}, iterator.getOriginalIndices().indices()); + assertEquals(0, iterator.shardId().getId()); + assertEquals("test_cluster_2:xyz", iterator.shardId().getIndexName()); + ShardRouting shardRouting = iterator.nextOrNull(); + assertNotNull(shardRouting); + assertEquals(shardRouting.getIndexName(), "xyz"); + assertNull(iterator.nextOrNull()); } } - assertEquals(2, remoteAliases.size()); + assertEquals(3, remoteAliases.size()); assertTrue(remoteAliases.toString(), remoteAliases.containsKey("foo_id")); assertTrue(remoteAliases.toString(), remoteAliases.containsKey("bar_id")); + assertTrue(remoteAliases.toString(), remoteAliases.containsKey("xyz_id")); assertEquals(new TermsQueryBuilder("foo", "bar"), remoteAliases.get("foo_id").getQueryBuilder()); assertEquals(new MatchAllQueryBuilder(), remoteAliases.get("bar_id").getQueryBuilder()); + assertNull(remoteAliases.get("xyz_id").getQueryBuilder()); } } diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 4813dc8ae7d..2d94fe2edd0 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -20,11 +20,11 @@ package org.elasticsearch.action.search; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.PlainShardIterator; import org.elasticsearch.cluster.routing.RecoverySource; -import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Strings; @@ -76,12 +76,14 @@ public class SearchAsyncActionTests extends ESTestCase { Map> nodeToContextMap = new HashMap<>(); AtomicInteger contextIdGenerator = new AtomicInteger(0); - GroupShardsIterator shardsIter = getShardsIter("idx", randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode); + GroupShardsIterator shardsIter = getShardsIter("idx", + new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()), + randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode); AtomicInteger numFreedContext = new AtomicInteger(); SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, Collections.singleton(RemoteClusterService.REMOTE_CLUSTERS_SEEDS)), null) { @Override - public void sendFreeContext(Transport.Connection connection, long contextId, SearchRequest request) { + public void sendFreeContext(Transport.Connection connection, long contextId, OriginalIndices originalIndices) { numFreedContext.incrementAndGet(); assertTrue(nodeToContextMap.containsKey(connection.getNode())); assertTrue(nodeToContextMap.get(connection.getNode()).remove(contextId)); @@ -110,7 +112,7 @@ public class SearchAsyncActionTests extends ESTestCase { TestSearchResponse response = new TestSearchResponse(); @Override - protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, SearchActionListener + protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, SearchActionListener listener) { assertTrue("shard: " + shard.shardId() + " has been queried twice", response.queried.add(shard.shardId())); Transport.Connection connection = getConnection(shard.currentNodeId()); @@ -133,7 +135,7 @@ public class SearchAsyncActionTests extends ESTestCase { for (int i = 0; i < results.getNumShards(); i++) { TestSearchPhaseResult result = results.results.get(i); assertEquals(result.node.getId(), result.getSearchShardTarget().getNodeId()); - sendReleaseSearchContext(result.getRequestId(), new MockConnection(result.node)); + sendReleaseSearchContext(result.getRequestId(), new MockConnection(result.node), OriginalIndices.NONE); } responseListener.onResponse(response); latch.countDown(); @@ -154,9 +156,9 @@ public class SearchAsyncActionTests extends ESTestCase { } } - private GroupShardsIterator getShardsIter(String index, int numShards, boolean doReplicas, DiscoveryNode primaryNode, - DiscoveryNode replicaNode) { - ArrayList list = new ArrayList<>(); + private static GroupShardsIterator getShardsIter(String index, OriginalIndices originalIndices, int numShards, + boolean doReplicas, DiscoveryNode primaryNode, DiscoveryNode replicaNode) { + ArrayList list = new ArrayList<>(); for (int i = 0; i < numShards; i++) { ArrayList started = new ArrayList<>(); ArrayList initializing = new ArrayList<>(); @@ -184,9 +186,9 @@ public class SearchAsyncActionTests extends ESTestCase { } Collections.shuffle(started, random()); started.addAll(initializing); - list.add(new PlainShardIterator(new ShardId(new Index(index, "_na_"), i), started)); + list.add(new SearchShardIterator(new ShardId(new Index(index, "_na_"), i), started, originalIndices)); } - return new GroupShardsIterator(list); + return new GroupShardsIterator<>(list); } public static class TestSearchResponse extends SearchResponse { diff --git a/core/src/test/java/org/elasticsearch/action/search/ShardSearchFailureTests.java b/core/src/test/java/org/elasticsearch/action/search/ShardSearchFailureTests.java index 96afbb276d5..eac949c7753 100644 --- a/core/src/test/java/org/elasticsearch/action/search/ShardSearchFailureTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/ShardSearchFailureTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentParser; @@ -42,7 +43,7 @@ public class ShardSearchFailureTests extends ESTestCase { String indexUuid = randomAlphaOfLengthBetween(5, 10); int shardId = randomInt(); return new ShardSearchFailure(ex, - new SearchShardTarget(nodeId, new ShardId(new Index(indexName, indexUuid), shardId))); + new SearchShardTarget(nodeId, new ShardId(new Index(indexName, indexUuid), shardId), null)); } public void testFromXContent() throws IOException { @@ -73,7 +74,7 @@ public class ShardSearchFailureTests extends ESTestCase { public void testToXContent() throws IOException { ShardSearchFailure failure = new ShardSearchFailure(new ParsingException(0, 0, "some message", null), - new SearchShardTarget("nodeId", new ShardId(new Index("indexName", "indexUuid"), 123))); + new SearchShardTarget("nodeId", new ShardId(new Index("indexName", "indexUuid"), 123), OriginalIndices.NONE)); BytesReference xContent = toXContent(failure, XContentType.JSON, randomBoolean()); assertEquals( "{\"shard\":123," diff --git a/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java new file mode 100644 index 00000000000..696e25de75e --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -0,0 +1,122 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.PlainShardIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; + +public class TransportSearchActionTests extends ESTestCase { + + public void testMergeShardsIterators() throws IOException { + List localShardIterators = new ArrayList<>(); + { + ShardId shardId = new ShardId("local_index", "local_index_uuid", 0); + ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, "local_node", true, STARTED); + ShardIterator shardIterator = new PlainShardIterator(shardId, Collections.singletonList(shardRouting)); + localShardIterators.add(shardIterator); + } + { + ShardId shardId2 = new ShardId("local_index_2", "local_index_2_uuid", 1); + ShardRouting shardRouting2 = TestShardRouting.newShardRouting(shardId2, "local_node", true, STARTED); + ShardIterator shardIterator2 = new PlainShardIterator(shardId2, Collections.singletonList(shardRouting2)); + localShardIterators.add(shardIterator2); + } + GroupShardsIterator localShardsIterator = new GroupShardsIterator<>(localShardIterators); + + OriginalIndices localIndices = new OriginalIndices(new String[]{"local_alias", "local_index_2"}, + IndicesOptions.strictExpandOpenAndForbidClosed()); + + OriginalIndices remoteIndices = new OriginalIndices(new String[]{"remote_alias", "remote_index_2"}, + IndicesOptions.strictExpandOpen()); + List remoteShardIterators = new ArrayList<>(); + { + ShardId remoteShardId = new ShardId("remote_index", "remote_index_uuid", 2); + ShardRouting remoteShardRouting = TestShardRouting.newShardRouting(remoteShardId, "remote_node", true, STARTED); + SearchShardIterator remoteShardIterator = new SearchShardIterator(remoteShardId, + Collections.singletonList(remoteShardRouting), remoteIndices); + remoteShardIterators.add(remoteShardIterator); + } + { + ShardId remoteShardId2 = new ShardId("remote_index_2", "remote_index_2_uuid", 3); + ShardRouting remoteShardRouting2 = TestShardRouting.newShardRouting(remoteShardId2, "remote_node", true, STARTED); + SearchShardIterator remoteShardIterator2 = new SearchShardIterator(remoteShardId2, + Collections.singletonList(remoteShardRouting2), remoteIndices); + remoteShardIterators.add(remoteShardIterator2); + } + OriginalIndices remoteIndices2 = new OriginalIndices(new String[]{"remote_index_3"}, IndicesOptions.strictExpand()); + + { + ShardId remoteShardId3 = new ShardId("remote_index_3", "remote_index_3_uuid", 4); + ShardRouting remoteShardRouting3 = TestShardRouting.newShardRouting(remoteShardId3, "remote_node", true, STARTED); + SearchShardIterator remoteShardIterator3 = new SearchShardIterator(remoteShardId3, + Collections.singletonList(remoteShardRouting3), remoteIndices2); + remoteShardIterators.add(remoteShardIterator3); + } + + GroupShardsIterator searchShardIterators = TransportSearchAction.mergeShardsIterators(localShardsIterator, + localIndices, remoteShardIterators); + + assertEquals(searchShardIterators.size(), 5); + int i = 0; + for (SearchShardIterator searchShardIterator : searchShardIterators) { + switch(i++) { + case 0: + assertEquals("local_index", searchShardIterator.shardId().getIndexName()); + assertEquals(0, searchShardIterator.shardId().getId()); + assertSame(localIndices, searchShardIterator.getOriginalIndices()); + break; + case 1: + assertEquals("local_index_2", searchShardIterator.shardId().getIndexName()); + assertEquals(1, searchShardIterator.shardId().getId()); + assertSame(localIndices, searchShardIterator.getOriginalIndices()); + break; + case 2: + assertEquals("remote_index", searchShardIterator.shardId().getIndexName()); + assertEquals(2, searchShardIterator.shardId().getId()); + assertSame(remoteIndices, searchShardIterator.getOriginalIndices()); + break; + case 3: + assertEquals("remote_index_2", searchShardIterator.shardId().getIndexName()); + assertEquals(3, searchShardIterator.shardId().getId()); + assertSame(remoteIndices, searchShardIterator.getOriginalIndices()); + break; + case 4: + assertEquals("remote_index_3", searchShardIterator.shardId().getIndexName()); + assertEquals(4, searchShardIterator.shardId().getId()); + assertSame(remoteIndices2, searchShardIterator.getOriginalIndices()); + break; + } + } + } +} diff --git a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index ef025e3c37b..93d8be990de 100644 --- a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -296,7 +296,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase { ShardsIterator shardIt = clusterService.state().routingTable().allShards(new String[]{TEST_INDEX}); Set set = new HashSet<>(); - for (ShardRouting shard : shardIt.asUnordered()) { + for (ShardRouting shard : shardIt) { set.add(shard.currentNodeId()); } @@ -332,7 +332,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase { // the master should not be in the list of nodes that requests were sent to ShardsIterator shardIt = clusterService.state().routingTable().allShards(new String[]{TEST_INDEX}); Set set = new HashSet<>(); - for (ShardRouting shard : shardIt.asUnordered()) { + for (ShardRouting shard : shardIt) { if (!shard.currentNodeId().equals(masterNode.getId())) { set.add(shard.currentNodeId()); } @@ -352,8 +352,8 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase { public void testOperationExecution() throws Exception { ShardsIterator shardIt = clusterService.state().routingTable().allShards(new String[]{TEST_INDEX}); Set shards = new HashSet<>(); - String nodeId = shardIt.asUnordered().iterator().next().currentNodeId(); - for (ShardRouting shard : shardIt.asUnordered()) { + String nodeId = shardIt.iterator().next().currentNodeId(); + for (ShardRouting shard : shardIt) { if (nodeId.equals(shard.currentNodeId())) { shards.add(shard); } @@ -417,7 +417,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase { ShardsIterator shardIt = clusterService.state().getRoutingTable().allShards(new String[]{TEST_INDEX}); Map> map = new HashMap<>(); - for (ShardRouting shard : shardIt.asUnordered()) { + for (ShardRouting shard : shardIt) { if (!map.containsKey(shard.currentNodeId())) { map.put(shard.currentNodeId(), new ArrayList<>()); } diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java index 91420fa227a..fe11cc9cd5f 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java @@ -165,10 +165,9 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa private List createExistingShards(ClusterState currentState, String reason) { List shards = new ArrayList<>(); - GroupShardsIterator shardGroups = - currentState.routingTable().allAssignedShardsGrouped(new String[] { INDEX }, true); + GroupShardsIterator shardGroups = currentState.routingTable().allAssignedShardsGrouped(new String[] { INDEX }, true); for (ShardIterator shardIt : shardGroups) { - for (ShardRouting shard : shardIt.asUnordered()) { + for (ShardRouting shard : shardIt) { shards.add(shard); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/GroupShardsIteratorTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/GroupShardsIteratorTests.java index 72cf0391fd4..f2571fce339 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/GroupShardsIteratorTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/GroupShardsIteratorTests.java @@ -43,7 +43,7 @@ public class GroupShardsIteratorTests extends ESTestCase { list.add(new PlainShardIterator(new ShardId(index, 0), Arrays.asList(newRouting(index, 0, true)))); list.add(new PlainShardIterator(new ShardId(index, 1), Arrays.asList(newRouting(index, 1, true)))); - GroupShardsIterator iter = new GroupShardsIterator(list); + GroupShardsIterator iter = new GroupShardsIterator<>(list); assertEquals(7, iter.totalSizeWith1ForEmpty()); assertEquals(5, iter.size()); assertEquals(6, iter.totalSize()); @@ -67,7 +67,7 @@ public class GroupShardsIteratorTests extends ESTestCase { Collections.shuffle(list, random()); ArrayList actualIterators = new ArrayList<>(); - GroupShardsIterator iter = new GroupShardsIterator(list); + GroupShardsIterator iter = new GroupShardsIterator<>(list); for (ShardIterator shardsIterator : iter) { actualIterators.add(shardsIterator); } diff --git a/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java b/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java index 86fa25872e0..172bcd6bd55 100644 --- a/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/structure/RoutingIteratorTests.java @@ -376,7 +376,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase { OperationRouting operationRouting = new OperationRouting(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - GroupShardsIterator shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_shards:0"); + GroupShardsIterator shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_shards:0"); assertThat(shardIterators.size(), equalTo(1)); assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0)); @@ -443,7 +443,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase { clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); // When replicas haven't initialized, it comes back with the primary first, then initializing replicas - GroupShardsIterator shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica_first"); + GroupShardsIterator shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica_first"); assertThat(shardIterators.size(), equalTo(2)); // two potential shards ShardIterator iter = shardIterators.iterator().next(); assertThat(iter.size(), equalTo(3)); // three potential candidates for the shard diff --git a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java index 91cfba0c70d..9bb180c9818 100644 --- a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -35,7 +35,6 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardIterator; @@ -52,7 +51,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.gateway.PrimaryShardAllocator; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergePolicyConfig; @@ -73,9 +71,6 @@ import org.elasticsearch.test.MockIndexEventListener; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.ConnectionProfile; -import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -292,7 +287,7 @@ public class CorruptedFileIT extends ESIntegTestCase { } assertThat(response.getStatus(), is(ClusterHealthStatus.RED)); ClusterState state = client().admin().cluster().prepareState().get().getState(); - GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false); + GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false); for (ShardIterator iterator : shardIterators) { ShardRouting routing; while ((routing = iterator.nextOrNull()) != null) { diff --git a/core/src/test/java/org/elasticsearch/index/suggest/stats/SuggestStatsIT.java b/core/src/test/java/org/elasticsearch/index/suggest/stats/SuggestStatsIT.java index 925bf56fe70..25eb6df4c27 100644 --- a/core/src/test/java/org/elasticsearch/index/suggest/stats/SuggestStatsIT.java +++ b/core/src/test/java/org/elasticsearch/index/suggest/stats/SuggestStatsIT.java @@ -150,10 +150,10 @@ public class SuggestStatsIT extends ESIntegTestCase { private Set nodeIdsWithIndex(String... indices) { ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); - GroupShardsIterator allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true); + GroupShardsIterator allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true); Set nodes = new HashSet<>(); for (ShardIterator shardIterator : allAssignedShardsGrouped) { - for (ShardRouting routing : shardIterator.asUnordered()) { + for (ShardRouting routing : shardIterator) { if (routing.active()) { nodes.add(routing.currentNodeId()); } diff --git a/core/src/test/java/org/elasticsearch/search/SearchHitTests.java b/core/src/test/java/org/elasticsearch/search/SearchHitTests.java index 51fffc3e95f..a2c11e8a641 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchHitTests.java +++ b/core/src/test/java/org/elasticsearch/search/SearchHitTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.search; import org.apache.lucene.search.Explanation; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; @@ -128,7 +129,8 @@ public class SearchHitTests extends ESTestCase { } if (randomBoolean()) { hit.shard(new SearchShardTarget(randomAlphaOfLengthBetween(5, 10), - new ShardId(new Index(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)), randomInt()))); + new ShardId(new Index(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)), randomInt()), + OriginalIndices.NONE)); } return hit; } diff --git a/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 6fc795a8825..31edc3ac808 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -184,8 +184,8 @@ public class SearchServiceTests extends ESSingleNodeTestCase { for (int i = 0; i < rounds; i++) { try { SearchPhaseResult searchPhaseResult = service.executeQueryPhase( - new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, - new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f), + new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, + new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f), new SearchTask(123L, "", "", "", null)); IntArrayList intCursors = new IntArrayList(1); intCursors.add(0); @@ -213,16 +213,16 @@ public class SearchServiceTests extends ESSingleNodeTestCase { final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); final IndexShard indexShard = indexService.getShard(0); final SearchContext contextWithDefaultTimeout = service.createContext( - new ShardSearchLocalRequest( - indexShard.shardId(), - 1, - SearchType.DEFAULT, - new SearchSourceBuilder(), - new String[0], - false, - new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f), - null); + new ShardSearchLocalRequest( + indexShard.shardId(), + 1, + SearchType.DEFAULT, + new SearchSourceBuilder(), + new String[0], + false, + new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f), + null); try { // the search context should inherit the default timeout assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5))); @@ -233,15 +233,15 @@ public class SearchServiceTests extends ESSingleNodeTestCase { final long seconds = randomIntBetween(6, 10); final SearchContext context = service.createContext( - new ShardSearchLocalRequest( - indexShard.shardId(), - 1, - SearchType.DEFAULT, - new SearchSourceBuilder().timeout(TimeValue.timeValueSeconds(seconds)), - new String[0], - false, - new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f), + new ShardSearchLocalRequest( + indexShard.shardId(), + 1, + SearchType.DEFAULT, + new SearchSourceBuilder().timeout(TimeValue.timeValueSeconds(seconds)), + new String[0], + false, + new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f), null); try { // the search context should inherit the query timeout diff --git a/core/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java b/core/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java index b93b3795cb5..7a0e10af99c 100644 --- a/core/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java +++ b/core/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.search.internal; import org.elasticsearch.Version; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -95,7 +96,7 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase { } else { filteringAliases = new AliasFilter(null, Strings.EMPTY_ARRAY); } - return new ShardSearchTransportRequest(searchRequest, shardId, + return new ShardSearchTransportRequest(new OriginalIndices(searchRequest), searchRequest, shardId, randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), Math.abs(randomLong())); } diff --git a/core/src/test/java/org/elasticsearch/search/stats/SearchStatsIT.java b/core/src/test/java/org/elasticsearch/search/stats/SearchStatsIT.java index 83fb38f18a2..11806a1cea9 100644 --- a/core/src/test/java/org/elasticsearch/search/stats/SearchStatsIT.java +++ b/core/src/test/java/org/elasticsearch/search/stats/SearchStatsIT.java @@ -165,10 +165,10 @@ public class SearchStatsIT extends ESIntegTestCase { private Set nodeIdsWithIndex(String... indices) { ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); - GroupShardsIterator allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true); + GroupShardsIterator allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true); Set nodes = new HashSet<>(); for (ShardIterator shardIterator : allAssignedShardsGrouped) { - for (ShardRouting routing : shardIterator.asUnordered()) { + for (ShardRouting routing : shardIterator) { if (routing.active()) { nodes.add(routing.currentNodeId()); }