diff --git a/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java b/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java index 97f4c3349c0..b49fc72c48a 100644 --- a/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java +++ b/core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java @@ -37,12 +37,12 @@ public final class ConnectionProfile { * types. */ public static final ConnectionProfile LIGHT_PROFILE = new ConnectionProfile( - Collections.singletonList(new ConnectionTypeHandle(0, 1, + Collections.singletonList(new ConnectionTypeHandle(0, 1, EnumSet.of( TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.PING, TransportRequestOptions.Type.RECOVERY, TransportRequestOptions.Type.REG, - TransportRequestOptions.Type.STATE)), 1); + TransportRequestOptions.Type.STATE))), 1); private final List handles; private final int numConnections; @@ -75,7 +75,7 @@ public final class ConnectionProfile { } } addedTypes.addAll(Arrays.asList(types)); - handles.add(new ConnectionTypeHandle(offset, numConnections, types)); + handles.add(new ConnectionTypeHandle(offset, numConnections, EnumSet.copyOf(Arrays.asList(types)))); offset += numConnections; } @@ -100,6 +100,22 @@ public final class ConnectionProfile { return numConnections; } + /** + * Returns the number of connections per type for this profile. This might return a count that is shared with other types such + * that the sum of all connections per type might be higher than {@link #getNumConnections()}. For instance if + * {@link org.elasticsearch.transport.TransportRequestOptions.Type#BULK} shares connections with + * {@link org.elasticsearch.transport.TransportRequestOptions.Type#REG} they will return both the same number of connections from + * this method but the connections are not distinct. + */ + public int getNumConnectionsPerType(TransportRequestOptions.Type type) { + for (ConnectionTypeHandle handle : handles) { + if (handle.getTypes().contains(type)) { + return handle.length; + } + } + throw new AssertionError("no handle found for type: " + type); + } + /** * Returns the type handles for this connection profile */ @@ -113,10 +129,10 @@ public final class ConnectionProfile { static final class ConnectionTypeHandle { public final int length; public final int offset; - private final TransportRequestOptions.Type[] types; + private final Set types; private final AtomicInteger counter = new AtomicInteger(); - private ConnectionTypeHandle(int offset, int length, TransportRequestOptions.Type... types) { + private ConnectionTypeHandle(int offset, int length, Set types) { this.length = length; this.offset = offset; this.types = types; @@ -127,6 +143,9 @@ public final class ConnectionProfile { * fashion. */ T getChannel(T[] channels) { + if (length == 0) { + throw new IllegalStateException("can't select channel size is 0"); + } assert channels.length >= offset + length : "illegal size: " + channels.length + " expected >= " + (offset + length); return channels[offset + Math.floorMod(counter.incrementAndGet(), length)]; } @@ -134,7 +153,7 @@ public final class ConnectionProfile { /** * Returns all types for this handle */ - TransportRequestOptions.Type[] getTypes() { + Set getTypes() { return types; } } diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 11e8de3c3cc..3aa31f3c213 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -81,6 +81,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumMap; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -149,11 +150,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9); private static final int PING_DATA_SIZE = -1; - protected final int connectionsPerNodeRecovery; - protected final int connectionsPerNodeBulk; - protected final int connectionsPerNodeReg; - protected final int connectionsPerNodeState; - protected final int connectionsPerNodePing; protected final TimeValue connectTimeout; protected final boolean blockingClient; private final CircuitBreakerService circuitBreakerService; @@ -179,7 +175,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i protected final boolean compress; protected volatile BoundTransportAddress boundAddress; private final String transportName; - private final ConnectionProfile defaultConnectionProfile; + protected final ConnectionProfile defaultConnectionProfile; public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, @@ -195,20 +191,27 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i this.networkService = networkService; this.transportName = transportName; - this.connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings); - this.connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings); - this.connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings); - this.connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings); - this.connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings); + this.connectTimeout = TCP_CONNECT_TIMEOUT.get(settings); this.blockingClient = TCP_BLOCKING_CLIENT.get(settings); + defaultConnectionProfile = buildDefaultConnectionProfile(settings); + } + + static ConnectionProfile buildDefaultConnectionProfile(Settings settings) { + int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings); + int connectionsPerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings); + int connectionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings); + int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings); + int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings); ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK); builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING); - builder.addConnections(connectionsPerNodeRecovery, TransportRequestOptions.Type.RECOVERY); + // if we are not master eligible we don't need a dedicated channel to publish the state + builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE); + // if we are not a data-node we don't need any dedicated channels for recovery + builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY); builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG); - builder.addConnections(connectionsPerNodeState, TransportRequestOptions.Type.STATE); - defaultConnectionProfile = builder.build(); + return builder.build(); } @Override diff --git a/core/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java b/core/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java index 04973e70cb3..c63cc135a6f 100644 --- a/core/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java +++ b/core/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java @@ -21,6 +21,8 @@ package org.elasticsearch.transport; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; +import java.util.EnumSet; + public class ConnectionProfileTests extends ESTestCase { public void testBuildConnectionProfile() { @@ -45,7 +47,7 @@ public class ConnectionProfileTests extends ESTestCase { assertEquals(4, build.getHandles().size()); assertEquals(0, build.getHandles().get(0).offset); assertEquals(1, build.getHandles().get(0).length); - assertArrayEquals(new TransportRequestOptions.Type[] {TransportRequestOptions.Type.BULK}, build.getHandles().get(0).getTypes()); + assertEquals(EnumSet.of(TransportRequestOptions.Type.BULK), build.getHandles().get(0).getTypes()); Integer channel = build.getHandles().get(0).getChannel(array); for (int i = 0; i < numIters; i++) { assertEquals(0, channel.intValue()); @@ -53,7 +55,7 @@ public class ConnectionProfileTests extends ESTestCase { assertEquals(1, build.getHandles().get(1).offset); assertEquals(2, build.getHandles().get(1).length); - assertArrayEquals(new TransportRequestOptions.Type[] {TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY}, + assertEquals(EnumSet.of(TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY), build.getHandles().get(1).getTypes()); channel = build.getHandles().get(1).getChannel(array); for (int i = 0; i < numIters; i++) { @@ -62,7 +64,7 @@ public class ConnectionProfileTests extends ESTestCase { assertEquals(3, build.getHandles().get(2).offset); assertEquals(3, build.getHandles().get(2).length); - assertArrayEquals(new TransportRequestOptions.Type[] {TransportRequestOptions.Type.PING}, build.getHandles().get(2).getTypes()); + assertEquals(EnumSet.of(TransportRequestOptions.Type.PING), build.getHandles().get(2).getTypes()); channel = build.getHandles().get(2).getChannel(array); for (int i = 0; i < numIters; i++) { assertThat(channel, Matchers.anyOf(Matchers.is(3), Matchers.is(4), Matchers.is(5))); @@ -70,10 +72,29 @@ public class ConnectionProfileTests extends ESTestCase { assertEquals(6, build.getHandles().get(3).offset); assertEquals(4, build.getHandles().get(3).length); - assertArrayEquals(new TransportRequestOptions.Type[] {TransportRequestOptions.Type.REG}, build.getHandles().get(3).getTypes()); + assertEquals(EnumSet.of(TransportRequestOptions.Type.REG), build.getHandles().get(3).getTypes()); channel = build.getHandles().get(3).getChannel(array); for (int i = 0; i < numIters; i++) { assertThat(channel, Matchers.anyOf(Matchers.is(6), Matchers.is(7), Matchers.is(8), Matchers.is(9))); } + + assertEquals(3, build.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); + assertEquals(4, build.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); + assertEquals(2, build.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); + assertEquals(2, build.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); + assertEquals(1, build.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + } + + public void testNoChannels() { + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(1, TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.STATE, + TransportRequestOptions.Type.RECOVERY, + TransportRequestOptions.Type.REG); + builder.addConnections(0, TransportRequestOptions.Type.PING); + ConnectionProfile build = builder.build(); + Integer[] array = new Integer[]{Integer.valueOf(0)}; + assertEquals(Integer.valueOf(0), build.getHandles().get(0).getChannel(array)); + expectThrows(IllegalStateException.class, () -> build.getHandles().get(1).getChannel(array)); } } diff --git a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java index fafdd529ac7..3df135df236 100644 --- a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java @@ -238,4 +238,38 @@ public class TCPTransportTests extends ESTestCase { } } + public void testDefaultConnectionProfile() { + ConnectionProfile profile = TcpTransport.buildDefaultConnectionProfile(Settings.EMPTY); + assertEquals(13, profile.getNumConnections()); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); + assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); + assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); + assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + + profile = TcpTransport.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build()); + assertEquals(12, profile.getNumConnections()); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); + assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); + assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); + assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + + profile = TcpTransport.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build()); + assertEquals(11, profile.getNumConnections()); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); + assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); + assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + + profile = TcpTransport.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).put("node.master", false).build()); + assertEquals(10, profile.getNumConnections()); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); + assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); + assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + } + } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index f70d9ab6d0f..c3bbb2d4e1c 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -64,6 +64,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpTransport; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportServiceAdapter; import org.elasticsearch.transport.TransportSettings; @@ -269,8 +270,12 @@ public class Netty4Transport extends TcpTransport { logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], " + "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]", name, workerCount, settings.get("port"), settings.get("bind_host"), settings.get("publish_host"), compress, - connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, - connectionsPerNodePing, receivePredictorMin, receivePredictorMax); + connectTimeout, defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY), + defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK), + defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.REG), + defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE), + defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.PING), + receivePredictorMin, receivePredictorMax); } final ThreadFactory workerFactory = daemonThreadFactory(this.settings, TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, name);