Transport/Http: Remove explicit setting of send/receive buffer, and improve netty receive buffer predictor, closes #2124.
This commit is contained in:
parent
408a74206f
commit
4eb85bbbd6
|
@ -23,7 +23,6 @@ import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
|
||||||
|
@ -58,8 +57,8 @@ public class NetworkService extends AbstractComponent {
|
||||||
public static final String TCP_BLOCKING_CLIENT = "network.tcp.blocking_client";
|
public static final String TCP_BLOCKING_CLIENT = "network.tcp.blocking_client";
|
||||||
public static final String TCP_CONNECT_TIMEOUT = "network.tcp.connect_timeout";
|
public static final String TCP_CONNECT_TIMEOUT = "network.tcp.connect_timeout";
|
||||||
|
|
||||||
public static final ByteSizeValue TCP_DEFAULT_SEND_BUFFER_SIZE = new ByteSizeValue(32, ByteSizeUnit.KB);
|
public static final ByteSizeValue TCP_DEFAULT_SEND_BUFFER_SIZE = null;
|
||||||
public static final ByteSizeValue TCP_DEFAULT_RECEIVE_BUFFER_SIZE = new ByteSizeValue(32, ByteSizeUnit.KB);
|
public static final ByteSizeValue TCP_DEFAULT_RECEIVE_BUFFER_SIZE = null;
|
||||||
public static final TimeValue TCP_DEFAULT_CONNECT_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS);
|
public static final TimeValue TCP_DEFAULT_CONNECT_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -92,6 +92,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
||||||
|
|
||||||
private final ByteSizeValue tcpSendBufferSize;
|
private final ByteSizeValue tcpSendBufferSize;
|
||||||
private final ByteSizeValue tcpReceiveBufferSize;
|
private final ByteSizeValue tcpReceiveBufferSize;
|
||||||
|
private final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory;
|
||||||
|
|
||||||
final ByteSizeValue maxCumulationBufferCapacity;
|
final ByteSizeValue maxCumulationBufferCapacity;
|
||||||
final int maxCompositeBufferComponents;
|
final int maxCompositeBufferComponents;
|
||||||
|
@ -130,6 +131,15 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
||||||
this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
|
this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
|
||||||
this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE));
|
this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE));
|
||||||
|
|
||||||
|
// See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one
|
||||||
|
ByteSizeValue receivePredictorMin = componentSettings.getAsBytesSize("receive_predictor_min", componentSettings.getAsBytesSize("receive_predictor_size", ByteSizeValue.parseBytesSizeValue("512k")));
|
||||||
|
ByteSizeValue receivePredictorMax = componentSettings.getAsBytesSize("receive_predictor_max", componentSettings.getAsBytesSize("receive_predictor_size", ByteSizeValue.parseBytesSizeValue("512k")));
|
||||||
|
if (receivePredictorMax.bytes() == receivePredictorMin.bytes()) {
|
||||||
|
receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory((int) receivePredictorMax.bytes());
|
||||||
|
} else {
|
||||||
|
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
|
||||||
|
}
|
||||||
|
|
||||||
this.compression = settings.getAsBoolean("http.compression", false);
|
this.compression = settings.getAsBoolean("http.compression", false);
|
||||||
this.compressionLevel = settings.getAsInt("http.compression_level", 6);
|
this.compressionLevel = settings.getAsInt("http.compression_level", 6);
|
||||||
|
|
||||||
|
@ -178,6 +188,8 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
||||||
if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
|
if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
|
||||||
serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes());
|
serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes());
|
||||||
}
|
}
|
||||||
|
serverBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
|
||||||
|
serverBootstrap.setOption("child.receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
|
||||||
if (reuseAddress != null) {
|
if (reuseAddress != null) {
|
||||||
serverBootstrap.setOption("reuseAddress", reuseAddress);
|
serverBootstrap.setOption("reuseAddress", reuseAddress);
|
||||||
serverBootstrap.setOption("child.reuseAddress", reuseAddress);
|
serverBootstrap.setOption("child.reuseAddress", reuseAddress);
|
||||||
|
|
|
@ -108,6 +108,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||||
|
|
||||||
final ByteSizeValue tcpSendBufferSize;
|
final ByteSizeValue tcpSendBufferSize;
|
||||||
final ByteSizeValue tcpReceiveBufferSize;
|
final ByteSizeValue tcpReceiveBufferSize;
|
||||||
|
final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory;
|
||||||
|
|
||||||
final int connectionsPerNodeLow;
|
final int connectionsPerNodeLow;
|
||||||
final int connectionsPerNodeMed;
|
final int connectionsPerNodeMed;
|
||||||
|
@ -175,6 +176,15 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||||
this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null);
|
this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null);
|
||||||
this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1);
|
this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1);
|
||||||
|
|
||||||
|
// See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one
|
||||||
|
ByteSizeValue receivePredictorMin = componentSettings.getAsBytesSize("receive_predictor_min", componentSettings.getAsBytesSize("receive_predictor_size", ByteSizeValue.parseBytesSizeValue("512k")));
|
||||||
|
ByteSizeValue receivePredictorMax = componentSettings.getAsBytesSize("receive_predictor_max", componentSettings.getAsBytesSize("receive_predictor_size", ByteSizeValue.parseBytesSizeValue("512k")));
|
||||||
|
if (receivePredictorMax.bytes() == receivePredictorMin.bytes()) {
|
||||||
|
receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory((int) receivePredictorMax.bytes());
|
||||||
|
} else {
|
||||||
|
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
|
||||||
|
}
|
||||||
|
|
||||||
logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}]",
|
logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}]",
|
||||||
workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeLow, connectionsPerNodeMed, connectionsPerNodeHigh);
|
workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeLow, connectionsPerNodeMed, connectionsPerNodeHigh);
|
||||||
}
|
}
|
||||||
|
@ -240,6 +250,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||||
if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
|
if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
|
||||||
clientBootstrap.setOption("receiveBufferSize", tcpReceiveBufferSize.bytes());
|
clientBootstrap.setOption("receiveBufferSize", tcpReceiveBufferSize.bytes());
|
||||||
}
|
}
|
||||||
|
clientBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
|
||||||
if (reuseAddress != null) {
|
if (reuseAddress != null) {
|
||||||
clientBootstrap.setOption("reuseAddress", reuseAddress);
|
clientBootstrap.setOption("reuseAddress", reuseAddress);
|
||||||
}
|
}
|
||||||
|
@ -294,6 +305,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||||
if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
|
if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
|
||||||
serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes());
|
serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes());
|
||||||
}
|
}
|
||||||
|
serverBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
|
||||||
|
serverBootstrap.setOption("child.receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
|
||||||
if (reuseAddress != null) {
|
if (reuseAddress != null) {
|
||||||
serverBootstrap.setOption("reuseAddress", reuseAddress);
|
serverBootstrap.setOption("reuseAddress", reuseAddress);
|
||||||
serverBootstrap.setOption("child.reuseAddress", reuseAddress);
|
serverBootstrap.setOption("child.reuseAddress", reuseAddress);
|
||||||
|
|
|
@ -59,7 +59,7 @@ public class TransportBenchmark {
|
||||||
final String executor = ThreadPool.Names.GENERIC;
|
final String executor = ThreadPool.Names.GENERIC;
|
||||||
final boolean waitForRequest = true;
|
final boolean waitForRequest = true;
|
||||||
final ByteSizeValue payloadSize = new ByteSizeValue(100, ByteSizeUnit.BYTES);
|
final ByteSizeValue payloadSize = new ByteSizeValue(100, ByteSizeUnit.BYTES);
|
||||||
final int NUMBER_OF_CLIENTS = 1;
|
final int NUMBER_OF_CLIENTS = 10;
|
||||||
final int NUMBER_OF_ITERATIONS = 100000;
|
final int NUMBER_OF_ITERATIONS = 100000;
|
||||||
final byte[] payload = new byte[(int) payloadSize.bytes()];
|
final byte[] payload = new byte[(int) payloadSize.bytes()];
|
||||||
final AtomicLong idGenerator = new AtomicLong();
|
final AtomicLong idGenerator = new AtomicLong();
|
||||||
|
|
Loading…
Reference in New Issue