Add infrastructure to manage network connections outside of Transport/TransportService (#22194)

Some expert users like UnicastZenPing today establishes real connections to nodes during it's ping
phase that can be used by other parts of the system. Yet, this is potentially dangerous
and undesirable unless the nodes have been fully verified and should be connected to in the
case of a cluster state update or if we join a newly elected master. For use-cases like this, this change adds the infrastructure to manually handle connections that are not publicly available on the node ie. should not be managed by `Transport`/`TransportSerivce`
This commit is contained in:
Simon Willnauer 2016-12-17 11:49:57 +01:00 committed by GitHub
parent 0b338bf523
commit 1f3eb068d5
5 changed files with 102 additions and 16 deletions

View File

@ -290,6 +290,9 @@ public class TransportService extends AbstractLifecycleComponent {
return transport.getLocalAddresses();
}
/**
* Returns <code>true</code> iff the given node is already connected.
*/
public boolean nodeConnected(DiscoveryNode node) {
return node.equals(localNode) || transport.nodeConnected(node);
}
@ -311,6 +314,20 @@ public class TransportService extends AbstractLifecycleComponent {
transport.connectToNode(node, connectionProfile);
}
/**
* Establishes and returns a new connection to the given node. The connection is NOT maintained by this service, it's the callers
* responsibility to close the connection once it goes out of scope.
* @param node the node to connect to
* @param profile the connection profile to use
*/
public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile profile) throws IOException {
if (node.equals(localNode)) {
return localNodeConnection;
} else {
return transport.openConnection(node, profile);
}
}
/**
* Lightly connect to the specified node, returning updated node
* information. The handshake will fail if the cluster name on the
@ -337,7 +354,19 @@ public class TransportService extends AbstractLifecycleComponent {
return handshakeNode;
}
private DiscoveryNode handshake(
/**
* Executes a high-level handshake using the given connection
* and returns the discovery node of the node the connection
* was established with. The handshake will fail if the cluster
* name on the target node mismatches the local cluster name.
*
* @param connection the connection to a specific node
* @param handshakeTimeout handshake timeout
* @return the connected node
* @throws ConnectTransportException if the connection failed
* @throws IllegalStateException if the handshake failed
*/
public DiscoveryNode handshake(
final Transport.Connection connection,
final long handshakeTimeout) throws ConnectTransportException {
final HandshakeResponse response;
@ -465,7 +494,7 @@ public class TransportService extends AbstractLifecycleComponent {
}
}
final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
@ -477,7 +506,7 @@ public class TransportService extends AbstractLifecycleComponent {
* Returns either a real transport connection or a local node connection if we are using the local node optimization.
* @throws NodeNotConnectedException if the given node is not connected
*/
private Transport.Connection getConnection(DiscoveryNode node) {
public Transport.Connection getConnection(DiscoveryNode node) {
if (Objects.requireNonNull(node, "node must be non-null").equals(localNode)) {
return localNodeConnection;
} else {

View File

@ -40,6 +40,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
@ -571,7 +572,7 @@ public class UnicastZenPingTests extends ESTestCase {
final BiFunction<Settings, Version, Transport> supplier) {
final Transport transport = supplier.apply(settings, version);
final TransportService transportService =
new TransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
final ConcurrentMap<TransportAddress, AtomicInteger> counters = ConcurrentCollections.newConcurrentMap();

View File

@ -113,14 +113,24 @@ public class TransportServiceHandshakeTests extends ESTestCase {
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion());
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)){
DiscoveryNode connectedNode = handleA.transportService.handshake(connection, timeout);
assertNotNull(connectedNode);
// the name and version should be updated
assertEquals(connectedNode.getName(), "TS_B");
assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion());
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}
DiscoveryNode connectedNode =
handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout);
handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout);
assertNotNull(connectedNode);
// the name and version should be updated
assertEquals(connectedNode.getName(), "TS_B");
assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion());
assertTrue(handleA.transportService.nodeConnected(discoveryNode));
}
public void testMismatchedClusterName() {
@ -133,8 +143,12 @@ public class TransportServiceHandshakeTests extends ESTestCase {
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion());
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake(
discoveryNode, timeout));
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode,
ConnectionProfile.LIGHT_PROFILE)) {
handleA.transportService.handshake(connection, timeout);
}
});
assertThat(ex.getMessage(), containsString("handshake failed, mismatched cluster name [Cluster [b]]"));
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}
@ -150,8 +164,12 @@ public class TransportServiceHandshakeTests extends ESTestCase {
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion());
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake(
discoveryNode, timeout));
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode,
ConnectionProfile.LIGHT_PROFILE)) {
handleA.transportService.handshake(connection, timeout);
}
});
assertThat(ex.getMessage(), containsString("handshake failed, incompatible version"));
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}

View File

@ -57,11 +57,13 @@ import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
@ -80,6 +82,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public final class MockTransportService extends TransportService {
private final Map<DiscoveryNode, List<Transport.Connection>> openConnections = new HashMap<>();
public static class TestPlugin extends Plugin {
@Override
@ -553,9 +556,7 @@ public final class MockTransportService extends TransportService {
}
@Override
public void close() {
transport.close();
}
public void close() { transport.close(); }
@Override
public Map<String, BoundTransportAddress> profileBoundAddresses() {
@ -701,4 +702,41 @@ public final class MockTransportService extends TransportService {
}
return transport;
}
@Override
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
FilteredConnection filteredConnection = new FilteredConnection(super.openConnection(node, profile)) {
final AtomicBoolean closed = new AtomicBoolean(false);
@Override
public void close() throws IOException {
try {
super.close();
} finally {
if (closed.compareAndSet(false, true)) {
synchronized (openConnections) {
List<Transport.Connection> connections = openConnections.get(node);
boolean remove = connections.remove(this);
assert remove;
if (connections.isEmpty()) {
openConnections.remove(node);
}
}
}
}
}
};
synchronized (openConnections) {
List<Transport.Connection> connections = openConnections.computeIfAbsent(node,
(n) -> new CopyOnWriteArrayList<>());
connections.add(filteredConnection);
}
return filteredConnection;
}
@Override
protected void doClose() {
super.doClose();
assert openConnections.size() == 0 : "still open connections: " + openConnections;
}
}

View File

@ -1351,8 +1351,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
// all is well
}
try {
serviceB.connectToNodeAndHandshake(nodeA, 100);
try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){
serviceB.handshake(connection, 100);
fail("exception should be thrown");
} catch (IllegalStateException e) {
// all is well
@ -1409,8 +1409,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
// all is well
}
try {
serviceB.connectToNodeAndHandshake(nodeA, 100);
try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){
serviceB.handshake(connection, 100);
fail("exception should be thrown");
} catch (IllegalStateException e) {
// all is well