Cleanup lots of code, add javadocs and tests
This commit is contained in:
parent
dd0331144a
commit
e642965804
|
@ -20,7 +20,8 @@ package org.elasticsearch.action.search;
|
|||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.Version;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
|
||||
|
@ -32,11 +33,11 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportActionProxy;
|
||||
import org.elasticsearch.transport.TransportConnectionListener;
|
||||
|
@ -46,6 +47,7 @@ import org.elasticsearch.transport.TransportRequestOptions;
|
|||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -57,28 +59,53 @@ import java.util.concurrent.BlockingQueue;
|
|||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener {
|
||||
/**
|
||||
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
|
||||
* current node is part of the cluster and it won't receive cluster state updated from the remote cluster. Remote clusters are also not
|
||||
* fully connected with the current node. From a connectin perspective a local cluster forms a bi-directional star network while in the
|
||||
* remote case we only connect to a subset of the nodes in the cluster in an uni-directional fashion.
|
||||
*
|
||||
* This class also handles the discovery of nodes from the remote cluster. The initial list of seed nodes is only used to discover all nodes
|
||||
* in the remote cluster and connects to all eligible nodes, for details see {@link RemoteClusterService#REMOTE_NODE_ATTRIBUTE}.
|
||||
*
|
||||
* In the case of a disconnection, this class will issue a re-connect task to establish at most
|
||||
* {@link RemoteClusterService#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of connections
|
||||
* per cluster has been reached.
|
||||
*/
|
||||
final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener, Closeable {
|
||||
|
||||
private final TransportService transportService;
|
||||
private final ConnectionProfile remoteProfile;
|
||||
private final CopyOnWriteArrayList<DiscoveryNode> clusterNodes = new CopyOnWriteArrayList();
|
||||
private final CopyOnWriteArrayList<DiscoveryNode> connectedNodes = new CopyOnWriteArrayList();
|
||||
private final Supplier<DiscoveryNode> nodeSupplier;
|
||||
private final String clusterName;
|
||||
private final int numSeedNodesToDiscover;
|
||||
private final int maxNumRemoteConnections;
|
||||
private final Predicate<DiscoveryNode> nodePredicate;
|
||||
private volatile List<DiscoveryNode> seedNodes;
|
||||
private final ConnectHandler connectHandler;
|
||||
|
||||
/**
|
||||
* Creates a new {@link RemoteClusterConnection}
|
||||
* @param settings the nodes settings object
|
||||
* @param clusterName the configured name of the cluster to connect to
|
||||
* @param seedNodes a list of seed nodes to discover eligible nodes from
|
||||
* @param transportService the local nodes transport service
|
||||
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
|
||||
* @param nodePredicate a predicate to filter eligable remote nodes to connect to
|
||||
*/
|
||||
RemoteClusterConnection(Settings settings, String clusterName, List<DiscoveryNode> seedNodes,
|
||||
TransportService transportService, int numSeedNodesToDiscover, Predicate<DiscoveryNode> nodePredicate) {
|
||||
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
|
||||
super(settings);
|
||||
this.transportService = transportService;
|
||||
this.numSeedNodesToDiscover = numSeedNodesToDiscover;
|
||||
this.maxNumRemoteConnections = maxNumRemoteConnections;
|
||||
this.nodePredicate = nodePredicate;
|
||||
this.clusterName = clusterName;
|
||||
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
|
||||
builder.setConnectTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
|
||||
builder.setHandshakeTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
|
||||
builder.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING); // TODO make this configurable?
|
||||
builder.addConnections(0, // we don't want this to be used for anything else but search
|
||||
TransportRequestOptions.Type.BULK,
|
||||
|
@ -90,9 +117,9 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
@Override
|
||||
public DiscoveryNode get() {
|
||||
if (current == null || current.hasNext() == false) {
|
||||
current = clusterNodes.iterator();
|
||||
current = connectedNodes.iterator();
|
||||
if (current.hasNext() == false) {
|
||||
throw new IllegalStateException("No node available for cluster: " + clusterName + " nodes: " + clusterNodes );
|
||||
throw new IllegalStateException("No node available for cluster: " + clusterName + " nodes: " + connectedNodes);
|
||||
}
|
||||
}
|
||||
return current.next();
|
||||
|
@ -100,8 +127,12 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
};
|
||||
this.seedNodes = Collections.unmodifiableList(seedNodes);
|
||||
this.connectHandler = new ConnectHandler();
|
||||
transportService.addConnectionListener(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the list of seed nodes for this cluster connection
|
||||
*/
|
||||
synchronized void updateSeedNodes(List<DiscoveryNode> seedNodes, ActionListener<Void> connectListener) {
|
||||
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
|
||||
connectHandler.connect(connectListener);
|
||||
|
@ -109,64 +140,32 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
|
||||
@Override
|
||||
public void onNodeDisconnected(DiscoveryNode node) {
|
||||
boolean remove = clusterNodes.remove(node);
|
||||
if (remove == true && clusterNodes.isEmpty()) {
|
||||
// try to reconnect
|
||||
ActionListener<Void> listener = ActionListener.wrap(x -> {},
|
||||
e -> logger.error("failed to establish connection to remote cluster", e));
|
||||
connectHandler.connect(listener);
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureConnected(DiscoveryNode[] nodes) {
|
||||
boolean seenNotConnectedNode = false;
|
||||
for (DiscoveryNode node : nodes) {
|
||||
if (transportService.nodeConnected(node) == false) {
|
||||
seenNotConnectedNode = true;
|
||||
transportService.connectToNode(node, remoteProfile);
|
||||
}
|
||||
}
|
||||
if (seenNotConnectedNode) {
|
||||
TimeValue connectTimeout = TimeValue.timeValueSeconds(10); // TODO make configurable
|
||||
connectHandler.handshakeAndConnect(clusterNodes.iterator(), transportService, connectTimeout,
|
||||
ActionListener.wrap((x) -> {
|
||||
}, x -> {
|
||||
}), false); // nocommit handle exceptions here what should we do
|
||||
boolean remove = connectedNodes.remove(node);
|
||||
if (remove == true && connectedNodes.size() < maxNumRemoteConnections) {
|
||||
// try to reconnect and fill up the slot of the disconnected node
|
||||
connectHandler.maybeConnect();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches all shards for the search request from this remote connection. This is used to later run the search on the remote end.
|
||||
*/
|
||||
public void fetchSearchShards(SearchRequest searchRequest, final List<String> indices,
|
||||
ActionListener<ClusterSearchShardsResponse> listener) {
|
||||
if (clusterNodes.isEmpty()) {
|
||||
connectHandler.connect(ActionListener.wrap((x) -> fetchSearchShards(searchRequest, indices, listener), listener::onFailure));
|
||||
if (connectedNodes.isEmpty()) {
|
||||
// just in case if we are not connected for some reason we try to connect and if we fail we to and notify the listener
|
||||
// this will cause some back pressure on the search end and eventually will cause rejections but that's fine
|
||||
// we can't proceed with a search on a cluster level.
|
||||
// in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the caller
|
||||
// end since they provide the listener.
|
||||
connectHandler.connect(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, indices, listener), listener::onFailure));
|
||||
} else {
|
||||
fetchShardsInternal(searchRequest, indices, listener);
|
||||
}
|
||||
}
|
||||
|
||||
public Transport.Connection getProxyConnection(DiscoveryNode nodeToProxyTo) {
|
||||
DiscoveryNode discoveryNode = nodeSupplier.get();
|
||||
Transport.Connection connection = transportService.getConnection(discoveryNode);
|
||||
return new Transport.Connection() {
|
||||
@Override
|
||||
public DiscoveryNode getNode() {
|
||||
return nodeToProxyTo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
|
||||
connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
|
||||
TransportActionProxy.wrapRequest(nodeToProxyTo, request), options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
throw new IllegalStateException("never close a proxy connection");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void fetchShardsInternal(SearchRequest searchRequest, List<String> indices, final ActionListener<ClusterSearchShardsResponse> listener) {
|
||||
private void fetchShardsInternal(SearchRequest searchRequest, List<String> indices,
|
||||
final ActionListener<ClusterSearchShardsResponse> listener) {
|
||||
final DiscoveryNode node = nodeSupplier.get();
|
||||
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices.toArray(new String[indices.size()]))
|
||||
.indicesOptions(searchRequest.indicesOptions()).local(true).preference(searchRequest.preference())
|
||||
|
@ -181,7 +180,6 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
|
||||
@Override
|
||||
public void handleResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
|
||||
ensureConnected(clusterSearchShardsResponse.getNodes());
|
||||
listener.onResponse(clusterSearchShardsResponse);
|
||||
}
|
||||
|
||||
|
@ -197,14 +195,54 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns a connection to the remote cluster. This connection might be a proxy connection that redirects internally to the
|
||||
* given node.
|
||||
*/
|
||||
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
|
||||
DiscoveryNode discoveryNode = nodeSupplier.get();
|
||||
Transport.Connection connection = transportService.getConnection(discoveryNode);
|
||||
return new Transport.Connection() {
|
||||
@Override
|
||||
public DiscoveryNode getNode() {
|
||||
return remoteClusterNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
|
||||
throws IOException, TransportException {
|
||||
connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
|
||||
TransportActionProxy.wrapRequest(remoteClusterNode, request), options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
assert false: "proxy connections must not be closed";
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the name of the remote cluster
|
||||
*/
|
||||
public String getClusterName() {
|
||||
return clusterName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
connectHandler.close();
|
||||
}
|
||||
|
||||
private class ConnectHandler {
|
||||
private Semaphore running = new Semaphore(1);
|
||||
private BlockingQueue<ActionListener<Void>> queue = new ArrayBlockingQueue<>(100);
|
||||
private class ConnectHandler implements Closeable {
|
||||
private final Semaphore running = new Semaphore(1);
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
private final BlockingQueue<ActionListener<Void>> queue = new ArrayBlockingQueue<>(100);
|
||||
|
||||
public void maybeConnect() {
|
||||
connect(null);
|
||||
}
|
||||
|
||||
public void connect(ActionListener<Void> connectListener) {
|
||||
final boolean runConnect;
|
||||
|
@ -213,13 +251,19 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
if (connectListener != null && queue.offer(connectListener) == false) {
|
||||
throw new IllegalStateException("connect queue is full");
|
||||
}
|
||||
if (queue.isEmpty()) {
|
||||
if (connectListener != null && queue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
runConnect = running.tryAcquire();
|
||||
|
||||
if (runConnect) {
|
||||
toNotify = new ArrayList<>();
|
||||
queue.drainTo(toNotify);
|
||||
if (closed.get()) {
|
||||
for (ActionListener<Void> listener : toNotify) {
|
||||
listener.onFailure(new AlreadyClosedException("connecte handler is already closed"));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
toNotify = Collections.emptyList();
|
||||
}
|
||||
|
@ -227,7 +271,6 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
if (runConnect) {
|
||||
forkConnect(toNotify);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void forkConnect(final Collection<ActionListener<Void>> toNotify) {
|
||||
|
@ -247,14 +290,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
ActionListener<Void> listener = ActionListener.wrap((x) -> {
|
||||
|
||||
synchronized (queue) {
|
||||
running.release();
|
||||
}
|
||||
for (ActionListener<Void> queuedListener : toNotify) {
|
||||
queuedListener.onResponse(x);
|
||||
}
|
||||
connect(null);
|
||||
maybeConnect();
|
||||
},
|
||||
(e) -> {
|
||||
synchronized (queue) {
|
||||
|
@ -263,72 +305,92 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
for (ActionListener<Void> queuedListener : toNotify) {
|
||||
queuedListener.onFailure(e);
|
||||
}
|
||||
connect(null);
|
||||
maybeConnect();
|
||||
});
|
||||
TimeValue connectTimeout = TimeValue.timeValueSeconds(10); // TODO make configurable
|
||||
Iterator<DiscoveryNode> iterator = Collections.synchronizedList(seedNodes).iterator();
|
||||
handshakeAndConnect(iterator, transportService, connectTimeout, listener, true);
|
||||
collectRemoteNodes(iterator, transportService, listener);
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
void handshakeAndConnect(Iterator<DiscoveryNode> seedNodes,
|
||||
final TransportService transportService, TimeValue connectTimeout, ActionListener<Void> listener,
|
||||
boolean connect) {
|
||||
void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
|
||||
final TransportService transportService, ActionListener<Void> listener) {
|
||||
try {
|
||||
if (seedNodes.hasNext()) {
|
||||
final DiscoveryNode seedNode = seedNodes.next();
|
||||
final DiscoveryNode handshakeNode;
|
||||
if (connect) {
|
||||
try (Transport.Connection connection = transportService.openConnection(seedNode,
|
||||
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null,null))) {
|
||||
handshakeNode = transportService.handshake(connection, connectTimeout.millis(), (c) -> true);
|
||||
Transport.Connection connection = transportService.openConnection(seedNode,
|
||||
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null,null));
|
||||
boolean success = false;
|
||||
try {
|
||||
handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
|
||||
(c) -> true);
|
||||
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
|
||||
transportService.connectToNode(handshakeNode, remoteProfile);
|
||||
clusterNodes.add(handshakeNode);
|
||||
connectedNodes.add(handshakeNode);
|
||||
}
|
||||
} else {
|
||||
handshakeNode = seedNode;
|
||||
}
|
||||
ClusterStateRequest request = new ClusterStateRequest();
|
||||
request.clear();
|
||||
request.nodes(true);
|
||||
transportService.sendRequest(transportService.getConnection(handshakeNode),
|
||||
ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
|
||||
new TransportResponseHandler<ClusterStateResponse>() {
|
||||
ClusterStateRequest request = new ClusterStateRequest();
|
||||
request.clear();
|
||||
request.nodes(true);
|
||||
transportService.sendRequest(connection,
|
||||
ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
|
||||
new TransportResponseHandler<ClusterStateResponse>() {
|
||||
|
||||
@Override
|
||||
public ClusterStateResponse newInstance() {
|
||||
return new ClusterStateResponse();
|
||||
}
|
||||
@Override
|
||||
public ClusterStateResponse newInstance() {
|
||||
return new ClusterStateResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(ClusterStateResponse response) {
|
||||
DiscoveryNodes nodes = response.getState().nodes();
|
||||
Iterable<DiscoveryNode> nodesIter = nodes.getNodes()::valuesIt;
|
||||
for (DiscoveryNode node : nodesIter) {
|
||||
if (nodePredicate.test(node) && clusterNodes.size() < numSeedNodesToDiscover) {
|
||||
transportService.connectToNode(node); // noop if node is connected
|
||||
clusterNodes.add(node);
|
||||
@Override
|
||||
public void handleResponse(ClusterStateResponse response) {
|
||||
try {
|
||||
DiscoveryNodes nodes = response.getState().nodes();
|
||||
Iterable<DiscoveryNode> nodesIter = nodes.getNodes()::valuesIt;
|
||||
for (DiscoveryNode node : nodesIter) {
|
||||
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
|
||||
try {
|
||||
transportService.connectToNode(node, remoteProfile); // noop if node is connected
|
||||
connectedNodes.add(node);
|
||||
} catch (ConnectTransportException ex) {
|
||||
// fair enough we can't connect just move on
|
||||
logger.debug((Supplier<?>)
|
||||
() -> new ParameterizedMessage("failed to connect to node {}", node), ex);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
listener.onResponse(null);
|
||||
|
||||
} catch (Exception ex) {
|
||||
logger.warn((Supplier<?>)
|
||||
() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterName), ex);
|
||||
collectRemoteNodes(seedNodes, transportService, listener);
|
||||
} finally {
|
||||
IOUtils.closeWhileHandlingException(connection);
|
||||
}
|
||||
}
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed",
|
||||
clusterName), exp);
|
||||
handshakeAndConnect(seedNodes, transportService, connectTimeout, listener, connect);
|
||||
}
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.warn((Supplier<?>)
|
||||
() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterName), exp);
|
||||
collectRemoteNodes(seedNodes, transportService, listener);
|
||||
IOUtils.closeWhileHandlingException(connection);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.MANAGEMENT;
|
||||
}
|
||||
});
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.MANAGEMENT;
|
||||
}
|
||||
});
|
||||
} else {
|
||||
listener.onFailure(new IllegalStateException("no seed node left"));
|
||||
}
|
||||
|
@ -336,12 +398,27 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
if (seedNodes.hasNext()) {
|
||||
logger.debug((Supplier<?>) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed",
|
||||
clusterName), ex);
|
||||
handshakeAndConnect(seedNodes, transportService, connectTimeout, listener, connect);
|
||||
collectRemoteNodes(seedNodes, transportService, listener);
|
||||
} else {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
closed.compareAndSet(false, true);
|
||||
running.acquire();
|
||||
running.release();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
synchronized (queue) {
|
||||
|
||||
running.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,284 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
|
||||
public final class RemoteClusterService extends AbstractComponent {
|
||||
|
||||
/**
|
||||
* A list of initial seed nodes to discover eligibale nodes from the remote cluster
|
||||
*/
|
||||
//TODO this should be an affix settings?
|
||||
public static final Setting<Settings> REMOTE_CLUSTERS_SEEDS = Setting.groupSetting("search.remote.seeds.",
|
||||
RemoteClusterService::validateRemoteClustersSeeds,
|
||||
Setting.Property.NodeScope,
|
||||
Setting.Property.Dynamic);
|
||||
/**
|
||||
* The maximum number of connections that will be established to a remote cluster. For instance if there is only a single
|
||||
* seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3.
|
||||
*/
|
||||
public static final Setting<Integer> REMOTE_CONNECTIONS_PER_CLUSTER = Setting.intSetting("search.remote.connections_per_cluster",
|
||||
3, 1, Setting.Property.NodeScope);
|
||||
|
||||
/**
|
||||
* The initial connect timeout for remote cluster connections
|
||||
*/
|
||||
public static final Setting<TimeValue> REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING =
|
||||
Setting.positiveTimeSetting("search.remote.initial_connect_timeout", TimeValue.timeValueSeconds(30), Setting.Property.NodeScope);
|
||||
|
||||
/**
|
||||
* The name of a node attribute to filter out nodes that should not be connected to in the remote cluster.
|
||||
* For instance a node can be configured with <tt>node.node_attr.gateway: true</tt> in order to be eligable as a gateway node between
|
||||
* clusters. In that case <tt>search.remote.node_attribute: gateway</tt> can be used to filter out other nodes in the remote cluster
|
||||
*/
|
||||
public static final Setting<String> REMOTE_NODE_ATTRIBUTE = Setting.simpleString("search.remote.node_attribute",
|
||||
Setting.Property.NodeScope);
|
||||
private final TransportService transportService;
|
||||
private final int numRemoteConnections;
|
||||
private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();
|
||||
|
||||
RemoteClusterService(Settings settings, TransportService transportService) {
|
||||
super(settings);
|
||||
this.transportService = transportService;
|
||||
numRemoteConnections = REMOTE_CONNECTIONS_PER_CLUSTER.get(settings);
|
||||
}
|
||||
|
||||
void updateRemoteClusters(Settings seedSettings, ActionListener<Void> connectionListener) {
|
||||
Map<String, RemoteClusterConnection> remoteClusters = new HashMap<>();
|
||||
Map<String, List<DiscoveryNode>> seeds = buildRemoteClustersSeeds(seedSettings);
|
||||
if (seeds.isEmpty()) {
|
||||
connectionListener.onResponse(null);
|
||||
} else {
|
||||
CountDown countDown = new CountDown(seeds.size());
|
||||
Predicate<DiscoveryNode> nodePredicate = (node) -> Version.CURRENT.isCompatible(node.getVersion());
|
||||
if (REMOTE_NODE_ATTRIBUTE.exists(settings)) {
|
||||
// nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for
|
||||
// cross cluster search
|
||||
String attribute = REMOTE_NODE_ATTRIBUTE.get(settings);
|
||||
nodePredicate = nodePredicate.and((node) -> Boolean.getBoolean(node.getAttributes().getOrDefault(attribute, "false")));
|
||||
}
|
||||
for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
|
||||
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
|
||||
if (remote == null) {
|
||||
remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections,
|
||||
nodePredicate);
|
||||
remoteClusters.put(entry.getKey(), remote);
|
||||
}
|
||||
remote.updateSeedNodes(entry.getValue(), ActionListener.wrap(
|
||||
x -> {
|
||||
if (countDown.countDown()) {
|
||||
connectionListener.onResponse(x);
|
||||
}
|
||||
},
|
||||
e -> {
|
||||
if (countDown.fastForward()) {
|
||||
connectionListener.onFailure(e);
|
||||
}
|
||||
logger.error("failed to update seed list for cluster: " + entry.getKey(), e);
|
||||
}));
|
||||
}
|
||||
}
|
||||
if (remoteClusters.isEmpty() == false) {
|
||||
remoteClusters.putAll(this.remoteClusters);
|
||||
this.remoteClusters = Collections.unmodifiableMap(remoteClusters);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if at least one remote cluster is configured
|
||||
*/
|
||||
boolean isCrossClusterSearchEnabled() {
|
||||
return remoteClusters.isEmpty() == false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> iff the given cluster is configured as a remote cluster. Otherwise <code>false</code>
|
||||
*/
|
||||
boolean isRemoteClusterRegistered(String clusterName) {
|
||||
return remoteClusters.containsKey(clusterName);
|
||||
}
|
||||
|
||||
void sendSearchShards(SearchRequest searchRequest, Map<String, List<String>> remoteIndicesByCluster,
|
||||
ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
|
||||
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
|
||||
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
|
||||
final AtomicReference<TransportException> transportException = new AtomicReference<>();
|
||||
for (Map.Entry<String, List<String>> entry : remoteIndicesByCluster.entrySet()) {
|
||||
final String clusterName = entry.getKey();
|
||||
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName);
|
||||
if (remoteClusterConnection == null) {
|
||||
throw new IllegalArgumentException("no such remote cluster: " + clusterName);
|
||||
}
|
||||
final List<String> indices = entry.getValue();
|
||||
remoteClusterConnection.fetchSearchShards(searchRequest, indices,
|
||||
new ActionListener<ClusterSearchShardsResponse>() {
|
||||
@Override
|
||||
public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
|
||||
searchShardsResponses.put(clusterName, clusterSearchShardsResponse);
|
||||
if (responsesCountDown.countDown()) {
|
||||
TransportException exception = transportException.get();
|
||||
if (exception == null) {
|
||||
listener.onResponse(searchShardsResponses);
|
||||
} else {
|
||||
listener.onFailure(transportException.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
TransportException exception = new TransportException("unable to communicate with remote cluster [" +
|
||||
clusterName + "]", e);
|
||||
if (transportException.compareAndSet(null, exception) == false) {
|
||||
exception = transportException.accumulateAndGet(exception, (previous, current) -> {
|
||||
current.addSuppressed(previous);
|
||||
return current;
|
||||
});
|
||||
}
|
||||
if (responsesCountDown.countDown()) {
|
||||
listener.onFailure(exception);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a connection to the given node on the given remote cluster
|
||||
* @throws IllegalArgumentException if the remote cluster is unknown
|
||||
*/
|
||||
Transport.Connection getConnection(DiscoveryNode node, String cluster) {
|
||||
RemoteClusterConnection connection = remoteClusters.get(cluster);
|
||||
if (connection == null) {
|
||||
throw new IllegalArgumentException("no such remote cluster: " + cluster);
|
||||
}
|
||||
return connection.getConnection(node);
|
||||
}
|
||||
|
||||
|
||||
static Map<String, List<DiscoveryNode>> buildRemoteClustersSeeds(Settings settings) {
|
||||
Map<String, List<DiscoveryNode>> remoteClustersNodes = new HashMap<>();
|
||||
for (String clusterName : settings.names()) {
|
||||
String[] remoteHosts = settings.getAsArray(clusterName);
|
||||
for (String remoteHost : remoteHosts) {
|
||||
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
|
||||
String host = remoteHost.substring(0, portSeparator);
|
||||
InetAddress hostAddress;
|
||||
try {
|
||||
hostAddress = InetAddress.getByName(host);
|
||||
} catch (UnknownHostException e) {
|
||||
throw new IllegalArgumentException("unknown host [" + host + "]", e);
|
||||
}
|
||||
int port = Integer.valueOf(remoteHost.substring(portSeparator + 1));
|
||||
DiscoveryNode node = new DiscoveryNode(clusterName + "#" + remoteHost,
|
||||
new TransportAddress(new InetSocketAddress(hostAddress, port)),
|
||||
Version.CURRENT.minimumCompatibilityVersion());
|
||||
//don't connect yet as that would require the remote node to be up and would fail the local node startup otherwise
|
||||
List<DiscoveryNode> nodes = remoteClustersNodes.get(clusterName);
|
||||
if (nodes == null) {
|
||||
nodes = new ArrayList<>();
|
||||
remoteClustersNodes.put(clusterName, nodes);
|
||||
}
|
||||
nodes.add(node);
|
||||
}
|
||||
}
|
||||
return remoteClustersNodes;
|
||||
}
|
||||
|
||||
static void validateRemoteClustersSeeds(Settings settings) {
|
||||
//TODO do we need a static whitelist like in reindex from remote?
|
||||
for (String clusterName : settings.names()) {
|
||||
String[] remoteHosts = settings.getAsArray(clusterName);
|
||||
if (remoteHosts.length == 0) {
|
||||
throw new IllegalArgumentException("no hosts set for remote cluster [" + clusterName + "], at least one host is required");
|
||||
}
|
||||
for (String remoteHost : remoteHosts) {
|
||||
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
|
||||
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
|
||||
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] " +
|
||||
"instead for remote cluster [" + clusterName + "]");
|
||||
}
|
||||
String host = remoteHost.substring(0, portSeparator);
|
||||
try {
|
||||
InetAddress.getByName(host);
|
||||
} catch (UnknownHostException e) {
|
||||
throw new IllegalArgumentException("unknown host [" + host + "]", e);
|
||||
}
|
||||
String port = remoteHost.substring(portSeparator + 1);
|
||||
try {
|
||||
Integer portValue = Integer.valueOf(port);
|
||||
if (portValue <= 0) {
|
||||
throw new IllegalArgumentException("port number must be > 0 but was: [" + portValue + "]");
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("port must be a number, found [" + port + "] instead for remote cluster [" +
|
||||
clusterName + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects to all remote clusters in a blocking fashion. This should be called on node startup to establish an initial connection
|
||||
* to all configured seed nodes.
|
||||
*/
|
||||
void initializeRemoteClusters() {
|
||||
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
|
||||
final PlainActionFuture<Void> future = new PlainActionFuture<>();
|
||||
updateRemoteClusters(REMOTE_CLUSTERS_SEEDS.get(settings), future);
|
||||
try {
|
||||
future.get(timeValue.millis(), TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (TimeoutException ex) {
|
||||
logger.warn("failed to connect to remote clusters within {}", timeValue.toString());
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("failed to connect to remote clusters", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,23 +19,17 @@
|
|||
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionListenerResponseHandler;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.OriginalIndices;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.search.dfs.DfsSearchResult;
|
||||
import org.elasticsearch.search.fetch.FetchSearchResult;
|
||||
|
@ -55,29 +49,19 @@ import org.elasticsearch.transport.Transport;
|
|||
import org.elasticsearch.transport.TransportActionProxy;
|
||||
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
|
||||
* transport.
|
||||
*/
|
||||
public class SearchTransportService extends AbstractComponent {
|
||||
public class SearchTransportService extends AbstractLifecycleComponent {
|
||||
|
||||
public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]";
|
||||
public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]";
|
||||
|
@ -92,166 +76,17 @@ public class SearchTransportService extends AbstractComponent {
|
|||
public static final String FETCH_ID_SCROLL_ACTION_NAME = "indices:data/read/search[phase/fetch/id/scroll]";
|
||||
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";
|
||||
|
||||
//TODO what should the setting name be?
|
||||
// TODO this should be an affix settings?
|
||||
public static final Setting<Settings> REMOTE_CLUSTERS_SEEDS = Setting.groupSetting("action.search.remote.",
|
||||
SearchTransportService::validateRemoteClustersSeeds,
|
||||
Setting.Property.NodeScope,
|
||||
Setting.Property.Dynamic);
|
||||
|
||||
/**
|
||||
* The maximum number of connections that will be established to a remote cluster. For instance if there is only a single
|
||||
* seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3.
|
||||
*/
|
||||
public static final Setting<Integer> NUM_REMOTE_CONNECTIONS = Setting.intSetting("action.search.num_remote_connections",
|
||||
3, 1, Setting.Property.NodeScope);
|
||||
|
||||
private final TransportService transportService;
|
||||
private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();
|
||||
private final RemoteClusterService remoteClusterService;
|
||||
|
||||
public SearchTransportService(Settings settings, ClusterSettings clusterSettings, TransportService transportService) {
|
||||
super(settings);
|
||||
this.transportService = transportService;
|
||||
clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTERS_SEEDS, this::setRemoteClusters,
|
||||
SearchTransportService::validateRemoteClustersSeeds);
|
||||
}
|
||||
|
||||
public void setupRemoteClusters() {
|
||||
setRemoteClusters(REMOTE_CLUSTERS_SEEDS.get(settings));
|
||||
}
|
||||
|
||||
private static void validateRemoteClustersSeeds(Settings settings) {
|
||||
//TODO do we need a static whitelist like in reindex from remote?
|
||||
for (String clusterName : settings.names()) {
|
||||
String[] remoteHosts = settings.getAsArray(clusterName);
|
||||
if (remoteHosts.length == 0) {
|
||||
throw new IllegalArgumentException("no hosts set for remote cluster [" + clusterName + "], at least one host is required");
|
||||
}
|
||||
for (String remoteHost : remoteHosts) {
|
||||
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
|
||||
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
|
||||
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] " +
|
||||
"instead for remote cluster [" + clusterName + "]");
|
||||
}
|
||||
String host = remoteHost.substring(0, portSeparator);
|
||||
try {
|
||||
InetAddress.getByName(host);
|
||||
} catch (UnknownHostException e) {
|
||||
throw new IllegalArgumentException("unknown host [" + host + "]", e);
|
||||
}
|
||||
String port = remoteHost.substring(portSeparator + 1);
|
||||
try {
|
||||
Integer portValue = Integer.valueOf(port);
|
||||
if (portValue <= 0) {
|
||||
throw new IllegalArgumentException("port number must be > 0 but was: [" + portValue + "]");
|
||||
}
|
||||
} catch(NumberFormatException e) {
|
||||
throw new IllegalArgumentException("port must be a number, found [" + port + "] instead for remote cluster [" +
|
||||
clusterName + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static Map<String, List<DiscoveryNode>> buildRemoteClustersSeeds(Settings settings) {
|
||||
Map<String, List<DiscoveryNode>> remoteClustersNodes = new HashMap<>();
|
||||
for (String clusterName : settings.names()) {
|
||||
String[] remoteHosts = settings.getAsArray(clusterName);
|
||||
for (String remoteHost : remoteHosts) {
|
||||
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
|
||||
String host = remoteHost.substring(0, portSeparator);
|
||||
InetAddress hostAddress;
|
||||
try {
|
||||
hostAddress = InetAddress.getByName(host);
|
||||
} catch (UnknownHostException e) {
|
||||
throw new IllegalArgumentException("unknown host [" + host + "]", e);
|
||||
}
|
||||
int port = Integer.valueOf(remoteHost.substring(portSeparator + 1));
|
||||
DiscoveryNode node = new DiscoveryNode(clusterName + "#" + remoteHost,
|
||||
new TransportAddress(new InetSocketAddress(hostAddress, port)),
|
||||
Version.CURRENT.minimumCompatibilityVersion());
|
||||
//don't connect yet as that would require the remote node to be up and would fail the local node startup otherwise
|
||||
List<DiscoveryNode> nodes = remoteClustersNodes.get(clusterName);
|
||||
if (nodes == null) {
|
||||
nodes = new ArrayList<>();
|
||||
remoteClustersNodes.put(clusterName, nodes);
|
||||
}
|
||||
nodes.add(node);
|
||||
}
|
||||
}
|
||||
return remoteClustersNodes;
|
||||
}
|
||||
|
||||
private void setRemoteClusters(Settings settings) {
|
||||
Map<String, List<DiscoveryNode>> seeds = buildRemoteClustersSeeds(settings);
|
||||
Map<String, RemoteClusterConnection> remoteClusters = new HashMap<>();
|
||||
for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
|
||||
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
|
||||
if (remote == null) {
|
||||
remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, 3,
|
||||
(node) -> Version.CURRENT.isCompatible(node.getVersion()));
|
||||
remoteClusters.put(entry.getKey(), remote);
|
||||
}
|
||||
remote.updateSeedNodes(entry.getValue(), ActionListener.wrap((x) -> {},
|
||||
e -> logger.error("failed to update seed list for cluster: " + entry.getKey(), e) ));
|
||||
}
|
||||
if (remoteClusters.isEmpty() == false) {
|
||||
remoteClusters.putAll(this.remoteClusters);
|
||||
this.remoteClusters = Collections.unmodifiableMap(remoteClusters);
|
||||
}
|
||||
}
|
||||
|
||||
boolean isCrossClusterSearchEnabled() {
|
||||
return remoteClusters.isEmpty() == false;
|
||||
}
|
||||
|
||||
boolean isRemoteClusterRegistered(String clusterName) {
|
||||
return remoteClusters.containsKey(clusterName);
|
||||
}
|
||||
|
||||
void sendSearchShards(SearchRequest searchRequest, Map<String, List<String>> remoteIndicesByCluster,
|
||||
ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
|
||||
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
|
||||
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
|
||||
final AtomicReference<TransportException> transportException = new AtomicReference<>();
|
||||
for (Map.Entry<String, List<String>> entry : remoteIndicesByCluster.entrySet()) {
|
||||
final String clusterName = entry.getKey();
|
||||
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName);
|
||||
if (remoteClusterConnection == null) {
|
||||
throw new IllegalArgumentException("no such remote cluster: " + clusterName);
|
||||
}
|
||||
final List<String> indices = entry.getValue();
|
||||
remoteClusterConnection.fetchSearchShards(searchRequest, indices,
|
||||
new ActionListener<ClusterSearchShardsResponse>() {
|
||||
@Override
|
||||
public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
|
||||
searchShardsResponses.put(clusterName, clusterSearchShardsResponse);
|
||||
if (responsesCountDown.countDown()) {
|
||||
TransportException exception = transportException.get();
|
||||
if (exception == null) {
|
||||
listener.onResponse(searchShardsResponses);
|
||||
} else {
|
||||
listener.onFailure(transportException.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
TransportException exception = new TransportException("unable to communicate with remote cluster [" +
|
||||
clusterName + "]", e);
|
||||
if (transportException.compareAndSet(null, exception) == false) {
|
||||
exception = transportException.accumulateAndGet(exception, (previous, current) -> {
|
||||
current.addSuppressed(previous);
|
||||
return current;
|
||||
});
|
||||
}
|
||||
if (responsesCountDown.countDown()) {
|
||||
listener.onFailure(exception);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
this.remoteClusterService = new RemoteClusterService(settings, transportService);
|
||||
final Consumer<Settings> clusterUpdateConsumer = (s) -> remoteClusterService.updateRemoteClusters(s,
|
||||
ActionListener.wrap((x) -> {}, (x) -> {}));
|
||||
clusterSettings.addSettingsUpdateConsumer(RemoteClusterService.REMOTE_CLUSTERS_SEEDS, clusterUpdateConsumer,
|
||||
RemoteClusterService::validateRemoteClustersSeeds);
|
||||
}
|
||||
|
||||
public void sendFreeContext(Transport.Connection connection, final long contextId, SearchRequest request) {
|
||||
|
@ -337,6 +172,10 @@ public class SearchTransportService extends AbstractComponent {
|
|||
new ActionListenerResponseHandler<>(listener, FetchSearchResult::new));
|
||||
}
|
||||
|
||||
public RemoteClusterService getRemoteClusterService() {
|
||||
return remoteClusterService;
|
||||
}
|
||||
|
||||
static class ScrollFreeContextRequest extends TransportRequest {
|
||||
private long id;
|
||||
|
||||
|
@ -558,11 +397,17 @@ public class SearchTransportService extends AbstractComponent {
|
|||
return transportService.getConnection(node);
|
||||
}
|
||||
|
||||
Transport.Connection getRemoteConnection(DiscoveryNode node, String cluster) {
|
||||
RemoteClusterConnection connection = remoteClusters.get(cluster);
|
||||
if (connection == null) {
|
||||
throw new IllegalArgumentException("no such remote cluster: " + cluster);
|
||||
}
|
||||
return connection.getProxyConnection(node);
|
||||
@Override
|
||||
protected void doStart() {
|
||||
// here we start to connect to the remote clusters
|
||||
remoteClusterService.initializeRemoteClusters();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws IOException {}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -74,6 +74,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
|
||||
private final ClusterService clusterService;
|
||||
private final SearchTransportService searchTransportService;
|
||||
private final RemoteClusterService remoteClusterService;
|
||||
private final SearchPhaseController searchPhaseController;
|
||||
private final SearchService searchService;
|
||||
|
||||
|
@ -85,6 +86,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new);
|
||||
this.searchPhaseController = searchPhaseController;
|
||||
this.searchTransportService = searchTransportService;
|
||||
this.remoteClusterService = searchTransportService.getRemoteClusterService();
|
||||
SearchTransportService.registerRequestHandler(transportService, searchService);
|
||||
this.clusterService = clusterService;
|
||||
this.searchService = searchService;
|
||||
|
@ -133,13 +135,13 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
|
||||
final String[] localIndices;
|
||||
final Map<String, List<String>> remoteIndicesByCluster = new HashMap<>();
|
||||
if (searchTransportService.isCrossClusterSearchEnabled()) {
|
||||
if (remoteClusterService.isCrossClusterSearchEnabled()) {
|
||||
List<String> localIndicesList = new ArrayList<>();
|
||||
for (String index : searchRequest.indices()) {
|
||||
int i = index.indexOf(REMOTE_CLUSTER_INDEX_SEPARATOR);
|
||||
if (i >= 0) {
|
||||
String remoteCluster = index.substring(0, i);
|
||||
if (searchTransportService.isRemoteClusterRegistered(remoteCluster)) {
|
||||
if (remoteClusterService.isRemoteClusterRegistered(remoteCluster)) {
|
||||
String remoteIndex = index.substring(i + 1);
|
||||
List<String> indices = remoteIndicesByCluster.get(remoteCluster);
|
||||
if (indices == null) {
|
||||
|
@ -163,11 +165,9 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, Collections.emptyList(),
|
||||
(nodeId) -> null, Collections.emptyMap(), listener);
|
||||
} else {
|
||||
// nocommit we have to extract this logic to add unittests ideally with manually prepared searchShardsResponses etc.
|
||||
searchTransportService.sendSearchShards(searchRequest, remoteIndicesByCluster,
|
||||
remoteClusterService.sendSearchShards(searchRequest, remoteIndicesByCluster,
|
||||
ActionListener.wrap((searchShardsResponses) -> {
|
||||
List<ShardIterator> remoteShardIterators = new ArrayList<>();
|
||||
Set<DiscoveryNode> remoteNodes = new HashSet<>();
|
||||
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
|
||||
Function<String, Transport.Connection> connectionFunction = processRemoteShards(searchShardsResponses,
|
||||
remoteShardIterators, remoteAliasFilters);
|
||||
|
@ -185,7 +185,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
String clusterName = entry.getKey();
|
||||
ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
|
||||
for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) {
|
||||
nodeToCluster.put(remoteNode.getId(), () -> searchTransportService.getRemoteConnection(remoteNode, clusterName));
|
||||
nodeToCluster.put(remoteNode.getId(), () -> remoteClusterService.getConnection(remoteNode, clusterName));
|
||||
}
|
||||
Map<String, AliasFilter> indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
|
||||
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
package org.elasticsearch.common.settings;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
|
||||
import org.elasticsearch.action.search.SearchTransportService;
|
||||
import org.elasticsearch.action.search.RemoteClusterService;
|
||||
import org.elasticsearch.action.search.TransportSearchAction;
|
||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
|
@ -254,8 +254,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
|
||||
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
|
||||
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
|
||||
SearchTransportService.REMOTE_CLUSTERS_SEEDS,
|
||||
SearchTransportService.NUM_REMOTE_CONNECTIONS,
|
||||
RemoteClusterService.REMOTE_CLUSTERS_SEEDS,
|
||||
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER,
|
||||
TransportService.TRACE_LOG_EXCLUDE_SETTING,
|
||||
TransportService.TRACE_LOG_INCLUDE_SETTING,
|
||||
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,
|
||||
|
|
|
@ -596,8 +596,6 @@ public class Node implements Closeable {
|
|||
// start after cluster service so the local disco is known
|
||||
discovery.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
SearchTransportService searchTransportService = injector.getInstance(SearchTransportService.class);
|
||||
searchTransportService.setupRemoteClusters();
|
||||
discovery.startInitialJoin();
|
||||
// tribe nodes don't have a master so we shouldn't register an observer s
|
||||
final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
|
||||
|
@ -640,6 +638,9 @@ public class Node implements Closeable {
|
|||
|
||||
// start nodes now, after the http server, because it may take some time
|
||||
tribeService.startNodes();
|
||||
// starts connecting to remote clusters if any cluster is configured
|
||||
SearchTransportService searchTransportService = injector.getInstance(SearchTransportService.class);
|
||||
searchTransportService.start();
|
||||
|
||||
if (WRITE_PORTS_FIELD_SETTING.get(settings)) {
|
||||
if (NetworkModule.HTTP_ENABLED.get(settings)) {
|
||||
|
@ -683,6 +684,7 @@ public class Node implements Closeable {
|
|||
injector.getInstance(GatewayService.class).stop();
|
||||
injector.getInstance(SearchService.class).stop();
|
||||
injector.getInstance(TransportService.class).stop();
|
||||
injector.getInstance(SearchTransportService.class).stop();
|
||||
|
||||
pluginLifecycleComponents.forEach(LifecycleComponent::stop);
|
||||
// we should stop this last since it waits for resources to get released
|
||||
|
@ -744,6 +746,8 @@ public class Node implements Closeable {
|
|||
toClose.add(injector.getInstance(SearchService.class));
|
||||
toClose.add(() -> stopWatch.stop().start("transport"));
|
||||
toClose.add(injector.getInstance(TransportService.class));
|
||||
toClose.add(() -> stopWatch.stop().start("search_transport_service"));
|
||||
toClose.add(injector.getInstance(SearchTransportService.class));
|
||||
|
||||
for (LifecycleComponent plugin : pluginLifecycleComponents) {
|
||||
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
|
||||
|
|
|
@ -171,7 +171,7 @@ public class TransportService extends AbstractLifecycleComponent {
|
|||
}
|
||||
|
||||
// for testing
|
||||
DiscoveryNode getLocalNode() {
|
||||
protected DiscoveryNode getLocalNode() {
|
||||
return localNode;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,330 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportConnectionListener;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class RemoteClusterConnectionIT extends ESIntegTestCase {
|
||||
|
||||
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public MockTransportService startTransport(String id, List<DiscoveryNode> knownNodes) {
|
||||
boolean success = false;
|
||||
MockTransportService newService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null);
|
||||
try {
|
||||
newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ClusterSearchShardsRequest::new, ThreadPool.Names.SAME,
|
||||
(request, channel) -> {
|
||||
channel.sendResponse(new ClusterSearchShardsResponse());
|
||||
});
|
||||
newService.registerRequestHandler(ClusterStateAction.NAME, ClusterStateRequest::new, ThreadPool.Names.SAME,
|
||||
(request, channel) -> {
|
||||
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
|
||||
for (DiscoveryNode node : knownNodes) {
|
||||
builder.add(node);
|
||||
}
|
||||
ClusterState build = ClusterState.builder(ClusterName.DEFAULT).nodes(builder.build()).build();
|
||||
channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build));
|
||||
});
|
||||
newService.start();
|
||||
newService.setLocalNode(new DiscoveryNode(id, newService.boundAddress().publishAddress(), Version.CURRENT));
|
||||
newService.acceptIncomingRequests();
|
||||
success = true;
|
||||
return newService;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
newService.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testDiscoverSingleNode() throws Exception {
|
||||
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
|
||||
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes);
|
||||
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes)) {
|
||||
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
|
||||
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
|
||||
knownNodes.add(seedTransport.getLocalDiscoNode());
|
||||
knownNodes.add(discoverableTransport.getLocalDiscoNode());
|
||||
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
|
||||
updateSeedNodes(connection, Arrays.asList(seedNode));
|
||||
assertTrue(service.nodeConnected(seedNode));
|
||||
assertTrue(service.nodeConnected(discoverableNode));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testNodeDisconnected() throws Exception {
|
||||
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
|
||||
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes);
|
||||
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes);
|
||||
MockTransportService spareTransport = startTransport("spare_node", knownNodes)) {
|
||||
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
|
||||
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
|
||||
DiscoveryNode spareNode = spareTransport.getLocalDiscoNode();
|
||||
knownNodes.add(seedTransport.getLocalDiscoNode());
|
||||
knownNodes.add(discoverableTransport.getLocalDiscoNode());
|
||||
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
|
||||
updateSeedNodes(connection, Arrays.asList(seedNode));
|
||||
assertTrue(service.nodeConnected(seedNode));
|
||||
assertTrue(service.nodeConnected(discoverableNode));
|
||||
assertFalse(service.nodeConnected(spareNode));
|
||||
knownNodes.add(spareNode);
|
||||
CountDownLatch latchDisconnect = new CountDownLatch(1);
|
||||
CountDownLatch latchConnected = new CountDownLatch(1);
|
||||
service.addConnectionListener(new TransportConnectionListener() {
|
||||
@Override
|
||||
public void onNodeDisconnected(DiscoveryNode node) {
|
||||
if (node.equals(discoverableNode)) {
|
||||
latchDisconnect.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNodeConnected(DiscoveryNode node) {
|
||||
if (node.equals(spareNode)) {
|
||||
latchConnected.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
discoverableTransport.close();
|
||||
// now make sure we try to connect again to other nodes once we got disconnected
|
||||
assertTrue(latchDisconnect.await(10, TimeUnit.SECONDS));
|
||||
assertTrue(latchConnected.await(10, TimeUnit.SECONDS));
|
||||
assertTrue(service.nodeConnected(spareNode));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testFilterDiscoveredNodes() throws Exception {
|
||||
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
|
||||
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes);
|
||||
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes)) {
|
||||
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
|
||||
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
|
||||
knownNodes.add(seedTransport.getLocalDiscoNode());
|
||||
knownNodes.add(discoverableTransport.getLocalDiscoNode());
|
||||
DiscoveryNode rejectedNode = randomBoolean() ? seedNode : discoverableNode;
|
||||
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false)) {
|
||||
updateSeedNodes(connection, Arrays.asList(seedNode));
|
||||
if (rejectedNode.equals(seedNode)) {
|
||||
assertFalse(service.nodeConnected(seedNode));
|
||||
assertTrue(service.nodeConnected(discoverableNode));
|
||||
} else {
|
||||
assertTrue(service.nodeConnected(seedNode));
|
||||
assertFalse(service.nodeConnected(discoverableNode));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void updateSeedNodes(RemoteClusterConnection connection, List<DiscoveryNode> seedNodes) throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
|
||||
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
|
||||
exceptionAtomicReference.set(x);
|
||||
latch.countDown();
|
||||
});
|
||||
connection.updateSeedNodes(seedNodes, listener);
|
||||
latch.await();
|
||||
if (exceptionAtomicReference.get() != null) {
|
||||
throw exceptionAtomicReference.get();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void testConnect() throws InterruptedException, IOException {
|
||||
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setNodes(true).get();
|
||||
ImmutableOpenMap<String, DiscoveryNode> nodes = clusterStateResponse.getState().getNodes().getDataNodes();
|
||||
DiscoveryNode node = nodes.valuesIt().next();
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", Arrays.asList(node),
|
||||
service, Integer.MAX_VALUE, n -> true)) {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
|
||||
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
|
||||
exceptionAtomicReference.set(x);
|
||||
latch.countDown();
|
||||
});
|
||||
connection.updateSeedNodes(Arrays.asList(node), listener);
|
||||
latch.await();
|
||||
assertTrue(service.nodeConnected(node));
|
||||
Iterable<DiscoveryNode> nodesIterable = nodes::valuesIt;
|
||||
for (DiscoveryNode dataNode : nodesIterable) {
|
||||
assertTrue(service.nodeConnected(dataNode));
|
||||
}
|
||||
assertNull(exceptionAtomicReference.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testConnectToSingleSeed() throws InterruptedException, IOException {
|
||||
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setNodes(true).get();
|
||||
ImmutableOpenMap<String, DiscoveryNode> nodes = clusterStateResponse.getState().getNodes().getNodes();
|
||||
DiscoveryNode node = nodes.valuesIt().next();
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", Arrays.asList(node),
|
||||
service, 1, n -> true)) {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
|
||||
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
|
||||
exceptionAtomicReference.set(x);
|
||||
latch.countDown();
|
||||
});
|
||||
connection.updateSeedNodes(Arrays.asList(node), listener);
|
||||
latch.await();
|
||||
assertTrue(service.nodeConnected(node));
|
||||
Iterable<DiscoveryNode> nodesIterable = nodes::valuesIt;
|
||||
for (DiscoveryNode aNode : nodesIterable) {
|
||||
if (aNode.equals(node)) {
|
||||
assertTrue(service.nodeConnected(aNode));
|
||||
} else {
|
||||
assertFalse(service.nodeConnected(aNode));
|
||||
}
|
||||
}
|
||||
assertNull(exceptionAtomicReference.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testFetchShards() throws Exception {
|
||||
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setNodes(true).get();
|
||||
ImmutableOpenMap<String, DiscoveryNode> clusterNodes = clusterStateResponse.getState().getNodes().getNodes();
|
||||
DiscoveryNode node = clusterNodes.valuesIt().next();
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
final boolean hasInitialNodes = randomBoolean();
|
||||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
hasInitialNodes ? Arrays.asList(node) : Collections.emptyList(), service, Integer.MAX_VALUE, n -> true)) {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
String newNode = null;
|
||||
if (hasInitialNodes == false) {
|
||||
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
|
||||
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
|
||||
exceptionAtomicReference.set(x);
|
||||
latch.countDown();
|
||||
});
|
||||
connection.updateSeedNodes(Arrays.asList(node), listener);
|
||||
latch.await();
|
||||
|
||||
newNode = internalCluster().startDataOnlyNode();
|
||||
createIndex("test-index");
|
||||
assertTrue(service.nodeConnected(node));
|
||||
Iterable<DiscoveryNode> nodesIterable = clusterNodes::valuesIt;
|
||||
for (DiscoveryNode dataNode : nodesIterable) {
|
||||
if (dataNode.getName().equals(newNode)) {
|
||||
assertFalse(service.nodeConnected(dataNode));
|
||||
} else {
|
||||
assertTrue(service.nodeConnected(dataNode));
|
||||
}
|
||||
}
|
||||
assertNull(exceptionAtomicReference.get());
|
||||
} else {
|
||||
createIndex("test-index");
|
||||
}
|
||||
|
||||
SearchRequest request = new SearchRequest("test-index");
|
||||
CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
|
||||
AtomicReference<Exception> failReference = new AtomicReference<>();
|
||||
|
||||
ActionListener<ClusterSearchShardsResponse> shardsListener = ActionListener.wrap(
|
||||
x -> {
|
||||
reference.set(x);
|
||||
responseLatch.countDown();
|
||||
},
|
||||
x -> {
|
||||
failReference.set(x);
|
||||
responseLatch.countDown();
|
||||
});
|
||||
connection.fetchSearchShards(request, Arrays.asList("test-index"), shardsListener);
|
||||
responseLatch.await();
|
||||
assertNull(failReference.get());
|
||||
assertNotNull(reference.get());
|
||||
ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get();
|
||||
DiscoveryNode[] nodes = clusterSearchShardsResponse.getNodes();
|
||||
assertTrue(nodes.length != 0);
|
||||
for (DiscoveryNode dataNode : nodes) {
|
||||
assertNotNull(connection.getConnection(dataNode));
|
||||
if (dataNode.getName().equals(newNode)) {
|
||||
assertFalse(service.nodeConnected(dataNode));
|
||||
} else {
|
||||
assertTrue(service.nodeConnected(dataNode));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,163 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class RemoteClusterConnectionTests extends ESIntegTestCase {
|
||||
|
||||
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void testConnect() throws InterruptedException {
|
||||
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setNodes(true).get();
|
||||
ImmutableOpenMap<String, DiscoveryNode> nodes = clusterStateResponse.getState().getNodes().getDataNodes();
|
||||
DiscoveryNode node = nodes.valuesIt().next();
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", Arrays.asList(node), service,
|
||||
Integer.MAX_VALUE, n -> true);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
|
||||
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
|
||||
exceptionAtomicReference.set(x);
|
||||
latch.countDown();
|
||||
});
|
||||
connection.updateSeedNodes(Arrays.asList(node),listener);
|
||||
latch.await();
|
||||
assertTrue(service.nodeConnected(node));
|
||||
Iterable<DiscoveryNode> nodesIterable = nodes::valuesIt;
|
||||
for (DiscoveryNode dataNode : nodesIterable) {
|
||||
assertTrue(service.nodeConnected(dataNode));
|
||||
}
|
||||
assertNull(exceptionAtomicReference.get());
|
||||
}
|
||||
}
|
||||
|
||||
public void testConnectToSingleSeed() throws InterruptedException {
|
||||
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setNodes(true).get();
|
||||
ImmutableOpenMap<String, DiscoveryNode> nodes = clusterStateResponse.getState().getNodes().getNodes();
|
||||
DiscoveryNode node = nodes.valuesIt().next();
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", Arrays.asList(node), service,
|
||||
1, n -> true);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
|
||||
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
|
||||
exceptionAtomicReference.set(x);
|
||||
latch.countDown();
|
||||
});
|
||||
connection.updateSeedNodes(Arrays.asList(node),listener);
|
||||
latch.await();
|
||||
assertTrue(service.nodeConnected(node));
|
||||
Iterable<DiscoveryNode> nodesIterable = nodes::valuesIt;
|
||||
for (DiscoveryNode aNode : nodesIterable) {
|
||||
if (aNode.equals(node)) {
|
||||
assertTrue(service.nodeConnected(aNode));
|
||||
} else {
|
||||
assertFalse(service.nodeConnected(aNode));
|
||||
}
|
||||
}
|
||||
assertNull(exceptionAtomicReference.get());
|
||||
}
|
||||
}
|
||||
|
||||
public void testFetchShards() throws InterruptedException {
|
||||
|
||||
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setNodes(true).get();
|
||||
ImmutableOpenMap<String, DiscoveryNode> dataNodes = clusterStateResponse.getState().getNodes().getDataNodes();
|
||||
DiscoveryNode node = dataNodes.valuesIt().next();
|
||||
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
|
||||
service.start();
|
||||
service.acceptIncomingRequests();
|
||||
final boolean hasInitialNodes = randomBoolean();
|
||||
RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
hasInitialNodes ? Arrays.asList(node) : Collections.emptyList(), service, Integer.MAX_VALUE, n -> true);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
if (hasInitialNodes == false) {
|
||||
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
|
||||
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
|
||||
exceptionAtomicReference.set(x);
|
||||
latch.countDown();
|
||||
});
|
||||
connection.updateSeedNodes(Arrays.asList(node), listener);
|
||||
latch.await();
|
||||
|
||||
String newNode = internalCluster().startDataOnlyNode();
|
||||
createIndex("test-index");
|
||||
assertTrue(service.nodeConnected(node));
|
||||
Iterable<DiscoveryNode> nodesIterable = dataNodes::valuesIt;
|
||||
for (DiscoveryNode dataNode : nodesIterable) {
|
||||
if (dataNode.getName().equals(newNode)) {
|
||||
assertFalse(service.nodeConnected(dataNode));
|
||||
} else {
|
||||
assertTrue(service.nodeConnected(dataNode));
|
||||
}
|
||||
}
|
||||
assertNull(exceptionAtomicReference.get());
|
||||
} else {
|
||||
createIndex("test-index");
|
||||
}
|
||||
|
||||
SearchRequest request = new SearchRequest("test-index");
|
||||
CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
|
||||
AtomicReference<Exception> failReference = new AtomicReference<>();
|
||||
|
||||
ActionListener<ClusterSearchShardsResponse> shardsListener = ActionListener.wrap(
|
||||
x -> {reference.set(x); responseLatch.countDown();},
|
||||
x -> {failReference.set(x); responseLatch.countDown();});
|
||||
connection.fetchSearchShards(request, Arrays.asList("test-index"), shardsListener);
|
||||
responseLatch.await();
|
||||
assertNull(failReference.get());
|
||||
assertNotNull(reference.get());
|
||||
ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get();
|
||||
DiscoveryNode[] nodes = clusterSearchShardsResponse.getNodes();
|
||||
assertTrue(nodes.length != 0);
|
||||
for (DiscoveryNode dataNode : nodes) {
|
||||
assertTrue(service.nodeConnected(dataNode));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -29,24 +29,24 @@ import java.net.InetSocketAddress;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class SearchTransportServiceTests extends ESTestCase {
|
||||
public class RemoteClusterServiceTests extends ESTestCase {
|
||||
|
||||
public void testRemoteClusterSeedSetting() {
|
||||
// simple validation
|
||||
SearchTransportService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder()
|
||||
.put("action.search.remote.foo", "192.168.0.1:8080")
|
||||
.put("action.search.remote.bar", "[::1]:9090").build());
|
||||
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder()
|
||||
.put("search.remote.seeds.foo", "192.168.0.1:8080")
|
||||
.put("search.remote.seeds.bar", "[::1]:9090").build());
|
||||
|
||||
expectThrows(IllegalArgumentException.class, () ->
|
||||
SearchTransportService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder()
|
||||
.put("action.search.remote.foo", "192.168.0.1").build()));
|
||||
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder()
|
||||
.put("search.remote.seeds.foo", "192.168.0.1").build()));
|
||||
}
|
||||
|
||||
public void testBuiltRemoteClustersSeeds() throws Exception {
|
||||
Map<String, List<DiscoveryNode>> map = SearchTransportService.buildRemoteClustersSeeds(
|
||||
SearchTransportService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder()
|
||||
.put("action.search.remote.foo", "192.168.0.1:8080")
|
||||
.put("action.search.remote.bar", "[::1]:9090").build()));
|
||||
Map<String, List<DiscoveryNode>> map = RemoteClusterService.buildRemoteClustersSeeds(
|
||||
RemoteClusterService.REMOTE_CLUSTERS_SEEDS.get(Settings.builder()
|
||||
.put("search.remote.seeds.foo", "192.168.0.1:8080")
|
||||
.put("search.remote.seeds.bar", "[::1]:9090").build()));
|
||||
assertEquals(2, map.size());
|
||||
assertTrue(map.containsKey("foo"));
|
||||
assertTrue(map.containsKey("bar"));
|
|
@ -82,7 +82,7 @@ public class SearchAsyncActionTests extends ESTestCase {
|
|||
GroupShardsIterator shardsIter = getShardsIter("idx", randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode);
|
||||
AtomicInteger numFreedContext = new AtomicInteger();
|
||||
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
|
||||
Collections.singleton(SearchTransportService.REMOTE_CLUSTERS_SEEDS)), null) {
|
||||
Collections.singleton(RemoteClusterService.REMOTE_CLUSTERS_SEEDS)), null) {
|
||||
@Override
|
||||
public void sendFreeContext(Transport.Connection connection, long contextId, SearchRequest request) {
|
||||
numFreedContext.incrementAndGet();
|
||||
|
|
|
@ -35,8 +35,8 @@ task mixedClusterTest(type: RestIntegTestTask) {
|
|||
dependsOn(remoteClusterTest)
|
||||
cluster {
|
||||
distribution = 'zip'
|
||||
setting 'action.search.remote.my_remote_cluster', "\"${-> remoteClusterTest.nodes.get(0).transportUri()}\""
|
||||
setting 'action.search.num_remote_connections', 1
|
||||
setting 'search.remote.seeds.my_remote_cluster', "\"${-> remoteClusterTest.nodes.get(0).transportUri()}\""
|
||||
setting 'search.remote.connections_per_cluster', 1
|
||||
|
||||
}
|
||||
systemProperty 'tests.rest.suite', 'multi_cluster'
|
||||
|
|
|
@ -99,16 +99,16 @@
|
|||
cluster.get_settings:
|
||||
include_defaults: true
|
||||
|
||||
- set: { defaults.action.search.remote.my_remote_cluster: remote_ip }
|
||||
- set: { defaults.search.remote.seeds.my_remote_cluster: remote_ip }
|
||||
|
||||
- do:
|
||||
cluster.put_settings:
|
||||
flat_settings: true
|
||||
body:
|
||||
transient:
|
||||
action.search.remote.test_remote_cluster: $remote_ip
|
||||
search.remote.seeds.test_remote_cluster: $remote_ip
|
||||
|
||||
- match: {transient: {action.search.remote.test_remote_cluster: $remote_ip}}
|
||||
- match: {transient: {search.remote.seeds.test_remote_cluster: $remote_ip}}
|
||||
|
||||
- do:
|
||||
search:
|
||||
|
|
|
@ -2285,7 +2285,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
|
||||
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo"))
|
||||
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent())
|
||||
.put("action.search.remote.test_remote_cluster", seedNode.getAddress().toString())
|
||||
.put("search.remote.seeds.test_remote_cluster", seedNode.getAddress().toString())
|
||||
.put("node.name", "node_prx_0")
|
||||
.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
|
||||
.put(NetworkModule.HTTP_ENABLED.getKey(), false)
|
||||
|
|
|
@ -740,4 +740,8 @@ public final class MockTransportService extends TransportService {
|
|||
super.doClose();
|
||||
assert openConnections.size() == 0 : "still open connections: " + openConnections;
|
||||
}
|
||||
|
||||
public DiscoveryNode getLocalDiscoNode() {
|
||||
return this.getLocalNode();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue