Move connection profile into connection manager (#32858)

This is related to #31835. It moves the default connection profile into
the ConnectionManager class. The will allow us to have different
connection managers with different profiles.
This commit is contained in:
Tim Brooks 2018-08-15 09:08:33 -06:00 committed by GitHub
parent 51cece1900
commit 2464b68613
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 184 additions and 179 deletions

View File

@ -54,7 +54,6 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportRequestOptions;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -147,7 +146,6 @@ public class Netty4Transport extends TcpTransport {
bootstrap.handler(getClientChannelInitializer());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(defaultConnectionProfile.getConnectTimeout().millis()));
bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
bootstrap.option(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings));
@ -175,14 +173,8 @@ public class Netty4Transport extends TcpTransport {
String name = profileSettings.profileName;
if (logger.isDebugEnabled()) {
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
+ "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
+ "receive_predictor[{}->{}]",
name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, compress,
defaultConnectionProfile.getConnectTimeout(),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.REG),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.PING),
receivePredictorMin, receivePredictorMax);
}

View File

@ -297,13 +297,13 @@ public final class ClusterSettings extends AbstractScopedSettings {
TcpTransport.TCP_REUSE_ADDRESS_PROFILE,
TcpTransport.TCP_SEND_BUFFER_SIZE_PROFILE,
TcpTransport.TCP_RECEIVE_BUFFER_SIZE_PROFILE,
TcpTransport.CONNECTIONS_PER_NODE_RECOVERY,
TcpTransport.CONNECTIONS_PER_NODE_BULK,
TcpTransport.CONNECTIONS_PER_NODE_REG,
TcpTransport.CONNECTIONS_PER_NODE_STATE,
TcpTransport.CONNECTIONS_PER_NODE_PING,
TransportService.CONNECTIONS_PER_NODE_RECOVERY,
TransportService.CONNECTIONS_PER_NODE_BULK,
TransportService.CONNECTIONS_PER_NODE_REG,
TransportService.CONNECTIONS_PER_NODE_STATE,
TransportService.CONNECTIONS_PER_NODE_PING,
TransportService.TCP_CONNECT_TIMEOUT,
TcpTransport.PING_SCHEDULE,
TcpTransport.TCP_CONNECT_TIMEOUT,
NetworkService.NETWORK_SERVER,
TcpTransport.TCP_NO_DELAY,
TcpTransport.TCP_KEEP_ALIVE,

View File

@ -60,15 +60,21 @@ public class ConnectionManager implements Closeable {
private final Transport transport;
private final ThreadPool threadPool;
private final TimeValue pingSchedule;
private final ConnectionProfile defaultProfile;
private final Lifecycle lifecycle = new Lifecycle();
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
this(settings, transport, threadPool, buildDefaultConnectionProfile(settings));
}
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) {
this.logger = Loggers.getLogger(getClass(), settings);
this.transport = transport;
this.threadPool = threadPool;
this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings);
this.defaultProfile = defaultProfile;
this.lifecycle.moveToStarted();
if (pingSchedule.millis() > 0) {
@ -84,6 +90,10 @@ public class ConnectionManager implements Closeable {
this.connectionListener.listeners.remove(listener);
}
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
return transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile));
}
/**
* Connects to a node with the given connection profile. If the node is already connected this method has no effect.
* Once a successful is established, it can be validated before being exposed.
@ -91,6 +101,7 @@ public class ConnectionManager implements Closeable {
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Transport.Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
if (node == null) {
throw new ConnectTransportException(null, "can't connect to a null node");
}
@ -104,8 +115,8 @@ public class ConnectionManager implements Closeable {
}
boolean success = false;
try {
connection = transport.openConnection(node, connectionProfile);
connectionValidator.accept(connection, connectionProfile);
connection = transport.openConnection(node, resolvedProfile);
connectionValidator.accept(connection, resolvedProfile);
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, connection);
if (logger.isDebugEnabled()) {
@ -279,4 +290,23 @@ public class ConnectionManager implements Closeable {
}
}
}
static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings);
int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings);
int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings);
int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
// if we are not master eligible we don't need a dedicated channel to publish the state
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
// if we are not a data-node we don't need any dedicated channels for recovery
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
return builder.build();
}
}

View File

@ -26,6 +26,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@ -61,14 +62,35 @@ public final class ConnectionProfile {
private final TimeValue connectTimeout;
private final TimeValue handshakeTimeout;
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout, TimeValue handshakeTimeout)
{
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout,
TimeValue handshakeTimeout) {
this.handles = handles;
this.numConnections = numConnections;
this.connectTimeout = connectTimeout;
this.handshakeTimeout = handshakeTimeout;
}
/**
* takes a {@link ConnectionProfile} resolves it to a fully specified (i.e., no nulls) profile
*/
public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionProfile profile, ConnectionProfile fallbackProfile) {
Objects.requireNonNull(fallbackProfile);
if (profile == null) {
return fallbackProfile;
} else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null) {
return profile;
} else {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(profile);
if (profile.getConnectTimeout() == null) {
builder.setConnectTimeout(fallbackProfile.getConnectTimeout());
}
if (profile.getHandshakeTimeout() == null) {
builder.setHandshakeTimeout(fallbackProfile.getHandshakeTimeout());
}
return builder.build();
}
}
/**
* A builder to build a new {@link ConnectionProfile}
*/

View File

@ -108,8 +108,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
this.nodePredicate = nodePredicate;
this.clusterAlias = clusterAlias;
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TcpTransport.TCP_CONNECT_TIMEOUT.get(settings));
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING); // TODO make this configurable?
builder.addConnections(0, // we don't want this to be used for anything else but search
TransportRequestOptions.Type.BULK,

View File

@ -29,7 +29,6 @@ import org.elasticsearch.action.NotifyOnceListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
@ -135,18 +134,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// the scheduled internal ping interval setting, defaults to disabled (-1)
public static final Setting<TimeValue> PING_SCHEDULE =
timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY =
intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_BULK =
intSetting("transport.connections_per_node.bulk", 3, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_REG =
intSetting("transport.connections_per_node.reg", 6, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_STATE =
intSetting("transport.connections_per_node.state", 1, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_PING =
intSetting("transport.connections_per_node.ping", 1, 1, Setting.Property.NodeScope);
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =
timeSetting("transport.tcp.connect_timeout", NetworkService.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope);
public static final Setting<Boolean> TCP_NO_DELAY =
boolSetting("transport.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope);
public static final Setting<Boolean> TCP_KEEP_ALIVE =
@ -154,11 +141,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
public static final Setting<Boolean> TCP_REUSE_ADDRESS =
boolSetting("transport.tcp.reuse_address", NetworkService.TCP_REUSE_ADDRESS, Setting.Property.NodeScope);
public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE =
Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE,
Setting.Property.NodeScope);
Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TCP_SEND_BUFFER_SIZE, Setting.Property.NodeScope);
public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE =
Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE,
Setting.Property.NodeScope);
Setting.byteSizeSetting("transport.tcp.receive_buffer_size", NetworkService.TCP_RECEIVE_BUFFER_SIZE, Setting.Property.NodeScope);
public static final Setting.AffixSetting<Boolean> TCP_NO_DELAY_PROFILE = affixKeySetting("transport.profiles.", "tcp_no_delay",
@ -213,7 +198,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected final boolean compress;
private volatile BoundTransportAddress boundAddress;
private final String transportName;
protected final ConnectionProfile defaultConnectionProfile;
private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
private final CounterMetric numHandshakes = new CounterMetric();
@ -237,7 +221,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
this.networkService = networkService;
this.transportName = transportName;
defaultConnectionProfile = buildDefaultConnectionProfile(settings);
final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings);
if (defaultFeatures == null) {
this.features = new String[0];
@ -261,25 +244,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
}
static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);
int connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings);
int connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings);
int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);
int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings));
builder.setHandshakeTimeout(TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
// if we are not master eligible we don't need a dedicated channel to publish the state
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
// if we are not a data-node we don't need any dedicated channels for recovery
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
return builder.build();
}
@Override
protected void doStart() {
}
@ -456,41 +420,21 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
}
/**
* takes a {@link ConnectionProfile} that have been passed as a parameter to the public methods
* and resolves it to a fully specified (i.e., no nulls) profile
*/
protected static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionProfile connectionProfile,
ConnectionProfile defaultConnectionProfile) {
Objects.requireNonNull(defaultConnectionProfile);
if (connectionProfile == null) {
return defaultConnectionProfile;
} else if (connectionProfile.getConnectTimeout() != null && connectionProfile.getHandshakeTimeout() != null) {
return connectionProfile;
} else {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(connectionProfile);
if (connectionProfile.getConnectTimeout() == null) {
builder.setConnectTimeout(defaultConnectionProfile.getConnectTimeout());
}
if (connectionProfile.getHandshakeTimeout() == null) {
builder.setHandshakeTimeout(defaultConnectionProfile.getHandshakeTimeout());
}
return builder.build();
}
}
protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
return resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
// This allows transport implementations to potentially override specific connection profiles. This
// primarily exists for the test implementations.
protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile connectionProfile) {
return connectionProfile;
}
@Override
public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
Objects.requireNonNull(connectionProfile, "connection profile cannot be null");
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
boolean success = false;
NodeChannels nodeChannels = null;
connectionProfile = resolveConnectionProfile(connectionProfile);
connectionProfile = maybeOverrideConnectionProfile(connectionProfile);
closeLock.readLock().lock(); // ensure we don't open connections while we are closing
try {
ensureOpen();

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
@ -43,6 +44,7 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
@ -71,10 +73,24 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import static java.util.Collections.emptyList;
import static org.elasticsearch.common.settings.Setting.intSetting;
import static org.elasticsearch.common.settings.Setting.listSetting;
import static org.elasticsearch.common.settings.Setting.timeSetting;
public class TransportService extends AbstractLifecycleComponent implements TransportConnectionListener {
public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY =
intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_BULK =
intSetting("transport.connections_per_node.bulk", 3, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_REG =
intSetting("transport.connections_per_node.reg", 6, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_STATE =
intSetting("transport.connections_per_node.state", 1, 1, Setting.Property.NodeScope);
public static final Setting<Integer> CONNECTIONS_PER_NODE_PING =
intSetting("transport.connections_per_node.ping", 1, 1, Setting.Property.NodeScope);
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =
timeSetting("transport.tcp.connect_timeout", NetworkService.TCP_CONNECT_TIMEOUT, Setting.Property.NodeScope);
public static final String DIRECT_RESPONSE_PROFILE = ".direct";
public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake";
@ -111,7 +127,6 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
Function.identity(), Property.Dynamic, Property.NodeScope);
private final Logger tracerLog;
private final ConnectionProfile defaultConnectionProfile;
volatile String[] tracerLogInclude;
volatile String[] tracerLogExclude;
@ -182,7 +197,6 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
this.connectToRemoteCluster = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
remoteClusterService = new RemoteClusterService(settings, this);
responseHandlers = transport.getResponseHandlers();
defaultConnectionProfile = TcpTransport.buildDefaultConnectionProfile(settings);
if (clusterSettings != null) {
clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude);
clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude);
@ -350,8 +364,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
return;
}
ConnectionProfile resolvedProfile = TcpTransport.resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
connectionManager.connectToNode(node, resolvedProfile, (newConnection, actualProfile) -> {
connectionManager.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
// We don't validate cluster names to allow for CCS connections.
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true).discoveryNode;
if (validateConnections && node.equals(remote) == false) {
@ -364,13 +377,13 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
* Establishes and returns a new connection to the given node. The connection is NOT maintained by this service, it's the callers
* responsibility to close the connection once it goes out of scope.
* @param node the node to connect to
* @param profile the connection profile to use
* @param connectionProfile the connection profile to use
*/
public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile profile) throws IOException {
public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException {
if (isLocalNode(node)) {
return localNodeConnection;
} else {
return transport.openConnection(node, profile);
return connectionManager.openConnection(node, connectionProfile);
}
}

View File

@ -44,7 +44,7 @@ import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import java.util.Arrays;
@ -139,7 +139,7 @@ public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
.put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly
.put(TcpTransport.TCP_CONNECT_TIMEOUT.getKey(), "10s") // Network delay disruption waits for the min between this
.put(TransportService.TCP_CONNECT_TIMEOUT.getKey(), "10s") // Network delay disruption waits for the min between this
// value and the time of disruption and does not recover immediately
// when disruption is stop. We should make sure we recover faster
// then the default of 30s, causing ensureGreen and friends to time out

View File

@ -35,6 +35,7 @@ import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -62,6 +63,73 @@ public class ConnectionManagerTests extends ESTestCase {
threadPool.shutdown();
}
public void testConnectionProfileResolve() {
final ConnectionProfile defaultProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
assertEquals(defaultProfile, ConnectionProfile.resolveConnectionProfile(null, defaultProfile));
final ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.BULK);
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.RECOVERY);
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.REG);
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.STATE);
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.PING);
final boolean connectionTimeoutSet = randomBoolean();
if (connectionTimeoutSet) {
builder.setConnectTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
final boolean connectionHandshakeSet = randomBoolean();
if (connectionHandshakeSet) {
builder.setHandshakeTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
final ConnectionProfile profile = builder.build();
final ConnectionProfile resolved = ConnectionProfile.resolveConnectionProfile(profile, defaultProfile);
assertNotEquals(resolved, defaultProfile);
assertThat(resolved.getNumConnections(), equalTo(profile.getNumConnections()));
assertThat(resolved.getHandles(), equalTo(profile.getHandles()));
assertThat(resolved.getConnectTimeout(),
equalTo(connectionTimeoutSet ? profile.getConnectTimeout() : defaultProfile.getConnectTimeout()));
assertThat(resolved.getHandshakeTimeout(),
equalTo(connectionHandshakeSet ? profile.getHandshakeTimeout() : defaultProfile.getHandshakeTimeout()));
}
public void testDefaultConnectionProfile() {
ConnectionProfile profile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
assertEquals(13, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build());
assertEquals(12, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build());
assertEquals(11, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false)
.put("node.master", false).build());
assertEquals(10, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
}
public void testConnectAndDisconnect() {
AtomicInteger nodeConnectedCount = new AtomicInteger();
AtomicInteger nodeDisconnectedCount = new AtomicInteger();

View File

@ -29,7 +29,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.test.ESTestCase;
@ -305,72 +304,6 @@ public class TcpTransportTests extends ESTestCase {
}
}
public void testConnectionProfileResolve() {
final ConnectionProfile defaultProfile = TcpTransport.buildDefaultConnectionProfile(Settings.EMPTY);
assertEquals(defaultProfile, TcpTransport.resolveConnectionProfile(null, defaultProfile));
final ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.BULK);
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.RECOVERY);
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.REG);
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.STATE);
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.PING);
final boolean connectionTimeoutSet = randomBoolean();
if (connectionTimeoutSet) {
builder.setConnectTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
final boolean connectionHandshakeSet = randomBoolean();
if (connectionHandshakeSet) {
builder.setHandshakeTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
final ConnectionProfile profile = builder.build();
final ConnectionProfile resolved = TcpTransport.resolveConnectionProfile(profile, defaultProfile);
assertNotEquals(resolved, defaultProfile);
assertThat(resolved.getNumConnections(), equalTo(profile.getNumConnections()));
assertThat(resolved.getHandles(), equalTo(profile.getHandles()));
assertThat(resolved.getConnectTimeout(),
equalTo(connectionTimeoutSet ? profile.getConnectTimeout() : defaultProfile.getConnectTimeout()));
assertThat(resolved.getHandshakeTimeout(),
equalTo(connectionHandshakeSet ? profile.getHandshakeTimeout() : defaultProfile.getHandshakeTimeout()));
}
public void testDefaultConnectionProfile() {
ConnectionProfile profile = TcpTransport.buildDefaultConnectionProfile(Settings.EMPTY);
assertEquals(13, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
profile = TcpTransport.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build());
assertEquals(12, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
profile = TcpTransport.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build());
assertEquals(11, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
profile = TcpTransport.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).put("node.master", false).build());
assertEquals(10, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
}
public void testDecodeWithIncompleteHeader() throws IOException {
BytesStreamOutput streamOutput = new BytesStreamOutput(1 << 14);
streamOutput.write('E');

View File

@ -385,9 +385,9 @@ public final class InternalTestCluster extends TestCluster {
// randomize tcp settings
if (random.nextBoolean()) {
builder.put(TcpTransport.CONNECTIONS_PER_NODE_RECOVERY.getKey(), random.nextInt(2) + 1);
builder.put(TcpTransport.CONNECTIONS_PER_NODE_BULK.getKey(), random.nextInt(3) + 1);
builder.put(TcpTransport.CONNECTIONS_PER_NODE_REG.getKey(), random.nextInt(6) + 1);
builder.put(TransportService.CONNECTIONS_PER_NODE_RECOVERY.getKey(), random.nextInt(2) + 1);
builder.put(TransportService.CONNECTIONS_PER_NODE_BULK.getKey(), random.nextInt(3) + 1);
builder.put(TransportService.CONNECTIONS_PER_NODE_REG.getKey(), random.nextInt(6) + 1);
}
if (random.nextBoolean()) {

View File

@ -309,7 +309,7 @@ public final class MockTransportService extends TransportService {
}
// TODO: Replace with proper setting
TimeValue connectingTimeout = TcpTransport.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
TimeValue connectingTimeout = TransportService.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
try {
if (delay.millis() < connectingTimeout.millis()) {
Thread.sleep(delay.millis());

View File

@ -78,6 +78,11 @@ public class StubbableConnectionManager extends ConnectionManager {
nodeConnectedBehaviors.remove(transportAddress);
}
@Override
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
return delegate.openConnection(node, connectionProfile);
}
@Override
public Transport.Connection getConnection(DiscoveryNode node) {
TransportAddress address = node.getAddress();

View File

@ -1997,11 +1997,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
assertEquals("handshake failed", exception.getCause().getMessage());
}
ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null);
TcpTransport.NodeChannels connection = originalTransport.openConnection(
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),
null
)) {
connectionProfile)) {
Version version = originalTransport.executeHandshake(connection.getNode(),
connection.channel(TransportRequestOptions.Type.PING), TimeValue.timeValueSeconds(10));
assertEquals(version, Version.CURRENT);

View File

@ -191,12 +191,11 @@ public class MockTcpTransport extends TcpTransport {
}
@Override
protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
ConnectionProfile connectionProfile1 = resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile connectionProfile) {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
Set<TransportRequestOptions.Type> allTypesWithConnection = new HashSet<>();
Set<TransportRequestOptions.Type> allTypesWithoutConnection = new HashSet<>();
for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile1.getHandles()) {
for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile.getHandles()) {
Set<TransportRequestOptions.Type> types = handle.getTypes();
if (handle.length > 0) {
allTypesWithConnection.addAll(types);
@ -209,8 +208,8 @@ public class MockTcpTransport extends TcpTransport {
if (allTypesWithoutConnection.isEmpty() == false) {
builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0]));
}
builder.setHandshakeTimeout(connectionProfile1.getHandshakeTimeout());
builder.setConnectTimeout(connectionProfile1.getConnectTimeout());
builder.setHandshakeTimeout(connectionProfile.getHandshakeTimeout());
builder.setConnectTimeout(connectionProfile.getConnectTimeout());
return builder.build();
}

View File

@ -133,16 +133,15 @@ public class MockNioTransport extends TcpTransport {
}
@Override
protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
ConnectionProfile resolvedProfile = resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
if (resolvedProfile.getNumConnections() <= 3) {
return resolvedProfile;
protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile connectionProfile) {
if (connectionProfile.getNumConnections() <= 3) {
return connectionProfile;
}
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
Set<TransportRequestOptions.Type> allTypesWithConnection = new HashSet<>();
Set<TransportRequestOptions.Type> allTypesWithoutConnection = new HashSet<>();
for (TransportRequestOptions.Type type : TransportRequestOptions.Type.values()) {
int numConnections = resolvedProfile.getNumConnectionsPerType(type);
int numConnections = connectionProfile.getNumConnectionsPerType(type);
if (numConnections > 0) {
allTypesWithConnection.add(type);
} else {
@ -155,8 +154,8 @@ public class MockNioTransport extends TcpTransport {
if (allTypesWithoutConnection.isEmpty() == false) {
builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0]));
}
builder.setHandshakeTimeout(resolvedProfile.getHandshakeTimeout());
builder.setConnectTimeout(resolvedProfile.getConnectTimeout());
builder.setHandshakeTimeout(connectionProfile.getHandshakeTimeout());
builder.setConnectTimeout(connectionProfile.getConnectTimeout());
return builder.build();
}