Reduce number of connections per node depending on the nodes role (#21849)

We currently treat every node equally when we establish connections to a node.
Yet, if we are not master eligible or can't hold any data there is no point in creating
a dedicated connection for sending the cluster state or running remote recoveries respectively.
The usage of STATE and RECOVERY connections on non-master and/or non-data nodes will result in an IllegalStateException.
This commit is contained in:
Simon Willnauer 2016-12-01 08:00:48 +01:00 committed by GitHub
parent fc9b63877e
commit dd5256c324
5 changed files with 108 additions and 26 deletions

View File

@ -37,12 +37,12 @@ public final class ConnectionProfile {
* types.
*/
public static final ConnectionProfile LIGHT_PROFILE = new ConnectionProfile(
Collections.singletonList(new ConnectionTypeHandle(0, 1,
Collections.singletonList(new ConnectionTypeHandle(0, 1, EnumSet.of(
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE)), 1);
TransportRequestOptions.Type.STATE))), 1);
private final List<ConnectionTypeHandle> handles;
private final int numConnections;
@ -75,7 +75,7 @@ public final class ConnectionProfile {
}
}
addedTypes.addAll(Arrays.asList(types));
handles.add(new ConnectionTypeHandle(offset, numConnections, types));
handles.add(new ConnectionTypeHandle(offset, numConnections, EnumSet.copyOf(Arrays.asList(types))));
offset += numConnections;
}
@ -100,6 +100,22 @@ public final class ConnectionProfile {
return numConnections;
}
/**
* Returns the number of connections per type for this profile. This might return a count that is shared with other types such
* that the sum of all connections per type might be higher than {@link #getNumConnections()}. For instance if
* {@link org.elasticsearch.transport.TransportRequestOptions.Type#BULK} shares connections with
* {@link org.elasticsearch.transport.TransportRequestOptions.Type#REG} they will return both the same number of connections from
* this method but the connections are not distinct.
*/
public int getNumConnectionsPerType(TransportRequestOptions.Type type) {
for (ConnectionTypeHandle handle : handles) {
if (handle.getTypes().contains(type)) {
return handle.length;
}
}
throw new AssertionError("no handle found for type: " + type);
}
/**
* Returns the type handles for this connection profile
*/
@ -113,10 +129,10 @@ public final class ConnectionProfile {
static final class ConnectionTypeHandle {
public final int length;
public final int offset;
private final TransportRequestOptions.Type[] types;
private final Set<TransportRequestOptions.Type> types;
private final AtomicInteger counter = new AtomicInteger();
private ConnectionTypeHandle(int offset, int length, TransportRequestOptions.Type... types) {
private ConnectionTypeHandle(int offset, int length, Set<TransportRequestOptions.Type> types) {
this.length = length;
this.offset = offset;
this.types = types;
@ -127,6 +143,9 @@ public final class ConnectionProfile {
* fashion.
*/
<T> T getChannel(T[] channels) {
if (length == 0) {
throw new IllegalStateException("can't select channel size is 0");
}
assert channels.length >= offset + length : "illegal size: " + channels.length + " expected >= " + (offset + length);
return channels[offset + Math.floorMod(counter.incrementAndGet(), length)];
}
@ -134,7 +153,7 @@ public final class ConnectionProfile {
/**
* Returns all types for this handle
*/
TransportRequestOptions.Type[] getTypes() {
Set<TransportRequestOptions.Type> getTypes() {
return types;
}
}

View File

@ -81,6 +81,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -149,11 +150,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
private static final int PING_DATA_SIZE = -1;
protected final int connectionsPerNodeRecovery;
protected final int connectionsPerNodeBulk;
protected final int connectionsPerNodeReg;
protected final int connectionsPerNodeState;
protected final int connectionsPerNodePing;
protected final TimeValue connectTimeout;
protected final boolean blockingClient;
private final CircuitBreakerService circuitBreakerService;
@ -179,7 +175,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
protected final boolean compress;
protected volatile BoundTransportAddress boundAddress;
private final String transportName;
private final ConnectionProfile defaultConnectionProfile;
protected final ConnectionProfile defaultConnectionProfile;
public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
@ -195,20 +191,27 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
this.networkService = networkService;
this.transportName = transportName;
this.connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);
this.connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings);
this.connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings);
this.connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);
this.connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);
this.connectTimeout = TCP_CONNECT_TIMEOUT.get(settings);
this.blockingClient = TCP_BLOCKING_CLIENT.get(settings);
defaultConnectionProfile = buildDefaultConnectionProfile(settings);
}
static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);
int connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings);
int connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings);
int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);
int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
builder.addConnections(connectionsPerNodeRecovery, TransportRequestOptions.Type.RECOVERY);
// if we are not master eligible we don't need a dedicated channel to publish the state
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
// if we are not a data-node we don't need any dedicated channels for recovery
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
builder.addConnections(connectionsPerNodeState, TransportRequestOptions.Type.STATE);
defaultConnectionProfile = builder.build();
return builder.build();
}
@Override

View File

@ -21,6 +21,8 @@ package org.elasticsearch.transport;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.util.EnumSet;
public class ConnectionProfileTests extends ESTestCase {
public void testBuildConnectionProfile() {
@ -45,7 +47,7 @@ public class ConnectionProfileTests extends ESTestCase {
assertEquals(4, build.getHandles().size());
assertEquals(0, build.getHandles().get(0).offset);
assertEquals(1, build.getHandles().get(0).length);
assertArrayEquals(new TransportRequestOptions.Type[] {TransportRequestOptions.Type.BULK}, build.getHandles().get(0).getTypes());
assertEquals(EnumSet.of(TransportRequestOptions.Type.BULK), build.getHandles().get(0).getTypes());
Integer channel = build.getHandles().get(0).getChannel(array);
for (int i = 0; i < numIters; i++) {
assertEquals(0, channel.intValue());
@ -53,7 +55,7 @@ public class ConnectionProfileTests extends ESTestCase {
assertEquals(1, build.getHandles().get(1).offset);
assertEquals(2, build.getHandles().get(1).length);
assertArrayEquals(new TransportRequestOptions.Type[] {TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY},
assertEquals(EnumSet.of(TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY),
build.getHandles().get(1).getTypes());
channel = build.getHandles().get(1).getChannel(array);
for (int i = 0; i < numIters; i++) {
@ -62,7 +64,7 @@ public class ConnectionProfileTests extends ESTestCase {
assertEquals(3, build.getHandles().get(2).offset);
assertEquals(3, build.getHandles().get(2).length);
assertArrayEquals(new TransportRequestOptions.Type[] {TransportRequestOptions.Type.PING}, build.getHandles().get(2).getTypes());
assertEquals(EnumSet.of(TransportRequestOptions.Type.PING), build.getHandles().get(2).getTypes());
channel = build.getHandles().get(2).getChannel(array);
for (int i = 0; i < numIters; i++) {
assertThat(channel, Matchers.anyOf(Matchers.is(3), Matchers.is(4), Matchers.is(5)));
@ -70,10 +72,29 @@ public class ConnectionProfileTests extends ESTestCase {
assertEquals(6, build.getHandles().get(3).offset);
assertEquals(4, build.getHandles().get(3).length);
assertArrayEquals(new TransportRequestOptions.Type[] {TransportRequestOptions.Type.REG}, build.getHandles().get(3).getTypes());
assertEquals(EnumSet.of(TransportRequestOptions.Type.REG), build.getHandles().get(3).getTypes());
channel = build.getHandles().get(3).getChannel(array);
for (int i = 0; i < numIters; i++) {
assertThat(channel, Matchers.anyOf(Matchers.is(6), Matchers.is(7), Matchers.is(8), Matchers.is(9)));
}
assertEquals(3, build.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(4, build.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(2, build.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(2, build.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(1, build.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
}
public void testNoChannels() {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1, TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG);
builder.addConnections(0, TransportRequestOptions.Type.PING);
ConnectionProfile build = builder.build();
Integer[] array = new Integer[]{Integer.valueOf(0)};
assertEquals(Integer.valueOf(0), build.getHandles().get(0).getChannel(array));
expectThrows(IllegalStateException.class, () -> build.getHandles().get(1).getChannel(array));
}
}

View File

@ -238,4 +238,38 @@ public class TCPTransportTests extends ESTestCase {
}
}
public void testDefaultConnectionProfile() {
ConnectionProfile profile = TcpTransport.buildDefaultConnectionProfile(Settings.EMPTY);
assertEquals(13, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
profile = TcpTransport.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build());
assertEquals(12, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
profile = TcpTransport.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build());
assertEquals(11, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
profile = TcpTransport.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).put("node.master", false).build());
assertEquals(10, profile.getNumConnections());
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
}
}

View File

@ -64,6 +64,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.TransportSettings;
@ -269,8 +270,12 @@ public class Netty4Transport extends TcpTransport<Channel> {
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
+ "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
name, workerCount, settings.get("port"), settings.get("bind_host"), settings.get("publish_host"), compress,
connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState,
connectionsPerNodePing, receivePredictorMin, receivePredictorMax);
connectTimeout, defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.REG),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.PING),
receivePredictorMin, receivePredictorMax);
}
final ThreadFactory workerFactory = daemonThreadFactory(this.settings, TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, name);