From 1f3eb068d54f6752e0dac155f8137b6eabc725bb Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sat, 17 Dec 2016 11:49:57 +0100 Subject: [PATCH] Add infrastructure to manage network connections outside of Transport/TransportService (#22194) Some expert users like UnicastZenPing today establishes real connections to nodes during it's ping phase that can be used by other parts of the system. Yet, this is potentially dangerous and undesirable unless the nodes have been fully verified and should be connected to in the case of a cluster state update or if we join a newly elected master. For use-cases like this, this change adds the infrastructure to manually handle connections that are not publicly available on the node ie. should not be managed by `Transport`/`TransportSerivce` --- .../transport/TransportService.java | 35 +++++++++++++-- .../discovery/zen/UnicastZenPingTests.java | 3 +- .../TransportServiceHandshakeTests.java | 28 +++++++++--- .../test/transport/MockTransportService.java | 44 +++++++++++++++++-- .../AbstractSimpleTransportTestCase.java | 8 ++-- 5 files changed, 102 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index a02b763f2d9..8884177ba63 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -290,6 +290,9 @@ public class TransportService extends AbstractLifecycleComponent { return transport.getLocalAddresses(); } + /** + * Returns true iff the given node is already connected. + */ public boolean nodeConnected(DiscoveryNode node) { return node.equals(localNode) || transport.nodeConnected(node); } @@ -311,6 +314,20 @@ public class TransportService extends AbstractLifecycleComponent { transport.connectToNode(node, connectionProfile); } + /** + * Establishes and returns a new connection to the given node. The connection is NOT maintained by this service, it's the callers + * responsibility to close the connection once it goes out of scope. + * @param node the node to connect to + * @param profile the connection profile to use + */ + public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile profile) throws IOException { + if (node.equals(localNode)) { + return localNodeConnection; + } else { + return transport.openConnection(node, profile); + } + } + /** * Lightly connect to the specified node, returning updated node * information. The handshake will fail if the cluster name on the @@ -337,7 +354,19 @@ public class TransportService extends AbstractLifecycleComponent { return handshakeNode; } - private DiscoveryNode handshake( + /** + * Executes a high-level handshake using the given connection + * and returns the discovery node of the node the connection + * was established with. The handshake will fail if the cluster + * name on the target node mismatches the local cluster name. + * + * @param connection the connection to a specific node + * @param handshakeTimeout handshake timeout + * @return the connected node + * @throws ConnectTransportException if the connection failed + * @throws IllegalStateException if the handshake failed + */ + public DiscoveryNode handshake( final Transport.Connection connection, final long handshakeTimeout) throws ConnectTransportException { final HandshakeResponse response; @@ -465,7 +494,7 @@ public class TransportService extends AbstractLifecycleComponent { } } - final void sendRequest(final Transport.Connection connection, final String action, + public final void sendRequest(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { @@ -477,7 +506,7 @@ public class TransportService extends AbstractLifecycleComponent { * Returns either a real transport connection or a local node connection if we are using the local node optimization. * @throws NodeNotConnectedException if the given node is not connected */ - private Transport.Connection getConnection(DiscoveryNode node) { + public Transport.Connection getConnection(DiscoveryNode node) { if (Objects.requireNonNull(node, "node must be non-null").equals(localNode)) { return localNodeConnection; } else { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 9886abb900a..de8d1a562e8 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransport; @@ -571,7 +572,7 @@ public class UnicastZenPingTests extends ESTestCase { final BiFunction supplier) { final Transport transport = supplier.apply(settings, version); final TransportService transportService = - new TransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); transportService.start(); transportService.acceptIncomingRequests(); final ConcurrentMap counters = ConcurrentCollections.newConcurrentMap(); diff --git a/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index 16735f34efe..fd756f6790e 100644 --- a/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -113,14 +113,24 @@ public class TransportServiceHandshakeTests extends ESTestCase { emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()); + try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)){ + DiscoveryNode connectedNode = handleA.transportService.handshake(connection, timeout); + assertNotNull(connectedNode); + // the name and version should be updated + assertEquals(connectedNode.getName(), "TS_B"); + assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion()); + assertFalse(handleA.transportService.nodeConnected(discoveryNode)); + } + DiscoveryNode connectedNode = - handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout); + handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout); assertNotNull(connectedNode); // the name and version should be updated assertEquals(connectedNode.getName(), "TS_B"); assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion()); assertTrue(handleA.transportService.nodeConnected(discoveryNode)); + } public void testMismatchedClusterName() { @@ -133,8 +143,12 @@ public class TransportServiceHandshakeTests extends ESTestCase { emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()); - IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake( - discoveryNode, timeout)); + IllegalStateException ex = expectThrows(IllegalStateException.class, () -> { + try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, + ConnectionProfile.LIGHT_PROFILE)) { + handleA.transportService.handshake(connection, timeout); + } + }); assertThat(ex.getMessage(), containsString("handshake failed, mismatched cluster name [Cluster [b]]")); assertFalse(handleA.transportService.nodeConnected(discoveryNode)); } @@ -150,8 +164,12 @@ public class TransportServiceHandshakeTests extends ESTestCase { emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()); - IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake( - discoveryNode, timeout)); + IllegalStateException ex = expectThrows(IllegalStateException.class, () -> { + try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, + ConnectionProfile.LIGHT_PROFILE)) { + handleA.transportService.handshake(connection, timeout); + } + }); assertThat(ex.getMessage(), containsString("handshake failed, incompatible version")); assertFalse(handleA.transportService.nodeConnected(discoveryNode)); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index dd05457cec1..a35a1919cb7 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -57,11 +57,13 @@ import java.io.IOException; import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; @@ -80,6 +82,7 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public final class MockTransportService extends TransportService { + private final Map> openConnections = new HashMap<>(); public static class TestPlugin extends Plugin { @Override @@ -553,9 +556,7 @@ public final class MockTransportService extends TransportService { } @Override - public void close() { - transport.close(); - } + public void close() { transport.close(); } @Override public Map profileBoundAddresses() { @@ -701,4 +702,41 @@ public final class MockTransportService extends TransportService { } return transport; } + + @Override + public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { + FilteredConnection filteredConnection = new FilteredConnection(super.openConnection(node, profile)) { + final AtomicBoolean closed = new AtomicBoolean(false); + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + if (closed.compareAndSet(false, true)) { + synchronized (openConnections) { + List connections = openConnections.get(node); + boolean remove = connections.remove(this); + assert remove; + if (connections.isEmpty()) { + openConnections.remove(node); + } + } + } + } + + } + }; + synchronized (openConnections) { + List connections = openConnections.computeIfAbsent(node, + (n) -> new CopyOnWriteArrayList<>()); + connections.add(filteredConnection); + } + return filteredConnection; + } + + @Override + protected void doClose() { + super.doClose(); + assert openConnections.size() == 0 : "still open connections: " + openConnections; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 542dcfb8b8b..283ae288d29 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -1351,8 +1351,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { // all is well } - try { - serviceB.connectToNodeAndHandshake(nodeA, 100); + try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){ + serviceB.handshake(connection, 100); fail("exception should be thrown"); } catch (IllegalStateException e) { // all is well @@ -1409,8 +1409,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { // all is well } - try { - serviceB.connectToNodeAndHandshake(nodeA, 100); + try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){ + serviceB.handshake(connection, 100); fail("exception should be thrown"); } catch (IllegalStateException e) { // all is well