From cf9aff954ef916aaf1bddea362bcbb91466e60a1 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 25 Oct 2018 13:37:49 -0600 Subject: [PATCH] Reduce channels in AbstractSimpleTransportTestCase (#34863) This is related to #30876. The AbstractSimpleTransportTestCase initiates many tcp connections. There are normally over 1,000 connections in TIME_WAIT at the end of the test. This is because every test opens at least two different transports that connect to each other with 13 channel connection profiles. This commit modifies the default connection profile used by this test to 6. One connection for each type, except for REG which gets 2 connections. --- .../netty4/SimpleNetty4TransportTests.java | 2 +- .../nio/SimpleNioTransportTests.java | 2 +- .../transport/ConnectionManager.java | 21 +-------------- .../transport/ConnectionProfile.java | 27 +++++++++++++++++++ .../transport/ConnectionManagerTests.java | 10 +++---- .../AbstractSimpleTransportTestCase.java | 24 +++++++++++++---- .../transport/MockTcpTransportTests.java | 2 +- .../nio/SimpleMockNioTransportTests.java | 2 +- ...stractSimpleSecurityTransportTestCase.java | 3 +-- ...pleSecurityNetty4ServerTransportTests.java | 2 +- .../nio/SimpleSecurityNioTransportTests.java | 2 +- 11 files changed, 59 insertions(+), 38 deletions(-) diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index 4e63727024f..e7faac8ae01 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -73,7 +73,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase } }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); + MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index 8f6d78b481d..33d40b9f735 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -77,7 +77,7 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase { } }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); + MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index 4e4d369330c..5f2635fac88 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -67,7 +67,7 @@ public class ConnectionManager implements Closeable { private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) { - this(settings, transport, threadPool, buildDefaultConnectionProfile(settings)); + this(settings, transport, threadPool, ConnectionProfile.buildDefaultConnectionProfile(settings)); } public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) { @@ -323,23 +323,4 @@ public class ConnectionManager implements Closeable { } } } - - public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) { - int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings); - int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings); - int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings); - int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings); - int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings); - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); - builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); - builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); - builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK); - builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING); - // if we are not master eligible we don't need a dedicated channel to publish the state - builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE); - // if we are not a data-node we don't need any dedicated channels for recovery - builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY); - builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG); - return builder.build(); - } } diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java b/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java index b9ed42ca00a..d6183655fa2 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java @@ -18,7 +18,9 @@ */ package org.elasticsearch.transport; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import java.util.ArrayList; @@ -91,6 +93,31 @@ public final class ConnectionProfile { } } + /** + * Builds a default connection profile based on the provided settings. + * + * @param settings to build the connection profile from + * @return the connection profile + */ + public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) { + int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings); + int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings); + int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings); + int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings); + int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings); + Builder builder = new Builder(); + builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); + builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); + builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK); + builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING); + // if we are not master eligible we don't need a dedicated channel to publish the state + builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE); + // if we are not a data-node we don't need any dedicated channels for recovery + builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY); + builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG); + return builder.build(); + } + /** * A builder to build a new {@link ConnectionProfile} */ diff --git a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java index bff5a2b122d..3dc9e0aece7 100644 --- a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java @@ -64,7 +64,7 @@ public class ConnectionManagerTests extends ESTestCase { } public void testConnectionProfileResolve() { - final ConnectionProfile defaultProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY); + final ConnectionProfile defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); assertEquals(defaultProfile, ConnectionProfile.resolveConnectionProfile(null, defaultProfile)); final ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); @@ -96,7 +96,7 @@ public class ConnectionManagerTests extends ESTestCase { } public void testDefaultConnectionProfile() { - ConnectionProfile profile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY); + ConnectionProfile profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); assertEquals(13, profile.getNumConnections()); assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); @@ -104,7 +104,7 @@ public class ConnectionManagerTests extends ESTestCase { assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); - profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build()); + profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build()); assertEquals(12, profile.getNumConnections()); assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); @@ -112,7 +112,7 @@ public class ConnectionManagerTests extends ESTestCase { assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); - profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build()); + profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build()); assertEquals(11, profile.getNumConnections()); assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); @@ -120,7 +120,7 @@ public class ConnectionManagerTests extends ESTestCase { assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); - profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false) + profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false) .put("node.master", false).build()); assertEquals(10, profile.getNumConnections()); assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 3b64f00084e..85a654c4cac 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -116,7 +116,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake); protected int channelsPerNodeConnection() { - return 13; + // This is a customized profile for this test case. + return 6; } @Override @@ -125,9 +126,17 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { super.setUp(); threadPool = new TestThreadPool(getClass().getName()); clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - serviceA = buildService("TS_A", version0, clusterSettings); // this one supports dynamic tracer updates + Settings connectionSettings = Settings.builder() + .put(TransportService.CONNECTIONS_PER_NODE_RECOVERY.getKey(), 1) + .put(TransportService.CONNECTIONS_PER_NODE_BULK.getKey(), 1) + .put(TransportService.CONNECTIONS_PER_NODE_REG.getKey(), 2) + .put(TransportService.CONNECTIONS_PER_NODE_STATE.getKey(), 1) + .put(TransportService.CONNECTIONS_PER_NODE_PING.getKey(), 1) + .build(); + + serviceA = buildService("TS_A", version0, clusterSettings, connectionSettings); // this one supports dynamic tracer updates nodeA = serviceA.getLocalNode(); - serviceB = buildService("TS_B", version1, null); // this one doesn't support dynamic tracer updates + serviceB = buildService("TS_B", version1, null, connectionSettings); // this one doesn't support dynamic tracer updates nodeB = serviceB.getLocalNode(); // wait till all nodes are properly connected and the event has been sent, so tests in this class // will not get this callback called on the connections done in this setup @@ -174,7 +183,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) { - return buildService(name, version, clusterSettings, Settings.EMPTY, true, true); + return buildService(name, version, clusterSettings, Settings.EMPTY); + } + + protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings, + Settings settings) { + return buildService(name, version, clusterSettings, settings, true, true); } @Override @@ -1999,7 +2013,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { assertEquals("handshake failed", exception.getCause().getMessage()); } - ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY); + ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); try (TransportService service = buildService("TS_TPC", Version.CURRENT, null); TcpTransport.NodeChannels connection = originalTransport.openConnection( new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0), diff --git a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java index 4084d08b2e8..e8b5f38b88d 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java @@ -50,7 +50,7 @@ public class MockTcpTransportTests extends AbstractSimpleTransportTestCase { } }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); + MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java index bebe50752f4..10f089e855a 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java @@ -78,7 +78,7 @@ public class SimpleMockNioTransportTests extends AbstractSimpleTransportTestCase }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); + MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java index 2e1a423d5fd..077edf22c91 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java @@ -21,7 +21,6 @@ import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.AbstractSimpleTransportTestCase; import org.elasticsearch.transport.BindTransportException; import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportRequestOptions; @@ -111,7 +110,7 @@ public abstract class AbstractSimpleSecurityTransportTestCase extends AbstractSi assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport); TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport(); - ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY); + ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); try (TransportService service = buildService("TS_TPC", Version.CURRENT, null); TcpTransport.NodeChannels connection = originalTransport.openConnection( new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0), diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java index 88895034df9..291b39f4b05 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java @@ -93,7 +93,7 @@ public class SimpleSecurityNetty4ServerTransportTests extends AbstractSimpleSecu }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, + MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java index 5208d58d743..7fd4d8b5e03 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java @@ -55,7 +55,7 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleSecurityTrans }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, + MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService;