make connection to nodes async and ensure that if we are not fully connected a search will fork or a reconnect

This commit is contained in:
Simon Willnauer 2016-12-21 10:00:04 +01:00
parent 3625d64b7f
commit dce24b5a10
5 changed files with 212 additions and 136 deletions

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport;
@ -43,26 +44,29 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener {
final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener {
private final TransportService transportService;
private final ConnectionProfile remoteProfile;
private final CopyOnWriteArrayList<DiscoveryNode> clusterNodes = new CopyOnWriteArrayList();
private final Supplier<DiscoveryNode> nodeSupplier;
private final String clusterName;
private final CountDownLatch connected;
private volatile List<DiscoveryNode> seedNodes;
private final ConnectHandler connectHandler;
RemoteClusterConnection(Settings settings, String clusterName, List<DiscoveryNode> seedNodes,
TransportService transportService) {
super(settings);
this.connected = new CountDownLatch(1);
this.transportService = transportService;
this.clusterName = clusterName;
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
@ -86,91 +90,12 @@ class RemoteClusterConnection extends AbstractComponent implements TransportConn
}
};
this.seedNodes = seedNodes;
this.connectHandler = new ConnectHandler();
}
public synchronized void connectWithSeeds(ActionListener<Void> connectListener) {
if (clusterNodes.isEmpty()) {
TimeValue connectTimeout = TimeValue.timeValueSeconds(10); // TODO make configurable
Iterator<DiscoveryNode> iterator = Collections.synchronizedList(seedNodes).iterator();
handshakeAndConnect(iterator, transportService, connectTimeout, connectListener, true);
} else {
connectListener.onResponse(null);
}
}
public synchronized void updateSeedNodes(List<DiscoveryNode> seedNodes) {
if (this.seedNodes.containsAll(seedNodes) == false || this.seedNodes.size() != seedNodes.size()) {
this.seedNodes = new ArrayList<>(seedNodes);
ActionListener<Void> listener = ActionListener.wrap(x -> {},
e -> logger.error("failed to establish connection to remote cluster", e));
connectWithSeeds(listener);
}
}
private void handshakeAndConnect(Iterator<DiscoveryNode> seedNodes,
final TransportService transportService, TimeValue connectTimeout, ActionListener<Void> listener,
boolean connect) {
try {
if (seedNodes.hasNext()) {
final DiscoveryNode seedNode = seedNodes.next();
final DiscoveryNode handshakeNode;
if (connect) {
try (Transport.Connection connection = transportService.openConnection(seedNode, ConnectionProfile.LIGHT_PROFILE)) {
handshakeNode = transportService.handshake(connection, connectTimeout.millis(), (c) -> true);
transportService.connectToNode(handshakeNode, remoteProfile);
clusterNodes.add(handshakeNode);
}
} else {
handshakeNode = seedNode;
}
ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.nodes(true);
transportService.sendRequest(transportService.getConnection(handshakeNode),
ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterStateResponse>() {
@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
}
@Override
public void handleResponse(ClusterStateResponse response) {
DiscoveryNodes nodes = response.getState().nodes();
Iterable<DiscoveryNode> nodesIter = nodes.getDataNodes()::valuesIt;
for (DiscoveryNode node : nodesIter) {
transportService.connectToNode(node); // noop if node is connected
clusterNodes.add(node);
}
listener.onResponse(null);
}
@Override
public void handleException(TransportException exp) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed",
clusterName), exp);
handshakeAndConnect(seedNodes, transportService, connectTimeout, listener, connect);
}
@Override
public String executor() {
return ThreadPool.Names.MANAGEMENT;
}
});
} else {
listener.onFailure(new IllegalStateException("no seed node left"));
}
} catch (IOException ex) {
if (seedNodes.hasNext()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed",
clusterName), ex);
handshakeAndConnect(seedNodes, transportService, connectTimeout, listener, connect);
} else {
listener.onFailure(ex);
}
}
synchronized void updateSeedNodes(List<DiscoveryNode> seedNodes, ActionListener<Void> connectListener) {
this.seedNodes = new ArrayList<>(seedNodes);
connectHandler.handshake(connectListener);
}
@Override
@ -180,7 +105,7 @@ class RemoteClusterConnection extends AbstractComponent implements TransportConn
// try to reconnect
ActionListener<Void> listener = ActionListener.wrap(x -> {},
e -> logger.error("failed to establish connection to remote cluster", e));
connectWithSeeds(listener);
connectHandler.handshake(listener);
}
}
@ -193,8 +118,8 @@ class RemoteClusterConnection extends AbstractComponent implements TransportConn
}
}
if (seenNotConnectedNode) {
final TimeValue connectTimeout = TimeValue.timeValueSeconds(10); // TODO make configurable
handshakeAndConnect(clusterNodes.iterator(), transportService, connectTimeout,
TimeValue connectTimeout = TimeValue.timeValueSeconds(10); // TODO make configurable
connectHandler.handshakeAndConnect(clusterNodes.iterator(), transportService, connectTimeout,
ActionListener.wrap((x) -> {
}, x -> {
}), false); // nocommit handle exceptions here what should we do
@ -203,6 +128,14 @@ class RemoteClusterConnection extends AbstractComponent implements TransportConn
public void fetchSearchShards(SearchRequest searchRequest, final List<String> indices,
ActionListener<ClusterSearchShardsResponse> listener) {
if (clusterNodes.isEmpty()) {
connectHandler.handshake(ActionListener.wrap((x) -> fetchSearchShards(searchRequest, indices, listener), listener::onFailure));
} else {
fetchShardsInternal(searchRequest, indices, listener);
}
}
private void fetchShardsInternal(SearchRequest searchRequest, List<String> indices, final ActionListener<ClusterSearchShardsResponse> listener) {
final DiscoveryNode node = nodeSupplier.get();
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices.toArray(new String[indices.size()]))
.indicesOptions(searchRequest.indicesOptions()).local(true).preference(searchRequest.preference())
@ -236,4 +169,145 @@ class RemoteClusterConnection extends AbstractComponent implements TransportConn
public String getClusterName() {
return clusterName;
}
private class ConnectHandler {
private Semaphore running = new Semaphore(1);
private BlockingQueue<ActionListener<Void>> queue = new ArrayBlockingQueue<>(100);
public void handshake(ActionListener<Void> connectListener) {
final boolean runConnect;
final Collection<ActionListener<Void>> toNotify;
synchronized (queue) {
if (connectListener != null && queue.offer(connectListener) == false) {
throw new IllegalStateException("connect queue is full");
}
if (queue.isEmpty()) {
return;
}
runConnect = running.tryAcquire();
if (runConnect) {
toNotify = new ArrayList<>();
queue.drainTo(toNotify);
} else {
toNotify = Collections.emptyList();
}
}
if (runConnect) {
forkConnect(toNotify);
}
}
private void forkConnect(final Collection<ActionListener<Void>> toNotify) {
ThreadPool threadPool = transportService.getThreadPool();
ExecutorService executor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
executor.submit(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
synchronized (queue) {
running.release();
}
for (ActionListener<Void> queuedListener : toNotify) {
queuedListener.onFailure(e);
}
}
@Override
protected void doRun() throws Exception {
ActionListener<Void> listener = ActionListener.wrap((x) -> {
synchronized (queue) {
running.release();
}
for (ActionListener<Void> queuedListener : toNotify) {
queuedListener.onResponse(x);
}
handshake(null);
},
(e) -> {
synchronized (queue) {
running.release();
}
for (ActionListener<Void> queuedListener : toNotify) {
queuedListener.onFailure(e);
}
handshake(null);
});
TimeValue connectTimeout = TimeValue.timeValueSeconds(10); // TODO make configurable
Iterator<DiscoveryNode> iterator = Collections.synchronizedList(seedNodes).iterator();
handshakeAndConnect(iterator, transportService, connectTimeout, listener, true);
}
});
}
void handshakeAndConnect(Iterator<DiscoveryNode> seedNodes,
final TransportService transportService, TimeValue connectTimeout, ActionListener<Void> listener,
boolean connect) {
try {
if (seedNodes.hasNext()) {
final DiscoveryNode seedNode = seedNodes.next();
final DiscoveryNode handshakeNode;
if (connect) {
try (Transport.Connection connection = transportService.openConnection(seedNode, ConnectionProfile.LIGHT_PROFILE)) {
handshakeNode = transportService.handshake(connection, connectTimeout.millis(), (c) -> true);
transportService.connectToNode(handshakeNode, remoteProfile);
clusterNodes.add(handshakeNode);
}
} else {
handshakeNode = seedNode;
}
ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.nodes(true);
transportService.sendRequest(transportService.getConnection(handshakeNode),
ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterStateResponse>() {
@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
}
@Override
public void handleResponse(ClusterStateResponse response) {
DiscoveryNodes nodes = response.getState().nodes();
Iterable<DiscoveryNode> nodesIter = nodes.getDataNodes()::valuesIt;
for (DiscoveryNode node : nodesIter) {
transportService.connectToNode(node); // noop if node is connected
clusterNodes.add(node);
}
listener.onResponse(null);
}
@Override
public void handleException(TransportException exp) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed",
clusterName), exp);
handshakeAndConnect(seedNodes, transportService, connectTimeout, listener, connect);
}
@Override
public String executor() {
return ThreadPool.Names.MANAGEMENT;
}
});
} else {
listener.onFailure(new IllegalStateException("no seed node left"));
}
} catch (IOException ex) {
if (seedNodes.hasNext()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("fetching nodes from external cluster {} failed",
clusterName), ex);
handshakeAndConnect(seedNodes, transportService, connectTimeout, listener, connect);
} else {
listener.onFailure(ex);
}
}
}
}
}

View File

@ -106,25 +106,9 @@ public class SearchTransportService extends AbstractComponent {
}
public void setupRemoteClusters() {
// nocommit we have to figure out a good way to set-up these connections
setRemoteClusters(REMOTE_CLUSTERS_SEEDS.get(settings));
}
private void connect() {
int size = remoteClusters.size();
CountDownLatch latch = new CountDownLatch(size);
for (RemoteClusterConnection connection : remoteClusters.values()) {
connection.connectWithSeeds(ActionListener.wrap(x -> latch.countDown(), ex -> {
throw new Error("failed to connect to to remote cluster " + connection.getClusterName(), ex);
}));
}
try {
latch.await(); // NOCOMMIT timeout?
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void validateRemoteClustersSeeds(Settings settings) {
//TODO do we need a static whitelist like in reindex from remote?
for (String clusterName : settings.names()) {
@ -195,14 +179,13 @@ public class SearchTransportService extends AbstractComponent {
if (remote == null) {
remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService);
remoteClusters.put(entry.getKey(), remote);
} else {
remote.updateSeedNodes(entry.getValue());
}
remote.updateSeedNodes(entry.getValue(), ActionListener.wrap((x) -> {},
e -> logger.error("failed to update seed list for cluster: " + entry.getKey(), e) ));
}
if (remoteClusters.isEmpty() == false) {
remoteClusters.putAll(this.remoteClusters);
this.remoteClusters = Collections.unmodifiableMap(remoteClusters);
connect(); //nocommit this sucks as it's executed on the state update thread
}
}
@ -214,10 +197,6 @@ public class SearchTransportService extends AbstractComponent {
return remoteClusters.containsKey(clusterName);
}
void connectToRemoteNode(DiscoveryNode remoteNode) {
transportService.connectToNode(remoteNode);
}
void sendSearchShards(SearchRequest searchRequest, Map<String, List<String>> remoteIndicesByCluster,
ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());

View File

@ -279,13 +279,21 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
if (remoteNodes.isEmpty()) {
return nodes::get;
}
ImmutableOpenMap.Builder<String, DiscoveryNode> builder = ImmutableOpenMap.builder(nodes.getNodes());
ImmutableOpenMap.Builder<String, DiscoveryNode> builder = ImmutableOpenMap.builder();
for (DiscoveryNode remoteNode : remoteNodes) {
//TODO shall we catch connect exceptions here? Otherwise we will return an error but we could rather return partial results?
searchTransportService.connectToRemoteNode(remoteNode);
builder.put(remoteNode.getId(), remoteNode);
}
return builder.build()::get;
ImmutableOpenMap<String, DiscoveryNode> remoteNodesMap = builder.build();
return (nodeId) -> {
DiscoveryNode discoveryNode = nodes.get(nodeId);
if (discoveryNode == null) {
discoveryNode = remoteNodesMap.get(nodeId);
}
if (discoveryNode == null) {
throw new IllegalArgumentException("no node found for id: " + nodeId);
}
return discoveryNode;
};
}
@Override

View File

@ -1182,4 +1182,11 @@ public class TransportService extends AbstractLifecycleComponent {
return "direct";
}
}
/**
* Returns the internal thread pool
*/
public ThreadPool getThreadPool() {
return threadPool;
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -59,7 +60,7 @@ public class RemoteClusterConnectionTests extends ESIntegTestCase {
exceptionAtomicReference.set(x);
latch.countDown();
});
connection.connectWithSeeds(listener);
connection.updateSeedNodes(Arrays.asList(node),listener);
latch.await();
assertTrue(service.nodeConnected(node));
Iterable<DiscoveryNode> nodesIterable = dataNodes::valuesIt;
@ -79,28 +80,35 @@ public class RemoteClusterConnectionTests extends ESIntegTestCase {
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", Arrays.asList(node), service);
final boolean hasInitialNodes = randomBoolean();
RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
hasInitialNodes ? Arrays.asList(node) : Collections.emptyList(), service);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
exceptionAtomicReference.set(x);
latch.countDown();
});
connection.connectWithSeeds(listener);
latch.await();
String newNode = internalCluster().startDataOnlyNode();
createIndex("test-index");
assertTrue(service.nodeConnected(node));
Iterable<DiscoveryNode> nodesIterable = dataNodes::valuesIt;
for (DiscoveryNode dataNode : nodesIterable) {
if (dataNode.getName().equals(newNode)) {
assertFalse(service.nodeConnected(dataNode));
} else {
assertTrue(service.nodeConnected(dataNode));
if (hasInitialNodes == false) {
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
ActionListener<Void> listener = ActionListener.wrap(x -> latch.countDown(), x -> {
exceptionAtomicReference.set(x);
latch.countDown();
});
connection.updateSeedNodes(Arrays.asList(node), listener);
latch.await();
String newNode = internalCluster().startDataOnlyNode();
createIndex("test-index");
assertTrue(service.nodeConnected(node));
Iterable<DiscoveryNode> nodesIterable = dataNodes::valuesIt;
for (DiscoveryNode dataNode : nodesIterable) {
if (dataNode.getName().equals(newNode)) {
assertFalse(service.nodeConnected(dataNode));
} else {
assertTrue(service.nodeConnected(dataNode));
}
}
assertNull(exceptionAtomicReference.get());
} else {
createIndex("test-index");
}
assertNull(exceptionAtomicReference.get());
SearchRequest request = new SearchRequest("test-index");
CountDownLatch responseLatch = new CountDownLatch(1);