diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 90ac25ab3f3..3d119c80cae 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -103,6 +103,13 @@ public class NettyTransport extends AbstractLifecycleComponent implem NettyStaticSetup.setup(); } + public static final String WORKER_COUNT = "transport.netty.worker_count"; + public static final String CONNECTIONS_PER_NODE_RECOVERY = "transport.connections_per_node.recovery"; + public static final String CONNECTIONS_PER_NODE_BULK = "transport.connections_per_node.bulk"; + public static final String CONNECTIONS_PER_NODE_REG = "transport.connections_per_node.reg"; + public static final String CONNECTIONS_PER_NODE_STATE = "transport.connections_per_node.state"; + public static final String CONNECTIONS_PER_NODE_PING = "transport.connections_per_node.ping"; + private final NetworkService networkService; final Version version; @@ -182,7 +189,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem System.setProperty("org.jboss.netty.epollBugWorkaround", "true"); } - this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors(settings) * 2); + this.workerCount = settings.getAsInt(WORKER_COUNT, EsExecutors.boundedNumberOfProcessors(settings) * 2); this.bossCount = componentSettings.getAsInt("boss_count", 1); this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false))); this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false))); @@ -197,11 +204,11 @@ public class NettyTransport extends AbstractLifecycleComponent implem this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress())); this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE)); this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE)); - this.connectionsPerNodeRecovery = componentSettings.getAsInt("connections_per_node.recovery", settings.getAsInt("transport.connections_per_node.recovery", 2)); - this.connectionsPerNodeBulk = componentSettings.getAsInt("connections_per_node.bulk", settings.getAsInt("transport.connections_per_node.bulk", 3)); - this.connectionsPerNodeReg = componentSettings.getAsInt("connections_per_node.reg", settings.getAsInt("transport.connections_per_node.reg", 6)); - this.connectionsPerNodeState = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt("transport.connections_per_node.state", 1)); - this.connectionsPerNodePing = componentSettings.getAsInt("connections_per_node.ping", settings.getAsInt("transport.connections_per_node.ping", 1)); + this.connectionsPerNodeRecovery = componentSettings.getAsInt("connections_per_node.recovery", settings.getAsInt(CONNECTIONS_PER_NODE_RECOVERY, 2)); + this.connectionsPerNodeBulk = componentSettings.getAsInt("connections_per_node.bulk", settings.getAsInt(CONNECTIONS_PER_NODE_BULK, 3)); + this.connectionsPerNodeReg = componentSettings.getAsInt("connections_per_node.reg", settings.getAsInt(CONNECTIONS_PER_NODE_REG, 6)); + this.connectionsPerNodeState = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt(CONNECTIONS_PER_NODE_STATE, 1)); + this.connectionsPerNodePing = componentSettings.getAsInt("connections_per_node.ping", settings.getAsInt(CONNECTIONS_PER_NODE_PING, 1)); // we want to have at least 1 for reg/state/ping if (this.connectionsPerNodeReg == 0) { diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 6bd14bddb8e..2738c5bac4d 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -77,6 +77,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.netty.NettyTransport; import org.junit.Assert; import java.io.Closeable; @@ -330,6 +331,15 @@ public final class InternalTestCluster extends TestCluster { builder.put("indices.fielddata.cache.expire", TimeValue.timeValueMillis(1 + random.nextInt(10000))); } } + + // randomize netty settings + if (random.nextBoolean()) { + builder.put(NettyTransport.WORKER_COUNT, random.nextInt(3) + 1); + builder.put(NettyTransport.CONNECTIONS_PER_NODE_RECOVERY, random.nextInt(2) + 1); + builder.put(NettyTransport.CONNECTIONS_PER_NODE_BULK, random.nextInt(3) + 1); + builder.put(NettyTransport.CONNECTIONS_PER_NODE_REG, random.nextInt(6) + 1); + } + return builder.build(); }