From 2464b686135eebab5b9a3fdcd2784a9cd16fad2e Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 15 Aug 2018 09:08:33 -0600 Subject: [PATCH] Move connection profile into connection manager (#32858) This is related to #31835. It moves the default connection profile into the ConnectionManager class. The will allow us to have different connection managers with different profiles. --- .../transport/netty4/Netty4Transport.java | 10 +-- .../common/settings/ClusterSettings.java | 12 ++-- .../transport/ConnectionManager.java | 34 ++++++++- .../transport/ConnectionProfile.java | 26 ++++++- .../transport/RemoteClusterConnection.java | 4 +- .../elasticsearch/transport/TcpTransport.java | 72 +++---------------- .../transport/TransportService.java | 27 +++++-- .../discovery/AbstractDisruptionTestCase.java | 4 +- .../transport/ConnectionManagerTests.java | 68 ++++++++++++++++++ .../transport/TcpTransportTests.java | 67 ----------------- .../test/InternalTestCluster.java | 6 +- .../test/transport/MockTransportService.java | 2 +- .../transport/StubbableConnectionManager.java | 5 ++ .../AbstractSimpleTransportTestCase.java | 4 +- .../transport/MockTcpTransport.java | 9 ++- .../transport/nio/MockNioTransport.java | 13 ++-- 16 files changed, 184 insertions(+), 179 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 90138bc5990..e310f3012a9 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -54,7 +54,6 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.TransportRequestOptions; import java.io.IOException; import java.net.InetSocketAddress; @@ -147,7 +146,6 @@ public class Netty4Transport extends TcpTransport { bootstrap.handler(getClientChannelInitializer()); - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(defaultConnectionProfile.getConnectTimeout().millis())); bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); bootstrap.option(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings)); @@ -175,14 +173,8 @@ public class Netty4Transport extends TcpTransport { String name = profileSettings.profileName; if (logger.isDebugEnabled()) { logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], " - + "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]", + + "receive_predictor[{}->{}]", name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, compress, - defaultConnectionProfile.getConnectTimeout(), - defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY), - defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK), - defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.REG), - defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE), - defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.PING), receivePredictorMin, receivePredictorMax); } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 03a0f3b42a1..de8691b3b68 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -297,13 +297,13 @@ public final class ClusterSettings extends AbstractScopedSettings { TcpTransport.TCP_REUSE_ADDRESS_PROFILE, TcpTransport.TCP_SEND_BUFFER_SIZE_PROFILE, TcpTransport.TCP_RECEIVE_BUFFER_SIZE_PROFILE, - TcpTransport.CONNECTIONS_PER_NODE_RECOVERY, - TcpTransport.CONNECTIONS_PER_NODE_BULK, - TcpTransport.CONNECTIONS_PER_NODE_REG, - TcpTransport.CONNECTIONS_PER_NODE_STATE, - TcpTransport.CONNECTIONS_PER_NODE_PING, + TransportService.CONNECTIONS_PER_NODE_RECOVERY, + TransportService.CONNECTIONS_PER_NODE_BULK, + TransportService.CONNECTIONS_PER_NODE_REG, + TransportService.CONNECTIONS_PER_NODE_STATE, + TransportService.CONNECTIONS_PER_NODE_PING, + TransportService.TCP_CONNECT_TIMEOUT, TcpTransport.PING_SCHEDULE, - TcpTransport.TCP_CONNECT_TIMEOUT, NetworkService.NETWORK_SERVER, TcpTransport.TCP_NO_DELAY, TcpTransport.TCP_KEEP_ALIVE, diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index f2b89ff5977..1ff8b701a83 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -60,15 +60,21 @@ public class ConnectionManager implements Closeable { private final Transport transport; private final ThreadPool threadPool; private final TimeValue pingSchedule; + private final ConnectionProfile defaultProfile; private final Lifecycle lifecycle = new Lifecycle(); private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) { + this(settings, transport, threadPool, buildDefaultConnectionProfile(settings)); + } + + public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) { this.logger = Loggers.getLogger(getClass(), settings); this.transport = transport; this.threadPool = threadPool; this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings); + this.defaultProfile = defaultProfile; this.lifecycle.moveToStarted(); if (pingSchedule.millis() > 0) { @@ -84,6 +90,10 @@ public class ConnectionManager implements Closeable { this.connectionListener.listeners.remove(listener); } + public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) { + return transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile)); + } + /** * Connects to a node with the given connection profile. If the node is already connected this method has no effect. * Once a successful is established, it can be validated before being exposed. @@ -91,6 +101,7 @@ public class ConnectionManager implements Closeable { public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile, CheckedBiConsumer connectionValidator) throws ConnectTransportException { + ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile); if (node == null) { throw new ConnectTransportException(null, "can't connect to a null node"); } @@ -104,8 +115,8 @@ public class ConnectionManager implements Closeable { } boolean success = false; try { - connection = transport.openConnection(node, connectionProfile); - connectionValidator.accept(connection, connectionProfile); + connection = transport.openConnection(node, resolvedProfile); + connectionValidator.accept(connection, resolvedProfile); // we acquire a connection lock, so no way there is an existing connection connectedNodes.put(node, connection); if (logger.isDebugEnabled()) { @@ -279,4 +290,23 @@ public class ConnectionManager implements Closeable { } } } + + static ConnectionProfile buildDefaultConnectionProfile(Settings settings) { + int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings); + int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings); + int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings); + int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings); + int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings); + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); + builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); + builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK); + builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING); + // if we are not master eligible we don't need a dedicated channel to publish the state + builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE); + // if we are not a data-node we don't need any dedicated channels for recovery + builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY); + builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG); + return builder.build(); + } } diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java b/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java index e14f684bf72..b9ed42ca00a 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -61,14 +62,35 @@ public final class ConnectionProfile { private final TimeValue connectTimeout; private final TimeValue handshakeTimeout; - private ConnectionProfile(List handles, int numConnections, TimeValue connectTimeout, TimeValue handshakeTimeout) - { + private ConnectionProfile(List handles, int numConnections, TimeValue connectTimeout, + TimeValue handshakeTimeout) { this.handles = handles; this.numConnections = numConnections; this.connectTimeout = connectTimeout; this.handshakeTimeout = handshakeTimeout; } + /** + * takes a {@link ConnectionProfile} resolves it to a fully specified (i.e., no nulls) profile + */ + public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionProfile profile, ConnectionProfile fallbackProfile) { + Objects.requireNonNull(fallbackProfile); + if (profile == null) { + return fallbackProfile; + } else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null) { + return profile; + } else { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(profile); + if (profile.getConnectTimeout() == null) { + builder.setConnectTimeout(fallbackProfile.getConnectTimeout()); + } + if (profile.getHandshakeTimeout() == null) { + builder.setHandshakeTimeout(fallbackProfile.getHandshakeTimeout()); + } + return builder.build(); + } + } + /** * A builder to build a new {@link ConnectionProfile} */ diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 355a9c655c9..67c0e1a5aa6 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -108,8 +108,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo this.nodePredicate = nodePredicate; this.clusterAlias = clusterAlias; ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); - builder.setConnectTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings)); - builder.setHandshakeTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings)); + builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); + builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); builder.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING); // TODO make this configurable? builder.addConnections(0, // we don't want this to be used for anything else but search TransportRequestOptions.Type.BULK, diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 064c76cc03f..0b82417cfaa 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.NotifyOnceListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Booleans; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesArray; @@ -135,18 +134,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements // the scheduled internal ping interval setting, defaults to disabled (-1) public static final Setting PING_SCHEDULE = timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope); - public static final Setting CONNECTIONS_PER_NODE_RECOVERY = - intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope); - public static final Setting CONNECTIONS_PER_NODE_BULK = - intSetting("transport.connections_per_node.bulk", 3, 1, Setting.Property.NodeScope); - public static final Setting CONNECTIONS_PER_NODE_REG = - intSetting("transport.connections_per_node.reg", 6, 1, Setting.Property.NodeScope); - public static final Setting CONNECTIONS_PER_NODE_STATE = - intSetting("transport.connections_per_node.state", 1, 1, Setting.Property.NodeScope); - public static final Setting CONNECTIONS_PER_NODE_PING = - intSetting("transport.connections_per_node.ping", 1, 1, Setting.Property.NodeScope); - public static final Setting TCP_CONNECT_TIMEOUT = - timeSetting("transport.tcp.connect_timeout", NetworkService.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope); public static final Setting TCP_NO_DELAY = boolSetting("transport.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope); public static final Setting TCP_KEEP_ALIVE = @@ -154,11 +141,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements public static final Setting TCP_REUSE_ADDRESS = boolSetting("transport.tcp.reuse_address", NetworkService.TCP_REUSE_ADDRESS, Setting.Property.NodeScope); public static final Setting TCP_SEND_BUFFER_SIZE = - Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE, - Setting.Property.NodeScope); + Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE, Setting.Property.NodeScope); public static final Setting TCP_RECEIVE_BUFFER_SIZE = - Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE, - Setting.Property.NodeScope); + Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope); public static final Setting.AffixSetting TCP_NO_DELAY_PROFILE = affixKeySetting("transport.profiles.", "tcp_no_delay", @@ -213,7 +198,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements protected final boolean compress; private volatile BoundTransportAddress boundAddress; private final String transportName; - protected final ConnectionProfile defaultConnectionProfile; private final ConcurrentMap pendingHandshakes = new ConcurrentHashMap<>(); private final CounterMetric numHandshakes = new CounterMetric(); @@ -237,7 +221,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings); this.networkService = networkService; this.transportName = transportName; - defaultConnectionProfile = buildDefaultConnectionProfile(settings); final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings); if (defaultFeatures == null) { this.features = new String[0]; @@ -261,25 +244,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } } - static ConnectionProfile buildDefaultConnectionProfile(Settings settings) { - int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings); - int connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings); - int connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings); - int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings); - int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings); - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); - builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings)); - builder.setHandshakeTimeout(TCP_CONNECT_TIMEOUT.get(settings)); - builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK); - builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING); - // if we are not master eligible we don't need a dedicated channel to publish the state - builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE); - // if we are not a data-node we don't need any dedicated channels for recovery - builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY); - builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG); - return builder.build(); - } - @Override protected void doStart() { } @@ -456,41 +420,21 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } } - /** - * takes a {@link ConnectionProfile} that have been passed as a parameter to the public methods - * and resolves it to a fully specified (i.e., no nulls) profile - */ - protected static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionProfile connectionProfile, - ConnectionProfile defaultConnectionProfile) { - Objects.requireNonNull(defaultConnectionProfile); - if (connectionProfile == null) { - return defaultConnectionProfile; - } else if (connectionProfile.getConnectTimeout() != null && connectionProfile.getHandshakeTimeout() != null) { - return connectionProfile; - } else { - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(connectionProfile); - if (connectionProfile.getConnectTimeout() == null) { - builder.setConnectTimeout(defaultConnectionProfile.getConnectTimeout()); - } - if (connectionProfile.getHandshakeTimeout() == null) { - builder.setHandshakeTimeout(defaultConnectionProfile.getHandshakeTimeout()); - } - return builder.build(); - } - } - - protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) { - return resolveConnectionProfile(connectionProfile, defaultConnectionProfile); + // This allows transport implementations to potentially override specific connection profiles. This + // primarily exists for the test implementations. + protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile connectionProfile) { + return connectionProfile; } @Override public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) { + Objects.requireNonNull(connectionProfile, "connection profile cannot be null"); if (node == null) { throw new ConnectTransportException(null, "can't open connection to a null node"); } boolean success = false; NodeChannels nodeChannels = null; - connectionProfile = resolveConnectionProfile(connectionProfile); + connectionProfile = maybeOverrideConnectionProfile(connectionProfile); closeLock.readLock().lock(); // ensure we don't open connections while we are closing try { ensureOpen(); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index d7ece36d7fd..8eca6504b70 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -43,6 +44,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; @@ -71,10 +73,24 @@ import java.util.function.Predicate; import java.util.function.Supplier; import static java.util.Collections.emptyList; +import static org.elasticsearch.common.settings.Setting.intSetting; import static org.elasticsearch.common.settings.Setting.listSetting; +import static org.elasticsearch.common.settings.Setting.timeSetting; public class TransportService extends AbstractLifecycleComponent implements TransportConnectionListener { + public static final Setting CONNECTIONS_PER_NODE_RECOVERY = + intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope); + public static final Setting CONNECTIONS_PER_NODE_BULK = + intSetting("transport.connections_per_node.bulk", 3, 1, Setting.Property.NodeScope); + public static final Setting CONNECTIONS_PER_NODE_REG = + intSetting("transport.connections_per_node.reg", 6, 1, Setting.Property.NodeScope); + public static final Setting CONNECTIONS_PER_NODE_STATE = + intSetting("transport.connections_per_node.state", 1, 1, Setting.Property.NodeScope); + public static final Setting CONNECTIONS_PER_NODE_PING = + intSetting("transport.connections_per_node.ping", 1, 1, Setting.Property.NodeScope); + public static final Setting TCP_CONNECT_TIMEOUT = + timeSetting("transport.tcp.connect_timeout", NetworkService.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope); public static final String DIRECT_RESPONSE_PROFILE = ".direct"; public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake"; @@ -111,7 +127,6 @@ public class TransportService extends AbstractLifecycleComponent implements Tran Function.identity(), Property.Dynamic, Property.NodeScope); private final Logger tracerLog; - private final ConnectionProfile defaultConnectionProfile; volatile String[] tracerLogInclude; volatile String[] tracerLogExclude; @@ -182,7 +197,6 @@ public class TransportService extends AbstractLifecycleComponent implements Tran this.connectToRemoteCluster = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings); remoteClusterService = new RemoteClusterService(settings, this); responseHandlers = transport.getResponseHandlers(); - defaultConnectionProfile = TcpTransport.buildDefaultConnectionProfile(settings); if (clusterSettings != null) { clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude); clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude); @@ -350,8 +364,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran return; } - ConnectionProfile resolvedProfile = TcpTransport.resolveConnectionProfile(connectionProfile, defaultConnectionProfile); - connectionManager.connectToNode(node, resolvedProfile, (newConnection, actualProfile) -> { + connectionManager.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> { // We don't validate cluster names to allow for CCS connections. final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true).discoveryNode; if (validateConnections && node.equals(remote) == false) { @@ -364,13 +377,13 @@ public class TransportService extends AbstractLifecycleComponent implements Tran * Establishes and returns a new connection to the given node. The connection is NOT maintained by this service, it's the callers * responsibility to close the connection once it goes out of scope. * @param node the node to connect to - * @param profile the connection profile to use + * @param connectionProfile the connection profile to use */ - public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile profile) throws IOException { + public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException { if (isLocalNode(node)) { return localNodeConnection; } else { - return transport.openConnection(node, profile); + return connectionManager.openConnection(node, connectionProfile); } } diff --git a/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java b/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java index 50dfd92d82e..0f3288b1973 100644 --- a/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java +++ b/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java @@ -44,7 +44,7 @@ import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.disruption.SlowClusterStateProcessing; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.TcpTransport; +import org.elasticsearch.transport.TransportService; import org.junit.Before; import java.util.Arrays; @@ -139,7 +139,7 @@ public abstract class AbstractDisruptionTestCase extends ESIntegTestCase { .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly - .put(TcpTransport.TCP_CONNECT_TIMEOUT.getKey(), "10s") // Network delay disruption waits for the min between this + .put(TransportService.TCP_CONNECT_TIMEOUT.getKey(), "10s") // Network delay disruption waits for the min between this // value and the time of disruption and does not recover immediately // when disruption is stop. We should make sure we recover faster // then the default of 30s, causing ensureGreen and friends to time out diff --git a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java index 64e8a426004..3c099c32bde 100644 --- a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java @@ -35,6 +35,7 @@ import java.net.InetAddress; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -62,6 +63,73 @@ public class ConnectionManagerTests extends ESTestCase { threadPool.shutdown(); } + public void testConnectionProfileResolve() { + final ConnectionProfile defaultProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY); + assertEquals(defaultProfile, ConnectionProfile.resolveConnectionProfile(null, defaultProfile)); + + final ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.BULK); + builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.RECOVERY); + builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.REG); + builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.STATE); + builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.PING); + + final boolean connectionTimeoutSet = randomBoolean(); + if (connectionTimeoutSet) { + builder.setConnectTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); + } + final boolean connectionHandshakeSet = randomBoolean(); + if (connectionHandshakeSet) { + builder.setHandshakeTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); + } + + final ConnectionProfile profile = builder.build(); + final ConnectionProfile resolved = ConnectionProfile.resolveConnectionProfile(profile, defaultProfile); + assertNotEquals(resolved, defaultProfile); + assertThat(resolved.getNumConnections(), equalTo(profile.getNumConnections())); + assertThat(resolved.getHandles(), equalTo(profile.getHandles())); + + assertThat(resolved.getConnectTimeout(), + equalTo(connectionTimeoutSet ? profile.getConnectTimeout() : defaultProfile.getConnectTimeout())); + assertThat(resolved.getHandshakeTimeout(), + equalTo(connectionHandshakeSet ? profile.getHandshakeTimeout() : defaultProfile.getHandshakeTimeout())); + } + + public void testDefaultConnectionProfile() { + ConnectionProfile profile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY); + assertEquals(13, profile.getNumConnections()); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); + assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); + assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); + assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + + profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build()); + assertEquals(12, profile.getNumConnections()); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); + assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); + assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); + assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + + profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build()); + assertEquals(11, profile.getNumConnections()); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); + assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); + assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + + profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false) + .put("node.master", false).build()); + assertEquals(10, profile.getNumConnections()); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); + assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); + assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + } + public void testConnectAndDisconnect() { AtomicInteger nodeConnectedCount = new AtomicInteger(); AtomicInteger nodeDisconnectedCount = new AtomicInteger(); diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 8474b994712..a3d2e1bbc57 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.test.ESTestCase; @@ -305,72 +304,6 @@ public class TcpTransportTests extends ESTestCase { } } - public void testConnectionProfileResolve() { - final ConnectionProfile defaultProfile = TcpTransport.buildDefaultConnectionProfile(Settings.EMPTY); - assertEquals(defaultProfile, TcpTransport.resolveConnectionProfile(null, defaultProfile)); - - final ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); - builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.BULK); - builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.RECOVERY); - builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.REG); - builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.STATE); - builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.PING); - - final boolean connectionTimeoutSet = randomBoolean(); - if (connectionTimeoutSet) { - builder.setConnectTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); - } - final boolean connectionHandshakeSet = randomBoolean(); - if (connectionHandshakeSet) { - builder.setHandshakeTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); - } - - final ConnectionProfile profile = builder.build(); - final ConnectionProfile resolved = TcpTransport.resolveConnectionProfile(profile, defaultProfile); - assertNotEquals(resolved, defaultProfile); - assertThat(resolved.getNumConnections(), equalTo(profile.getNumConnections())); - assertThat(resolved.getHandles(), equalTo(profile.getHandles())); - - assertThat(resolved.getConnectTimeout(), - equalTo(connectionTimeoutSet ? profile.getConnectTimeout() : defaultProfile.getConnectTimeout())); - assertThat(resolved.getHandshakeTimeout(), - equalTo(connectionHandshakeSet ? profile.getHandshakeTimeout() : defaultProfile.getHandshakeTimeout())); - } - - public void testDefaultConnectionProfile() { - ConnectionProfile profile = TcpTransport.buildDefaultConnectionProfile(Settings.EMPTY); - assertEquals(13, profile.getNumConnections()); - assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); - assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); - assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); - assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); - assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); - - profile = TcpTransport.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build()); - assertEquals(12, profile.getNumConnections()); - assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); - assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); - assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); - assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); - assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); - - profile = TcpTransport.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build()); - assertEquals(11, profile.getNumConnections()); - assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); - assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); - assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); - assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); - assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); - - profile = TcpTransport.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).put("node.master", false).build()); - assertEquals(10, profile.getNumConnections()); - assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); - assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); - assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); - assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); - assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); - } - public void testDecodeWithIncompleteHeader() throws IOException { BytesStreamOutput streamOutput = new BytesStreamOutput(1 << 14); streamOutput.write('E'); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 5b4f528913f..306f79e5e16 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -385,9 +385,9 @@ public final class InternalTestCluster extends TestCluster { // randomize tcp settings if (random.nextBoolean()) { - builder.put(TcpTransport.CONNECTIONS_PER_NODE_RECOVERY.getKey(), random.nextInt(2) + 1); - builder.put(TcpTransport.CONNECTIONS_PER_NODE_BULK.getKey(), random.nextInt(3) + 1); - builder.put(TcpTransport.CONNECTIONS_PER_NODE_REG.getKey(), random.nextInt(6) + 1); + builder.put(TransportService.CONNECTIONS_PER_NODE_RECOVERY.getKey(), random.nextInt(2) + 1); + builder.put(TransportService.CONNECTIONS_PER_NODE_BULK.getKey(), random.nextInt(3) + 1); + builder.put(TransportService.CONNECTIONS_PER_NODE_REG.getKey(), random.nextInt(6) + 1); } if (random.nextBoolean()) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 40a6ad6476d..15ab06d651e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -309,7 +309,7 @@ public final class MockTransportService extends TransportService { } // TODO: Replace with proper setting - TimeValue connectingTimeout = TcpTransport.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY); + TimeValue connectingTimeout = TransportService.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY); try { if (delay.millis() < connectingTimeout.millis()) { Thread.sleep(delay.millis()); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java index a74cb2752c2..486ccc805d0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java @@ -78,6 +78,11 @@ public class StubbableConnectionManager extends ConnectionManager { nodeConnectedBehaviors.remove(transportAddress); } + @Override + public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) { + return delegate.openConnection(node, connectionProfile); + } + @Override public Transport.Connection getConnection(DiscoveryNode node) { TransportAddress address = node.getAddress(); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 20cce99477c..29997b16ba0 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -1997,11 +1997,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { assertEquals("handshake failed", exception.getCause().getMessage()); } + ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY); try (TransportService service = buildService("TS_TPC", Version.CURRENT, null); TcpTransport.NodeChannels connection = originalTransport.openConnection( new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0), - null - )) { + connectionProfile)) { Version version = originalTransport.executeHandshake(connection.getNode(), connection.channel(TransportRequestOptions.Type.PING), TimeValue.timeValueSeconds(10)); assertEquals(version, Version.CURRENT); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 438151b51b9..e6d80ac24d8 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -191,12 +191,11 @@ public class MockTcpTransport extends TcpTransport { } @Override - protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) { - ConnectionProfile connectionProfile1 = resolveConnectionProfile(connectionProfile, defaultConnectionProfile); + protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile connectionProfile) { ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); Set allTypesWithConnection = new HashSet<>(); Set allTypesWithoutConnection = new HashSet<>(); - for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile1.getHandles()) { + for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile.getHandles()) { Set types = handle.getTypes(); if (handle.length > 0) { allTypesWithConnection.addAll(types); @@ -209,8 +208,8 @@ public class MockTcpTransport extends TcpTransport { if (allTypesWithoutConnection.isEmpty() == false) { builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0])); } - builder.setHandshakeTimeout(connectionProfile1.getHandshakeTimeout()); - builder.setConnectTimeout(connectionProfile1.getConnectTimeout()); + builder.setHandshakeTimeout(connectionProfile.getHandshakeTimeout()); + builder.setConnectTimeout(connectionProfile.getConnectTimeout()); return builder.build(); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index dc5305d951b..fbe61db6ee7 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -133,16 +133,15 @@ public class MockNioTransport extends TcpTransport { } @Override - protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) { - ConnectionProfile resolvedProfile = resolveConnectionProfile(connectionProfile, defaultConnectionProfile); - if (resolvedProfile.getNumConnections() <= 3) { - return resolvedProfile; + protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile connectionProfile) { + if (connectionProfile.getNumConnections() <= 3) { + return connectionProfile; } ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); Set allTypesWithConnection = new HashSet<>(); Set allTypesWithoutConnection = new HashSet<>(); for (TransportRequestOptions.Type type : TransportRequestOptions.Type.values()) { - int numConnections = resolvedProfile.getNumConnectionsPerType(type); + int numConnections = connectionProfile.getNumConnectionsPerType(type); if (numConnections > 0) { allTypesWithConnection.add(type); } else { @@ -155,8 +154,8 @@ public class MockNioTransport extends TcpTransport { if (allTypesWithoutConnection.isEmpty() == false) { builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0])); } - builder.setHandshakeTimeout(resolvedProfile.getHandshakeTimeout()); - builder.setConnectTimeout(resolvedProfile.getConnectTimeout()); + builder.setHandshakeTimeout(connectionProfile.getHandshakeTimeout()); + builder.setConnectTimeout(connectionProfile.getConnectTimeout()); return builder.build(); }