Add per-socket keepalive options (#44055)
Uses JDK 11's per-socket configuration of TCP keepalive (supported on Linux and Mac), see https://bugs.openjdk.java.net/browse/JDK-8194298, and exposes these as transport settings. By default, these options are disabled for now (i.e. fall-back to OS behavior), but we would like to explore whether we can enable them by default, in particular to force keepalive configurations that are better tuned for running ES.
This commit is contained in:
parent
81ea08b8ca
commit
7aeb2fe73c
|
@ -85,6 +85,23 @@ example above:
|
|||
* `publish_host`: The host which is published in informational APIs
|
||||
* `tcp.no_delay`: Configures the `TCP_NO_DELAY` option for this socket
|
||||
* `tcp.keep_alive`: Configures the `SO_KEEPALIVE` option for this socket
|
||||
* `tcp.keep_idle`: Configures the `TCP_KEEPIDLE` option for this socket, which
|
||||
determines the time in seconds that a connection must be idle before
|
||||
starting to send TCP keepalive probes.
|
||||
Only applicable on Linux and Mac, and requires JDK 11 or newer.
|
||||
Defaults to -1, which does not set this option at the socket level, but
|
||||
uses default system configuration instead.
|
||||
* `tcp.keep_interval`: Configures the `TCP_KEEPINTVL` option for this socket,
|
||||
which determines the time in seconds between sending TCP keepalive probes.
|
||||
Only applicable on Linux and Mac, and requires JDK 11 or newer.
|
||||
Defaults to -1, which does not set this option at the socket level, but
|
||||
uses default system configuration instead.
|
||||
* `tcp.keep_count`: Configures the `TCP_KEEPCNT` option for this socket, which
|
||||
determines the number of unacknowledged TCP keepalive probes that may be
|
||||
sent on a connection before it is dropped.
|
||||
Only applicable on Linux and Mac, and requires JDK 11 or newer.
|
||||
Defaults to -1, which does not set this option at the socket level, but
|
||||
uses default system configuration instead.
|
||||
* `tcp.reuse_address`: Configures the `SO_REUSEADDR` option for this socket
|
||||
* `tcp.send_buffer_size`: Configures the send buffer size of the socket
|
||||
* `tcp.receive_buffer_size`: Configures the receive buffer size of the socket
|
||||
|
|
|
@ -250,9 +250,9 @@ public final class IOUtils {
|
|||
}
|
||||
|
||||
// TODO: replace with constants class if needed (cf. org.apache.lucene.util.Constants)
|
||||
private static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
|
||||
private static final boolean LINUX = System.getProperty("os.name").startsWith("Linux");
|
||||
private static final boolean MAC_OS_X = System.getProperty("os.name").startsWith("Mac OS X");
|
||||
public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
|
||||
public static final boolean LINUX = System.getProperty("os.name").startsWith("Linux");
|
||||
public static final boolean MAC_OS_X = System.getProperty("os.name").startsWith("Mac OS X");
|
||||
|
||||
/**
|
||||
* Ensure that any writes to the given file is written to the storage device that contains it. The {@code isDir} parameter specifies
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.elasticsearch.core.internal.net;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.SocketOption;
|
||||
|
||||
/**
|
||||
* Utilities for network-related methods.
|
||||
*/
|
||||
public class NetUtils {
|
||||
|
||||
/**
|
||||
* Returns the extended TCP_KEEPIDLE socket option, if available on this JDK
|
||||
*/
|
||||
public static SocketOption<Integer> getTcpKeepIdleSocketOptionOrNull() {
|
||||
return getExtendedSocketOptionOrNull("TCP_KEEPIDLE");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the extended TCP_KEEPINTERVAL socket option, if available on this JDK
|
||||
*/
|
||||
public static SocketOption<Integer> getTcpKeepIntervalSocketOptionOrNull() {
|
||||
return getExtendedSocketOptionOrNull("TCP_KEEPINTERVAL");
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the extended TCP_KEEPCOUNT socket option, if available on this JDK
|
||||
*/
|
||||
public static SocketOption<Integer> getTcpKeepCountSocketOptionOrNull() {
|
||||
return getExtendedSocketOptionOrNull("TCP_KEEPCOUNT");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> SocketOption<T> getExtendedSocketOptionOrNull(String fieldName) {
|
||||
try {
|
||||
final Class<?> extendedSocketOptionsClass = Class.forName("jdk.net.ExtendedSocketOptions");
|
||||
final Field field = extendedSocketOptionsClass.getField(fieldName);
|
||||
return (SocketOption<T>) field.get(null);
|
||||
} catch (Exception t) {
|
||||
// ignore
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.elasticsearch.core.internal.net;
|
||||
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
public class NetUtilsTests extends ESTestCase {
|
||||
|
||||
public void testExtendedSocketOptions() {
|
||||
assumeTrue("JDK possibly not supported", Constants.JVM_NAME.contains("HotSpot") || Constants.JVM_NAME.contains("OpenJDK"));
|
||||
assumeTrue("Platform possibly not supported", IOUtils.LINUX || IOUtils.MAC_OS_X);
|
||||
assertNotNull(NetUtils.getTcpKeepIdleSocketOptionOrNull());
|
||||
assertNotNull(NetUtils.getTcpKeepIntervalSocketOptionOrNull());
|
||||
assertNotNull(NetUtils.getTcpKeepCountSocketOptionOrNull());
|
||||
}
|
||||
}
|
|
@ -32,6 +32,9 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
|
|||
|
||||
private final boolean tcpNoDelay;
|
||||
private final boolean tcpKeepAlive;
|
||||
private final int tcpKeepIdle;
|
||||
private final int tcpKeepInterval;
|
||||
private final int tcpKeepCount;
|
||||
private final boolean tcpReuseAddress;
|
||||
private final int tcpSendBufferSize;
|
||||
private final int tcpReceiveBufferSize;
|
||||
|
@ -40,18 +43,23 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
|
|||
/**
|
||||
* This will create a {@link ChannelFactory}.
|
||||
*/
|
||||
protected ChannelFactory(boolean tcpNoDelay, boolean tcpKeepAlive, boolean tcpReuseAddress, int tcpSendBufferSize,
|
||||
int tcpReceiveBufferSize) {
|
||||
this(tcpNoDelay, tcpKeepAlive, tcpReuseAddress, tcpSendBufferSize, tcpReceiveBufferSize, new RawChannelFactory());
|
||||
protected ChannelFactory(boolean tcpNoDelay, boolean tcpKeepAlive, int tcpKeepIdle, int tcpKeepInterval, int tcpKeepCount,
|
||||
boolean tcpReuseAddress, int tcpSendBufferSize, int tcpReceiveBufferSize) {
|
||||
this(tcpNoDelay, tcpKeepAlive, tcpKeepIdle, tcpKeepInterval, tcpKeepCount, tcpReuseAddress, tcpSendBufferSize, tcpReceiveBufferSize,
|
||||
new RawChannelFactory());
|
||||
}
|
||||
|
||||
/**
|
||||
* This will create a {@link ChannelFactory} using the raw channel factory passed to the constructor.
|
||||
*/
|
||||
protected ChannelFactory(boolean tcpNoDelay, boolean tcpKeepAlive, boolean tcpReuseAddress, int tcpSendBufferSize,
|
||||
int tcpReceiveBufferSize, RawChannelFactory rawChannelFactory) {
|
||||
protected ChannelFactory(boolean tcpNoDelay, boolean tcpKeepAlive, int tcpKeepIdle, int tcpKeepInterval, int tcpKeepCount,
|
||||
boolean tcpReuseAddress, int tcpSendBufferSize, int tcpReceiveBufferSize,
|
||||
RawChannelFactory rawChannelFactory) {
|
||||
this.tcpNoDelay = tcpNoDelay;
|
||||
this.tcpKeepAlive = tcpKeepAlive;
|
||||
this.tcpKeepIdle = tcpKeepIdle;
|
||||
this.tcpKeepInterval = tcpKeepInterval;
|
||||
this.tcpKeepCount = tcpKeepCount;
|
||||
this.tcpReuseAddress = tcpReuseAddress;
|
||||
this.tcpSendBufferSize = tcpSendBufferSize;
|
||||
this.tcpReceiveBufferSize = tcpReceiveBufferSize;
|
||||
|
@ -182,8 +190,8 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
|
|||
}
|
||||
|
||||
private Config.Socket createSocketConfig(InetSocketAddress remoteAddress, boolean isAccepted) {
|
||||
return new Config.Socket(tcpNoDelay, tcpKeepAlive, tcpReuseAddress, tcpSendBufferSize, tcpReceiveBufferSize, remoteAddress,
|
||||
isAccepted);
|
||||
return new Config.Socket(tcpNoDelay, tcpKeepAlive, tcpKeepIdle, tcpKeepInterval, tcpKeepCount, tcpReuseAddress, tcpSendBufferSize,
|
||||
tcpReceiveBufferSize, remoteAddress, isAccepted);
|
||||
}
|
||||
|
||||
public static class RawChannelFactory {
|
||||
|
|
|
@ -37,16 +37,23 @@ public abstract class Config {
|
|||
|
||||
private final boolean tcpNoDelay;
|
||||
private final boolean tcpKeepAlive;
|
||||
private final int tcpKeepIdle;
|
||||
private final int tcpKeepInterval;
|
||||
private final int tcpKeepCount;
|
||||
private final int tcpSendBufferSize;
|
||||
private final int tcpReceiveBufferSize;
|
||||
private final InetSocketAddress remoteAddress;
|
||||
private final boolean isAccepted;
|
||||
|
||||
public Socket(boolean tcpNoDelay, boolean tcpKeepAlive, boolean tcpReuseAddress, int tcpSendBufferSize, int tcpReceiveBufferSize,
|
||||
public Socket(boolean tcpNoDelay, boolean tcpKeepAlive, int tcpKeepIdle, int tcpKeepInterval, int tcpKeepCount,
|
||||
boolean tcpReuseAddress, int tcpSendBufferSize, int tcpReceiveBufferSize,
|
||||
InetSocketAddress remoteAddress, boolean isAccepted) {
|
||||
super(tcpReuseAddress);
|
||||
this.tcpNoDelay = tcpNoDelay;
|
||||
this.tcpKeepAlive = tcpKeepAlive;
|
||||
this.tcpKeepIdle = tcpKeepIdle;
|
||||
this.tcpKeepInterval = tcpKeepInterval;
|
||||
this.tcpKeepCount = tcpKeepCount;
|
||||
this.tcpSendBufferSize = tcpSendBufferSize;
|
||||
this.tcpReceiveBufferSize = tcpReceiveBufferSize;
|
||||
this.remoteAddress = remoteAddress;
|
||||
|
@ -61,6 +68,18 @@ public abstract class Config {
|
|||
return tcpKeepAlive;
|
||||
}
|
||||
|
||||
public int tcpKeepIdle() {
|
||||
return tcpKeepIdle;
|
||||
}
|
||||
|
||||
public int tcpKeepInterval() {
|
||||
return tcpKeepInterval;
|
||||
}
|
||||
|
||||
public int tcpKeepCount() {
|
||||
return tcpKeepCount;
|
||||
}
|
||||
|
||||
public int tcpSendBufferSize() {
|
||||
return tcpSendBufferSize;
|
||||
}
|
||||
|
|
|
@ -20,12 +20,14 @@
|
|||
package org.elasticsearch.nio;
|
||||
|
||||
import org.elasticsearch.common.concurrent.CompletableContext;
|
||||
import org.elasticsearch.core.internal.net.NetUtils;
|
||||
import org.elasticsearch.nio.utils.ByteBufferUtils;
|
||||
import org.elasticsearch.nio.utils.ExceptionsHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketOption;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
@ -34,6 +36,7 @@ import java.security.PrivilegedActionException;
|
|||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
@ -326,6 +329,27 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
|
|||
// we ensure that it is properly set before any bind attempt.
|
||||
socket.setReuseAddress(socketConfig.tcpReuseAddress());
|
||||
socket.setKeepAlive(socketConfig.tcpKeepAlive());
|
||||
if (socketConfig.tcpKeepAlive()) {
|
||||
final Set<SocketOption<?>> supportedOptions = socket.getChannel().supportedOptions();
|
||||
if (socketConfig.tcpKeepIdle() >= 0) {
|
||||
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
|
||||
if (keepIdleOption != null && supportedOptions.contains(keepIdleOption)) {
|
||||
socket.getChannel().setOption(keepIdleOption, socketConfig.tcpKeepIdle());
|
||||
}
|
||||
}
|
||||
if (socketConfig.tcpKeepInterval() >= 0) {
|
||||
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
|
||||
if (keepIntervalOption != null && supportedOptions.contains(keepIntervalOption)) {
|
||||
socket.getChannel().setOption(keepIntervalOption, socketConfig.tcpKeepInterval());
|
||||
}
|
||||
}
|
||||
if (socketConfig.tcpKeepCount() >= 0) {
|
||||
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
|
||||
if (keepCountOption != null && supportedOptions.contains(keepCountOption)) {
|
||||
socket.getChannel().setOption(keepCountOption, socketConfig.tcpKeepCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
socket.setTcpNoDelay(socketConfig.tcpNoDelay());
|
||||
int tcpSendBufferSize = socketConfig.tcpSendBufferSize();
|
||||
if (tcpSendBufferSize > 0) {
|
||||
|
|
|
@ -128,7 +128,7 @@ public class ChannelFactoryTests extends ESTestCase {
|
|||
private static class TestChannelFactory extends ChannelFactory<NioServerSocketChannel, NioSocketChannel> {
|
||||
|
||||
TestChannelFactory(RawChannelFactory rawChannelFactory) {
|
||||
super(randomBoolean(), randomBoolean(), randomBoolean(), -1, -1, rawChannelFactory);
|
||||
super(randomBoolean(), randomBoolean(), -1, -1, -1, randomBoolean(), -1, -1, rawChannelFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -274,7 +274,7 @@ public class EventHandlerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private static Config.Socket getSocketConfig() {
|
||||
return new Config.Socket(randomBoolean(), randomBoolean(), randomBoolean(), -1, -1, mock(InetSocketAddress.class),
|
||||
return new Config.Socket(randomBoolean(), randomBoolean(), -1, -1, -1, randomBoolean(), -1, -1, mock(InetSocketAddress.class),
|
||||
randomBoolean());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -116,10 +116,14 @@ public class SocketChannelContextTests extends ESTestCase {
|
|||
Config.Socket config;
|
||||
boolean tcpNoDelay = randomBoolean();
|
||||
boolean tcpKeepAlive = randomBoolean();
|
||||
int tcpKeepIdle = randomIntBetween(1, 1000);
|
||||
int tcpKeepInterval = randomIntBetween(1, 1000);
|
||||
int tcpKeepCount = randomIntBetween(1, 1000);
|
||||
boolean tcpReuseAddress = randomBoolean();
|
||||
int tcpSendBufferSize = randomIntBetween(1000, 2000);
|
||||
int tcpReceiveBufferSize = randomIntBetween(1000, 2000);
|
||||
config = new Config.Socket(tcpNoDelay, tcpKeepAlive, tcpReuseAddress, tcpSendBufferSize, tcpReceiveBufferSize, address, isAccepted);
|
||||
config = new Config.Socket(tcpNoDelay, tcpKeepAlive, tcpKeepIdle, tcpKeepInterval, tcpKeepCount, tcpReuseAddress, tcpSendBufferSize,
|
||||
tcpReceiveBufferSize, address, isAccepted);
|
||||
InboundChannelBuffer buffer = InboundChannelBuffer.allocatingInstance();
|
||||
TestSocketChannelContext context = new TestSocketChannelContext(channel, selector, exceptionHandler, handler, buffer, config);
|
||||
context.register();
|
||||
|
@ -181,10 +185,14 @@ public class SocketChannelContextTests extends ESTestCase {
|
|||
Config.Socket config;
|
||||
boolean tcpNoDelay = randomBoolean();
|
||||
boolean tcpKeepAlive = randomBoolean();
|
||||
int tcpKeepIdle = randomIntBetween(1, 1000);
|
||||
int tcpKeepInterval = randomIntBetween(1, 1000);
|
||||
int tcpKeepCount = randomIntBetween(1, 1000);
|
||||
boolean tcpReuseAddress = randomBoolean();
|
||||
int tcpSendBufferSize = randomIntBetween(1000, 2000);
|
||||
int tcpReceiveBufferSize = randomIntBetween(1000, 2000);
|
||||
config = new Config.Socket(tcpNoDelay, tcpKeepAlive, tcpReuseAddress, tcpSendBufferSize, tcpReceiveBufferSize, address, false);
|
||||
config = new Config.Socket(tcpNoDelay, tcpKeepAlive, tcpKeepIdle, tcpKeepInterval, tcpKeepCount, tcpReuseAddress, tcpSendBufferSize,
|
||||
tcpReceiveBufferSize, address, false);
|
||||
InboundChannelBuffer buffer = InboundChannelBuffer.allocatingInstance();
|
||||
TestSocketChannelContext context = new TestSocketChannelContext(channel, selector, exceptionHandler, handler, buffer, config);
|
||||
doThrow(new SocketException()).doNothing().when(rawSocket).setReuseAddress(tcpReuseAddress);
|
||||
|
@ -438,7 +446,7 @@ public class SocketChannelContextTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private static Config.Socket getSocketConfig() {
|
||||
return new Config.Socket(randomBoolean(), randomBoolean(), randomBoolean(), -1, -1, mock(InetSocketAddress.class),
|
||||
return new Config.Socket(randomBoolean(), randomBoolean(), -1, -1, -1, randomBoolean(), -1, -1, mock(InetSocketAddress.class),
|
||||
randomBoolean());
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import io.netty.channel.ChannelOption;
|
|||
import io.netty.channel.FixedRecvByteBufAllocator;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioChannelOption;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import io.netty.handler.codec.http.HttpContentCompressor;
|
||||
|
@ -52,6 +53,8 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.core.internal.net.NetUtils;
|
||||
import org.elasticsearch.http.AbstractHttpServerTransport;
|
||||
import org.elasticsearch.http.HttpChannel;
|
||||
import org.elasticsearch.http.HttpHandlingSettings;
|
||||
|
@ -62,6 +65,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.transport.netty4.Netty4Utils;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketOption;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
|
||||
|
@ -71,6 +75,9 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEAD
|
|||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_ALIVE;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_COUNT;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_IDLE;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_INTERVAL;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_NO_DELAY;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS;
|
||||
|
@ -184,6 +191,31 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
|||
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
|
||||
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));
|
||||
|
||||
if (SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)) {
|
||||
// Netty logs a warning if it can't set the option, so try this only on supported platforms
|
||||
if (IOUtils.LINUX || IOUtils.MAC_OS_X) {
|
||||
if (SETTING_HTTP_TCP_KEEP_IDLE.get(settings) >= 0) {
|
||||
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
|
||||
if (keepIdleOption != null) {
|
||||
serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), SETTING_HTTP_TCP_KEEP_IDLE.get(settings));
|
||||
}
|
||||
}
|
||||
if (SETTING_HTTP_TCP_KEEP_INTERVAL.get(settings) >= 0) {
|
||||
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
|
||||
if (keepIntervalOption != null) {
|
||||
serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption),
|
||||
SETTING_HTTP_TCP_KEEP_INTERVAL.get(settings));
|
||||
}
|
||||
}
|
||||
if (SETTING_HTTP_TCP_KEEP_COUNT.get(settings) >= 0) {
|
||||
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
|
||||
if (keepCountOption != null) {
|
||||
serverBootstrap.childOption(NioChannelOption.of(keepCountOption), SETTING_HTTP_TCP_KEEP_COUNT.get(settings));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
|
||||
if (tcpSendBufferSize.getBytes() > 0) {
|
||||
serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
|
||||
|
|
|
@ -32,6 +32,7 @@ import io.netty.channel.ChannelOption;
|
|||
import io.netty.channel.FixedRecvByteBufAllocator;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioChannelOption;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.util.AttributeKey;
|
||||
|
@ -53,6 +54,8 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.core.internal.net.NetUtils;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
|
@ -60,6 +63,7 @@ import org.elasticsearch.transport.TransportSettings;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketOption;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -148,6 +152,29 @@ public class Netty4Transport extends TcpTransport {
|
|||
|
||||
bootstrap.option(ChannelOption.TCP_NODELAY, TransportSettings.TCP_NO_DELAY.get(settings));
|
||||
bootstrap.option(ChannelOption.SO_KEEPALIVE, TransportSettings.TCP_KEEP_ALIVE.get(settings));
|
||||
if (TransportSettings.TCP_KEEP_ALIVE.get(settings)) {
|
||||
// Netty logs a warning if it can't set the option, so try this only on supported platforms
|
||||
if (IOUtils.LINUX || IOUtils.MAC_OS_X) {
|
||||
if (TransportSettings.TCP_KEEP_IDLE.get(settings) >= 0) {
|
||||
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
|
||||
if (keepIdleOption != null) {
|
||||
bootstrap.option(NioChannelOption.of(keepIdleOption), TransportSettings.TCP_KEEP_IDLE.get(settings));
|
||||
}
|
||||
}
|
||||
if (TransportSettings.TCP_KEEP_INTERVAL.get(settings) >= 0) {
|
||||
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
|
||||
if (keepIntervalOption != null) {
|
||||
bootstrap.option(NioChannelOption.of(keepIntervalOption), TransportSettings.TCP_KEEP_INTERVAL.get(settings));
|
||||
}
|
||||
}
|
||||
if (TransportSettings.TCP_KEEP_COUNT.get(settings) >= 0) {
|
||||
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
|
||||
if (keepCountOption != null) {
|
||||
bootstrap.option(NioChannelOption.of(keepCountOption), TransportSettings.TCP_KEEP_COUNT.get(settings));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final ByteSizeValue tcpSendBufferSize = TransportSettings.TCP_SEND_BUFFER_SIZE.get(settings);
|
||||
if (tcpSendBufferSize.getBytes() > 0) {
|
||||
|
@ -185,6 +212,30 @@ public class Netty4Transport extends TcpTransport {
|
|||
|
||||
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay);
|
||||
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive);
|
||||
if (profileSettings.tcpKeepAlive) {
|
||||
// Netty logs a warning if it can't set the option, so try this only on supported platforms
|
||||
if (IOUtils.LINUX || IOUtils.MAC_OS_X) {
|
||||
if (profileSettings.tcpKeepIdle >= 0) {
|
||||
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
|
||||
if (keepIdleOption != null) {
|
||||
serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), profileSettings.tcpKeepIdle);
|
||||
}
|
||||
}
|
||||
if (profileSettings.tcpKeepInterval >= 0) {
|
||||
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
|
||||
if (keepIntervalOption != null) {
|
||||
serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption), profileSettings.tcpKeepInterval);
|
||||
}
|
||||
|
||||
}
|
||||
if (profileSettings.tcpKeepCount >= 0) {
|
||||
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
|
||||
if (keepCountOption != null) {
|
||||
serverBootstrap.childOption(NioChannelOption.of(keepCountOption), profileSettings.tcpKeepCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (profileSettings.sendBufferSize.getBytes() != -1) {
|
||||
serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(profileSettings.sendBufferSize.getBytes()));
|
||||
|
|
|
@ -56,6 +56,9 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUN
|
|||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_ALIVE;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_COUNT;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_IDLE;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_INTERVAL;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_NO_DELAY;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE;
|
||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS;
|
||||
|
@ -70,6 +73,9 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
|
|||
|
||||
protected final boolean tcpNoDelay;
|
||||
protected final boolean tcpKeepAlive;
|
||||
protected final int tcpKeepIdle;
|
||||
protected final int tcpKeepInterval;
|
||||
protected final int tcpKeepCount;
|
||||
protected final boolean reuseAddress;
|
||||
protected final int tcpSendBufferSize;
|
||||
protected final int tcpReceiveBufferSize;
|
||||
|
@ -91,6 +97,9 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
|
|||
|
||||
this.tcpNoDelay = SETTING_HTTP_TCP_NO_DELAY.get(settings);
|
||||
this.tcpKeepAlive = SETTING_HTTP_TCP_KEEP_ALIVE.get(settings);
|
||||
this.tcpKeepIdle = SETTING_HTTP_TCP_KEEP_IDLE.get(settings);
|
||||
this.tcpKeepInterval = SETTING_HTTP_TCP_KEEP_INTERVAL.get(settings);
|
||||
this.tcpKeepCount = SETTING_HTTP_TCP_KEEP_COUNT.get(settings);
|
||||
this.reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
|
||||
this.tcpSendBufferSize = Math.toIntExact(SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings).getBytes());
|
||||
this.tcpReceiveBufferSize = Math.toIntExact(SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings).getBytes());
|
||||
|
@ -151,7 +160,8 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
|
|||
private class HttpChannelFactory extends ChannelFactory<NioHttpServerChannel, NioHttpChannel> {
|
||||
|
||||
private HttpChannelFactory() {
|
||||
super(tcpNoDelay, tcpKeepAlive, reuseAddress, tcpSendBufferSize, tcpReceiveBufferSize);
|
||||
super(tcpNoDelay, tcpKeepAlive, tcpKeepIdle, tcpKeepInterval, tcpKeepCount, reuseAddress, tcpSendBufferSize,
|
||||
tcpReceiveBufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -142,8 +142,9 @@ public class NioTransport extends TcpTransport {
|
|||
protected abstract class TcpChannelFactory extends ChannelFactory<NioTcpServerChannel, NioTcpChannel> {
|
||||
|
||||
protected TcpChannelFactory(ProfileSettings profileSettings) {
|
||||
super(profileSettings.tcpNoDelay, profileSettings.tcpKeepAlive, profileSettings.reuseAddress,
|
||||
Math.toIntExact(profileSettings.sendBufferSize.getBytes()), Math.toIntExact(profileSettings.receiveBufferSize.getBytes()));
|
||||
super(profileSettings.tcpNoDelay, profileSettings.tcpKeepAlive, profileSettings.tcpKeepIdle, profileSettings.tcpKeepInterval,
|
||||
profileSettings.tcpKeepCount, profileSettings.reuseAddress, Math.toIntExact(profileSettings.sendBufferSize.getBytes()),
|
||||
Math.toIntExact(profileSettings.receiveBufferSize.getBytes()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -181,6 +181,9 @@ class NioHttpClient implements Closeable {
|
|||
private ClientChannelFactory(CountDownLatch latch, Collection<FullHttpResponse> content) {
|
||||
super(NetworkService.TCP_NO_DELAY.get(Settings.EMPTY),
|
||||
NetworkService.TCP_KEEP_ALIVE.get(Settings.EMPTY),
|
||||
NetworkService.TCP_KEEP_IDLE.get(Settings.EMPTY),
|
||||
NetworkService.TCP_KEEP_INTERVAL.get(Settings.EMPTY),
|
||||
NetworkService.TCP_KEEP_COUNT.get(Settings.EMPTY),
|
||||
NetworkService.TCP_REUSE_ADDRESS.get(Settings.EMPTY),
|
||||
Math.toIntExact(NetworkService.TCP_SEND_BUFFER_SIZE.get(Settings.EMPTY).getBytes()),
|
||||
Math.toIntExact(NetworkService.TCP_RECEIVE_BUFFER_SIZE.get(Settings.EMPTY).getBytes()));
|
||||
|
|
|
@ -58,7 +58,7 @@ public class NioGroupFactoryTests extends ESTestCase {
|
|||
private static class BindingFactory extends ChannelFactory<NioServerSocketChannel, NioSocketChannel> {
|
||||
|
||||
private BindingFactory() {
|
||||
super(false, false, false, -1, -1);
|
||||
super(false, false, -1, -1, -1, false, -1, -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -52,6 +52,12 @@ public final class NetworkService {
|
|||
Setting.boolSetting("network.tcp.no_delay", true, Property.NodeScope);
|
||||
public static final Setting<Boolean> TCP_KEEP_ALIVE =
|
||||
Setting.boolSetting("network.tcp.keep_alive", true, Property.NodeScope);
|
||||
public static final Setting<Integer> TCP_KEEP_IDLE =
|
||||
Setting.intSetting("network.tcp.keep_idle", -1, -1, Property.NodeScope);
|
||||
public static final Setting<Integer> TCP_KEEP_INTERVAL =
|
||||
Setting.intSetting("network.tcp.keep_interval", -1, -1, Property.NodeScope);
|
||||
public static final Setting<Integer> TCP_KEEP_COUNT =
|
||||
Setting.intSetting("network.tcp.keep_count", -1, -1, Property.NodeScope);
|
||||
public static final Setting<Boolean> TCP_REUSE_ADDRESS =
|
||||
Setting.boolSetting("network.tcp.reuse_address", NetworkUtils.defaultReuseAddress(), Property.NodeScope);
|
||||
public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE =
|
||||
|
|
|
@ -275,6 +275,9 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
HttpTransportSettings.OLD_SETTING_HTTP_TCP_NO_DELAY,
|
||||
HttpTransportSettings.SETTING_HTTP_TCP_NO_DELAY,
|
||||
HttpTransportSettings.SETTING_HTTP_TCP_KEEP_ALIVE,
|
||||
HttpTransportSettings.SETTING_HTTP_TCP_KEEP_IDLE,
|
||||
HttpTransportSettings.SETTING_HTTP_TCP_KEEP_INTERVAL,
|
||||
HttpTransportSettings.SETTING_HTTP_TCP_KEEP_COUNT,
|
||||
HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS,
|
||||
HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE,
|
||||
HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE,
|
||||
|
@ -339,6 +342,12 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
TransportSettings.TCP_KEEP_ALIVE,
|
||||
TransportSettings.OLD_TCP_KEEP_ALIVE_PROFILE,
|
||||
TransportSettings.TCP_KEEP_ALIVE_PROFILE,
|
||||
TransportSettings.TCP_KEEP_IDLE,
|
||||
TransportSettings.TCP_KEEP_IDLE_PROFILE,
|
||||
TransportSettings.TCP_KEEP_INTERVAL,
|
||||
TransportSettings.TCP_KEEP_INTERVAL_PROFILE,
|
||||
TransportSettings.TCP_KEEP_COUNT,
|
||||
TransportSettings.TCP_KEEP_COUNT_PROFILE,
|
||||
TransportSettings.TCP_REUSE_ADDRESS,
|
||||
TransportSettings.OLD_TCP_REUSE_ADDRESS_PROFILE,
|
||||
TransportSettings.TCP_REUSE_ADDRESS_PROFILE,
|
||||
|
@ -361,6 +370,9 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
NetworkService.GLOBAL_NETWORK_PUBLISH_HOST_SETTING,
|
||||
NetworkService.TCP_NO_DELAY,
|
||||
NetworkService.TCP_KEEP_ALIVE,
|
||||
NetworkService.TCP_KEEP_IDLE,
|
||||
NetworkService.TCP_KEEP_INTERVAL,
|
||||
NetworkService.TCP_KEEP_COUNT,
|
||||
NetworkService.TCP_REUSE_ADDRESS,
|
||||
NetworkService.TCP_SEND_BUFFER_SIZE,
|
||||
NetworkService.TCP_RECEIVE_BUFFER_SIZE,
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.util.function.Function;
|
|||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static org.elasticsearch.common.settings.Setting.boolSetting;
|
||||
import static org.elasticsearch.common.settings.Setting.intSetting;
|
||||
import static org.elasticsearch.common.settings.Setting.listSetting;
|
||||
|
||||
public final class HttpTransportSettings {
|
||||
|
@ -42,7 +43,7 @@ public final class HttpTransportSettings {
|
|||
public static final Setting<String> SETTING_CORS_ALLOW_ORIGIN =
|
||||
new Setting<>("http.cors.allow-origin", "", (value) -> value, Property.NodeScope);
|
||||
public static final Setting<Integer> SETTING_CORS_MAX_AGE =
|
||||
Setting.intSetting("http.cors.max-age", 1728000, Property.NodeScope);
|
||||
intSetting("http.cors.max-age", 1728000, Property.NodeScope);
|
||||
public static final Setting<String> SETTING_CORS_ALLOW_METHODS =
|
||||
new Setting<>("http.cors.allow-methods", "OPTIONS,HEAD,GET,POST,PUT,DELETE", (value) -> value, Property.NodeScope);
|
||||
public static final Setting<String> SETTING_CORS_ALLOW_HEADERS =
|
||||
|
@ -50,13 +51,13 @@ public final class HttpTransportSettings {
|
|||
public static final Setting<Boolean> SETTING_CORS_ALLOW_CREDENTIALS =
|
||||
Setting.boolSetting("http.cors.allow-credentials", false, Property.NodeScope);
|
||||
public static final Setting<Integer> SETTING_PIPELINING_MAX_EVENTS =
|
||||
Setting.intSetting("http.pipelining.max_events", 10000, Property.NodeScope);
|
||||
intSetting("http.pipelining.max_events", 10000, Property.NodeScope);
|
||||
public static final Setting<Boolean> SETTING_HTTP_COMPRESSION =
|
||||
Setting.boolSetting("http.compression", true, Property.NodeScope);
|
||||
// we intentionally use a different compression level as Netty here as our benchmarks have shown that a compression level of 3 is the
|
||||
// best compromise between reduction in network traffic and added latency. For more details please check #7309.
|
||||
public static final Setting<Integer> SETTING_HTTP_COMPRESSION_LEVEL =
|
||||
Setting.intSetting("http.compression_level", 3, Property.NodeScope);
|
||||
intSetting("http.compression_level", 3, Property.NodeScope);
|
||||
public static final Setting<List<String>> SETTING_HTTP_HOST =
|
||||
listSetting("http.host", emptyList(), Function.identity(), Property.NodeScope);
|
||||
public static final Setting<List<String>> SETTING_HTTP_PUBLISH_HOST =
|
||||
|
@ -67,7 +68,7 @@ public final class HttpTransportSettings {
|
|||
public static final Setting<PortsRange> SETTING_HTTP_PORT =
|
||||
new Setting<>("http.port", "9200-9300", PortsRange::new, Property.NodeScope);
|
||||
public static final Setting<Integer> SETTING_HTTP_PUBLISH_PORT =
|
||||
Setting.intSetting("http.publish_port", -1, -1, Property.NodeScope);
|
||||
intSetting("http.publish_port", -1, -1, Property.NodeScope);
|
||||
public static final Setting<Boolean> SETTING_HTTP_DETAILED_ERRORS_ENABLED =
|
||||
Setting.boolSetting("http.detailed_errors.enabled", true, Property.NodeScope);
|
||||
public static final Setting<Boolean> SETTING_HTTP_CONTENT_TYPE_REQUIRED =
|
||||
|
@ -91,7 +92,7 @@ public final class HttpTransportSettings {
|
|||
public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_HEADER_SIZE =
|
||||
Setting.byteSizeSetting("http.max_header_size", new ByteSizeValue(8, ByteSizeUnit.KB), Property.NodeScope);
|
||||
public static final Setting<Integer> SETTING_HTTP_MAX_WARNING_HEADER_COUNT =
|
||||
Setting.intSetting("http.max_warning_header_count", -1, -1, Property.NodeScope);
|
||||
intSetting("http.max_warning_header_count", -1, -1, Property.NodeScope);
|
||||
public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_WARNING_HEADER_SIZE =
|
||||
Setting.byteSizeSetting("http.max_warning_header_size", new ByteSizeValue(-1), Property.NodeScope);
|
||||
public static final Setting<ByteSizeValue> SETTING_HTTP_MAX_INITIAL_LINE_LENGTH =
|
||||
|
@ -113,6 +114,12 @@ public final class HttpTransportSettings {
|
|||
boolSetting("http.tcp.no_delay", OLD_SETTING_HTTP_TCP_NO_DELAY, Setting.Property.NodeScope);
|
||||
public static final Setting<Boolean> SETTING_HTTP_TCP_KEEP_ALIVE =
|
||||
boolSetting("http.tcp.keep_alive", NetworkService.TCP_KEEP_ALIVE, Setting.Property.NodeScope);
|
||||
public static final Setting<Integer> SETTING_HTTP_TCP_KEEP_IDLE =
|
||||
intSetting("http.tcp.keep_idle", NetworkService.TCP_KEEP_IDLE, -1, Setting.Property.NodeScope);
|
||||
public static final Setting<Integer> SETTING_HTTP_TCP_KEEP_INTERVAL =
|
||||
intSetting("http.tcp.keep_interval", NetworkService.TCP_KEEP_INTERVAL, -1, Setting.Property.NodeScope);
|
||||
public static final Setting<Integer> SETTING_HTTP_TCP_KEEP_COUNT =
|
||||
intSetting("http.tcp.keep_count", NetworkService.TCP_KEEP_COUNT, -1, Setting.Property.NodeScope);
|
||||
public static final Setting<Boolean> SETTING_HTTP_TCP_REUSE_ADDRESS =
|
||||
boolSetting("http.tcp.reuse_address", NetworkService.TCP_REUSE_ADDRESS, Setting.Property.NodeScope);
|
||||
public static final Setting<ByteSizeValue> SETTING_HTTP_TCP_SEND_BUFFER_SIZE =
|
||||
|
|
|
@ -871,6 +871,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|||
public final String profileName;
|
||||
public final boolean tcpNoDelay;
|
||||
public final boolean tcpKeepAlive;
|
||||
public final int tcpKeepIdle;
|
||||
public final int tcpKeepInterval;
|
||||
public final int tcpKeepCount;
|
||||
public final boolean reuseAddress;
|
||||
public final ByteSizeValue sendBufferSize;
|
||||
public final ByteSizeValue receiveBufferSize;
|
||||
|
@ -884,6 +887,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|||
this.profileName = profileName;
|
||||
isDefaultProfile = TransportSettings.DEFAULT_PROFILE.equals(profileName);
|
||||
tcpKeepAlive = TransportSettings.TCP_KEEP_ALIVE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
|
||||
tcpKeepIdle = TransportSettings.TCP_KEEP_IDLE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
|
||||
tcpKeepInterval = TransportSettings.TCP_KEEP_INTERVAL_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
|
||||
tcpKeepCount = TransportSettings.TCP_KEEP_COUNT_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
|
||||
tcpNoDelay = TransportSettings.TCP_NO_DELAY_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
|
||||
reuseAddress = TransportSettings.TCP_REUSE_ADDRESS_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
|
||||
sendBufferSize = TransportSettings.TCP_SEND_BUFFER_SIZE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
|
||||
|
|
|
@ -100,6 +100,21 @@ public final class TransportSettings {
|
|||
key -> boolSetting(key,
|
||||
fallback(key, OLD_TCP_KEEP_ALIVE_PROFILE, "tcp\\.keep_alive$", "tcp_keep_alive"),
|
||||
Setting.Property.NodeScope));
|
||||
public static final Setting<Integer> TCP_KEEP_IDLE =
|
||||
intSetting("transport.tcp.keep_idle", NetworkService.TCP_KEEP_IDLE, -1, Setting.Property.NodeScope);
|
||||
public static final Setting.AffixSetting<Integer> TCP_KEEP_IDLE_PROFILE =
|
||||
affixKeySetting("transport.profiles.", "tcp.keep_idle",
|
||||
key -> intSetting(key, TCP_KEEP_IDLE, -1, Setting.Property.NodeScope));
|
||||
public static final Setting<Integer> TCP_KEEP_INTERVAL =
|
||||
intSetting("transport.tcp.keep_interval", NetworkService.TCP_KEEP_INTERVAL, -1, Setting.Property.NodeScope);
|
||||
public static final Setting.AffixSetting<Integer> TCP_KEEP_INTERVAL_PROFILE =
|
||||
affixKeySetting("transport.profiles.", "tcp.keep_interval",
|
||||
key -> intSetting(key, TCP_KEEP_INTERVAL, -1, Setting.Property.NodeScope));
|
||||
public static final Setting<Integer> TCP_KEEP_COUNT =
|
||||
intSetting("transport.tcp.keep_count", NetworkService.TCP_KEEP_COUNT, -1, Setting.Property.NodeScope);
|
||||
public static final Setting.AffixSetting<Integer> TCP_KEEP_COUNT_PROFILE =
|
||||
affixKeySetting("transport.profiles.", "tcp.keep_count",
|
||||
key -> intSetting(key, TCP_KEEP_COUNT, -1, Setting.Property.NodeScope));
|
||||
public static final Setting<Boolean> TCP_REUSE_ADDRESS =
|
||||
boolSetting("transport.tcp.reuse_address", NetworkService.TCP_REUSE_ADDRESS, Setting.Property.NodeScope);
|
||||
public static final Setting.AffixSetting<Boolean> OLD_TCP_REUSE_ADDRESS_PROFILE =
|
||||
|
|
|
@ -70,6 +70,14 @@ grant {
|
|||
// Allow host/ip name service lookups
|
||||
permission java.net.SocketPermission "*", "resolve";
|
||||
|
||||
// Allow reading and setting socket keepalive options
|
||||
permission jdk.net.NetworkPermission "getOption.TCP_KEEPIDLE";
|
||||
permission jdk.net.NetworkPermission "setOption.TCP_KEEPIDLE";
|
||||
permission jdk.net.NetworkPermission "getOption.TCP_KEEPINTERVAL";
|
||||
permission jdk.net.NetworkPermission "setOption.TCP_KEEPINTERVAL";
|
||||
permission jdk.net.NetworkPermission "getOption.TCP_KEEPCOUNT";
|
||||
permission jdk.net.NetworkPermission "setOption.TCP_KEEPCOUNT";
|
||||
|
||||
// Allow read access to all system properties
|
||||
permission java.util.PropertyPermission "*", "read";
|
||||
|
||||
|
|
|
@ -134,13 +134,25 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
super.setUp();
|
||||
threadPool = new TestThreadPool(getClass().getName());
|
||||
clusterSettingsA = new ClusterSettings(Settings.EMPTY, getSupportedSettings());
|
||||
Settings connectionSettings = Settings.builder()
|
||||
final Settings.Builder connectionSettingsBuilder = Settings.builder()
|
||||
.put(TransportSettings.CONNECTIONS_PER_NODE_RECOVERY.getKey(), 1)
|
||||
.put(TransportSettings.CONNECTIONS_PER_NODE_BULK.getKey(), 1)
|
||||
.put(TransportSettings.CONNECTIONS_PER_NODE_REG.getKey(), 2)
|
||||
.put(TransportSettings.CONNECTIONS_PER_NODE_STATE.getKey(), 1)
|
||||
.put(TransportSettings.CONNECTIONS_PER_NODE_PING.getKey(), 1)
|
||||
.build();
|
||||
.put(TransportSettings.CONNECTIONS_PER_NODE_PING.getKey(), 1);
|
||||
|
||||
connectionSettingsBuilder.put(TransportSettings.TCP_KEEP_ALIVE.getKey(), randomBoolean());
|
||||
if (randomBoolean()) {
|
||||
connectionSettingsBuilder.put(TransportSettings.TCP_KEEP_IDLE.getKey(), randomIntBetween(1, 1000));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
connectionSettingsBuilder.put(TransportSettings.TCP_KEEP_INTERVAL.getKey(), randomIntBetween(1, 1000));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
connectionSettingsBuilder.put(TransportSettings.TCP_KEEP_COUNT.getKey(), randomIntBetween(1, 10));
|
||||
}
|
||||
|
||||
final Settings connectionSettings = connectionSettingsBuilder.build();
|
||||
|
||||
serviceA = buildService("TS_A", version0, clusterSettingsA, connectionSettings); // this one supports dynamic tracer updates
|
||||
nodeA = serviceA.getLocalNode();
|
||||
|
@ -2550,6 +2562,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
Settings globalSettings = Settings.builder()
|
||||
.put("network.tcp.no_delay", enable)
|
||||
.put("network.tcp.keep_alive", enable)
|
||||
.put("network.tcp.keep_idle", "42")
|
||||
.put("network.tcp.keep_interval", "7")
|
||||
.put("network.tcp.keep_count", "13")
|
||||
.put("network.tcp.reuse_address", enable)
|
||||
.put("network.tcp.send_buffer_size", "43000b")
|
||||
.put("network.tcp.receive_buffer_size", "42000b")
|
||||
|
@ -2560,6 +2575,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
Settings globalSettings2 = Settings.builder()
|
||||
.put("network.tcp.no_delay", !enable)
|
||||
.put("network.tcp.keep_alive", !enable)
|
||||
.put("network.tcp.keep_idle", "43")
|
||||
.put("network.tcp.keep_interval", "8")
|
||||
.put("network.tcp.keep_count", "14")
|
||||
.put("network.tcp.reuse_address", !enable)
|
||||
.put("network.tcp.send_buffer_size", "4b")
|
||||
.put("network.tcp.receive_buffer_size", "3b")
|
||||
|
@ -2570,6 +2588,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
Settings transportSettings = Settings.builder()
|
||||
.put("transport.tcp.no_delay", enable)
|
||||
.put("transport.tcp.keep_alive", enable)
|
||||
.put("transport.tcp.keep_idle", "42")
|
||||
.put("transport.tcp.keep_interval", "7")
|
||||
.put("transport.tcp.keep_count", "13")
|
||||
.put("transport.tcp.reuse_address", enable)
|
||||
.put("transport.tcp.send_buffer_size", "43000b")
|
||||
.put("transport.tcp.receive_buffer_size", "42000b")
|
||||
|
@ -2582,6 +2603,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
Settings transportSettings2 = Settings.builder()
|
||||
.put("transport.tcp.no_delay", !enable)
|
||||
.put("transport.tcp.keep_alive", !enable)
|
||||
.put("transport.tcp.keep_idle", "43")
|
||||
.put("transport.tcp.keep_interval", "8")
|
||||
.put("transport.tcp.keep_count", "14")
|
||||
.put("transport.tcp.reuse_address", !enable)
|
||||
.put("transport.tcp.send_buffer_size", "5b")
|
||||
.put("transport.tcp.receive_buffer_size", "6b")
|
||||
|
@ -2593,6 +2617,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
Settings defaultProfileSettings = Settings.builder()
|
||||
.put("transport.profiles.default.tcp.no_delay", enable)
|
||||
.put("transport.profiles.default.tcp.keep_alive", enable)
|
||||
.put("transport.profiles.default.tcp.keep_idle", "42")
|
||||
.put("transport.profiles.default.tcp.keep_interval", "7")
|
||||
.put("transport.profiles.default.tcp.keep_count", "13")
|
||||
.put("transport.profiles.default.tcp.reuse_address", enable)
|
||||
.put("transport.profiles.default.tcp.send_buffer_size", "43000b")
|
||||
.put("transport.profiles.default.tcp.receive_buffer_size", "42000b")
|
||||
|
@ -2606,6 +2633,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
Settings profileSettings = Settings.builder()
|
||||
.put("transport.profiles.some_profile.tcp.no_delay", enable)
|
||||
.put("transport.profiles.some_profile.tcp.keep_alive", enable)
|
||||
.put("transport.profiles.some_profile.tcp.keep_idle", "42")
|
||||
.put("transport.profiles.some_profile.tcp.keep_interval", "7")
|
||||
.put("transport.profiles.some_profile.tcp.keep_count", "13")
|
||||
.put("transport.profiles.some_profile.tcp.reuse_address", enable)
|
||||
.put("transport.profiles.some_profile.tcp.send_buffer_size", "43000b")
|
||||
.put("transport.profiles.some_profile.tcp.receive_buffer_size", "42000b")
|
||||
|
@ -2627,6 +2657,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
|
||||
assertEquals(enable, settings.tcpNoDelay);
|
||||
assertEquals(enable, settings.tcpKeepAlive);
|
||||
assertEquals(42, settings.tcpKeepIdle);
|
||||
assertEquals(7, settings.tcpKeepInterval);
|
||||
assertEquals(13, settings.tcpKeepCount);
|
||||
assertEquals(enable, settings.reuseAddress);
|
||||
assertEquals(43000, settings.sendBufferSize.getBytes());
|
||||
assertEquals(42000, settings.receiveBufferSize.getBytes());
|
||||
|
|
|
@ -198,6 +198,9 @@ public class MockNioTransport extends TcpTransport {
|
|||
private MockTcpChannelFactory(boolean isClient, ProfileSettings profileSettings, String profileName) {
|
||||
super(profileSettings.tcpNoDelay,
|
||||
profileSettings.tcpKeepAlive,
|
||||
profileSettings.tcpKeepIdle,
|
||||
profileSettings.tcpKeepInterval,
|
||||
profileSettings.tcpKeepCount,
|
||||
profileSettings.reuseAddress,
|
||||
Math.toIntExact(profileSettings.sendBufferSize.getBytes()),
|
||||
Math.toIntExact(profileSettings.receiveBufferSize.getBytes()));
|
||||
|
|
|
@ -83,7 +83,8 @@ public class SecurityNioHttpServerTransport extends NioHttpServerTransport {
|
|||
class SecurityHttpChannelFactory extends ChannelFactory<NioHttpServerChannel, NioHttpChannel> {
|
||||
|
||||
private SecurityHttpChannelFactory() {
|
||||
super(tcpNoDelay, tcpKeepAlive, reuseAddress, tcpSendBufferSize, tcpReceiveBufferSize);
|
||||
super(tcpNoDelay, tcpKeepAlive, tcpKeepIdle, tcpKeepInterval, tcpKeepCount, reuseAddress, tcpSendBufferSize,
|
||||
tcpReceiveBufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -76,7 +76,8 @@ public class SSLChannelContextTests extends ESTestCase {
|
|||
outboundBuffer = new SSLOutboundBuffer((n) -> new Page(ByteBuffer.allocate(n), () -> {}));
|
||||
when(channel.getRawChannel()).thenReturn(rawChannel);
|
||||
exceptionHandler = mock(Consumer.class);
|
||||
socketConfig = new Config.Socket(randomBoolean(), randomBoolean(), randomBoolean(), -1, -1, mock(InetSocketAddress.class), false);
|
||||
socketConfig = new Config.Socket(randomBoolean(), randomBoolean(), -1, -1, -1, randomBoolean(), -1, -1,
|
||||
mock(InetSocketAddress.class), false);
|
||||
context = new SSLChannelContext(channel, selector, socketConfig, exceptionHandler, sslDriver, readWriteHandler, channelBuffer);
|
||||
context.setSelectionKey(mock(SelectionKey.class));
|
||||
|
||||
|
|
Loading…
Reference in New Issue