From 4eb85bbbd6204a2b58d8370e046db7903f058996 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 30 Jul 2012 21:37:38 +0200 Subject: [PATCH] Transport/Http: Remove explicit setting of send/receive buffer, and improve netty receive buffer predictor, closes #2124. --- .../common/network/NetworkService.java | 5 ++--- .../http/netty/NettyHttpServerTransport.java | 12 ++++++++++++ .../transport/netty/NettyTransport.java | 13 +++++++++++++ .../benchmark/transport/TransportBenchmark.java | 2 +- 4 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/elasticsearch/common/network/NetworkService.java b/src/main/java/org/elasticsearch/common/network/NetworkService.java index 1a9c9005369..f07a5256a88 100644 --- a/src/main/java/org/elasticsearch/common/network/NetworkService.java +++ b/src/main/java/org/elasticsearch/common/network/NetworkService.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -58,8 +57,8 @@ public class NetworkService extends AbstractComponent { public static final String TCP_BLOCKING_CLIENT = "network.tcp.blocking_client"; public static final String TCP_CONNECT_TIMEOUT = "network.tcp.connect_timeout"; - public static final ByteSizeValue TCP_DEFAULT_SEND_BUFFER_SIZE = new ByteSizeValue(32, ByteSizeUnit.KB); - public static final ByteSizeValue TCP_DEFAULT_RECEIVE_BUFFER_SIZE = new ByteSizeValue(32, ByteSizeUnit.KB); + public static final ByteSizeValue TCP_DEFAULT_SEND_BUFFER_SIZE = null; + public static final ByteSizeValue TCP_DEFAULT_RECEIVE_BUFFER_SIZE = null; public static final TimeValue TCP_DEFAULT_CONNECT_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS); } diff --git a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java index de9fbfa1482..95f78374eeb 100644 --- a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java +++ b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java @@ -92,6 +92,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent 0) { serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes()); } + serverBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); + serverBootstrap.setOption("child.receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); if (reuseAddress != null) { serverBootstrap.setOption("reuseAddress", reuseAddress); serverBootstrap.setOption("child.reuseAddress", reuseAddress); diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 75c1632f054..5e28e09b231 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -108,6 +108,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem final ByteSizeValue tcpSendBufferSize; final ByteSizeValue tcpReceiveBufferSize; + final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory; final int connectionsPerNodeLow; final int connectionsPerNodeMed; @@ -175,6 +176,15 @@ public class NettyTransport extends AbstractLifecycleComponent implem this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null); this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1); + // See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one + ByteSizeValue receivePredictorMin = componentSettings.getAsBytesSize("receive_predictor_min", componentSettings.getAsBytesSize("receive_predictor_size", ByteSizeValue.parseBytesSizeValue("512k"))); + ByteSizeValue receivePredictorMax = componentSettings.getAsBytesSize("receive_predictor_max", componentSettings.getAsBytesSize("receive_predictor_size", ByteSizeValue.parseBytesSizeValue("512k"))); + if (receivePredictorMax.bytes() == receivePredictorMin.bytes()) { + receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory((int) receivePredictorMax.bytes()); + } else { + receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes()); + } + logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}]", workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeLow, connectionsPerNodeMed, connectionsPerNodeHigh); } @@ -240,6 +250,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) { clientBootstrap.setOption("receiveBufferSize", tcpReceiveBufferSize.bytes()); } + clientBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); if (reuseAddress != null) { clientBootstrap.setOption("reuseAddress", reuseAddress); } @@ -294,6 +305,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) { serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes()); } + serverBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); + serverBootstrap.setOption("child.receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); if (reuseAddress != null) { serverBootstrap.setOption("reuseAddress", reuseAddress); serverBootstrap.setOption("child.reuseAddress", reuseAddress); diff --git a/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java b/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java index fe90106cd57..8dc42ba5e29 100644 --- a/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java +++ b/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java @@ -59,7 +59,7 @@ public class TransportBenchmark { final String executor = ThreadPool.Names.GENERIC; final boolean waitForRequest = true; final ByteSizeValue payloadSize = new ByteSizeValue(100, ByteSizeUnit.BYTES); - final int NUMBER_OF_CLIENTS = 1; + final int NUMBER_OF_CLIENTS = 10; final int NUMBER_OF_ITERATIONS = 100000; final byte[] payload = new byte[(int) payloadSize.bytes()]; final AtomicLong idGenerator = new AtomicLong();