Extract proxy connection logic to specialized class (#47138)

Currently the logic to check if a connection to a remote discovery node
exists and otherwise create a proxy connection is mixed with the
collect nodes, cluster connection lifecycle, and other
RemoteClusterConnection logic. This commit introduces a specialized
RemoteConnectionManager class which handles the open connections.
Additionally, it reworks the "round-robin" proxy logic to create the list
of potential connections at connection open/close time, opposed to each
time a connection is requested.
This commit is contained in:
Tim Brooks 2019-09-25 15:58:18 -06:00 committed by GitHub
parent 7c5a088aa5
commit 4f47e1f169
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 351 additions and 159 deletions

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.zen.NodesFaultDetection;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
@ -120,7 +121,7 @@ public class FollowersChecker {
channel.sendResponse(new NodesFaultDetection.PingResponse()));
transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
handleDisconnectedNode(node);
}
});

View File

@ -37,6 +37,7 @@ import org.elasticsearch.discovery.zen.MasterFaultDetection;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@ -118,7 +119,7 @@ public class LeaderChecker {
transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
handleDisconnectedNode(node);
}
});

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportService;
@ -97,7 +98,7 @@ public abstract class FaultDetection implements Closeable {
private class FDConnectionListener implements TransportConnectionListener {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
AbstractRunnable runnable = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {

View File

@ -31,10 +31,8 @@ import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.core.internal.io.IOUtils;
import java.io.Closeable;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -150,13 +148,13 @@ public class ConnectionManager implements Closeable {
} else {
logger.debug("connected to node [{}]", node);
try {
connectionListener.onNodeConnected(node);
connectionListener.onNodeConnected(node, conn);
} finally {
final Transport.Connection finalConnection = conn;
conn.addCloseListener(ActionListener.wrap(() -> {
logger.trace("unregistering {} after connection close and marking as disconnected", node);
connectedNodes.remove(node, finalConnection);
connectionListener.onNodeDisconnected(node);
connectionListener.onNodeDisconnected(node, conn);
}));
}
}
@ -218,13 +216,6 @@ public class ConnectionManager implements Closeable {
return connectedNodes.size();
}
/**
* Returns the set of nodes this manager is connected to.
*/
public Set<DiscoveryNode> connectedNodes() {
return Collections.unmodifiableSet(connectedNodes.keySet());
}
@Override
public void close() {
internalClose(true);
@ -283,16 +274,16 @@ public class ConnectionManager implements Closeable {
private final CopyOnWriteArrayList<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();
@Override
public void onNodeDisconnected(DiscoveryNode key) {
public void onNodeDisconnected(DiscoveryNode key, Transport.Connection connection) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeDisconnected(key);
listener.onNodeDisconnected(key, connection);
}
}
@Override
public void onNodeConnected(DiscoveryNode node) {
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeConnected(node);
listener.onNodeConnected(node, connection);
}
}

View File

@ -23,7 +23,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
@ -53,7 +52,6 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
@ -78,7 +76,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
private static final Logger logger = LogManager.getLogger(RemoteClusterConnection.class);
private final TransportService transportService;
private final ConnectionManager connectionManager;
private final RemoteConnectionManager remoteConnectionManager;
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
@ -116,7 +114,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.nodePredicate = nodePredicate;
this.clusterAlias = clusterAlias;
this.connectionManager = connectionManager;
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
this.seedNodes = Collections.unmodifiableList(seedNodes);
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
.getConcreteSettingForNamespace(clusterAlias).get(settings);
@ -168,8 +166,8 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (connectionManager.size() < maxNumRemoteConnections) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
if (remoteConnectionManager.size() < maxNumRemoteConnections) {
// try to reconnect and fill up the slot of the disconnected node
connectHandler.connect(ActionListener.wrap(
ignore -> logger.trace("successfully connected after disconnect of {}", node),
@ -182,7 +180,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
* will invoke the listener immediately.
*/
void ensureConnected(ActionListener<Void> voidActionListener) {
if (connectionManager.size() == 0) {
if (remoteConnectionManager.size() == 0) {
connectHandler.connect(voidActionListener);
} else {
voidActionListener.onResponse(null);
@ -211,8 +209,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
request.clear();
request.nodes(true);
request.local(true); // run this on the node that gets the request it's as good as any other
final DiscoveryNode node = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(node);
Transport.Connection connection = remoteConnectionManager.getAnyRemoteConnection();
transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterStateResponse>() {
@ -256,12 +253,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
* If such node is not connected, the returned connection will be a proxy connection that redirects to it.
*/
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
if (connectionManager.nodeConnected(remoteClusterNode)) {
return connectionManager.getConnection(remoteClusterNode);
}
DiscoveryNode discoveryNode = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(discoveryNode);
return new ProxyConnection(connection, remoteClusterNode);
return remoteConnectionManager.getRemoteConnection(remoteClusterNode);
}
private Predicate<ClusterName> getRemoteClusterNamePredicate() {
@ -280,67 +272,19 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
};
}
static final class ProxyConnection implements Transport.Connection {
private final Transport.Connection proxyConnection;
private final DiscoveryNode targetNode;
private ProxyConnection(Transport.Connection proxyConnection, DiscoveryNode targetNode) {
this.proxyConnection = proxyConnection;
this.targetNode = targetNode;
}
@Override
public DiscoveryNode getNode() {
return targetNode;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
proxyConnection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
TransportActionProxy.wrapRequest(targetNode, request), options);
}
@Override
public void close() {
assert false: "proxy connections must not be closed";
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
proxyConnection.addCloseListener(listener);
}
@Override
public boolean isClosed() {
return proxyConnection.isClosed();
}
@Override
public Version getVersion() {
return proxyConnection.getVersion();
}
}
Transport.Connection getConnection() {
return connectionManager.getConnection(getAnyConnectedNode());
return remoteConnectionManager.getAnyRemoteConnection();
}
@Override
public void close() throws IOException {
IOUtils.close(connectHandler);
connectionManager.closeNoBlock();
IOUtils.close(connectHandler, remoteConnectionManager);
}
public boolean isClosed() {
return connectHandler.isClosed();
}
public String getProxyAddress() {
return proxyAddress;
}
public List<Tuple<String, Supplier<DiscoveryNode>>> getSeedNodes() {
return seedNodes;
}
@ -456,14 +400,14 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
final ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG);
final StepListener<Transport.Connection> openConnectionStep = new StepListener<>();
try {
connectionManager.openConnection(seedNode, profile, openConnectionStep);
remoteConnectionManager.openConnection(seedNode, profile, openConnectionStep);
} catch (Exception e) {
onFailure.accept(e);
}
final StepListener<TransportService.HandshakeResponse> handShakeStep = new StepListener<>();
openConnectionStep.whenComplete(connection -> {
ConnectionProfile connectionProfile = connectionManager.getConnectionProfile();
ConnectionProfile connectionProfile = remoteConnectionManager.getConnectionManager().getConnectionProfile();
transportService.handshake(connection, connectionProfile.getHandshakeTimeout().millis(),
getRemoteClusterNamePredicate(), handShakeStep);
}, onFailure);
@ -472,8 +416,8 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
handShakeStep.whenComplete(handshakeResponse -> {
final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode());
if (nodePredicate.test(handshakeNode) && connectionManager.size() < maxNumRemoteConnections) {
connectionManager.connectToNode(handshakeNode, null,
if (nodePredicate.test(handshakeNode) && remoteConnectionManager.size() < maxNumRemoteConnections) {
remoteConnectionManager.connectToNode(handshakeNode, null,
transportService.connectionValidator(handshakeNode), fullConnectionStep);
} else {
fullConnectionStep.onResponse(null);
@ -565,8 +509,8 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
private void handleNodes(Iterator<DiscoveryNode> nodesIter) {
while (nodesIter.hasNext()) {
final DiscoveryNode node = maybeAddProxyAddress(proxyAddress, nodesIter.next());
if (nodePredicate.test(node) && connectionManager.size() < maxNumRemoteConnections) {
connectionManager.connectToNode(node, null,
if (nodePredicate.test(node) && remoteConnectionManager.size() < maxNumRemoteConnections) {
remoteConnectionManager.connectToNode(node, null,
transportService.connectionValidator(node), new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
@ -625,20 +569,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
}
boolean isNodeConnected(final DiscoveryNode node) {
return connectionManager.nodeConnected(node);
}
private final AtomicLong nextNodeId = new AtomicLong();
DiscoveryNode getAnyConnectedNode() {
List<DiscoveryNode> nodes = new ArrayList<>(connectionManager.connectedNodes());
if (nodes.isEmpty()) {
throw new NoSuchRemoteClusterException(clusterAlias);
} else {
long curr;
while ((curr = nextNodeId.incrementAndGet()) == Long.MIN_VALUE);
return nodes.get(Math.toIntExact(Math.floorMod(curr, nodes.size())));
}
return remoteConnectionManager.getConnectionManager().nodeConnected(node);
}
/**
@ -655,7 +586,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
}
int getNumNodesConnected() {
return connectionManager.size();
return remoteConnectionManager.size();
}
private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) {
@ -663,6 +594,6 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
}
ConnectionManager getConnectionManager() {
return connectionManager;
return remoteConnectionManager.getConnectionManager();
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class RemoteConnectionManager implements Closeable {
private final String clusterAlias;
private final ConnectionManager connectionManager;
private final AtomicLong counter = new AtomicLong();
private volatile List<Transport.Connection> connections = Collections.emptyList();
RemoteConnectionManager(String clusterAlias, ConnectionManager connectionManager) {
this.clusterAlias = clusterAlias;
this.connectionManager = connectionManager;
this.connectionManager.addListener(new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
addConnection(connection);
}
@Override
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
removeConnection(connection);
}
});
}
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
ConnectionManager.ConnectionValidator connectionValidator,
ActionListener<Void> listener) throws ConnectTransportException {
connectionManager.connectToNode(node, connectionProfile, connectionValidator, listener);
}
public void openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Transport.Connection> listener) {
connectionManager.openConnection(node, profile, listener);
}
public Transport.Connection getRemoteConnection(DiscoveryNode node) {
try {
return connectionManager.getConnection(node);
} catch (NodeNotConnectedException e) {
return new ProxyConnection(getAnyRemoteConnection(), node);
}
}
public Transport.Connection getAnyRemoteConnection() {
List<Transport.Connection> localConnections = this.connections;
if (localConnections.isEmpty()) {
throw new NoSuchRemoteClusterException(clusterAlias);
} else {
long curr;
while ((curr = counter.incrementAndGet()) == Long.MIN_VALUE);
return localConnections.get(Math.toIntExact(Math.floorMod(curr, (long) localConnections.size())));
}
}
public ConnectionManager getConnectionManager() {
return connectionManager;
}
public int size() {
return connectionManager.size();
}
public void close() {
connectionManager.closeNoBlock();
}
private synchronized void addConnection(Transport.Connection addedConnection) {
ArrayList<Transport.Connection> newConnections = new ArrayList<>(this.connections);
newConnections.add(addedConnection);
this.connections = Collections.unmodifiableList(newConnections);
}
private synchronized void removeConnection(Transport.Connection removedConnection) {
int newSize = this.connections.size() - 1;
ArrayList<Transport.Connection> newConnections = new ArrayList<>(newSize);
for (Transport.Connection connection : this.connections) {
if (connection.equals(removedConnection) == false) {
newConnections.add(connection);
}
}
assert newConnections.size() == newSize : "Expected connection count: " + newSize + ", Found: " + newConnections.size();
this.connections = Collections.unmodifiableList(newConnections);
}
static final class ProxyConnection implements Transport.Connection {
private final Transport.Connection connection;
private final DiscoveryNode targetNode;
private ProxyConnection(Transport.Connection connection, DiscoveryNode targetNode) {
this.connection = connection;
this.targetNode = targetNode;
}
@Override
public DiscoveryNode getNode() {
return targetNode;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
TransportActionProxy.wrapRequest(targetNode, request), options);
}
@Override
public void close() {
assert false: "proxy connections must not be closed";
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
connection.addCloseListener(listener);
}
@Override
public boolean isClosed() {
return connection.isClosed();
}
@Override
public Version getVersion() {
return connection.getVersion();
}
}
}

View File

@ -43,10 +43,10 @@ public interface TransportConnectionListener {
/**
* Called once a node connection is opened and registered.
*/
default void onNodeConnected(DiscoveryNode node) {}
default void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {}
/**
* Called once a node connection is closed and unregistered.
*/
default void onNodeDisconnected(DiscoveryNode node) {}
default void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {}
}

View File

@ -492,7 +492,7 @@ public class TransportSearchActionTests extends ESTestCase {
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
if (disconnectedNodes.remove(node)) {
disconnectedLatch.countDown();
}
@ -662,7 +662,7 @@ public class TransportSearchActionTests extends ESTestCase {
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
if (disconnectedNodes.remove(node)) {
disconnectedLatch.countDown();
}

View File

@ -166,7 +166,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
assert addr == null : "boundAddress: " + addr;
return DiscoveryNode.createLocal(settings, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID());
}, null, Collections.emptySet());
transportService.addNodeConnectedBehavior(cm -> Collections.emptySet());
transportService.addNodeConnectedBehavior((cm, dn) -> false);
transportService.addGetConnectionBehavior((connectionManager, discoveryNode) -> {
// The FailAndRetryTransport does not use the connection profile
PlainActionFuture<Transport.Connection> future = PlainActionFuture.newFuture();

View File

@ -30,7 +30,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes.Builder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
@ -215,9 +214,11 @@ public class PeerFinderTests extends ESTestCase {
= new ConnectionManager(settings, capturingTransport);
StubbableConnectionManager connectionManager
= new StubbableConnectionManager(innerConnectionManager, settings, capturingTransport);
connectionManager.setDefaultNodeConnectedBehavior(cm -> {
assertTrue(Sets.haveEmptyIntersection(connectedNodes, disconnectedNodes));
return connectedNodes;
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> {
final boolean isConnected = connectedNodes.contains(discoveryNode);
final boolean isDisconnected = disconnectedNodes.contains(discoveryNode);
assert isConnected != isDisconnected : discoveryNode + ": isConnected=" + isConnected + ", isDisconnected=" + isDisconnected;
return isConnected;
});
connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> capturingTransport.createConnection(discoveryNode));
transportService = new TransportService(settings, capturingTransport, deterministicTaskQueue.getThreadPool(),

View File

@ -102,12 +102,12 @@ public class ZenFaultDetectionTests extends ESTestCase {
final CountDownLatch latch = new CountDownLatch(2);
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
latch.countDown();
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
fail("disconnect should not be called " + node);
}
};

View File

@ -52,6 +52,7 @@ import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportResponse;
@ -193,12 +194,12 @@ public class PublishClusterStateActionTests extends ESTestCase {
final CountDownLatch latch = new CountDownLatch(nodes.size() * 2);
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
latch.countDown();
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
fail("disconnect should not be called " + node);
}
};

View File

@ -77,12 +77,12 @@ public class ConnectionManagerTests extends ESTestCase {
AtomicInteger nodeDisconnectedCount = new AtomicInteger();
connectionManager.addListener(new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
nodeConnectedCount.incrementAndGet();
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
nodeDisconnectedCount.incrementAndGet();
}
});
@ -204,12 +204,12 @@ public class ConnectionManagerTests extends ESTestCase {
AtomicInteger nodeDisconnectedCount = new AtomicInteger();
connectionManager.addListener(new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
nodeConnectedCount.incrementAndGet();
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
nodeDisconnectedCount.incrementAndGet();
}
});
@ -244,12 +244,12 @@ public class ConnectionManagerTests extends ESTestCase {
AtomicInteger nodeDisconnectedCount = new AtomicInteger();
connectionManager.addListener(new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
nodeConnectedCount.incrementAndGet();
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
nodeDisconnectedCount.incrementAndGet();
}
});
@ -293,7 +293,6 @@ public class ConnectionManagerTests extends ESTestCase {
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
}
}
}

View File

@ -87,7 +87,7 @@ public class RemoteClusterClientTests extends ESTestCase {
service.getRemoteClusterService().getConnections().forEach(con -> {
con.getConnectionManager().addListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
if (remoteNode.equals(node)) {
semaphore.release();
}

View File

@ -101,6 +101,7 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.mock;
public class RemoteClusterConnectionTests extends ESTestCase {
@ -362,14 +363,14 @@ public class RemoteClusterConnectionTests extends ESTestCase {
CountDownLatch latchConnected = new CountDownLatch(1);
connectionManager.addListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
if (node.equals(discoverableNode)) {
latchDisconnect.countDown();
}
}
@Override
public void onNodeConnected(DiscoveryNode node) {
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
if (node.equals(spareNode)) {
latchConnected.countDown();
}
@ -498,7 +499,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport);
StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport);
connectionManager.addConnectBehavior(seedNode.getAddress(), (cm, discoveryNode) -> {
connectionManager.addGetConnectionBehavior(seedNode.getAddress(), (cm, discoveryNode) -> {
if (discoveryNode == seedNode) {
return seedConnection;
}
@ -1003,8 +1004,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
barrier.await();
for (int j = 0; j < numGetCalls; j++) {
try {
DiscoveryNode node = connection.getAnyConnectedNode();
assertNotNull(node);
Transport.Connection lowLevelConnection = connection.getConnection();
assertNotNull(lowLevelConnection);
} catch (NoSuchRemoteClusterException e) {
// ignore, this is an expected exception
}
@ -1034,7 +1035,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
ActionListener.map(fut, x -> null)));
} else {
DiscoveryNode node = randomFrom(discoverableNodes).v2().get();
connection.onNodeDisconnected(node);
connection.onNodeDisconnected(node, mock(Transport.Connection.class));
}
}
} catch (Exception ex) {
@ -1142,14 +1143,14 @@ public class RemoteClusterConnectionTests extends ESTestCase {
ConnectionManager delegate = new ConnectionManager(Settings.EMPTY, service.transport);
StubbableConnectionManager connectionManager = new StubbableConnectionManager(delegate, Settings.EMPTY, service.transport);
connectionManager.setDefaultNodeConnectedBehavior(cm -> Collections.singleton(connectedNode));
connectionManager.setDefaultNodeConnectedBehavior((cm, node) -> connectedNode.equals(node));
connectionManager.addConnectBehavior(connectedNode.getAddress(), (cm, discoveryNode) -> {
if (discoveryNode == connectedNode) {
return seedConnection;
}
return cm.getConnection(discoveryNode);
connectionManager.addGetConnectionBehavior(connectedNode.getAddress(), (cm, discoveryNode) -> seedConnection);
connectionManager.addGetConnectionBehavior(disconnectedNode.getAddress(), (cm, discoveryNode) -> {
throw new NodeNotConnectedException(discoveryNode, "");
});
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
@ -1163,13 +1164,13 @@ public class RemoteClusterConnectionTests extends ESTestCase {
for (int i = 0; i < 10; i++) {
// we don't use the transport service connection manager so we will get a proxy connection for the local node
Transport.Connection remoteConnection = connection.getConnection(service.getLocalNode());
assertThat(remoteConnection, instanceOf(RemoteClusterConnection.ProxyConnection.class));
assertThat(remoteConnection, instanceOf(RemoteConnectionManager.ProxyConnection.class));
assertThat(remoteConnection.getNode(), equalTo(service.getLocalNode()));
}
for (int i = 0; i < 10; i++) {
//always a proxy connection as the target node is not connected
Transport.Connection remoteConnection = connection.getConnection(disconnectedNode);
assertThat(remoteConnection, instanceOf(RemoteClusterConnection.ProxyConnection.class));
assertThat(remoteConnection, instanceOf(RemoteConnectionManager.ProxyConnection.class));
assertThat(remoteConnection.getNode(), sameInstance(disconnectedNode));
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import java.net.InetAddress;
import java.util.HashSet;
import java.util.Set;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
public class RemoteConnectionManagerTests extends ESTestCase {
private Transport transport;
private RemoteConnectionManager remoteConnectionManager;
private ConnectionManager.ConnectionValidator validator = (connection, profile, listener) -> listener.onResponse(null);
@Override
public void setUp() throws Exception {
super.setUp();
transport = mock(Transport.class);
remoteConnectionManager = new RemoteConnectionManager("remote-cluster", new ConnectionManager(Settings.EMPTY, transport));
}
@SuppressWarnings("unchecked")
public void testGetConnection() {
TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 1000);
doAnswer(invocationOnMock -> {
ActionListener<Transport.Connection> listener = (ActionListener<Transport.Connection>) invocationOnMock.getArguments()[2];
listener.onResponse(new TestRemoteConnection((DiscoveryNode) invocationOnMock.getArguments()[0]));
return null;
}).when(transport).openConnection(any(DiscoveryNode.class), any(ConnectionProfile.class), any(ActionListener.class));
DiscoveryNode node1 = new DiscoveryNode("node-1", address, Version.CURRENT);
PlainActionFuture<Void> future1 = PlainActionFuture.newFuture();
remoteConnectionManager.connectToNode(node1, null, validator, future1);
assertTrue(future1.isDone());
// Add duplicate connect attempt to ensure that we do not get duplicate connections in the round robin
remoteConnectionManager.connectToNode(node1, null, validator, PlainActionFuture.newFuture());
DiscoveryNode node2 = new DiscoveryNode("node-2", address, Version.CURRENT.minimumCompatibilityVersion());
PlainActionFuture<Void> future2 = PlainActionFuture.newFuture();
remoteConnectionManager.connectToNode(node2, null, validator, future2);
assertTrue(future2.isDone());
assertEquals(node1, remoteConnectionManager.getRemoteConnection(node1).getNode());
assertEquals(node2, remoteConnectionManager.getRemoteConnection(node2).getNode());
DiscoveryNode node4 = new DiscoveryNode("node-4", address, Version.CURRENT);
assertThat(remoteConnectionManager.getRemoteConnection(node4), instanceOf(RemoteConnectionManager.ProxyConnection.class));
// Test round robin
Set<Version> versions = new HashSet<>();
versions.add(remoteConnectionManager.getRemoteConnection(node4).getVersion());
versions.add(remoteConnectionManager.getRemoteConnection(node4).getVersion());
assertThat(versions, hasItems(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()));
// Test that the connection is cleared from the round robin list when it is closed
remoteConnectionManager.getRemoteConnection(node1).close();
versions.clear();
versions.add(remoteConnectionManager.getRemoteConnection(node4).getVersion());
versions.add(remoteConnectionManager.getRemoteConnection(node4).getVersion());
assertThat(versions, hasItems(Version.CURRENT.minimumCompatibilityVersion()));
assertEquals(1, versions.size());
}
private static class TestRemoteConnection extends CloseableConnection {
private final DiscoveryNode node;
private TestRemoteConnection(DiscoveryNode node) {
this.node = node;
}
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
}
}
}

View File

@ -81,7 +81,7 @@ public class MockTransport implements Transport, LifecycleComponent {
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this),
settings, this);
connectionManager.setDefaultNodeConnectedBehavior(cm -> Collections.emptySet());
connectionManager.setDefaultNodeConnectedBehavior((cm, node) -> false);
connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode));
return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
connectionManager);

View File

@ -463,7 +463,7 @@ public final class MockTransportService extends TransportService {
* @return {@code true} if no other get connection behavior was registered for this address before.
*/
public boolean addGetConnectionBehavior(TransportAddress transportAddress, StubbableConnectionManager.GetConnectionBehavior behavior) {
return connectionManager().addConnectBehavior(transportAddress, behavior);
return connectionManager().addGetConnectionBehavior(transportAddress, behavior);
}
/**

View File

@ -28,7 +28,6 @@ import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -37,7 +36,7 @@ public class StubbableConnectionManager extends ConnectionManager {
private final ConnectionManager delegate;
private final ConcurrentMap<TransportAddress, GetConnectionBehavior> getConnectionBehaviors;
private volatile GetConnectionBehavior defaultGetConnectionBehavior = ConnectionManager::getConnection;
private volatile NodeConnectedBehavior defaultNodeConnectedBehavior = ConnectionManager::connectedNodes;
private volatile NodeConnectedBehavior defaultNodeConnectedBehavior = ConnectionManager::nodeConnected;
public StubbableConnectionManager(ConnectionManager delegate, Settings settings, Transport transport) {
super(settings, transport);
@ -45,7 +44,7 @@ public class StubbableConnectionManager extends ConnectionManager {
this.getConnectionBehaviors = new ConcurrentHashMap<>();
}
public boolean addConnectBehavior(TransportAddress transportAddress, GetConnectionBehavior connectBehavior) {
public boolean addGetConnectionBehavior(TransportAddress transportAddress, GetConnectionBehavior connectBehavior) {
return getConnectionBehaviors.put(transportAddress, connectBehavior) == null;
}
@ -64,7 +63,6 @@ public class StubbableConnectionManager extends ConnectionManager {
public void clearBehaviors() {
defaultGetConnectionBehavior = ConnectionManager::getConnection;
getConnectionBehaviors.clear();
defaultNodeConnectedBehavior = ConnectionManager::connectedNodes;
}
public void clearBehavior(TransportAddress transportAddress) {
@ -85,12 +83,7 @@ public class StubbableConnectionManager extends ConnectionManager {
@Override
public boolean nodeConnected(DiscoveryNode node) {
return defaultNodeConnectedBehavior.connectedNodes(delegate).contains(node);
}
@Override
public Set<DiscoveryNode> connectedNodes() {
return defaultNodeConnectedBehavior.connectedNodes(delegate);
return defaultNodeConnectedBehavior.connectedNodes(delegate, node);
}
@Override
@ -132,6 +125,6 @@ public class StubbableConnectionManager extends ConnectionManager {
@FunctionalInterface
public interface NodeConnectedBehavior {
Set<DiscoveryNode> connectedNodes(ConnectionManager connectionManager);
boolean connectedNodes(ConnectionManager connectionManager, DiscoveryNode node);
}
}

View File

@ -163,12 +163,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
final CountDownLatch latch = new CountDownLatch(2);
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
latch.countDown();
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
fail("disconnect should not be called " + node);
}
};
@ -689,12 +689,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
final CountDownLatch latch = new CountDownLatch(1);
TransportConnectionListener disconnectListener = new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
fail("node connected should not be called, all connection have been done previously, node: " + node);
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
latch.countDown();
}
};
@ -1731,12 +1731,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
final CountDownLatch latch = new CountDownLatch(4);
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
latch.countDown();
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
fail("disconnect should not be called " + node);
}
};