diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 0abebebdb18..d5ee044782b 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchPhaseResult; @@ -44,7 +43,7 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; +import java.util.function.BiFunction; import java.util.stream.Collectors; abstract class AbstractSearchAsyncAction extends InitialSearchPhase @@ -58,7 +57,7 @@ abstract class AbstractSearchAsyncAction exten /** * Used by subclasses to resolve node ids to DiscoveryNodes. **/ - private final Function nodeIdToConnection; + private final BiFunction nodeIdToConnection; private final SearchTask task; private final SearchPhaseResults results; private final long clusterStateVersion; @@ -71,7 +70,7 @@ abstract class AbstractSearchAsyncAction exten protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService, - Function nodeIdToConnection, + BiFunction nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, @@ -210,7 +209,7 @@ abstract class AbstractSearchAsyncAction exten results.getSuccessfulResults().forEach((entry) -> { try { SearchShardTarget searchShardTarget = entry.getSearchShardTarget(); - Transport.Connection connection = nodeIdToConnection.apply(searchShardTarget.getNodeId()); + Transport.Connection connection = getConnection(null, searchShardTarget.getNodeId()); sendReleaseSearchContext(entry.getRequestId(), connection, searchShardTarget.getOriginalIndices()); } catch (Exception inner) { inner.addSuppressed(exception); @@ -273,8 +272,8 @@ abstract class AbstractSearchAsyncAction exten } @Override - public final Transport.Connection getConnection(String nodeId) { - return nodeIdToConnection.apply(nodeId); + public final Transport.Connection getConnection(String clusterAlias, String nodeId) { + return nodeIdToConnection.apply(clusterAlias, nodeId); } @Override @@ -297,10 +296,10 @@ abstract class AbstractSearchAsyncAction exten listener.onFailure(e); } - public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt, ShardRouting shard) { - AliasFilter filter = aliasFilter.get(shard.index().getUUID()); + public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt) { + AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID()); assert filter != null; - float indexBoost = concreteIndexBoosts.getOrDefault(shard.index().getUUID(), DEFAULT_INDEX_BOOST); + float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST); return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis()); } diff --git a/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java b/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java index 66a88ce2fee..a72dcac4f24 100644 --- a/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java @@ -72,7 +72,7 @@ final class DfsQueryPhase extends SearchPhase { () -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)), context); for (final DfsSearchResult dfsResult : resultList) { final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget(); - Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId()); + Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); QuerySearchRequest querySearchRequest = new QuerySearchRequest(searchShardTarget.getOriginalIndices(), dfsResult.getRequestId(), dfs); final int shardIndex = dfsResult.getShardIndex(); diff --git a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 25231efe49b..b824a46c50f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -136,7 +136,8 @@ final class FetchSearchPhase extends SearchPhase { counter.countDown(); } else { SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget(); - Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId()); + Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), + searchShardTarget.getNodeId()); ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry, lastEmittedDocPerShard, searchShardTarget.getOriginalIndices()); executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(), @@ -191,7 +192,7 @@ final class FetchSearchPhase extends SearchPhase { if (context.getRequest().scroll() == null && queryResult.hasSearchContext()) { try { SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget(); - Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId()); + Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); context.sendReleaseSearchContext(queryResult.getRequestId(), connection, searchShardTarget.getOriginalIndices()); } catch (Exception e) { context.getLogger().trace("failed to release context", e); diff --git a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index 2453e2b80b5..de58b190642 100644 --- a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -67,7 +67,8 @@ abstract class InitialSearchPhase extends final SearchShardIterator shardIt, Exception e) { // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard - SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId(), shardIt.getOriginalIndices()); + SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId(), shardIt.getClusterAlias(), + shardIt.getOriginalIndices()); onShardFailure(shardIndex, shardTarget, e); if (totalOps.incrementAndGet() == expectedTotalOps) { @@ -144,7 +145,7 @@ abstract class InitialSearchPhase extends } else { try { executePhaseOnShard(shardIt, shard, new SearchActionListener(new SearchShardTarget(shard.currentNodeId(), - shardIt.shardId(), shardIt.getOriginalIndices()), shardIndex) { + shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) { @Override public void innerOnResponse(FirstResult result) { onShardResult(result, shardIt); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index be8cb0cff01..a87b58c4e67 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -29,62 +29,33 @@ import org.elasticsearch.transport.Transport; import java.util.Map; import java.util.concurrent.Executor; -import java.util.function.Function; +import java.util.function.BiFunction; final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { private final SearchPhaseController searchPhaseController; - SearchDfsQueryThenFetchAsyncAction( - final Logger logger, - final SearchTransportService searchTransportService, - final Function nodeIdToConnection, - final Map aliasFilter, - final Map concreteIndexBoosts, - final SearchPhaseController searchPhaseController, - final Executor executor, - final SearchRequest request, - final ActionListener listener, - final GroupShardsIterator shardsIts, - final TransportSearchAction.SearchTimeProvider timeProvider, - final long clusterStateVersion, - final SearchTask task) { - super( - "dfs", - logger, - searchTransportService, - nodeIdToConnection, - aliasFilter, - concreteIndexBoosts, - executor, - request, - listener, - shardsIts, - timeProvider, - clusterStateVersion, - task, - new SearchPhaseResults<>(shardsIts.size())); + SearchDfsQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService, + final BiFunction nodeIdToConnection, final Map aliasFilter, + final Map concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor, + final SearchRequest request, final ActionListener listener, + final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, + final long clusterStateVersion, final SearchTask task) { + super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, + shardsIts, timeProvider, clusterStateVersion, task, new SearchPhaseResults<>(shardsIts.size())); this.searchPhaseController = searchPhaseController; } @Override - protected void executePhaseOnShard( - final SearchShardIterator shardIt, - final ShardRouting shard, - final SearchActionListener listener) { - getSearchTransport().sendExecuteDfs(getConnection(shard.currentNodeId()), - buildShardSearchRequest(shardIt, shard) , getTask(), listener); + protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard, + final SearchActionListener listener) { + getSearchTransport().sendExecuteDfs(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()), + buildShardSearchRequest(shardIt) , getTask(), listener); } @Override - protected SearchPhase getNextPhase( - final SearchPhaseResults results, final SearchPhaseContext context) { - return new DfsQueryPhase( - results.results, - searchPhaseController, - (queryResults) -> - new FetchSearchPhase(queryResults, searchPhaseController, context), - context); + protected SearchPhase getNextPhase(final SearchPhaseResults results, final SearchPhaseContext context) { + return new DfsQueryPhase(results.results, searchPhaseController, (queryResults) -> + new FetchSearchPhase(queryResults, searchPhaseController, context), context); } - } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java index a109ab96397..9829ff6a983 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java @@ -21,7 +21,6 @@ package org.elasticsearch.action.search; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.InternalSearchResponse; @@ -84,7 +83,7 @@ interface SearchPhaseContext extends ActionListener, Executor { * Returns a connection to the node if connected otherwise and {@link org.elasticsearch.transport.ConnectTransportException} will be * thrown. */ - Transport.Connection getConnection(String nodeId); + Transport.Connection getConnection(String clusterAlias, String nodeId); /** * Returns the {@link SearchTransportService} to send shard request to other nodes @@ -106,7 +105,7 @@ interface SearchPhaseContext extends ActionListener, Executor { /** * Builds an request for the initial search phase. */ - ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt, ShardRouting shard); + ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt); /** * Processes the phase transition from on phase to another. This method handles all errors that happen during the initial run execution diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 855e0216284..de8109aadd8 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -29,59 +29,31 @@ import org.elasticsearch.transport.Transport; import java.util.Map; import java.util.concurrent.Executor; -import java.util.function.Function; +import java.util.function.BiFunction; final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { private final SearchPhaseController searchPhaseController; - SearchQueryThenFetchAsyncAction( - final Logger logger, - final SearchTransportService searchTransportService, - final Function nodeIdToConnection, - final Map aliasFilter, - final Map concreteIndexBoosts, - final SearchPhaseController searchPhaseController, - final Executor executor, - final SearchRequest request, - final ActionListener listener, - final GroupShardsIterator shardsIts, - final TransportSearchAction.SearchTimeProvider timeProvider, - long clusterStateVersion, - SearchTask task) { - super( - "query", - logger, - searchTransportService, - nodeIdToConnection, - aliasFilter, - concreteIndexBoosts, - executor, - request, - listener, - shardsIts, - timeProvider, - clusterStateVersion, - task, - searchPhaseController.newSearchPhaseResults(request, shardsIts.size())); + SearchQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService, + final BiFunction nodeIdToConnection, final Map aliasFilter, + final Map concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor, + final SearchRequest request, final ActionListener listener, + final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, + long clusterStateVersion, SearchTask task) { + super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, + shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size())); this.searchPhaseController = searchPhaseController; } - protected void executePhaseOnShard( - final SearchShardIterator shardIt, - final ShardRouting shard, - final SearchActionListener listener) { - getSearchTransport().sendExecuteQuery( - getConnection(shard.currentNodeId()), - buildShardSearchRequest(shardIt, shard), - getTask(), - listener); + protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard, + final SearchActionListener listener) { + getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()), + buildShardSearchRequest(shardIt), getTask(), listener); } @Override - protected SearchPhase getNextPhase( - final SearchPhaseResults results, - final SearchPhaseContext context) { + protected SearchPhase getNextPhase(final SearchPhaseResults results, final SearchPhaseContext context) { return new FetchSearchPhase(results, searchPhaseController, context); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java b/core/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java index ca78945a299..d3d707771b8 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java @@ -33,6 +33,7 @@ import java.util.List; public final class SearchShardIterator extends PlainShardIterator { private final OriginalIndices originalIndices; + private String clusterAlias; /** * Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards @@ -41,9 +42,10 @@ public final class SearchShardIterator extends PlainShardIterator { * @param shardId shard id of the group * @param shards shards to iterate */ - public SearchShardIterator(ShardId shardId, List shards, OriginalIndices originalIndices) { + public SearchShardIterator(String clusterAlias, ShardId shardId, List shards, OriginalIndices originalIndices) { super(shardId, shards); this.originalIndices = originalIndices; + this.clusterAlias = clusterAlias; } /** @@ -52,4 +54,8 @@ public final class SearchShardIterator extends PlainShardIterator { public OriginalIndices getOriginalIndices() { return originalIndices; } + + public String getClusterAlias() { + return clusterAlias; + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 9e858a4ccaf..221641097d3 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; @@ -79,7 +78,7 @@ public class SearchTransportService extends AbstractComponent { private final TransportService transportService; - public SearchTransportService(Settings settings, ClusterSettings clusterSettings, TransportService transportService) { + public SearchTransportService(Settings settings, TransportService transportService) { super(settings); this.transportService = transportService; } @@ -390,7 +389,18 @@ public class SearchTransportService extends AbstractComponent { TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new); } - Transport.Connection getConnection(DiscoveryNode node) { - return transportService.getConnection(node); + /** + * Returns a connection to the given node on the provided cluster. If the cluster alias is null the node will be resolved + * against the local cluster. + * @param clusterAlias the cluster alias the node should be resolve against + * @param node the node to resolve + * @return a connection to the given node belonging to the cluster with the provided alias. + */ + Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + if (clusterAlias == null) { + return transportService.getConnection(node); + } else { + return transportService.getRemoteClusterService().getConnection(node, clusterAlias); + } } } diff --git a/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java b/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java index 6d5b30fd9bd..8fed61af294 100644 --- a/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java +++ b/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java @@ -214,7 +214,7 @@ public class ShardSearchFailure implements ShardOperationFailedException { } return new ShardSearchFailure(exception, new SearchShardTarget(nodeId, - new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), OriginalIndices.NONE)); + new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), null, OriginalIndices.NONE)); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index ae18caa50f0..91a23bf6a6f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.search; -import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; @@ -59,7 +58,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; -import java.util.function.Function; +import java.util.function.BiFunction; import java.util.function.LongSupplier; import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH; @@ -204,31 +203,32 @@ public class TransportSearchAction extends HandledTransportAction null, clusterState, Collections.emptyMap(), listener); + (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener); } else { remoteClusterService.collectSearchShards(searchRequest, remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> { List remoteShardIterators = new ArrayList<>(); Map remoteAliasFilters = new HashMap<>(); - Function connectionFunction = processRemoteShards(remoteClusterService, - searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); + BiFunction clusterNodeLookup = processRemoteShards(searchShardsResponses, + remoteClusterIndices, remoteShardIterators, remoteAliasFilters); executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteShardIterators, - connectionFunction, clusterState, remoteAliasFilters, listener); + clusterNodeLookup, clusterState, remoteAliasFilters, listener); }, listener::onFailure)); } } - static Function processRemoteShards(RemoteClusterService remoteClusterService, - Map searchShardsResponses, + static BiFunction processRemoteShards(Map searchShardsResponses, Map remoteIndicesByCluster, List remoteShardIterators, Map aliasFilterMap) { - Map> nodeToCluster = new HashMap<>(); + Map> clusterToNode = new HashMap<>(); for (Map.Entry entry : searchShardsResponses.entrySet()) { String clusterAlias = entry.getKey(); ClusterSearchShardsResponse searchShardsResponse = entry.getValue(); + HashMap idToDiscoveryNode = new HashMap<>(); + clusterToNode.put(clusterAlias, idToDiscoveryNode); for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) { - nodeToCluster.put(remoteNode.getId(), () -> remoteClusterService.getConnection(remoteNode, clusterAlias)); + idToDiscoveryNode.put(remoteNode.getId(), remoteNode); } Map indicesAndFilters = searchShardsResponse.getIndicesAndFilters(); for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) { @@ -240,7 +240,7 @@ public class TransportSearchAction extends HandledTransportAction { - Supplier supplier = nodeToCluster.get(nodeId); - if (supplier == null) { - throw new IllegalArgumentException("unknown remote node: " + nodeId); - } - return supplier.get(); + return (clusterAlias, nodeId) -> { + Map clusterNodes = clusterToNode.get(clusterAlias); + if (clusterNodes == null) { + throw new IllegalArgumentException("unknown remote cluster: " + clusterAlias); + } + return clusterNodes.get(nodeId); }; } private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices, - List remoteShardIterators, Function remoteConnections, + List remoteShardIterators, BiFunction remoteConnections, ClusterState clusterState, Map remoteAliasMap, ActionListener listener) { @@ -312,18 +312,12 @@ public class TransportSearchAction extends HandledTransportAction connectionLookup = (nodeId) -> { - final DiscoveryNode discoveryNode = nodes.get(nodeId); - final Transport.Connection connection; - if (discoveryNode != null) { - connection = searchTransportService.getConnection(discoveryNode); - } else { - connection = remoteConnections.apply(nodeId); - } - if (connection == null) { + BiFunction connectionLookup = (clusterName, nodeId) -> { + final DiscoveryNode discoveryNode = clusterName == null ? nodes.get(nodeId) : remoteConnections.apply(clusterName, nodeId); + if (discoveryNode == null) { throw new IllegalStateException("no node found for id: " + nodeId); } - return connection; + return searchTransportService.getConnection(clusterName, discoveryNode); }; searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(), @@ -338,7 +332,7 @@ public class TransportSearchAction extends HandledTransportAction(shards); } @@ -351,7 +345,7 @@ public class TransportSearchAction extends HandledTransportAction shardIterators, SearchTimeProvider timeProvider, - Function connectionLookup, + BiFunction connectionLookup, long clusterStateVersion, Map aliasFilter, Map concreteIndexBoosts, ActionListener listener) { diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 7d175916c8e..0efb99a1fab 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -933,5 +933,4 @@ public abstract class StreamInput extends InputStream { * be a no-op depending on the underlying implementation if the information of the remaining bytes is not present. */ protected abstract void ensureCanReadBytes(int length) throws EOFException; - } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 54ce2bf7a20..c2d72efec68 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -412,7 +412,7 @@ public class Node implements Closeable { final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings()); final SearchTransportService searchTransportService = new SearchTransportService(settings, - settingsModule.getClusterSettings(), transportService); + transportService); final Consumer httpBind; final HttpServerTransport httpServerTransport; if (networkModule.isHttpEnabled()) { diff --git a/core/src/main/java/org/elasticsearch/search/SearchHit.java b/core/src/main/java/org/elasticsearch/search/SearchHit.java index d0d5047863f..6172f974b14 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchHit.java +++ b/core/src/main/java/org/elasticsearch/search/SearchHit.java @@ -545,7 +545,7 @@ public final class SearchHit implements Streamable, ToXContentObject, Iterable { +public final class SearchShardTarget implements Writeable, Comparable { private final Text nodeId; private final ShardId shardId; - //original indices are only needed in the coordinating node throughout the search request execution. + //original indices and cluster alias are only needed in the coordinating node throughout the search request execution. //no need to serialize them as part of SearchShardTarget. private final transient OriginalIndices originalIndices; + private final transient String clusterAlias; public SearchShardTarget(StreamInput in) throws IOException { if (in.readBoolean()) { @@ -49,17 +50,19 @@ public class SearchShardTarget implements Writeable, Comparable createAction( final boolean controlled, @@ -53,35 +60,19 @@ public class AbstractSearchAsyncActionTookTests extends ESTestCase { System::nanoTime); } - return new AbstractSearchAsyncAction( - "test", - null, - null, - null, - null, - null, - null, - null, - null, - new GroupShardsIterator<>(Collections.singletonList(new SearchShardIterator(null, Collections.emptyList(), null))), - timeProvider, - 0, - null, - null - ) { + return new AbstractSearchAsyncAction("test", null, null, null, + Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null, + new SearchRequest(), null, new GroupShardsIterator<>(Collections.singletonList( + new SearchShardIterator(null, null, Collections.emptyList(), null))), timeProvider, 0, null, + new InitialSearchPhase.SearchPhaseResults<>(10)) { @Override - protected SearchPhase getNextPhase( - final SearchPhaseResults results, - final SearchPhaseContext context) { + protected SearchPhase getNextPhase(final SearchPhaseResults results, final SearchPhaseContext context) { return null; } @Override - protected void executePhaseOnShard( - final SearchShardIterator shardIt, - final ShardRouting shard, - final SearchActionListener listener) { - + protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard, + final SearchActionListener listener) { } @Override @@ -112,4 +103,16 @@ public class AbstractSearchAsyncActionTookTests extends ESTestCase { assertThat(actual, greaterThanOrEqualTo(TimeUnit.NANOSECONDS.toMillis(expected.get()))); } } + + public void testBuildShardSearchTransportRequest() { + final AtomicLong expected = new AtomicLong(); + AbstractSearchAsyncAction action = createAction(false, expected); + SearchShardIterator iterator = new SearchShardIterator("test-cluster", new ShardId(new Index("name", "foo"), 1), + Collections.emptyList(), new OriginalIndices(new String[] {"name", "name1"}, IndicesOptions.strictExpand())); + ShardSearchTransportRequest shardSearchTransportRequest = action.buildShardSearchRequest(iterator); + assertEquals(IndicesOptions.strictExpand(), shardSearchTransportRequest.indicesOptions()); + assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices()); + assertEquals(new MatchAllQueryBuilder(), shardSearchTransportRequest.filteringAliases()); + assertEquals(2.0f, shardSearchTransportRequest.indexBoost(), 0.0f); + } } diff --git a/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java index c2f21a7cc2c..76ba4f6dcc8 100644 --- a/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java @@ -23,7 +23,6 @@ import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TermStatistics; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.MockDirectoryWrapper; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -59,7 +58,7 @@ public class DfsQueryPhaseTests extends ESTestCase { SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null, null) { + Settings.builder().put("search.remote.connect", false).build(), null) { @Override public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, @@ -114,7 +113,7 @@ public class DfsQueryPhaseTests extends ESTestCase { SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null, null) { + Settings.builder().put("search.remote.connect", false).build(), null) { @Override public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, @@ -169,7 +168,7 @@ public class DfsQueryPhaseTests extends ESTestCase { SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null, null) { + Settings.builder().put("search.remote.connect", false).build(), null) { @Override public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task, diff --git a/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java index 20e295561bb..b7f0e0785f9 100644 --- a/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java @@ -57,7 +57,7 @@ public class ExpandSearchPhaseTests extends ESTestCase { .collapse(new CollapseBuilder("someField").setInnerHits(new InnerHitBuilder().setName("foobarbaz")))); mockSearchPhaseContext.getRequest().source().query(originalQuery); mockSearchPhaseContext.searchTransport = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null, null) { + Settings.builder().put("search.remote.connect", false).build(), null) { @Override void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener listener) { @@ -126,7 +126,7 @@ public class ExpandSearchPhaseTests extends ESTestCase { mockSearchPhaseContext.getRequest().source(new SearchSourceBuilder() .collapse(new CollapseBuilder("someField").setInnerHits(new InnerHitBuilder().setName("foobarbaz")))); mockSearchPhaseContext.searchTransport = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null, null) { + Settings.builder().put("search.remote.connect", false).build(), null) { @Override void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener listener) { @@ -168,7 +168,7 @@ public class ExpandSearchPhaseTests extends ESTestCase { public void testSkipPhase() throws IOException { MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); mockSearchPhaseContext.searchTransport = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null, null) { + Settings.builder().put("search.remote.connect", false).build(), null) { @Override void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener listener) { diff --git a/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java index 239f8f10a41..be42455a80a 100644 --- a/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java @@ -103,7 +103,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null, null) { + Settings.builder().put("search.remote.connect", false).build(), null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { @@ -157,7 +157,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null, null) { + Settings.builder().put("search.remote.connect", false).build(), null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { @@ -210,7 +210,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); } SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null, null) { + Settings.builder().put("search.remote.connect", false).build(), null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { @@ -271,7 +271,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); AtomicInteger numFetches = new AtomicInteger(0); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null, null) { + Settings.builder().put("search.remote.connect", false).build(), null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { @@ -324,7 +324,7 @@ public class FetchSearchPhaseTests extends ESTestCase { results.consumeResult(queryResult); SearchTransportService searchTransportService = new SearchTransportService( - Settings.builder().put("search.remote.connect", false).build(), null, null) { + Settings.builder().put("search.remote.connect", false).build(), null) { @Override public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task, SearchActionListener listener) { diff --git a/core/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/core/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index 98b6d2e7527..9135778479e 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/core/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -20,7 +20,6 @@ package org.elasticsearch.action.search; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.search.SearchShardTarget; @@ -100,7 +99,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext { } @Override - public Transport.Connection getConnection(String nodeId) { + public Transport.Connection getConnection(String clusterAlias, String nodeId) { return null; // null is ok here for this test } @@ -111,7 +110,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext { } @Override - public ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt, ShardRouting shard) { + public ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt) { Assert.fail("should not be called"); return null; } diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index b6d79b6ead9..39890038f2a 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -30,14 +30,12 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -81,8 +79,7 @@ public class SearchAsyncActionTests extends ESTestCase { new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()), randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode); AtomicInteger numFreedContext = new AtomicInteger(); - SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, - Collections.singleton(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS)), null) { + SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null) { @Override public void sendFreeContext(Transport.Connection connection, long contextId, OriginalIndices originalIndices) { numFreedContext.incrementAndGet(); @@ -99,7 +96,9 @@ public class SearchAsyncActionTests extends ESTestCase { "test", logger, transportService, - lookup::get, + (cluster, node) -> { + assert cluster == null : "cluster was not null: " + cluster; + return lookup.get(node); }, aliasFilters, Collections.emptyMap(), null, @@ -116,7 +115,7 @@ public class SearchAsyncActionTests extends ESTestCase { protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, SearchActionListener listener) { assertTrue("shard: " + shard.shardId() + " has been queried twice", response.queried.add(shard.shardId())); - Transport.Connection connection = getConnection(shard.currentNodeId()); + Transport.Connection connection = getConnection(null, shard.currentNodeId()); TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(), connection.getNode()); Set ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> new HashSet<>()); @@ -187,7 +186,7 @@ public class SearchAsyncActionTests extends ESTestCase { } Collections.shuffle(started, random()); started.addAll(initializing); - list.add(new SearchShardIterator(new ShardId(new Index(index, "_na_"), i), started, originalIndices)); + list.add(new SearchShardIterator(null, new ShardId(new Index(index, "_na_"), i), started, originalIndices)); } return new GroupShardsIterator<>(list); } diff --git a/core/src/test/java/org/elasticsearch/action/search/ShardSearchFailureTests.java b/core/src/test/java/org/elasticsearch/action/search/ShardSearchFailureTests.java index eac949c7753..81a6e7a70c6 100644 --- a/core/src/test/java/org/elasticsearch/action/search/ShardSearchFailureTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/ShardSearchFailureTests.java @@ -43,7 +43,7 @@ public class ShardSearchFailureTests extends ESTestCase { String indexUuid = randomAlphaOfLengthBetween(5, 10); int shardId = randomInt(); return new ShardSearchFailure(ex, - new SearchShardTarget(nodeId, new ShardId(new Index(indexName, indexUuid), shardId), null)); + new SearchShardTarget(nodeId, new ShardId(new Index(indexName, indexUuid), shardId), null, null)); } public void testFromXContent() throws IOException { @@ -74,7 +74,7 @@ public class ShardSearchFailureTests extends ESTestCase { public void testToXContent() throws IOException { ShardSearchFailure failure = new ShardSearchFailure(new ParsingException(0, 0, "some message", null), - new SearchShardTarget("nodeId", new ShardId(new Index("indexName", "indexUuid"), 123), OriginalIndices.NONE)); + new SearchShardTarget("nodeId", new ShardId(new Index("indexName", "indexUuid"), 123), null, OriginalIndices.NONE)); BytesReference xContent = toXContent(failure, XContentType.JSON, randomBoolean()); assertEquals( "{\"shard\":123," diff --git a/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index fbd622f878d..f34f4313fd6 100644 --- a/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -90,14 +90,14 @@ public class TransportSearchActionTests extends ESTestCase { { ShardId remoteShardId = new ShardId("remote_index", "remote_index_uuid", 2); ShardRouting remoteShardRouting = TestShardRouting.newShardRouting(remoteShardId, "remote_node", true, STARTED); - SearchShardIterator remoteShardIterator = new SearchShardIterator(remoteShardId, + SearchShardIterator remoteShardIterator = new SearchShardIterator("remote", remoteShardId, Collections.singletonList(remoteShardRouting), remoteIndices); remoteShardIterators.add(remoteShardIterator); } { ShardId remoteShardId2 = new ShardId("remote_index_2", "remote_index_2_uuid", 3); ShardRouting remoteShardRouting2 = TestShardRouting.newShardRouting(remoteShardId2, "remote_node", true, STARTED); - SearchShardIterator remoteShardIterator2 = new SearchShardIterator(remoteShardId2, + SearchShardIterator remoteShardIterator2 = new SearchShardIterator("remote", remoteShardId2, Collections.singletonList(remoteShardRouting2), remoteIndices); remoteShardIterators.add(remoteShardIterator2); } @@ -106,7 +106,7 @@ public class TransportSearchActionTests extends ESTestCase { { ShardId remoteShardId3 = new ShardId("remote_index_3", "remote_index_3_uuid", 4); ShardRouting remoteShardRouting3 = TestShardRouting.newShardRouting(remoteShardId3, "remote_node", true, STARTED); - SearchShardIterator remoteShardIterator3 = new SearchShardIterator(remoteShardId3, + SearchShardIterator remoteShardIterator3 = new SearchShardIterator("remote", remoteShardId3, Collections.singletonList(remoteShardRouting3), remoteIndices2); remoteShardIterators.add(remoteShardIterator3); } @@ -188,7 +188,7 @@ public class TransportSearchActionTests extends ESTestCase { remoteIndicesByCluster.put("test_cluster_2", new OriginalIndices(new String[]{"x*"}, IndicesOptions.strictExpandOpenAndForbidClosed())); Map remoteAliases = new HashMap<>(); - TransportSearchAction.processRemoteShards(service, searchShardsResponseMap, remoteIndicesByCluster, iteratorList, + TransportSearchAction.processRemoteShards(searchShardsResponseMap, remoteIndicesByCluster, iteratorList, remoteAliases); assertEquals(4, iteratorList.size()); for (SearchShardIterator iterator : iteratorList) { diff --git a/core/src/test/java/org/elasticsearch/search/SearchHitTests.java b/core/src/test/java/org/elasticsearch/search/SearchHitTests.java index a2c11e8a641..4f34d427c87 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchHitTests.java +++ b/core/src/test/java/org/elasticsearch/search/SearchHitTests.java @@ -129,7 +129,7 @@ public class SearchHitTests extends ESTestCase { } if (randomBoolean()) { hit.shard(new SearchShardTarget(randomAlphaOfLengthBetween(5, 10), - new ShardId(new Index(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)), randomInt()), + new ShardId(new Index(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)), randomInt()), null, OriginalIndices.NONE)); } return hit;