diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml
index 5deb52e53fd..82c0ce3b77c 100644
--- a/buildSrc/src/main/resources/checkstyle_suppressions.xml
+++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml
@@ -147,7 +147,6 @@
-
@@ -454,7 +453,6 @@
-
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java
index 6d79e230922..8e9360bdb12 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java
@@ -20,7 +20,7 @@
package org.elasticsearch.action.admin.cluster.remote;
import org.elasticsearch.action.ActionResponse;
-import org.elasticsearch.action.search.RemoteConnectionInfo;
+import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java
index cdb79a82583..33254a9aed9 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java
@@ -20,7 +20,7 @@
package org.elasticsearch.action.admin.cluster.remote;
import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.search.RemoteClusterService;
+import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
@@ -30,8 +30,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
-import java.util.ArrayList;
-
public final class TransportRemoteInfoAction extends HandledTransportAction {
private final RemoteClusterService remoteClusterService;
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
index a221c6001a5..9e858a4ccaf 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
@@ -26,7 +26,7 @@ import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
@@ -46,6 +46,7 @@ import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
@@ -62,7 +63,7 @@ import java.util.function.Supplier;
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
* transport.
*/
-public class SearchTransportService extends AbstractLifecycleComponent {
+public class SearchTransportService extends AbstractComponent {
public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]";
public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]";
@@ -77,17 +78,10 @@ public class SearchTransportService extends AbstractLifecycleComponent {
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";
private final TransportService transportService;
- private final RemoteClusterService remoteClusterService;
- private final boolean connectToRemote;
public SearchTransportService(Settings settings, ClusterSettings clusterSettings, TransportService transportService) {
super(settings);
- this.connectToRemote = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
this.transportService = transportService;
- this.remoteClusterService = new RemoteClusterService(settings, transportService);
- if (connectToRemote) {
- remoteClusterService.listenForUpdates(clusterSettings);
- }
}
public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) {
@@ -181,7 +175,7 @@ public class SearchTransportService extends AbstractLifecycleComponent {
}
public RemoteClusterService getRemoteClusterService() {
- return remoteClusterService;
+ return transportService.getRemoteClusterService();
}
static class ScrollFreeContextRequest extends TransportRequest {
@@ -399,20 +393,4 @@ public class SearchTransportService extends AbstractLifecycleComponent {
Transport.Connection getConnection(DiscoveryNode node) {
return transportService.getConnection(node);
}
-
- @Override
- protected void doStart() {
- if (connectToRemote) {
- // here we start to connect to the remote clusters
- remoteClusterService.initializeRemoteClusters();
- }
- }
-
- @Override
- protected void doStop() {}
-
- @Override
- protected void doClose() throws IOException {
- remoteClusterService.close();
- }
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
index 94803d771eb..ae18caa50f0 100644
--- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
@@ -19,8 +19,11 @@
package org.elasticsearch.action.search;
+import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
+import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
+import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
@@ -37,15 +40,19 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.RemoteClusterAware;
+import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -203,7 +210,7 @@ public class TransportSearchAction extends HandledTransportAction {
List remoteShardIterators = new ArrayList<>();
Map remoteAliasFilters = new HashMap<>();
- Function connectionFunction = remoteClusterService.processRemoteShards(
+ Function connectionFunction = processRemoteShards(remoteClusterService,
searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteShardIterators,
connectionFunction, clusterState, remoteAliasFilters, listener);
@@ -211,6 +218,51 @@ public class TransportSearchAction extends HandledTransportAction processRemoteShards(RemoteClusterService remoteClusterService,
+ Map searchShardsResponses,
+ Map remoteIndicesByCluster,
+ List remoteShardIterators,
+ Map aliasFilterMap) {
+ Map> nodeToCluster = new HashMap<>();
+ for (Map.Entry entry : searchShardsResponses.entrySet()) {
+ String clusterAlias = entry.getKey();
+ ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
+ for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) {
+ nodeToCluster.put(remoteNode.getId(), () -> remoteClusterService.getConnection(remoteNode, clusterAlias));
+ }
+ Map indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
+ for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
+ //add the cluster name to the remote index names for indices disambiguation
+ //this ends up in the hits returned with the search response
+ ShardId shardId = clusterSearchShardsGroup.getShardId();
+ Index remoteIndex = shardId.getIndex();
+ Index index = new Index(clusterAlias + RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR + remoteIndex.getName(),
+ remoteIndex.getUUID());
+ OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias);
+ assert originalIndices != null;
+ SearchShardIterator shardIterator = new SearchShardIterator(new ShardId(index, shardId.getId()),
+ Arrays.asList(clusterSearchShardsGroup.getShards()), originalIndices);
+ remoteShardIterators.add(shardIterator);
+ AliasFilter aliasFilter;
+ if (indicesAndFilters == null) {
+ aliasFilter = new AliasFilter(null, Strings.EMPTY_ARRAY);
+ } else {
+ aliasFilter = indicesAndFilters.get(shardId.getIndexName());
+ assert aliasFilter != null;
+ }
+ // here we have to map the filters to the UUID since from now on we use the uuid for the lookup
+ aliasFilterMap.put(remoteIndex.getUUID(), aliasFilter);
+ }
+ }
+ return (nodeId) -> {
+ Supplier supplier = nodeToCluster.get(nodeId);
+ if (supplier == null) {
+ throw new IllegalArgumentException("unknown remote node: " + nodeId);
+ }
+ return supplier.get();
+ };
+ }
+
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
List remoteShardIterators, Function remoteConnections,
ClusterState clusterState, Map remoteAliasMap,
@@ -234,9 +286,10 @@ public class TransportSearchAction extends HandledTransportAction localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap,
- searchRequest.preference());
- GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, localIndices, remoteShardIterators);
+ GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
+ concreteIndices, routingMap, searchRequest.preference());
+ GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
+ remoteShardIterators);
failIfOverShardCountLimit(clusterService, shardIterators.size());
@@ -297,7 +350,8 @@ public class TransportSearchAction extends HandledTransportAction shardIterators,
- SearchTimeProvider timeProvider, Function connectionLookup,
+ SearchTimeProvider timeProvider,
+ Function connectionLookup,
long clusterStateVersion, Map aliasFilter,
Map concreteIndexBoosts,
ActionListener listener) {
@@ -306,13 +360,13 @@ public class TransportSearchAction extends HandledTransportAction stopWatch.stop().start("transport"));
toClose.add(injector.getInstance(TransportService.class));
- toClose.add(() -> stopWatch.stop().start("search_transport_service"));
- toClose.add(injector.getInstance(SearchTransportService.class));
for (LifecycleComponent plugin : pluginLifecycleComponents) {
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterAware.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java
similarity index 96%
rename from core/src/main/java/org/elasticsearch/action/search/RemoteClusterAware.java
rename to core/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java
index 2785a8efdb6..42ab7315234 100644
--- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterAware.java
+++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.elasticsearch.action.search;
+package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
@@ -51,8 +51,8 @@ public abstract class RemoteClusterAware extends AbstractComponent {
public static final Setting.AffixSetting> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.",
"seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterAware::parseSeedAddress,
Setting.Property.NodeScope, Setting.Property.Dynamic));
- protected static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
- protected static final String LOCAL_CLUSTER_GROUP_KEY = "";
+ public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
+ public static final String LOCAL_CLUSTER_GROUP_KEY = "";
protected final ClusterNameExpressionResolver clusterNameResolver;
/**
@@ -91,7 +91,7 @@ public abstract class RemoteClusterAware extends AbstractComponent {
*
* @return a map of grouped remote and local indices
*/
- protected Map> groupClusterIndices(String[] requestIndices, Predicate indexExists) {
+ public Map> groupClusterIndices(String[] requestIndices, Predicate indexExists) {
Map> perClusterIndices = new HashMap<>();
Set remoteClusterNames = getRemoteClusterNames();
for (String index : requestIndices) {
diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
similarity index 97%
rename from core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java
rename to core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
index a3f3f3a9612..5c7e072f650 100644
--- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java
+++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.elasticsearch.action.search;
+package org.elasticsearch.transport;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
@@ -33,6 +33,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
@@ -42,17 +43,6 @@ import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.ConnectTransportException;
-import org.elasticsearch.transport.ConnectionProfile;
-import org.elasticsearch.transport.TcpTransport;
-import org.elasticsearch.transport.Transport;
-import org.elasticsearch.transport.TransportActionProxy;
-import org.elasticsearch.transport.TransportConnectionListener;
-import org.elasticsearch.transport.TransportException;
-import org.elasticsearch.transport.TransportRequest;
-import org.elasticsearch.transport.TransportRequestOptions;
-import org.elasticsearch.transport.TransportResponseHandler;
-import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java
similarity index 81%
rename from core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java
rename to core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java
index cf2be61ed05..92dce9d53f1 100644
--- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java
+++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.elasticsearch.action.search;
+package org.elasticsearch.transport;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
@@ -25,6 +25,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchShardIterator;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -38,9 +40,6 @@ import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.internal.AliasFilter;
-import org.elasticsearch.transport.Transport;
-import org.elasticsearch.transport.TransportException;
-import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
@@ -169,7 +168,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
/**
* Returns true
if at least one remote cluster is configured
*/
- boolean isCrossClusterSearchEnabled() {
+ public boolean isCrossClusterSearchEnabled() {
return remoteClusters.isEmpty() == false;
}
@@ -184,7 +183,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
return remoteClusters.containsKey(clusterName);
}
- void collectSearchShards(SearchRequest searchRequest, Map remoteIndicesByCluster,
+ public void collectSearchShards(SearchRequest searchRequest, Map remoteIndicesByCluster,
ActionListener