mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-10 06:55:32 +00:00
Add remote cluster connections manager
This commit is contained in:
parent
3515f782d1
commit
3625d64b7f
@ -0,0 +1,239 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportConnectionListener;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener {
|
||||
|
||||
private final TransportService transportService;
|
||||
private final ConnectionProfile remoteProfile;
|
||||
private final CopyOnWriteArrayList<DiscoveryNode> clusterNodes = new CopyOnWriteArrayList();
|
||||
private final Supplier<DiscoveryNode> nodeSupplier;
|
||||
private final String clusterName;
|
||||
private final CountDownLatch connected;
|
||||
private volatile List<DiscoveryNode> seedNodes;
|
||||
|
||||
RemoteClusterConnection(Settings settings, String clusterName, List<DiscoveryNode> seedNodes,
|
||||
TransportService transportService) {
|
||||
super(settings);
|
||||
this.connected = new CountDownLatch(1);
|
||||
this.transportService = transportService;
|
||||
this.clusterName = clusterName;
|
||||
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
|
||||
builder.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING); // TODO make this configurable?
|
||||
builder.addConnections(0, // we don't want this to be used for anything else but search
|
||||
TransportRequestOptions.Type.BULK,
|
||||
TransportRequestOptions.Type.STATE,
|
||||
TransportRequestOptions.Type.RECOVERY);
|
||||
remoteProfile = builder.build();
|
||||
nodeSupplier = new Supplier<DiscoveryNode>() {
|
||||
private volatile Iterator<DiscoveryNode> current;
|
||||
@Override
|
||||
public DiscoveryNode get() {
|
||||
if (current == null || current.hasNext() == false) {
|
||||
current = clusterNodes.iterator();
|
||||
if (current.hasNext() == false) {
|
||||
throw new IllegalStateException("No node available for cluster: " + clusterName + " nodes: " + clusterNodes );
|
||||
}
|
||||
}
|
||||
return current.next();
|
||||
}
|
||||
};
|
||||
this.seedNodes = seedNodes;
|
||||
}
|
||||
|
||||
public synchronized void connectWithSeeds(ActionListener<Void> connectListener) {
|
||||
if (clusterNodes.isEmpty()) {
|
||||
TimeValue connectTimeout = TimeValue.timeValueSeconds(10); // TODO make configurable
|
||||
Iterator<DiscoveryNode> iterator = Collections.synchronizedList(seedNodes).iterator();
|
||||
handshakeAndConnect(iterator, transportService, connectTimeout, connectListener, true);
|
||||
} else {
|
||||
connectListener.onResponse(null);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public synchronized void updateSeedNodes(List<DiscoveryNode> seedNodes) {
|
||||
if (this.seedNodes.containsAll(seedNodes) == false || this.seedNodes.size() != seedNodes.size()) {
|
||||
this.seedNodes = new ArrayList<>(seedNodes);
|
||||
ActionListener<Void> listener = ActionListener.wrap(x -> {},
|
||||
e -> logger.error("failed to establish connection to remote cluster", e));
|
||||
connectWithSeeds(listener);
|
||||
}
|
||||
}
|
||||
|
||||
private void handshakeAndConnect(Iterator<DiscoveryNode> seedNodes,
|
||||
final TransportService transportService, TimeValue connectTimeout, ActionListener<Void> listener,
|
||||
boolean connect) {
|
||||
try {
|
||||
if (seedNodes.hasNext()) {
|
||||
final DiscoveryNode seedNode = seedNodes.next();
|
||||
final DiscoveryNode handshakeNode;
|
||||
if (connect) {
|
||||
try (Transport.Connection connection = transportService.openConnection(seedNode, ConnectionProfile.LIGHT_PROFILE)) {
|
||||
handshakeNode = transportService.handshake(connection, connectTimeout.millis(), (c) -> true);
|
||||
transportService.connectToNode(handshakeNode, remoteProfile);
|
||||
clusterNodes.add(handshakeNode);
|
||||
}
|
||||
} else {
|
||||
handshakeNode = seedNode;
|
||||
}
|
||||
ClusterStateRequest request = new ClusterStateRequest();
|
||||
request.clear();
|
||||
request.nodes(true);
|
||||
transportService.sendRequest(transportService.getConnection(handshakeNode),
|
||||
ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
|
||||
new TransportResponseHandler<ClusterStateResponse>() {
|
||||
|
||||
@Override
|
||||
public ClusterStateResponse newInstance() {
|
||||
return new ClusterStateResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(ClusterStateResponse response) {
|
||||
DiscoveryNodes nodes = response.getState().nodes();
|
||||
Iterable<DiscoveryNode> nodesIter = nodes.getDataNodes()::valuesIt;
|
||||
for (DiscoveryNode node : nodesIter) {
|
||||
transportService.connectToNode(node); // noop if node is connected
|
||||
clusterNodes.add(node);
|
||||
}
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed",
|
||||
clusterName), exp);
|
||||
handshakeAndConnect(seedNodes, transportService, connectTimeout, listener, connect);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.MANAGEMENT;
|
||||
}
|
||||
});
|
||||
} else {
|
||||
listener.onFailure(new IllegalStateException("no seed node left"));
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
if (seedNodes.hasNext()) {
|
||||
logger.debug((Supplier<?>) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed",
|
||||
clusterName), ex);
|
||||
handshakeAndConnect(seedNodes, transportService, connectTimeout, listener, connect);
|
||||
} else {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNodeDisconnected(DiscoveryNode node) {
|
||||
boolean remove = clusterNodes.remove(node);
|
||||
if (remove == true && clusterNodes.isEmpty()) {
|
||||
// try to reconnect
|
||||
ActionListener<Void> listener = ActionListener.wrap(x -> {},
|
||||
e -> logger.error("failed to establish connection to remote cluster", e));
|
||||
connectWithSeeds(listener);
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureConnected(DiscoveryNode[] nodes) {
|
||||
boolean seenNotConnectedNode = false;
|
||||
for (DiscoveryNode node : nodes) {
|
||||
if (transportService.nodeConnected(node) == false) {
|
||||
seenNotConnectedNode = true;
|
||||
transportService.connectToNode(node, remoteProfile);
|
||||
}
|
||||
}
|
||||
if (seenNotConnectedNode) {
|
||||
final TimeValue connectTimeout = TimeValue.timeValueSeconds(10); // TODO make configurable
|
||||
handshakeAndConnect(clusterNodes.iterator(), transportService, connectTimeout,
|
||||
ActionListener.wrap((x) -> {
|
||||
}, x -> {
|
||||
}), false); // nocommit handle exceptions here what should we do
|
||||
}
|
||||
}
|
||||
|
||||
public void fetchSearchShards(SearchRequest searchRequest, final List<String> indices,
|
||||
ActionListener<ClusterSearchShardsResponse> listener) {
|
||||
final DiscoveryNode node = nodeSupplier.get();
|
||||
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices.toArray(new String[indices.size()]))
|
||||
.indicesOptions(searchRequest.indicesOptions()).local(true).preference(searchRequest.preference())
|
||||
.routing(searchRequest.routing());
|
||||
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
|
||||
new TransportResponseHandler<ClusterSearchShardsResponse>() {
|
||||
|
||||
@Override
|
||||
public ClusterSearchShardsResponse newInstance() {
|
||||
return new ClusterSearchShardsResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
|
||||
ensureConnected(clusterSearchShardsResponse.getNodes());
|
||||
listener.onResponse(clusterSearchShardsResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SEARCH;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public String getClusterName() {
|
||||
return clusterName;
|
||||
}
|
||||
}
|
@ -24,12 +24,9 @@ import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionListenerResponseHandler;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.OriginalIndices;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
@ -53,13 +50,11 @@ import org.elasticsearch.search.query.QuerySearchResultProvider;
|
||||
import org.elasticsearch.search.query.ScrollQuerySearchResult;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -67,10 +62,12 @@ import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
@ -99,16 +96,35 @@ public class SearchTransportService extends AbstractComponent {
|
||||
Setting.Property.Dynamic);
|
||||
|
||||
private final TransportService transportService;
|
||||
private volatile Map<String, List<DiscoveryNode>> remoteClustersSeeds;
|
||||
private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();
|
||||
|
||||
public SearchTransportService(Settings settings, ClusterSettings clusterSettings, TransportService transportService) {
|
||||
super(settings);
|
||||
this.transportService = transportService;
|
||||
setRemoteClustersSeeds(REMOTE_CLUSTERS_SEEDS.get(settings));
|
||||
clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTERS_SEEDS, this::setRemoteClustersSeeds,
|
||||
clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTERS_SEEDS, this::setRemoteClusters,
|
||||
SearchTransportService::validateRemoteClustersSeeds);
|
||||
}
|
||||
|
||||
public void setupRemoteClusters() {
|
||||
// nocommit we have to figure out a good way to set-up these connections
|
||||
setRemoteClusters(REMOTE_CLUSTERS_SEEDS.get(settings));
|
||||
}
|
||||
|
||||
private void connect() {
|
||||
int size = remoteClusters.size();
|
||||
CountDownLatch latch = new CountDownLatch(size);
|
||||
for (RemoteClusterConnection connection : remoteClusters.values()) {
|
||||
connection.connectWithSeeds(ActionListener.wrap(x -> latch.countDown(), ex -> {
|
||||
throw new Error("failed to connect to to remote cluster " + connection.getClusterName(), ex);
|
||||
}));
|
||||
}
|
||||
try {
|
||||
latch.await(); // NOCOMMIT timeout?
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
private static void validateRemoteClustersSeeds(Settings settings) {
|
||||
//TODO do we need a static whitelist like in reindex from remote?
|
||||
for (String clusterName : settings.names()) {
|
||||
@ -171,34 +187,31 @@ public class SearchTransportService extends AbstractComponent {
|
||||
return remoteClustersNodes;
|
||||
}
|
||||
|
||||
private void setRemoteClustersSeeds(Settings settings) {
|
||||
remoteClustersSeeds = buildRemoteClustersSeeds(settings);
|
||||
private void setRemoteClusters(Settings settings) {
|
||||
Map<String, List<DiscoveryNode>> seeds = buildRemoteClustersSeeds(settings);
|
||||
Map<String, RemoteClusterConnection> remoteClusters = new HashMap<>();
|
||||
for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
|
||||
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
|
||||
if (remote == null) {
|
||||
remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService);
|
||||
remoteClusters.put(entry.getKey(), remote);
|
||||
} else {
|
||||
remote.updateSeedNodes(entry.getValue());
|
||||
}
|
||||
}
|
||||
if (remoteClusters.isEmpty() == false) {
|
||||
remoteClusters.putAll(this.remoteClusters);
|
||||
this.remoteClusters = Collections.unmodifiableMap(remoteClusters);
|
||||
connect(); //nocommit this sucks as it's executed on the state update thread
|
||||
}
|
||||
}
|
||||
|
||||
boolean isCrossClusterSearchEnabled() {
|
||||
return remoteClustersSeeds.isEmpty() == false;
|
||||
return remoteClusters.isEmpty() == false;
|
||||
}
|
||||
|
||||
boolean isRemoteClusterRegistered(String clusterName) {
|
||||
return remoteClustersSeeds.containsKey(clusterName);
|
||||
}
|
||||
|
||||
private DiscoveryNode connectToRemoteCluster(String clusterName) {
|
||||
List<DiscoveryNode> nodes = remoteClustersSeeds.get(clusterName);
|
||||
if (nodes == null) {
|
||||
throw new IllegalArgumentException("no remote cluster configured with name [" + clusterName + "]");
|
||||
}
|
||||
DiscoveryNode remoteNode = nodes.get(Randomness.get().nextInt(nodes.size()));
|
||||
//TODO we just take a random host for now, implement fallback in case of connect failure
|
||||
try {
|
||||
//TODO at the moment the configured cluster names are really just labels. We should validate that all the nodes
|
||||
//belong to the same cluster, also validate the cluster name against the configured label and make sure they match
|
||||
// now go and do a real connection with the updated version of the node
|
||||
connectToRemoteNode(remoteNode);
|
||||
return remoteNode;
|
||||
} catch(ConnectTransportException e) {
|
||||
throw new ConnectTransportException(remoteNode, "unable to connect to remote cluster [" + clusterName + "]", e);
|
||||
}
|
||||
return remoteClusters.containsKey(clusterName);
|
||||
}
|
||||
|
||||
void connectToRemoteNode(DiscoveryNode remoteNode) {
|
||||
@ -212,24 +225,15 @@ public class SearchTransportService extends AbstractComponent {
|
||||
final AtomicReference<TransportException> transportException = new AtomicReference<>();
|
||||
for (Map.Entry<String, List<String>> entry : remoteIndicesByCluster.entrySet()) {
|
||||
final String clusterName = entry.getKey();
|
||||
//TODO we should rather eagerly connect to every configured remote node of all remote clusters
|
||||
final DiscoveryNode node = connectToRemoteCluster(clusterName);
|
||||
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName);
|
||||
if (remoteClusterConnection == null) {
|
||||
throw new IllegalArgumentException("no such remote cluster: " + clusterName);
|
||||
}
|
||||
final List<String> indices = entry.getValue();
|
||||
//local true so we don't go to the master for each single remote search
|
||||
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices.toArray(new String[indices.size()]))
|
||||
.indicesOptions(searchRequest.indicesOptions()).local(true).preference(searchRequest.preference())
|
||||
.routing(searchRequest.routing());
|
||||
|
||||
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
|
||||
new TransportResponseHandler<ClusterSearchShardsResponse>() {
|
||||
|
||||
remoteClusterConnection.fetchSearchShards(searchRequest, indices,
|
||||
new ActionListener<ClusterSearchShardsResponse>() {
|
||||
@Override
|
||||
public ClusterSearchShardsResponse newInstance() {
|
||||
return new ClusterSearchShardsResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
|
||||
public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
|
||||
searchShardsResponses.put(clusterName, clusterSearchShardsResponse);
|
||||
if (responsesCountDown.countDown()) {
|
||||
TransportException exception = transportException.get();
|
||||
@ -242,7 +246,7 @@ public class SearchTransportService extends AbstractComponent {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException e) {
|
||||
public void onFailure(Exception e) {
|
||||
TransportException exception = new TransportException("unable to communicate with remote cluster [" +
|
||||
clusterName + "]", e);
|
||||
if (transportException.compareAndSet(null, exception) == false) {
|
||||
@ -255,11 +259,6 @@ public class SearchTransportService extends AbstractComponent {
|
||||
listener.onFailure(exception);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SEARCH;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -585,6 +585,8 @@ public class Node implements Closeable {
|
||||
// start after cluster service so the local disco is known
|
||||
discovery.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
SearchTransportService searchTransportService = injector.getInstance(SearchTransportService.class);
|
||||
searchTransportService.setupRemoteClusters();
|
||||
discovery.startInitialJoin();
|
||||
// tribe nodes don't have a master so we shouldn't register an observer s
|
||||
final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
|
||||
@ -619,6 +621,7 @@ public class Node implements Closeable {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (NetworkModule.HTTP_ENABLED.get(settings)) {
|
||||
injector.getInstance(HttpServer.class).start();
|
||||
}
|
||||
|
@ -188,7 +188,7 @@ public final class ConnectionProfile {
|
||||
*/
|
||||
<T> T getChannel(T[] channels) {
|
||||
if (length == 0) {
|
||||
throw new IllegalStateException("can't select channel size is 0");
|
||||
throw new IllegalStateException("can't select channel size is 0 for types: " + types);
|
||||
}
|
||||
assert channels.length >= offset + length : "illegal size: " + channels.length + " expected >= " + (offset + length);
|
||||
return channels[offset + Math.floorMod(counter.incrementAndGet(), length)];
|
||||
|
@ -64,6 +64,7 @@ import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
@ -369,6 +370,25 @@ public class TransportService extends AbstractLifecycleComponent {
|
||||
public DiscoveryNode handshake(
|
||||
final Transport.Connection connection,
|
||||
final long handshakeTimeout) throws ConnectTransportException {
|
||||
return handshake(connection, handshakeTimeout, clusterName::equals);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a high-level handshake using the given connection
|
||||
* and returns the discovery node of the node the connection
|
||||
* was established with. The handshake will fail if the cluster
|
||||
* name on the target node mismatches the local cluster name.
|
||||
*
|
||||
* @param connection the connection to a specific node
|
||||
* @param handshakeTimeout handshake timeout
|
||||
* @param clusterNamePredicate cluster name validation predicate
|
||||
* @return the connected node
|
||||
* @throws ConnectTransportException if the connection failed
|
||||
* @throws IllegalStateException if the handshake failed
|
||||
*/
|
||||
public DiscoveryNode handshake(
|
||||
final Transport.Connection connection,
|
||||
final long handshakeTimeout, Predicate<ClusterName> clusterNamePredicate) throws ConnectTransportException {
|
||||
final HandshakeResponse response;
|
||||
final DiscoveryNode node = connection.getNode();
|
||||
try {
|
||||
@ -386,7 +406,7 @@ public class TransportService extends AbstractLifecycleComponent {
|
||||
throw new IllegalStateException("handshake failed with " + node, e);
|
||||
}
|
||||
|
||||
if (!Objects.equals(clusterName, response.clusterName)) {
|
||||
if (!clusterNamePredicate.test(response.clusterName)) {
|
||||
throw new IllegalStateException("handshake failed, mismatched cluster name [" + response.clusterName + "] - " + node);
|
||||
} else if (response.version.isCompatible((localNode != null ? localNode.getVersion() : Version.CURRENT)) == false) {
|
||||
throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node);
|
||||
|
@ -0,0 +1,125 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class RemoteClusterConnectionTests extends ESIntegTestCase {
|
||||
|
||||
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void testConnect() throws InterruptedException {
|
||||
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setNodes(true).get();
|
||||
ImmutableOpenMap<String, DiscoveryNode> dataNodes = clusterStateResponse.getState().getNodes().getDataNodes();
|
||||
DiscoveryNode node = dataNodes.valuesIt().next();
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", Arrays.asList(node), service);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
|
||||
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
|
||||
exceptionAtomicReference.set(x);
|
||||
latch.countDown();
|
||||
});
|
||||
connection.connectWithSeeds(listener);
|
||||
latch.await();
|
||||
assertTrue(service.nodeConnected(node));
|
||||
Iterable<DiscoveryNode> nodesIterable = dataNodes::valuesIt;
|
||||
for (DiscoveryNode dataNode : nodesIterable) {
|
||||
assertTrue(service.nodeConnected(dataNode));
|
||||
|
||||
}
|
||||
assertNull(exceptionAtomicReference.get());
|
||||
}
|
||||
}
|
||||
|
||||
public void testFetchShards() throws InterruptedException {
|
||||
|
||||
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setNodes(true).get();
|
||||
ImmutableOpenMap<String, DiscoveryNode> dataNodes = clusterStateResponse.getState().getNodes().getDataNodes();
|
||||
DiscoveryNode node = dataNodes.valuesIt().next();
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", Arrays.asList(node), service);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
|
||||
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
|
||||
exceptionAtomicReference.set(x);
|
||||
latch.countDown();
|
||||
});
|
||||
connection.connectWithSeeds(listener);
|
||||
latch.await();
|
||||
|
||||
String newNode = internalCluster().startDataOnlyNode();
|
||||
createIndex("test-index");
|
||||
assertTrue(service.nodeConnected(node));
|
||||
Iterable<DiscoveryNode> nodesIterable = dataNodes::valuesIt;
|
||||
for (DiscoveryNode dataNode : nodesIterable) {
|
||||
if (dataNode.getName().equals(newNode)) {
|
||||
assertFalse(service.nodeConnected(dataNode));
|
||||
} else {
|
||||
assertTrue(service.nodeConnected(dataNode));
|
||||
}
|
||||
}
|
||||
assertNull(exceptionAtomicReference.get());
|
||||
|
||||
SearchRequest request = new SearchRequest("test-index");
|
||||
CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
|
||||
AtomicReference<Exception> failReference = new AtomicReference<>();
|
||||
|
||||
ActionListener<ClusterSearchShardsResponse> shardsListener = ActionListener.wrap(
|
||||
x -> {reference.set(x); responseLatch.countDown();},
|
||||
x -> {failReference.set(x); responseLatch.countDown();});
|
||||
connection.fetchSearchShards(request, Arrays.asList("test-index"), shardsListener);
|
||||
responseLatch.await();
|
||||
assertNull(failReference.get());
|
||||
assertNotNull(reference.get());
|
||||
ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get();
|
||||
DiscoveryNode[] nodes = clusterSearchShardsResponse.getNodes();
|
||||
assertTrue(nodes.length != 0);
|
||||
for (DiscoveryNode dataNode : nodes) {
|
||||
assertTrue(service.nodeConnected(dataNode));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user