Move RemoteClusterService into TransportService (#24424)

TransportService and RemoteClusterService are closely coupled already today
and to simplify remote cluster integration down the road it can be a direct
dependency of TransportService. This change moves RemoteClusterService into
TransportService with the goal to make it a hidden implementation detail
of TransportService in followup changes.
This commit is contained in:
Simon Willnauer 2017-05-02 18:09:32 +02:00 committed by GitHub
parent 5de2bcc624
commit 2f9e9460d4
17 changed files with 228 additions and 221 deletions

View File

@ -147,7 +147,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchPhaseController.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]ShardSearchFailure.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportClearScrollAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportSearchAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]DelegatingActionListener.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]IndicesOptions.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]ToXContentToBytes.java" checks="LineLength" />
@ -454,7 +453,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]SimulatePipelineRequestParsingTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]SimulatePipelineResponseTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]WriteableIngestDocumentTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]RemoteClusterServiceTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchRequestBuilderTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]TransportActionFilterChainTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]WaitActiveShardCountIT.java" checks="LineLength" />

View File

@ -20,7 +20,7 @@
package org.elasticsearch.action.admin.cluster.remote;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.search.RemoteConnectionInfo;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;

View File

@ -20,7 +20,7 @@
package org.elasticsearch.action.admin.cluster.remote;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.RemoteClusterService;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
@ -30,8 +30,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
public final class TransportRemoteInfoAction extends HandledTransportAction<RemoteInfoRequest, RemoteInfoResponse> {
private final RemoteClusterService remoteClusterService;

View File

@ -26,7 +26,7 @@ import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
@ -46,6 +46,7 @@ import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
@ -62,7 +63,7 @@ import java.util.function.Supplier;
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
* transport.
*/
public class SearchTransportService extends AbstractLifecycleComponent {
public class SearchTransportService extends AbstractComponent {
public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]";
public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]";
@ -77,17 +78,10 @@ public class SearchTransportService extends AbstractLifecycleComponent {
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";
private final TransportService transportService;
private final RemoteClusterService remoteClusterService;
private final boolean connectToRemote;
public SearchTransportService(Settings settings, ClusterSettings clusterSettings, TransportService transportService) {
super(settings);
this.connectToRemote = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
this.transportService = transportService;
this.remoteClusterService = new RemoteClusterService(settings, transportService);
if (connectToRemote) {
remoteClusterService.listenForUpdates(clusterSettings);
}
}
public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) {
@ -181,7 +175,7 @@ public class SearchTransportService extends AbstractLifecycleComponent {
}
public RemoteClusterService getRemoteClusterService() {
return remoteClusterService;
return transportService.getRemoteClusterService();
}
static class ScrollFreeContextRequest extends TransportRequest {
@ -399,20 +393,4 @@ public class SearchTransportService extends AbstractLifecycleComponent {
Transport.Connection getConnection(DiscoveryNode node) {
return transportService.getConnection(node);
}
@Override
protected void doStart() {
if (connectToRemote) {
// here we start to connect to the remote clusters
remoteClusterService.initializeRemoteClusters();
}
}
@Override
protected void doStop() {}
@Override
protected void doClose() throws IOException {
remoteClusterService.close();
}
}

View File

@ -19,8 +19,11 @@
package org.elasticsearch.action.search;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
@ -37,15 +40,19 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -203,7 +210,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
ActionListener.wrap((searchShardsResponses) -> {
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
Function<String, Transport.Connection> connectionFunction = remoteClusterService.processRemoteShards(
Function<String, Transport.Connection> connectionFunction = processRemoteShards(remoteClusterService,
searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteShardIterators,
connectionFunction, clusterState, remoteAliasFilters, listener);
@ -211,6 +218,51 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
}
static Function<String, Transport.Connection> processRemoteShards(RemoteClusterService remoteClusterService,
Map<String, ClusterSearchShardsResponse> searchShardsResponses,
Map<String, OriginalIndices> remoteIndicesByCluster,
List<SearchShardIterator> remoteShardIterators,
Map<String, AliasFilter> aliasFilterMap) {
Map<String, Supplier<Transport.Connection>> nodeToCluster = new HashMap<>();
for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
String clusterAlias = entry.getKey();
ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) {
nodeToCluster.put(remoteNode.getId(), () -> remoteClusterService.getConnection(remoteNode, clusterAlias));
}
Map<String, AliasFilter> indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
//add the cluster name to the remote index names for indices disambiguation
//this ends up in the hits returned with the search response
ShardId shardId = clusterSearchShardsGroup.getShardId();
Index remoteIndex = shardId.getIndex();
Index index = new Index(clusterAlias + RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR + remoteIndex.getName(),
remoteIndex.getUUID());
OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias);
assert originalIndices != null;
SearchShardIterator shardIterator = new SearchShardIterator(new ShardId(index, shardId.getId()),
Arrays.asList(clusterSearchShardsGroup.getShards()), originalIndices);
remoteShardIterators.add(shardIterator);
AliasFilter aliasFilter;
if (indicesAndFilters == null) {
aliasFilter = new AliasFilter(null, Strings.EMPTY_ARRAY);
} else {
aliasFilter = indicesAndFilters.get(shardId.getIndexName());
assert aliasFilter != null;
}
// here we have to map the filters to the UUID since from now on we use the uuid for the lookup
aliasFilterMap.put(remoteIndex.getUUID(), aliasFilter);
}
}
return (nodeId) -> {
Supplier<Transport.Connection> supplier = nodeToCluster.get(nodeId);
if (supplier == null) {
throw new IllegalArgumentException("unknown remote node: " + nodeId);
}
return supplier.get();
};
}
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
List<SearchShardIterator> remoteShardIterators, Function<String, Transport.Connection> remoteConnections,
ClusterState clusterState, Map<String, AliasFilter> remoteAliasMap,
@ -234,9 +286,10 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
for (int i = 0; i < indices.length; i++) {
concreteIndices[i] = indices[i].getName();
}
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap,
searchRequest.preference());
GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices, remoteShardIterators);
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
concreteIndices, routingMap, searchRequest.preference());
GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
remoteShardIterators);
failIfOverShardCountLimit(clusterService, shardIterators.size());
@ -297,7 +350,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators,
SearchTimeProvider timeProvider, Function<String, Transport.Connection> connectionLookup,
SearchTimeProvider timeProvider,
Function<String, Transport.Connection> connectionLookup,
long clusterStateVersion, Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
ActionListener<SearchResponse> listener) {
@ -306,13 +360,13 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
switch(searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, timeProvider,
clusterStateVersion, task);
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task);
break;
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, timeProvider,
clusterStateVersion, task);
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task);
break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");

View File

@ -19,8 +19,8 @@
package org.elasticsearch.common.settings;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.search.RemoteClusterAware;
import org.elasticsearch.action.search.RemoteClusterService;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.DestructiveOperations;

View File

@ -736,10 +736,6 @@ public class Node implements Closeable {
// start nodes now, after the http server, because it may take some time
tribeService.startNodes();
// starts connecting to remote clusters if any cluster is configured
SearchTransportService searchTransportService = injector.getInstance(SearchTransportService.class);
searchTransportService.start();
logger.info("started");
return this;
@ -773,7 +769,6 @@ public class Node implements Closeable {
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
injector.getInstance(SearchTransportService.class).stop();
pluginLifecycleComponents.forEach(LifecycleComponent::stop);
// we should stop this last since it waits for resources to get released
@ -835,8 +830,6 @@ public class Node implements Closeable {
toClose.add(injector.getInstance(SearchService.class));
toClose.add(() -> stopWatch.stop().start("transport"));
toClose.add(injector.getInstance(TransportService.class));
toClose.add(() -> stopWatch.stop().start("search_transport_service"));
toClose.add(injector.getInstance(SearchTransportService.class));
for (LifecycleComponent plugin : pluginLifecycleComponents) {
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
@ -51,8 +51,8 @@ public abstract class RemoteClusterAware extends AbstractComponent {
public static final Setting.AffixSetting<List<InetSocketAddress>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.",
"seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterAware::parseSeedAddress,
Setting.Property.NodeScope, Setting.Property.Dynamic));
protected static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
protected static final String LOCAL_CLUSTER_GROUP_KEY = "";
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
public static final String LOCAL_CLUSTER_GROUP_KEY = "";
protected final ClusterNameExpressionResolver clusterNameResolver;
/**
@ -91,7 +91,7 @@ public abstract class RemoteClusterAware extends AbstractComponent {
*
* @return a map of grouped remote and local indices
*/
protected Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) {
public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) {
Map<String, List<String>> perClusterIndices = new HashMap<>();
Set<String> remoteClusterNames = getRemoteClusterNames();
for (String index : requestIndices) {

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
package org.elasticsearch.transport;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
@ -33,6 +33,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
@ -42,17 +43,6 @@ import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
package org.elasticsearch.transport;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
@ -25,6 +25,8 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchShardIterator;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -38,9 +40,6 @@ import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
@ -169,7 +168,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
/**
* Returns <code>true</code> if at least one remote cluster is configured
*/
boolean isCrossClusterSearchEnabled() {
public boolean isCrossClusterSearchEnabled() {
return remoteClusters.isEmpty() == false;
}
@ -184,7 +183,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
return remoteClusters.containsKey(clusterName);
}
void collectSearchShards(SearchRequest searchRequest, Map<String, OriginalIndices> remoteIndicesByCluster,
public void collectSearchShards(SearchRequest searchRequest, Map<String, OriginalIndices> remoteIndicesByCluster,
ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
@ -229,54 +228,11 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
}
}
Function<String, Transport.Connection> processRemoteShards(Map<String, ClusterSearchShardsResponse> searchShardsResponses,
Map<String, OriginalIndices> remoteIndicesByCluster,
List<SearchShardIterator> remoteShardIterators,
Map<String, AliasFilter> aliasFilterMap) {
Map<String, Supplier<Transport.Connection>> nodeToCluster = new HashMap<>();
for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
String clusterAlias = entry.getKey();
ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) {
nodeToCluster.put(remoteNode.getId(), () -> getConnection(remoteNode, clusterAlias));
}
Map<String, AliasFilter> indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
//add the cluster name to the remote index names for indices disambiguation
//this ends up in the hits returned with the search response
ShardId shardId = clusterSearchShardsGroup.getShardId();
Index remoteIndex = shardId.getIndex();
Index index = new Index(clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + remoteIndex.getName(), remoteIndex.getUUID());
OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias);
assert originalIndices != null;
SearchShardIterator shardIterator = new SearchShardIterator(new ShardId(index, shardId.getId()),
Arrays.asList(clusterSearchShardsGroup.getShards()), originalIndices);
remoteShardIterators.add(shardIterator);
AliasFilter aliasFilter;
if (indicesAndFilters == null) {
aliasFilter = new AliasFilter(null, Strings.EMPTY_ARRAY);
} else {
aliasFilter = indicesAndFilters.get(shardId.getIndexName());
assert aliasFilter != null;
}
// here we have to map the filters to the UUID since from now on we use the uuid for the lookup
aliasFilterMap.put(remoteIndex.getUUID(), aliasFilter);
}
}
return (nodeId) -> {
Supplier<Transport.Connection> supplier = nodeToCluster.get(nodeId);
if (supplier == null) {
throw new IllegalArgumentException("unknown remote node: " + nodeId);
}
return supplier.get();
};
}
/**
* Returns a connection to the given node on the given remote cluster
* @throws IllegalArgumentException if the remote cluster is unknown
*/
private Transport.Connection getConnection(DiscoveryNode node, String cluster) {
public Transport.Connection getConnection(DiscoveryNode node, String cluster) {
RemoteClusterConnection connection = remoteClusters.get(cluster);
if (connection == null) {
throw new IllegalArgumentException("no such remote cluster: " + cluster);

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.cluster.ClusterName;
@ -82,6 +83,7 @@ public class TransportService extends AbstractLifecycleComponent {
protected final TaskManager taskManager;
private final TransportInterceptor.AsyncSender asyncSender;
private final Function<BoundTransportAddress, DiscoveryNode> localNodeFactory;
private final boolean connectToRemoteCluster;
volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap();
final Object requestHandlerMutex = new Object();
@ -119,6 +121,8 @@ public class TransportService extends AbstractLifecycleComponent {
volatile String[] tracerLogInclude;
volatile String[] tracerLogExclude;
private final RemoteClusterService remoteClusterService;
/** if set will call requests sent to this id to shortcut and executed locally */
volatile DiscoveryNode localNode = null;
private final Transport.Connection localNodeConnection = new Transport.Connection() {
@ -158,12 +162,21 @@ public class TransportService extends AbstractLifecycleComponent {
taskManager = createTaskManager();
this.interceptor = transportInterceptor;
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
this.connectToRemoteCluster = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
remoteClusterService = new RemoteClusterService(settings, this);
if (clusterSettings != null) {
clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude);
clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude);
if (connectToRemoteCluster) {
remoteClusterService.listenForUpdates(clusterSettings);
}
}
}
public RemoteClusterService getRemoteClusterService() {
return remoteClusterService;
}
public DiscoveryNode getLocalNode() {
return localNode;
}
@ -209,6 +222,10 @@ public class TransportService extends AbstractLifecycleComponent {
false, false,
(request, channel) -> channel.sendResponse(
new HandshakeResponse(localNode, clusterName, localNode.getVersion())));
if (connectToRemoteCluster) {
// here we start to connect to the remote clusters
remoteClusterService.initializeRemoteClusters();
}
}
@Override
@ -253,8 +270,8 @@ public class TransportService extends AbstractLifecycleComponent {
}
@Override
protected void doClose() {
transport.close();
protected void doClose() throws IOException {
IOUtils.close(remoteClusterService, transport);
}
/**

View File

@ -37,6 +37,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;

View File

@ -19,25 +19,52 @@
package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
public class TransportSearchActionTests extends ESTestCase {
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
public void testMergeShardsIterators() throws IOException {
List<ShardIterator> localShardIterators = new ArrayList<>();
{
@ -119,4 +146,93 @@ public class TransportSearchActionTests extends ESTestCase {
}
}
}
public void testProcessRemoteShards() throws IOException {
try (TransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
null)) {
RemoteClusterService service = transportService.getRemoteClusterService();
assertFalse(service.isCrossClusterSearchEnabled());
List<SearchShardIterator> iteratorList = new ArrayList<>();
Map<String, ClusterSearchShardsResponse> searchShardsResponseMap = new HashMap<>();
DiscoveryNode[] nodes = new DiscoveryNode[] {
new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT),
new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT)
};
Map<String, AliasFilter> indicesAndAliases = new HashMap<>();
indicesAndAliases.put("foo", new AliasFilter(new TermsQueryBuilder("foo", "bar"), Strings.EMPTY_ARRAY));
indicesAndAliases.put("bar", new AliasFilter(new MatchAllQueryBuilder(), Strings.EMPTY_ARRAY));
ClusterSearchShardsGroup[] groups = new ClusterSearchShardsGroup[] {
new ClusterSearchShardsGroup(new ShardId("foo", "foo_id", 0),
new ShardRouting[] {TestShardRouting.newShardRouting("foo", 0, "node1", true, ShardRoutingState.STARTED),
TestShardRouting.newShardRouting("foo", 0, "node2", false, ShardRoutingState.STARTED)}),
new ClusterSearchShardsGroup(new ShardId("foo", "foo_id", 1),
new ShardRouting[] {TestShardRouting.newShardRouting("foo", 0, "node1", true, ShardRoutingState.STARTED),
TestShardRouting.newShardRouting("foo", 1, "node2", false, ShardRoutingState.STARTED)}),
new ClusterSearchShardsGroup(new ShardId("bar", "bar_id", 0),
new ShardRouting[] {TestShardRouting.newShardRouting("bar", 0, "node2", true, ShardRoutingState.STARTED),
TestShardRouting.newShardRouting("bar", 0, "node1", false, ShardRoutingState.STARTED)})
};
searchShardsResponseMap.put("test_cluster_1", new ClusterSearchShardsResponse(groups, nodes, indicesAndAliases));
DiscoveryNode[] nodes2 = new DiscoveryNode[] {
new DiscoveryNode("node3", buildNewFakeTransportAddress(), Version.CURRENT)
};
ClusterSearchShardsGroup[] groups2 = new ClusterSearchShardsGroup[] {
new ClusterSearchShardsGroup(new ShardId("xyz", "xyz_id", 0),
new ShardRouting[] {TestShardRouting.newShardRouting("xyz", 0, "node3", true, ShardRoutingState.STARTED)})
};
searchShardsResponseMap.put("test_cluster_2", new ClusterSearchShardsResponse(groups2, nodes2, null));
Map<String, OriginalIndices> remoteIndicesByCluster = new HashMap<>();
remoteIndicesByCluster.put("test_cluster_1",
new OriginalIndices(new String[]{"fo*", "ba*"}, IndicesOptions.strictExpandOpenAndForbidClosed()));
remoteIndicesByCluster.put("test_cluster_2",
new OriginalIndices(new String[]{"x*"}, IndicesOptions.strictExpandOpenAndForbidClosed()));
Map<String, AliasFilter> remoteAliases = new HashMap<>();
TransportSearchAction.processRemoteShards(service, searchShardsResponseMap, remoteIndicesByCluster, iteratorList,
remoteAliases);
assertEquals(4, iteratorList.size());
for (SearchShardIterator iterator : iteratorList) {
if (iterator.shardId().getIndexName().endsWith("foo")) {
assertArrayEquals(new String[]{"fo*", "ba*"}, iterator.getOriginalIndices().indices());
assertTrue(iterator.shardId().getId() == 0 || iterator.shardId().getId() == 1);
assertEquals("test_cluster_1:foo", iterator.shardId().getIndexName());
ShardRouting shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "foo");
shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "foo");
assertNull(iterator.nextOrNull());
} else if (iterator.shardId().getIndexName().endsWith("bar")) {
assertArrayEquals(new String[]{"fo*", "ba*"}, iterator.getOriginalIndices().indices());
assertEquals(0, iterator.shardId().getId());
assertEquals("test_cluster_1:bar", iterator.shardId().getIndexName());
ShardRouting shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "bar");
shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "bar");
assertNull(iterator.nextOrNull());
} else if (iterator.shardId().getIndexName().endsWith("xyz")) {
assertArrayEquals(new String[]{"x*"}, iterator.getOriginalIndices().indices());
assertEquals(0, iterator.shardId().getId());
assertEquals("test_cluster_2:xyz", iterator.shardId().getIndexName());
ShardRouting shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "xyz");
assertNull(iterator.nextOrNull());
}
}
assertEquals(3, remoteAliases.size());
assertTrue(remoteAliases.toString(), remoteAliases.containsKey("foo_id"));
assertTrue(remoteAliases.toString(), remoteAliases.containsKey("bar_id"));
assertTrue(remoteAliases.toString(), remoteAliases.containsKey("xyz_id"));
assertEquals(new TermsQueryBuilder("foo", "bar"), remoteAliases.get("foo_id").getQueryBuilder());
assertEquals(new MatchAllQueryBuilder(), remoteAliases.get("bar_id").getQueryBuilder());
assertNull(remoteAliases.get("xyz_id").getQueryBuilder());
}
}
}

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
package org.elasticsearch.transport;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Build;
@ -33,6 +33,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -53,6 +54,8 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterConnection;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportService;

View File

@ -16,26 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
@ -44,10 +32,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
@ -144,7 +130,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
assertFalse(service.isRemoteClusterRegistered("foo"));
Map<String, List<String>> perClusterIndices = service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", "cluster*:baz", "*:boo", "no*match:boo"}, i -> false);
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", "cluster*:baz", "*:boo", "no*match:boo"},
i -> false);
String[] localIndices = perClusterIndices.computeIfAbsent(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
k -> Collections.emptyList()).toArray(new String[0]);
assertNotNull(perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
@ -202,90 +189,6 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
}
public void testProcessRemoteShards() throws IOException {
try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, null)) {
assertFalse(service.isCrossClusterSearchEnabled());
List<SearchShardIterator> iteratorList = new ArrayList<>();
Map<String, ClusterSearchShardsResponse> searchShardsResponseMap = new HashMap<>();
DiscoveryNode[] nodes = new DiscoveryNode[] {
new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT),
new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT)
};
Map<String, AliasFilter> indicesAndAliases = new HashMap<>();
indicesAndAliases.put("foo", new AliasFilter(new TermsQueryBuilder("foo", "bar"), Strings.EMPTY_ARRAY));
indicesAndAliases.put("bar", new AliasFilter(new MatchAllQueryBuilder(), Strings.EMPTY_ARRAY));
ClusterSearchShardsGroup[] groups = new ClusterSearchShardsGroup[] {
new ClusterSearchShardsGroup(new ShardId("foo", "foo_id", 0),
new ShardRouting[] {TestShardRouting.newShardRouting("foo", 0, "node1", true, ShardRoutingState.STARTED),
TestShardRouting.newShardRouting("foo", 0, "node2", false, ShardRoutingState.STARTED)}),
new ClusterSearchShardsGroup(new ShardId("foo", "foo_id", 1),
new ShardRouting[] {TestShardRouting.newShardRouting("foo", 0, "node1", true, ShardRoutingState.STARTED),
TestShardRouting.newShardRouting("foo", 1, "node2", false, ShardRoutingState.STARTED)}),
new ClusterSearchShardsGroup(new ShardId("bar", "bar_id", 0),
new ShardRouting[] {TestShardRouting.newShardRouting("bar", 0, "node2", true, ShardRoutingState.STARTED),
TestShardRouting.newShardRouting("bar", 0, "node1", false, ShardRoutingState.STARTED)})
};
searchShardsResponseMap.put("test_cluster_1", new ClusterSearchShardsResponse(groups, nodes, indicesAndAliases));
DiscoveryNode[] nodes2 = new DiscoveryNode[] {
new DiscoveryNode("node3", buildNewFakeTransportAddress(), Version.CURRENT)
};
ClusterSearchShardsGroup[] groups2 = new ClusterSearchShardsGroup[] {
new ClusterSearchShardsGroup(new ShardId("xyz", "xyz_id", 0),
new ShardRouting[] {TestShardRouting.newShardRouting("xyz", 0, "node3", true, ShardRoutingState.STARTED)})
};
searchShardsResponseMap.put("test_cluster_2", new ClusterSearchShardsResponse(groups2, nodes2, null));
Map<String, OriginalIndices> remoteIndicesByCluster = new HashMap<>();
remoteIndicesByCluster.put("test_cluster_1",
new OriginalIndices(new String[]{"fo*", "ba*"}, IndicesOptions.strictExpandOpenAndForbidClosed()));
remoteIndicesByCluster.put("test_cluster_2",
new OriginalIndices(new String[]{"x*"}, IndicesOptions.strictExpandOpenAndForbidClosed()));
Map<String, AliasFilter> remoteAliases = new HashMap<>();
service.processRemoteShards(searchShardsResponseMap, remoteIndicesByCluster, iteratorList, remoteAliases);
assertEquals(4, iteratorList.size());
for (SearchShardIterator iterator : iteratorList) {
if (iterator.shardId().getIndexName().endsWith("foo")) {
assertArrayEquals(new String[]{"fo*", "ba*"}, iterator.getOriginalIndices().indices());
assertTrue(iterator.shardId().getId() == 0 || iterator.shardId().getId() == 1);
assertEquals("test_cluster_1:foo", iterator.shardId().getIndexName());
ShardRouting shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "foo");
shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "foo");
assertNull(iterator.nextOrNull());
} else if (iterator.shardId().getIndexName().endsWith("bar")) {
assertArrayEquals(new String[]{"fo*", "ba*"}, iterator.getOriginalIndices().indices());
assertEquals(0, iterator.shardId().getId());
assertEquals("test_cluster_1:bar", iterator.shardId().getIndexName());
ShardRouting shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "bar");
shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "bar");
assertNull(iterator.nextOrNull());
} else if (iterator.shardId().getIndexName().endsWith("xyz")) {
assertArrayEquals(new String[]{"x*"}, iterator.getOriginalIndices().indices());
assertEquals(0, iterator.shardId().getId());
assertEquals("test_cluster_2:xyz", iterator.shardId().getIndexName());
ShardRouting shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "xyz");
assertNull(iterator.nextOrNull());
}
}
assertEquals(3, remoteAliases.size());
assertTrue(remoteAliases.toString(), remoteAliases.containsKey("foo_id"));
assertTrue(remoteAliases.toString(), remoteAliases.containsKey("bar_id"));
assertTrue(remoteAliases.toString(), remoteAliases.containsKey("xyz_id"));
assertEquals(new TermsQueryBuilder("foo", "bar"), remoteAliases.get("foo_id").getQueryBuilder());
assertEquals(new MatchAllQueryBuilder(), remoteAliases.get("bar_id").getQueryBuilder());
assertNull(remoteAliases.get("xyz_id").getQueryBuilder());
}
}
public void testRemoteNodeAttribute() throws IOException, InterruptedException {
final Settings settings =
Settings.builder().put("search.remote.node.attr", "gateway").build();

View File

@ -819,7 +819,7 @@ public final class MockTransportService extends TransportService {
}
@Override
protected void doClose() {
protected void doClose() throws IOException {
super.doClose();
synchronized (openConnections) {
assert openConnections.size() == 0 : "still open connections: " + openConnections;