From 2f9e9460d4c0c656e4f0b41376c803cddecf28bf Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 2 May 2017 18:09:32 +0200 Subject: [PATCH] Move RemoteClusterService into TransportService (#24424) TransportService and RemoteClusterService are closely coupled already today and to simplify remote cluster integration down the road it can be a direct dependency of TransportService. This change moves RemoteClusterService into TransportService with the goal to make it a hidden implementation detail of TransportService in followup changes. --- .../resources/checkstyle_suppressions.xml | 2 - .../cluster/remote/RemoteInfoResponse.java | 2 +- .../remote/TransportRemoteInfoAction.java | 4 +- .../action/search/SearchTransportService.java | 30 +---- .../action/search/TransportSearchAction.java | 72 +++++++++-- .../common/settings/ClusterSettings.java | 4 +- .../java/org/elasticsearch/node/Node.java | 7 -- .../RemoteClusterAware.java | 8 +- .../RemoteClusterConnection.java | 14 +-- .../RemoteClusterService.java | 56 +-------- .../RemoteConnectionInfo.java | 2 +- .../transport/TransportService.java | 21 +++- .../action/search/SearchAsyncActionTests.java | 1 + .../search/TransportSearchActionTests.java | 116 ++++++++++++++++++ .../RemoteClusterConnectionTests.java | 5 +- .../RemoteClusterServiceTests.java | 103 +--------------- .../test/transport/MockTransportService.java | 2 +- 17 files changed, 228 insertions(+), 221 deletions(-) rename core/src/main/java/org/elasticsearch/{action/search => transport}/RemoteClusterAware.java (96%) rename core/src/main/java/org/elasticsearch/{action/search => transport}/RemoteClusterConnection.java (97%) rename core/src/main/java/org/elasticsearch/{action/search => transport}/RemoteClusterService.java (81%) rename core/src/main/java/org/elasticsearch/{action/search => transport}/RemoteConnectionInfo.java (99%) rename core/src/test/java/org/elasticsearch/{action/search => transport}/RemoteClusterConnectionTests.java (99%) rename core/src/test/java/org/elasticsearch/{action/search => transport}/RemoteClusterServiceTests.java (68%) diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 5deb52e53fd..82c0ce3b77c 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -147,7 +147,6 @@ - @@ -454,7 +453,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java index 6d79e230922..8e9360bdb12 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.admin.cluster.remote; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.search.RemoteConnectionInfo; +import org.elasticsearch.transport.RemoteConnectionInfo; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContentObject; diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java index cdb79a82583..33254a9aed9 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.admin.cluster.remote; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.RemoteClusterService; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.action.search.SearchTransportService; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; @@ -30,8 +30,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; - public final class TransportRemoteInfoAction extends HandledTransportAction { private final RemoteClusterService remoteClusterService; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index a221c6001a5..9e858a4ccaf 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -26,7 +26,7 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.ClusterSettings; @@ -46,6 +46,7 @@ import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.ScrollQuerySearchResult; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TaskAwareTransportRequestHandler; @@ -62,7 +63,7 @@ import java.util.function.Supplier; * An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through * transport. */ -public class SearchTransportService extends AbstractLifecycleComponent { +public class SearchTransportService extends AbstractComponent { public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]"; public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]"; @@ -77,17 +78,10 @@ public class SearchTransportService extends AbstractLifecycleComponent { public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]"; private final TransportService transportService; - private final RemoteClusterService remoteClusterService; - private final boolean connectToRemote; public SearchTransportService(Settings settings, ClusterSettings clusterSettings, TransportService transportService) { super(settings); - this.connectToRemote = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings); this.transportService = transportService; - this.remoteClusterService = new RemoteClusterService(settings, transportService); - if (connectToRemote) { - remoteClusterService.listenForUpdates(clusterSettings); - } } public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) { @@ -181,7 +175,7 @@ public class SearchTransportService extends AbstractLifecycleComponent { } public RemoteClusterService getRemoteClusterService() { - return remoteClusterService; + return transportService.getRemoteClusterService(); } static class ScrollFreeContextRequest extends TransportRequest { @@ -399,20 +393,4 @@ public class SearchTransportService extends AbstractLifecycleComponent { Transport.Connection getConnection(DiscoveryNode node) { return transportService.getConnection(node); } - - @Override - protected void doStart() { - if (connectToRemote) { - // here we start to connect to the remote clusters - remoteClusterService.initializeRemoteClusters(); - } - } - - @Override - protected void doStop() {} - - @Override - protected void doClose() throws IOException { - remoteClusterService.close(); - } } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 94803d771eb..ae18caa50f0 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -19,8 +19,11 @@ package org.elasticsearch.action.search; +import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterState; @@ -37,15 +40,19 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -203,7 +210,7 @@ public class TransportSearchAction extends HandledTransportAction { List remoteShardIterators = new ArrayList<>(); Map remoteAliasFilters = new HashMap<>(); - Function connectionFunction = remoteClusterService.processRemoteShards( + Function connectionFunction = processRemoteShards(remoteClusterService, searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteShardIterators, connectionFunction, clusterState, remoteAliasFilters, listener); @@ -211,6 +218,51 @@ public class TransportSearchAction extends HandledTransportAction processRemoteShards(RemoteClusterService remoteClusterService, + Map searchShardsResponses, + Map remoteIndicesByCluster, + List remoteShardIterators, + Map aliasFilterMap) { + Map> nodeToCluster = new HashMap<>(); + for (Map.Entry entry : searchShardsResponses.entrySet()) { + String clusterAlias = entry.getKey(); + ClusterSearchShardsResponse searchShardsResponse = entry.getValue(); + for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) { + nodeToCluster.put(remoteNode.getId(), () -> remoteClusterService.getConnection(remoteNode, clusterAlias)); + } + Map indicesAndFilters = searchShardsResponse.getIndicesAndFilters(); + for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) { + //add the cluster name to the remote index names for indices disambiguation + //this ends up in the hits returned with the search response + ShardId shardId = clusterSearchShardsGroup.getShardId(); + Index remoteIndex = shardId.getIndex(); + Index index = new Index(clusterAlias + RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR + remoteIndex.getName(), + remoteIndex.getUUID()); + OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias); + assert originalIndices != null; + SearchShardIterator shardIterator = new SearchShardIterator(new ShardId(index, shardId.getId()), + Arrays.asList(clusterSearchShardsGroup.getShards()), originalIndices); + remoteShardIterators.add(shardIterator); + AliasFilter aliasFilter; + if (indicesAndFilters == null) { + aliasFilter = new AliasFilter(null, Strings.EMPTY_ARRAY); + } else { + aliasFilter = indicesAndFilters.get(shardId.getIndexName()); + assert aliasFilter != null; + } + // here we have to map the filters to the UUID since from now on we use the uuid for the lookup + aliasFilterMap.put(remoteIndex.getUUID(), aliasFilter); + } + } + return (nodeId) -> { + Supplier supplier = nodeToCluster.get(nodeId); + if (supplier == null) { + throw new IllegalArgumentException("unknown remote node: " + nodeId); + } + return supplier.get(); + }; + } + private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices, List remoteShardIterators, Function remoteConnections, ClusterState clusterState, Map remoteAliasMap, @@ -234,9 +286,10 @@ public class TransportSearchAction extends HandledTransportAction localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, - searchRequest.preference()); - GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, localIndices, remoteShardIterators); + GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState, + concreteIndices, routingMap, searchRequest.preference()); + GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, localIndices, + remoteShardIterators); failIfOverShardCountLimit(clusterService, shardIterators.size()); @@ -297,7 +350,8 @@ public class TransportSearchAction extends HandledTransportAction shardIterators, - SearchTimeProvider timeProvider, Function connectionLookup, + SearchTimeProvider timeProvider, + Function connectionLookup, long clusterStateVersion, Map aliasFilter, Map concreteIndexBoosts, ActionListener listener) { @@ -306,13 +360,13 @@ public class TransportSearchAction extends HandledTransportAction stopWatch.stop().start("transport")); toClose.add(injector.getInstance(TransportService.class)); - toClose.add(() -> stopWatch.stop().start("search_transport_service")); - toClose.add(injector.getInstance(SearchTransportService.class)); for (LifecycleComponent plugin : pluginLifecycleComponents) { toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")")); diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterAware.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java similarity index 96% rename from core/src/main/java/org/elasticsearch/action/search/RemoteClusterAware.java rename to core/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 2785a8efdb6..42ab7315234 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterAware.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.action.search; +package org.elasticsearch.transport; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver; @@ -51,8 +51,8 @@ public abstract class RemoteClusterAware extends AbstractComponent { public static final Setting.AffixSetting> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.", "seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterAware::parseSeedAddress, Setting.Property.NodeScope, Setting.Property.Dynamic)); - protected static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':'; - protected static final String LOCAL_CLUSTER_GROUP_KEY = ""; + public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':'; + public static final String LOCAL_CLUSTER_GROUP_KEY = ""; protected final ClusterNameExpressionResolver clusterNameResolver; /** @@ -91,7 +91,7 @@ public abstract class RemoteClusterAware extends AbstractComponent { * * @return a map of grouped remote and local indices */ - protected Map> groupClusterIndices(String[] requestIndices, Predicate indexExists) { + public Map> groupClusterIndices(String[] requestIndices, Predicate indexExists) { Map> perClusterIndices = new HashMap<>(); Set remoteClusterNames = getRemoteClusterNames(); for (String index : requestIndices) { diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java similarity index 97% rename from core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java rename to core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index a3f3f3a9612..5c7e072f650 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.action.search; +package org.elasticsearch.transport; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; @@ -33,6 +33,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; @@ -42,17 +43,6 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.ConnectionProfile; -import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportActionProxy; -import org.elasticsearch.transport.TransportConnectionListener; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponseHandler; -import org.elasticsearch.transport.TransportService; import java.io.Closeable; import java.io.IOException; diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java similarity index 81% rename from core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java rename to core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index cf2be61ed05..92dce9d53f1 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.action.search; +package org.elasticsearch.transport; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.IOUtils; @@ -25,6 +25,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchShardIterator; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -38,9 +40,6 @@ import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportService; import java.io.Closeable; import java.io.IOException; @@ -169,7 +168,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl /** * Returns true if at least one remote cluster is configured */ - boolean isCrossClusterSearchEnabled() { + public boolean isCrossClusterSearchEnabled() { return remoteClusters.isEmpty() == false; } @@ -184,7 +183,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl return remoteClusters.containsKey(clusterName); } - void collectSearchShards(SearchRequest searchRequest, Map remoteIndicesByCluster, + public void collectSearchShards(SearchRequest searchRequest, Map remoteIndicesByCluster, ActionListener> listener) { final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); final Map searchShardsResponses = new ConcurrentHashMap<>(); @@ -229,54 +228,11 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl } } - Function processRemoteShards(Map searchShardsResponses, - Map remoteIndicesByCluster, - List remoteShardIterators, - Map aliasFilterMap) { - Map> nodeToCluster = new HashMap<>(); - for (Map.Entry entry : searchShardsResponses.entrySet()) { - String clusterAlias = entry.getKey(); - ClusterSearchShardsResponse searchShardsResponse = entry.getValue(); - for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) { - nodeToCluster.put(remoteNode.getId(), () -> getConnection(remoteNode, clusterAlias)); - } - Map indicesAndFilters = searchShardsResponse.getIndicesAndFilters(); - for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) { - //add the cluster name to the remote index names for indices disambiguation - //this ends up in the hits returned with the search response - ShardId shardId = clusterSearchShardsGroup.getShardId(); - Index remoteIndex = shardId.getIndex(); - Index index = new Index(clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + remoteIndex.getName(), remoteIndex.getUUID()); - OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias); - assert originalIndices != null; - SearchShardIterator shardIterator = new SearchShardIterator(new ShardId(index, shardId.getId()), - Arrays.asList(clusterSearchShardsGroup.getShards()), originalIndices); - remoteShardIterators.add(shardIterator); - AliasFilter aliasFilter; - if (indicesAndFilters == null) { - aliasFilter = new AliasFilter(null, Strings.EMPTY_ARRAY); - } else { - aliasFilter = indicesAndFilters.get(shardId.getIndexName()); - assert aliasFilter != null; - } - // here we have to map the filters to the UUID since from now on we use the uuid for the lookup - aliasFilterMap.put(remoteIndex.getUUID(), aliasFilter); - } - } - return (nodeId) -> { - Supplier supplier = nodeToCluster.get(nodeId); - if (supplier == null) { - throw new IllegalArgumentException("unknown remote node: " + nodeId); - } - return supplier.get(); - }; - } - /** * Returns a connection to the given node on the given remote cluster * @throws IllegalArgumentException if the remote cluster is unknown */ - private Transport.Connection getConnection(DiscoveryNode node, String cluster) { + public Transport.Connection getConnection(DiscoveryNode node, String cluster) { RemoteClusterConnection connection = remoteClusters.get(cluster); if (connection == null) { throw new IllegalArgumentException("no such remote cluster: " + cluster); diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteConnectionInfo.java b/core/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java similarity index 99% rename from core/src/main/java/org/elasticsearch/action/search/RemoteConnectionInfo.java rename to core/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java index ff3548d215b..53e6d220da1 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteConnectionInfo.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.action.search; +package org.elasticsearch.transport; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index d956149b0df..7de96063615 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; import org.elasticsearch.cluster.ClusterName; @@ -82,6 +83,7 @@ public class TransportService extends AbstractLifecycleComponent { protected final TaskManager taskManager; private final TransportInterceptor.AsyncSender asyncSender; private final Function localNodeFactory; + private final boolean connectToRemoteCluster; volatile Map requestHandlers = Collections.emptyMap(); final Object requestHandlerMutex = new Object(); @@ -119,6 +121,8 @@ public class TransportService extends AbstractLifecycleComponent { volatile String[] tracerLogInclude; volatile String[] tracerLogExclude; + private final RemoteClusterService remoteClusterService; + /** if set will call requests sent to this id to shortcut and executed locally */ volatile DiscoveryNode localNode = null; private final Transport.Connection localNodeConnection = new Transport.Connection() { @@ -158,12 +162,21 @@ public class TransportService extends AbstractLifecycleComponent { taskManager = createTaskManager(); this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); + this.connectToRemoteCluster = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings); + remoteClusterService = new RemoteClusterService(settings, this); if (clusterSettings != null) { clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude); clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude); + if (connectToRemoteCluster) { + remoteClusterService.listenForUpdates(clusterSettings); + } } } + public RemoteClusterService getRemoteClusterService() { + return remoteClusterService; + } + public DiscoveryNode getLocalNode() { return localNode; } @@ -209,6 +222,10 @@ public class TransportService extends AbstractLifecycleComponent { false, false, (request, channel) -> channel.sendResponse( new HandshakeResponse(localNode, clusterName, localNode.getVersion()))); + if (connectToRemoteCluster) { + // here we start to connect to the remote clusters + remoteClusterService.initializeRemoteClusters(); + } } @Override @@ -253,8 +270,8 @@ public class TransportService extends AbstractLifecycleComponent { } @Override - protected void doClose() { - transport.close(); + protected void doClose() throws IOException { + IOUtils.close(remoteClusterService, transport); } /** diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 1039e8e959d..b6d79b6ead9 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; diff --git a/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 696e25de75e..fbd622f878d 100644 --- a/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -19,25 +19,52 @@ package org.elasticsearch.action.search; +import org.elasticsearch.Version; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.PlainShardIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; public class TransportSearchActionTests extends ESTestCase { + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + public void testMergeShardsIterators() throws IOException { List localShardIterators = new ArrayList<>(); { @@ -119,4 +146,93 @@ public class TransportSearchActionTests extends ESTestCase { } } } + + public void testProcessRemoteShards() throws IOException { + try (TransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, + null)) { + RemoteClusterService service = transportService.getRemoteClusterService(); + assertFalse(service.isCrossClusterSearchEnabled()); + List iteratorList = new ArrayList<>(); + Map searchShardsResponseMap = new HashMap<>(); + DiscoveryNode[] nodes = new DiscoveryNode[] { + new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT), + new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT) + }; + Map indicesAndAliases = new HashMap<>(); + indicesAndAliases.put("foo", new AliasFilter(new TermsQueryBuilder("foo", "bar"), Strings.EMPTY_ARRAY)); + indicesAndAliases.put("bar", new AliasFilter(new MatchAllQueryBuilder(), Strings.EMPTY_ARRAY)); + ClusterSearchShardsGroup[] groups = new ClusterSearchShardsGroup[] { + new ClusterSearchShardsGroup(new ShardId("foo", "foo_id", 0), + new ShardRouting[] {TestShardRouting.newShardRouting("foo", 0, "node1", true, ShardRoutingState.STARTED), + TestShardRouting.newShardRouting("foo", 0, "node2", false, ShardRoutingState.STARTED)}), + new ClusterSearchShardsGroup(new ShardId("foo", "foo_id", 1), + new ShardRouting[] {TestShardRouting.newShardRouting("foo", 0, "node1", true, ShardRoutingState.STARTED), + TestShardRouting.newShardRouting("foo", 1, "node2", false, ShardRoutingState.STARTED)}), + new ClusterSearchShardsGroup(new ShardId("bar", "bar_id", 0), + new ShardRouting[] {TestShardRouting.newShardRouting("bar", 0, "node2", true, ShardRoutingState.STARTED), + TestShardRouting.newShardRouting("bar", 0, "node1", false, ShardRoutingState.STARTED)}) + }; + searchShardsResponseMap.put("test_cluster_1", new ClusterSearchShardsResponse(groups, nodes, indicesAndAliases)); + DiscoveryNode[] nodes2 = new DiscoveryNode[] { + new DiscoveryNode("node3", buildNewFakeTransportAddress(), Version.CURRENT) + }; + ClusterSearchShardsGroup[] groups2 = new ClusterSearchShardsGroup[] { + new ClusterSearchShardsGroup(new ShardId("xyz", "xyz_id", 0), + new ShardRouting[] {TestShardRouting.newShardRouting("xyz", 0, "node3", true, ShardRoutingState.STARTED)}) + }; + searchShardsResponseMap.put("test_cluster_2", new ClusterSearchShardsResponse(groups2, nodes2, null)); + + Map remoteIndicesByCluster = new HashMap<>(); + remoteIndicesByCluster.put("test_cluster_1", + new OriginalIndices(new String[]{"fo*", "ba*"}, IndicesOptions.strictExpandOpenAndForbidClosed())); + remoteIndicesByCluster.put("test_cluster_2", + new OriginalIndices(new String[]{"x*"}, IndicesOptions.strictExpandOpenAndForbidClosed())); + Map remoteAliases = new HashMap<>(); + TransportSearchAction.processRemoteShards(service, searchShardsResponseMap, remoteIndicesByCluster, iteratorList, + remoteAliases); + assertEquals(4, iteratorList.size()); + for (SearchShardIterator iterator : iteratorList) { + if (iterator.shardId().getIndexName().endsWith("foo")) { + assertArrayEquals(new String[]{"fo*", "ba*"}, iterator.getOriginalIndices().indices()); + assertTrue(iterator.shardId().getId() == 0 || iterator.shardId().getId() == 1); + assertEquals("test_cluster_1:foo", iterator.shardId().getIndexName()); + ShardRouting shardRouting = iterator.nextOrNull(); + assertNotNull(shardRouting); + assertEquals(shardRouting.getIndexName(), "foo"); + shardRouting = iterator.nextOrNull(); + assertNotNull(shardRouting); + assertEquals(shardRouting.getIndexName(), "foo"); + assertNull(iterator.nextOrNull()); + } else if (iterator.shardId().getIndexName().endsWith("bar")) { + assertArrayEquals(new String[]{"fo*", "ba*"}, iterator.getOriginalIndices().indices()); + assertEquals(0, iterator.shardId().getId()); + assertEquals("test_cluster_1:bar", iterator.shardId().getIndexName()); + ShardRouting shardRouting = iterator.nextOrNull(); + assertNotNull(shardRouting); + assertEquals(shardRouting.getIndexName(), "bar"); + shardRouting = iterator.nextOrNull(); + assertNotNull(shardRouting); + assertEquals(shardRouting.getIndexName(), "bar"); + assertNull(iterator.nextOrNull()); + } else if (iterator.shardId().getIndexName().endsWith("xyz")) { + assertArrayEquals(new String[]{"x*"}, iterator.getOriginalIndices().indices()); + assertEquals(0, iterator.shardId().getId()); + assertEquals("test_cluster_2:xyz", iterator.shardId().getIndexName()); + ShardRouting shardRouting = iterator.nextOrNull(); + assertNotNull(shardRouting); + assertEquals(shardRouting.getIndexName(), "xyz"); + assertNull(iterator.nextOrNull()); + } + } + assertEquals(3, remoteAliases.size()); + assertTrue(remoteAliases.toString(), remoteAliases.containsKey("foo_id")); + assertTrue(remoteAliases.toString(), remoteAliases.containsKey("bar_id")); + assertTrue(remoteAliases.toString(), remoteAliases.containsKey("xyz_id")); + assertEquals(new TermsQueryBuilder("foo", "bar"), remoteAliases.get("foo_id").getQueryBuilder()); + assertEquals(new MatchAllQueryBuilder(), remoteAliases.get("bar_id").getQueryBuilder()); + assertNull(remoteAliases.get("xyz_id").getQueryBuilder()); + } + + } + } diff --git a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java b/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java similarity index 99% rename from core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java rename to core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 8cf6d7d48c7..1ce73aee905 100644 --- a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterConnectionTests.java +++ b/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.action.search; +package org.elasticsearch.transport; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Build; @@ -33,6 +33,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -53,6 +54,8 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterConnection; +import org.elasticsearch.transport.RemoteConnectionInfo; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportService; diff --git a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterServiceTests.java b/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java similarity index 68% rename from core/src/test/java/org/elasticsearch/action/search/RemoteClusterServiceTests.java rename to core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 909c85a5b60..32a672e1bbc 100644 --- a/core/src/test/java/org/elasticsearch/action/search/RemoteClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -16,26 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.action.search; +package org.elasticsearch.transport; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.index.query.MatchAllQueryBuilder; -import org.elasticsearch.index.query.TermsQueryBuilder; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; @@ -44,10 +32,8 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -144,7 +130,8 @@ public class RemoteClusterServiceTests extends ESTestCase { assertTrue(service.isRemoteClusterRegistered("cluster_2")); assertFalse(service.isRemoteClusterRegistered("foo")); Map> perClusterIndices = service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar", - "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", "cluster*:baz", "*:boo", "no*match:boo"}, i -> false); + "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", "cluster*:baz", "*:boo", "no*match:boo"}, + i -> false); String[] localIndices = perClusterIndices.computeIfAbsent(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, k -> Collections.emptyList()).toArray(new String[0]); assertNotNull(perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)); @@ -202,90 +189,6 @@ public class RemoteClusterServiceTests extends ESTestCase { } } - public void testProcessRemoteShards() throws IOException { - try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, null)) { - assertFalse(service.isCrossClusterSearchEnabled()); - List iteratorList = new ArrayList<>(); - Map searchShardsResponseMap = new HashMap<>(); - DiscoveryNode[] nodes = new DiscoveryNode[] { - new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT), - new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT) - }; - Map indicesAndAliases = new HashMap<>(); - indicesAndAliases.put("foo", new AliasFilter(new TermsQueryBuilder("foo", "bar"), Strings.EMPTY_ARRAY)); - indicesAndAliases.put("bar", new AliasFilter(new MatchAllQueryBuilder(), Strings.EMPTY_ARRAY)); - ClusterSearchShardsGroup[] groups = new ClusterSearchShardsGroup[] { - new ClusterSearchShardsGroup(new ShardId("foo", "foo_id", 0), - new ShardRouting[] {TestShardRouting.newShardRouting("foo", 0, "node1", true, ShardRoutingState.STARTED), - TestShardRouting.newShardRouting("foo", 0, "node2", false, ShardRoutingState.STARTED)}), - new ClusterSearchShardsGroup(new ShardId("foo", "foo_id", 1), - new ShardRouting[] {TestShardRouting.newShardRouting("foo", 0, "node1", true, ShardRoutingState.STARTED), - TestShardRouting.newShardRouting("foo", 1, "node2", false, ShardRoutingState.STARTED)}), - new ClusterSearchShardsGroup(new ShardId("bar", "bar_id", 0), - new ShardRouting[] {TestShardRouting.newShardRouting("bar", 0, "node2", true, ShardRoutingState.STARTED), - TestShardRouting.newShardRouting("bar", 0, "node1", false, ShardRoutingState.STARTED)}) - }; - searchShardsResponseMap.put("test_cluster_1", new ClusterSearchShardsResponse(groups, nodes, indicesAndAliases)); - DiscoveryNode[] nodes2 = new DiscoveryNode[] { - new DiscoveryNode("node3", buildNewFakeTransportAddress(), Version.CURRENT) - }; - ClusterSearchShardsGroup[] groups2 = new ClusterSearchShardsGroup[] { - new ClusterSearchShardsGroup(new ShardId("xyz", "xyz_id", 0), - new ShardRouting[] {TestShardRouting.newShardRouting("xyz", 0, "node3", true, ShardRoutingState.STARTED)}) - }; - searchShardsResponseMap.put("test_cluster_2", new ClusterSearchShardsResponse(groups2, nodes2, null)); - - Map remoteIndicesByCluster = new HashMap<>(); - remoteIndicesByCluster.put("test_cluster_1", - new OriginalIndices(new String[]{"fo*", "ba*"}, IndicesOptions.strictExpandOpenAndForbidClosed())); - remoteIndicesByCluster.put("test_cluster_2", - new OriginalIndices(new String[]{"x*"}, IndicesOptions.strictExpandOpenAndForbidClosed())); - Map remoteAliases = new HashMap<>(); - service.processRemoteShards(searchShardsResponseMap, remoteIndicesByCluster, iteratorList, remoteAliases); - assertEquals(4, iteratorList.size()); - for (SearchShardIterator iterator : iteratorList) { - if (iterator.shardId().getIndexName().endsWith("foo")) { - assertArrayEquals(new String[]{"fo*", "ba*"}, iterator.getOriginalIndices().indices()); - assertTrue(iterator.shardId().getId() == 0 || iterator.shardId().getId() == 1); - assertEquals("test_cluster_1:foo", iterator.shardId().getIndexName()); - ShardRouting shardRouting = iterator.nextOrNull(); - assertNotNull(shardRouting); - assertEquals(shardRouting.getIndexName(), "foo"); - shardRouting = iterator.nextOrNull(); - assertNotNull(shardRouting); - assertEquals(shardRouting.getIndexName(), "foo"); - assertNull(iterator.nextOrNull()); - } else if (iterator.shardId().getIndexName().endsWith("bar")) { - assertArrayEquals(new String[]{"fo*", "ba*"}, iterator.getOriginalIndices().indices()); - assertEquals(0, iterator.shardId().getId()); - assertEquals("test_cluster_1:bar", iterator.shardId().getIndexName()); - ShardRouting shardRouting = iterator.nextOrNull(); - assertNotNull(shardRouting); - assertEquals(shardRouting.getIndexName(), "bar"); - shardRouting = iterator.nextOrNull(); - assertNotNull(shardRouting); - assertEquals(shardRouting.getIndexName(), "bar"); - assertNull(iterator.nextOrNull()); - } else if (iterator.shardId().getIndexName().endsWith("xyz")) { - assertArrayEquals(new String[]{"x*"}, iterator.getOriginalIndices().indices()); - assertEquals(0, iterator.shardId().getId()); - assertEquals("test_cluster_2:xyz", iterator.shardId().getIndexName()); - ShardRouting shardRouting = iterator.nextOrNull(); - assertNotNull(shardRouting); - assertEquals(shardRouting.getIndexName(), "xyz"); - assertNull(iterator.nextOrNull()); - } - } - assertEquals(3, remoteAliases.size()); - assertTrue(remoteAliases.toString(), remoteAliases.containsKey("foo_id")); - assertTrue(remoteAliases.toString(), remoteAliases.containsKey("bar_id")); - assertTrue(remoteAliases.toString(), remoteAliases.containsKey("xyz_id")); - assertEquals(new TermsQueryBuilder("foo", "bar"), remoteAliases.get("foo_id").getQueryBuilder()); - assertEquals(new MatchAllQueryBuilder(), remoteAliases.get("bar_id").getQueryBuilder()); - assertNull(remoteAliases.get("xyz_id").getQueryBuilder()); - } - } - public void testRemoteNodeAttribute() throws IOException, InterruptedException { final Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 117c168bc41..a25f435af2c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -819,7 +819,7 @@ public final class MockTransportService extends TransportService { } @Override - protected void doClose() { + protected void doClose() throws IOException { super.doClose(); synchronized (openConnections) { assert openConnections.size() == 0 : "still open connections: " + openConnections;