Set specific keepalive options by default on supported platforms (#59278)
keepalives tell any intermediate devices that the connection remains alive, which helps with overzealous firewalls that are killing idle connections. keepalives are enabled by default in Elasticsearch, but use system defaults for their configuration, which often times do not have reasonable defaults (e.g. 7200s for TCP_KEEP_IDLE) in the context of distributed systems such as Elasticsearch. This PR sets the socket-level keep_alive options for network.tcp.{keep_idle,keep_interval} to 5 minutes on configurations that support it (>= Java 11 & (MacOS || Linux)) and where the system defaults are set to something higher than 5 minutes. This helps keep the connections alive while not interfering with system defaults or user-specified settings unless they are deemed to be set too high by providing better out-of-the-box defaults.
This commit is contained in:
parent
981e436d6c
commit
ffe114b890
|
@ -80,31 +80,30 @@ this node connects to other nodes in the cluster.
|
|||
The following parameters can be configured on each transport profile, as in the
|
||||
example above:
|
||||
|
||||
* `port`: The port to bind to
|
||||
* `bind_host`: The host to bind
|
||||
* `publish_host`: The host which is published in informational APIs
|
||||
* `tcp.no_delay`: Configures the `TCP_NO_DELAY` option for this socket
|
||||
* `tcp.keep_alive`: Configures the `SO_KEEPALIVE` option for this socket
|
||||
* `port`: The port to which to bind.
|
||||
* `bind_host`: The host to which to bind.
|
||||
* `publish_host`: The host which is published in informational APIs.
|
||||
* `tcp.no_delay`: Configures the `TCP_NO_DELAY` option for this socket.
|
||||
* `tcp.keep_alive`: Configures the `SO_KEEPALIVE` option for this socket, which
|
||||
determines whether it sends TCP keepalive probes.
|
||||
* `tcp.keep_idle`: Configures the `TCP_KEEPIDLE` option for this socket, which
|
||||
determines the time in seconds that a connection must be idle before
|
||||
starting to send TCP keepalive probes.
|
||||
Only applicable on Linux and Mac, and requires JDK 11 or newer.
|
||||
Defaults to -1, which does not set this option at the socket level, but
|
||||
uses default system configuration instead.
|
||||
starting to send TCP keepalive probes. Defaults to `-1` which means to use
|
||||
the smaller of 300 or the system default. May not be greater than 300. Only
|
||||
applicable on Linux and macOS, and requires Java 11 or newer.
|
||||
* `tcp.keep_interval`: Configures the `TCP_KEEPINTVL` option for this socket,
|
||||
which determines the time in seconds between sending TCP keepalive probes.
|
||||
Only applicable on Linux and Mac, and requires JDK 11 or newer.
|
||||
Defaults to -1, which does not set this option at the socket level, but
|
||||
uses default system configuration instead.
|
||||
Defaults to `-1` which means to use the smaller of 300 or the system
|
||||
default. May not be greater than 300. Only applicable on Linux and macOS,
|
||||
and requires Java 11 or newer.
|
||||
* `tcp.keep_count`: Configures the `TCP_KEEPCNT` option for this socket, which
|
||||
determines the number of unacknowledged TCP keepalive probes that may be
|
||||
sent on a connection before it is dropped.
|
||||
Only applicable on Linux and Mac, and requires JDK 11 or newer.
|
||||
Defaults to -1, which does not set this option at the socket level, but
|
||||
uses default system configuration instead.
|
||||
* `tcp.reuse_address`: Configures the `SO_REUSEADDR` option for this socket
|
||||
* `tcp.send_buffer_size`: Configures the send buffer size of the socket
|
||||
* `tcp.receive_buffer_size`: Configures the receive buffer size of the socket
|
||||
sent on a connection before it is dropped. Defaults to `-1` which means to
|
||||
use the system default. Only applicable on Linux and macOS, and requires
|
||||
Java 11 or newer.
|
||||
* `tcp.reuse_address`: Configures the `SO_REUSEADDR` option for this socket.
|
||||
* `tcp.send_buffer_size`: Configures the send buffer size of the socket.
|
||||
* `tcp.receive_buffer_size`: Configures the receive buffer size of the socket.
|
||||
|
||||
[[long-lived-connections]]
|
||||
===== Long-lived idle connections
|
||||
|
|
|
@ -19,8 +19,12 @@
|
|||
|
||||
package org.elasticsearch.core.internal.net;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.SocketOption;
|
||||
import java.net.StandardSocketOptions;
|
||||
import java.nio.channels.NetworkChannel;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* Utilities for network-related methods.
|
||||
|
@ -59,4 +63,47 @@ public class NetUtils {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If SO_KEEPALIVE is enabled (default), this method ensures sane default values for the extended socket options
|
||||
* TCP_KEEPIDLE and TCP_KEEPINTERVAL. The default value for TCP_KEEPIDLE is system dependent, but is typically 2 hours.
|
||||
* Such a high value can result in firewalls eagerly closing these connections. To tell any intermediate devices that
|
||||
* the connection remains alive, we explicitly set these options to 5 minutes if the defaults are higher than that.
|
||||
*/
|
||||
public static void tryEnsureReasonableKeepAliveConfig(NetworkChannel socketChannel) {
|
||||
assert socketChannel != null;
|
||||
try {
|
||||
if (socketChannel.supportedOptions().contains(StandardSocketOptions.SO_KEEPALIVE)) {
|
||||
final Boolean keepalive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
|
||||
assert keepalive != null;
|
||||
if (keepalive.booleanValue()) {
|
||||
for (SocketOption<Integer> option : Arrays.asList(
|
||||
NetUtils.getTcpKeepIdleSocketOptionOrNull(),
|
||||
NetUtils.getTcpKeepIntervalSocketOptionOrNull())) {
|
||||
setMinValueForSocketOption(socketChannel, option, 300);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// Getting an exception here should be ok when concurrently closing the channel
|
||||
// An UnsupportedOperationException or IllegalArgumentException, however, should not happen
|
||||
assert e instanceof IOException : e;
|
||||
}
|
||||
}
|
||||
|
||||
private static void setMinValueForSocketOption(NetworkChannel socketChannel, SocketOption<Integer> option, int minValue) {
|
||||
if (option != null && socketChannel.supportedOptions().contains(option)) {
|
||||
try {
|
||||
final Integer currentIdleVal = socketChannel.getOption(option);
|
||||
assert currentIdleVal != null;
|
||||
if (currentIdleVal.intValue() > minValue) {
|
||||
socketChannel.setOption(option, minValue);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// Getting an exception here should be ok when concurrently closing the channel
|
||||
// An UnsupportedOperationException or IllegalArgumentException, however, should not happen
|
||||
assert e instanceof IOException : e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -350,6 +350,7 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
|
|||
}
|
||||
}
|
||||
}
|
||||
NetUtils.tryEnsureReasonableKeepAliveConfig(socket.getChannel());
|
||||
socket.setTcpNoDelay(socketConfig.tcpNoDelay());
|
||||
int tcpSendBufferSize = socketConfig.tcpSendBufferSize();
|
||||
if (tcpSendBufferSize > 0) {
|
||||
|
|
|
@ -59,7 +59,7 @@ import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRI
|
|||
* local buffer with a defined size.
|
||||
*/
|
||||
@SuppressForbidden(reason = "Channel#write")
|
||||
public class CopyBytesSocketChannel extends NioSocketChannel {
|
||||
public class CopyBytesSocketChannel extends Netty4NioSocketChannel {
|
||||
|
||||
private static final int MAX_BYTES_PER_WRITE = StrictMath.toIntExact(ByteSizeValue.parseBytesSizeValue(
|
||||
System.getProperty("es.transport.buffer.size", "1m"), "es.transport.buffer.size").getBytes());
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
/**
|
||||
* Helper class to expose {@link #javaChannel()} method
|
||||
*/
|
||||
public class Netty4NioSocketChannel extends NioSocketChannel {
|
||||
|
||||
public Netty4NioSocketChannel() {
|
||||
super();
|
||||
}
|
||||
|
||||
public Netty4NioSocketChannel(Channel parent, SocketChannel socket) {
|
||||
super(parent, socket);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketChannel javaChannel() {
|
||||
return super.javaChannel();
|
||||
}
|
||||
|
||||
}
|
|
@ -27,7 +27,6 @@ import io.netty.buffer.UnpooledByteBufAllocator;
|
|||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ServerChannel;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
|
||||
|
@ -68,7 +67,7 @@ public class NettyAllocator {
|
|||
if (ALLOCATOR instanceof NoDirectBuffers) {
|
||||
return CopyBytesSocketChannel.class;
|
||||
} else {
|
||||
return NioSocketChannel.class;
|
||||
return Netty4NioSocketChannel.class;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,12 +49,12 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.core.internal.net.NetUtils;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.SharedGroupFactory;
|
||||
import org.elasticsearch.transport.Netty4NioSocketChannel;
|
||||
import org.elasticsearch.transport.NettyAllocator;
|
||||
import org.elasticsearch.transport.SharedGroupFactory;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
|
||||
|
@ -143,31 +143,30 @@ public class Netty4Transport extends TcpTransport {
|
|||
bootstrap.group(sharedGroup.getLowLevelGroup());
|
||||
|
||||
// NettyAllocator will return the channel type designed to work with the configured allocator
|
||||
assert Netty4NioSocketChannel.class.isAssignableFrom(NettyAllocator.getChannelType());
|
||||
bootstrap.channel(NettyAllocator.getChannelType());
|
||||
bootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
|
||||
|
||||
bootstrap.option(ChannelOption.TCP_NODELAY, TransportSettings.TCP_NO_DELAY.get(settings));
|
||||
bootstrap.option(ChannelOption.SO_KEEPALIVE, TransportSettings.TCP_KEEP_ALIVE.get(settings));
|
||||
if (TransportSettings.TCP_KEEP_ALIVE.get(settings)) {
|
||||
// Netty logs a warning if it can't set the option, so try this only on supported platforms
|
||||
if (IOUtils.LINUX || IOUtils.MAC_OS_X) {
|
||||
if (TransportSettings.TCP_KEEP_IDLE.get(settings) >= 0) {
|
||||
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
|
||||
if (keepIdleOption != null) {
|
||||
bootstrap.option(NioChannelOption.of(keepIdleOption), TransportSettings.TCP_KEEP_IDLE.get(settings));
|
||||
}
|
||||
// Note that Netty logs a warning if it can't set the option
|
||||
if (TransportSettings.TCP_KEEP_IDLE.get(settings) >= 0) {
|
||||
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
|
||||
if (keepIdleOption != null) {
|
||||
bootstrap.option(NioChannelOption.of(keepIdleOption), TransportSettings.TCP_KEEP_IDLE.get(settings));
|
||||
}
|
||||
if (TransportSettings.TCP_KEEP_INTERVAL.get(settings) >= 0) {
|
||||
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
|
||||
if (keepIntervalOption != null) {
|
||||
bootstrap.option(NioChannelOption.of(keepIntervalOption), TransportSettings.TCP_KEEP_INTERVAL.get(settings));
|
||||
}
|
||||
}
|
||||
if (TransportSettings.TCP_KEEP_INTERVAL.get(settings) >= 0) {
|
||||
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
|
||||
if (keepIntervalOption != null) {
|
||||
bootstrap.option(NioChannelOption.of(keepIntervalOption), TransportSettings.TCP_KEEP_INTERVAL.get(settings));
|
||||
}
|
||||
if (TransportSettings.TCP_KEEP_COUNT.get(settings) >= 0) {
|
||||
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
|
||||
if (keepCountOption != null) {
|
||||
bootstrap.option(NioChannelOption.of(keepCountOption), TransportSettings.TCP_KEEP_COUNT.get(settings));
|
||||
}
|
||||
}
|
||||
if (TransportSettings.TCP_KEEP_COUNT.get(settings) >= 0) {
|
||||
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
|
||||
if (keepCountOption != null) {
|
||||
bootstrap.option(NioChannelOption.of(keepCountOption), TransportSettings.TCP_KEEP_COUNT.get(settings));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -215,26 +214,24 @@ public class Netty4Transport extends TcpTransport {
|
|||
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay);
|
||||
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive);
|
||||
if (profileSettings.tcpKeepAlive) {
|
||||
// Netty logs a warning if it can't set the option, so try this only on supported platforms
|
||||
if (IOUtils.LINUX || IOUtils.MAC_OS_X) {
|
||||
if (profileSettings.tcpKeepIdle >= 0) {
|
||||
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
|
||||
if (keepIdleOption != null) {
|
||||
serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), profileSettings.tcpKeepIdle);
|
||||
}
|
||||
// Note that Netty logs a warning if it can't set the option
|
||||
if (profileSettings.tcpKeepIdle >= 0) {
|
||||
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
|
||||
if (keepIdleOption != null) {
|
||||
serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), profileSettings.tcpKeepIdle);
|
||||
}
|
||||
}
|
||||
if (profileSettings.tcpKeepInterval >= 0) {
|
||||
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
|
||||
if (keepIntervalOption != null) {
|
||||
serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption), profileSettings.tcpKeepInterval);
|
||||
}
|
||||
if (profileSettings.tcpKeepInterval >= 0) {
|
||||
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
|
||||
if (keepIntervalOption != null) {
|
||||
serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption), profileSettings.tcpKeepInterval);
|
||||
}
|
||||
|
||||
}
|
||||
if (profileSettings.tcpKeepCount >= 0) {
|
||||
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
|
||||
if (keepCountOption != null) {
|
||||
serverBootstrap.childOption(NioChannelOption.of(keepCountOption), profileSettings.tcpKeepCount);
|
||||
}
|
||||
}
|
||||
if (profileSettings.tcpKeepCount >= 0) {
|
||||
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
|
||||
if (keepCountOption != null) {
|
||||
serverBootstrap.childOption(NioChannelOption.of(keepCountOption), profileSettings.tcpKeepCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -281,7 +278,6 @@ public class Netty4Transport extends TcpTransport {
|
|||
ExceptionsHelper.maybeDieOnAnotherThread(connectFuture.cause());
|
||||
throw new IOException(connectFuture.cause());
|
||||
}
|
||||
addClosedExceptionLogger(channel);
|
||||
|
||||
Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, false, "default", connectFuture);
|
||||
channel.attr(CHANNEL_KEY).set(nettyChannel);
|
||||
|
@ -311,6 +307,9 @@ public class Netty4Transport extends TcpTransport {
|
|||
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
addClosedExceptionLogger(ch);
|
||||
assert ch instanceof Netty4NioSocketChannel;
|
||||
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
|
||||
ch.pipeline().addLast("logging", new ESLoggingHandler());
|
||||
// using a dot as a prefix means this cannot come from any settings parsed
|
||||
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
|
||||
|
@ -334,6 +333,8 @@ public class Netty4Transport extends TcpTransport {
|
|||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
addClosedExceptionLogger(ch);
|
||||
assert ch instanceof Netty4NioSocketChannel;
|
||||
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
|
||||
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture());
|
||||
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
|
||||
ch.pipeline().addLast("logging", new ESLoggingHandler());
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.transport.netty4;
|
|||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.bootstrap.JavaVersion;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
|
@ -28,21 +29,33 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.core.internal.net.NetUtils;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.test.transport.StubbableTransport;
|
||||
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
import org.elasticsearch.transport.Netty4NioSocketChannel;
|
||||
import org.elasticsearch.transport.SharedGroupFactory;
|
||||
import org.elasticsearch.transport.TcpChannel;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TestProfiles;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.Collections;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase {
|
||||
|
||||
|
@ -75,4 +88,50 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
|
|||
assertThat(e.getMessage(), containsString("[127.0.0.1:9876]"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testDefaultKeepAliveSettings() throws IOException {
|
||||
assumeTrue("setting default keepalive options not supported on this platform",
|
||||
(IOUtils.LINUX || IOUtils.MAC_OS_X) &&
|
||||
JavaVersion.current().compareTo(JavaVersion.parse("11")) >= 0);
|
||||
try (MockTransportService serviceC = buildService("TS_C", Version.CURRENT, Settings.EMPTY);
|
||||
MockTransportService serviceD = buildService("TS_D", Version.CURRENT, Settings.EMPTY)) {
|
||||
serviceC.start();
|
||||
serviceC.acceptIncomingRequests();
|
||||
serviceD.start();
|
||||
serviceD.acceptIncomingRequests();
|
||||
|
||||
try (Transport.Connection connection = serviceC.openConnection(serviceD.getLocalDiscoNode(), TestProfiles.LIGHT_PROFILE)) {
|
||||
assertThat(connection, instanceOf(StubbableTransport.WrappedConnection.class));
|
||||
Transport.Connection conn = ((StubbableTransport.WrappedConnection) connection).getConnection();
|
||||
assertThat(conn, instanceOf(TcpTransport.NodeChannels.class));
|
||||
TcpTransport.NodeChannels nodeChannels = (TcpTransport.NodeChannels) conn;
|
||||
for (TcpChannel channel : nodeChannels.getChannels()) {
|
||||
assertFalse(channel.isServerChannel());
|
||||
checkDefaultKeepAliveOptions(channel);
|
||||
}
|
||||
|
||||
assertThat(serviceD.getOriginalTransport(), instanceOf(TcpTransport.class));
|
||||
for (TcpChannel channel : getAcceptedChannels((TcpTransport) serviceD.getOriginalTransport())) {
|
||||
assertTrue(channel.isServerChannel());
|
||||
checkDefaultKeepAliveOptions(channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkDefaultKeepAliveOptions(TcpChannel channel) throws IOException {
|
||||
assertThat(channel, instanceOf(Netty4TcpChannel.class));
|
||||
Netty4TcpChannel nettyChannel = (Netty4TcpChannel) channel;
|
||||
assertThat(nettyChannel.getNettyChannel(), instanceOf(Netty4NioSocketChannel.class));
|
||||
Netty4NioSocketChannel netty4NioSocketChannel = (Netty4NioSocketChannel) nettyChannel.getNettyChannel();
|
||||
SocketChannel socketChannel = netty4NioSocketChannel.javaChannel();
|
||||
assertThat(socketChannel.supportedOptions(), hasItem(NetUtils.getTcpKeepIdleSocketOptionOrNull()));
|
||||
Integer keepIdle = socketChannel.getOption(NetUtils.getTcpKeepIdleSocketOptionOrNull());
|
||||
assertNotNull(keepIdle);
|
||||
assertThat(keepIdle, lessThanOrEqualTo(500));
|
||||
assertThat(socketChannel.supportedOptions(), hasItem(NetUtils.getTcpKeepIntervalSocketOptionOrNull()));
|
||||
Integer keepInterval = socketChannel.getOption(NetUtils.getTcpKeepIntervalSocketOptionOrNull());
|
||||
assertNotNull(keepInterval);
|
||||
assertThat(keepInterval, lessThanOrEqualTo(500));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.transport.nio;
|
|||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.bootstrap.JavaVersion;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
|
@ -28,22 +29,31 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.MockPageCacheRecycler;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.core.internal.net.NetUtils;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.test.transport.StubbableTransport;
|
||||
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
import org.elasticsearch.transport.TcpChannel;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.elasticsearch.transport.TestProfiles;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.Collections;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase {
|
||||
|
||||
|
@ -78,4 +88,48 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase {
|
|||
assertThat(cause, instanceOf(IOException.class));
|
||||
}
|
||||
}
|
||||
|
||||
public void testDefaultKeepAliveSettings() throws IOException {
|
||||
assumeTrue("setting default keepalive options not supported on this platform",
|
||||
(IOUtils.LINUX || IOUtils.MAC_OS_X) &&
|
||||
JavaVersion.current().compareTo(JavaVersion.parse("11")) >= 0);
|
||||
try (MockTransportService serviceC = buildService("TS_C", Version.CURRENT, Settings.EMPTY);
|
||||
MockTransportService serviceD = buildService("TS_D", Version.CURRENT, Settings.EMPTY)) {
|
||||
serviceC.start();
|
||||
serviceC.acceptIncomingRequests();
|
||||
serviceD.start();
|
||||
serviceD.acceptIncomingRequests();
|
||||
|
||||
try (Transport.Connection connection = serviceC.openConnection(serviceD.getLocalDiscoNode(), TestProfiles.LIGHT_PROFILE)) {
|
||||
assertThat(connection, instanceOf(StubbableTransport.WrappedConnection.class));
|
||||
Transport.Connection conn = ((StubbableTransport.WrappedConnection) connection).getConnection();
|
||||
assertThat(conn, instanceOf(TcpTransport.NodeChannels.class));
|
||||
TcpTransport.NodeChannels nodeChannels = (TcpTransport.NodeChannels) conn;
|
||||
for (TcpChannel channel : nodeChannels.getChannels()) {
|
||||
assertFalse(channel.isServerChannel());
|
||||
checkDefaultKeepAliveOptions(channel);
|
||||
}
|
||||
|
||||
assertThat(serviceD.getOriginalTransport(), instanceOf(TcpTransport.class));
|
||||
for (TcpChannel channel : getAcceptedChannels((TcpTransport) serviceD.getOriginalTransport())) {
|
||||
assertTrue(channel.isServerChannel());
|
||||
checkDefaultKeepAliveOptions(channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkDefaultKeepAliveOptions(TcpChannel channel) throws IOException {
|
||||
assertThat(channel, instanceOf(NioTcpChannel.class));
|
||||
NioTcpChannel nioChannel = (NioTcpChannel) channel;
|
||||
SocketChannel socketChannel = nioChannel.getRawChannel();
|
||||
assertThat(socketChannel.supportedOptions(), hasItem(NetUtils.getTcpKeepIdleSocketOptionOrNull()));
|
||||
Integer keepIdle = socketChannel.getOption(NetUtils.getTcpKeepIdleSocketOptionOrNull());
|
||||
assertNotNull(keepIdle);
|
||||
assertThat(keepIdle, lessThanOrEqualTo(500));
|
||||
assertThat(socketChannel.supportedOptions(), hasItem(NetUtils.getTcpKeepIntervalSocketOptionOrNull()));
|
||||
Integer keepInterval = socketChannel.getOption(NetUtils.getTcpKeepIntervalSocketOptionOrNull());
|
||||
assertNotNull(keepInterval);
|
||||
assertThat(keepInterval, lessThanOrEqualTo(500));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,9 +53,9 @@ public final class NetworkService {
|
|||
public static final Setting<Boolean> TCP_KEEP_ALIVE =
|
||||
Setting.boolSetting("network.tcp.keep_alive", true, Property.NodeScope);
|
||||
public static final Setting<Integer> TCP_KEEP_IDLE =
|
||||
Setting.intSetting("network.tcp.keep_idle", -1, -1, Property.NodeScope);
|
||||
Setting.intSetting("network.tcp.keep_idle", -1, -1, 300, Property.NodeScope);
|
||||
public static final Setting<Integer> TCP_KEEP_INTERVAL =
|
||||
Setting.intSetting("network.tcp.keep_interval", -1, -1, Property.NodeScope);
|
||||
Setting.intSetting("network.tcp.keep_interval", -1, -1, 300, Property.NodeScope);
|
||||
public static final Setting<Integer> TCP_KEEP_COUNT =
|
||||
Setting.intSetting("network.tcp.keep_count", -1, -1, Property.NodeScope);
|
||||
public static final Setting<Boolean> TCP_REUSE_ADDRESS =
|
||||
|
|
|
@ -147,10 +147,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
|
||||
connectionSettingsBuilder.put(TransportSettings.TCP_KEEP_ALIVE.getKey(), randomBoolean());
|
||||
if (randomBoolean()) {
|
||||
connectionSettingsBuilder.put(TransportSettings.TCP_KEEP_IDLE.getKey(), randomIntBetween(1, 1000));
|
||||
connectionSettingsBuilder.put(TransportSettings.TCP_KEEP_IDLE.getKey(), randomIntBetween(1, 300));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
connectionSettingsBuilder.put(TransportSettings.TCP_KEEP_INTERVAL.getKey(), randomIntBetween(1, 1000));
|
||||
connectionSettingsBuilder.put(TransportSettings.TCP_KEEP_INTERVAL.getKey(), randomIntBetween(1, 300));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
connectionSettingsBuilder.put(TransportSettings.TCP_KEEP_COUNT.getKey(), randomIntBetween(1, 10));
|
||||
|
|
Loading…
Reference in New Issue