diff --git a/core/src/main/java/org/elasticsearch/http/HttpTransportSettings.java b/core/src/main/java/org/elasticsearch/http/HttpTransportSettings.java index 54be8b4ecd7..cf9bd3a42ee 100644 --- a/core/src/main/java/org/elasticsearch/http/HttpTransportSettings.java +++ b/core/src/main/java/org/elasticsearch/http/HttpTransportSettings.java @@ -26,8 +26,10 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.transport.PortsRange; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import static java.util.Collections.emptyList; @@ -93,6 +95,9 @@ public final class HttpTransportSettings { public static final Setting SETTING_HTTP_RESET_COOKIES = Setting.boolSetting("http.reset_cookies", false, Property.NodeScope); + public static final Setting SETTING_HTTP_READ_TIMEOUT = + Setting.timeSetting("http.read_timeout", new TimeValue(30, TimeUnit.SECONDS), new TimeValue(0), Property.NodeScope); + public static final Setting SETTING_HTTP_TCP_NO_DELAY = boolSetting("http.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope); public static final Setting SETTING_HTTP_TCP_KEEP_ALIVE = diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 893efeb6957..31b32a8ab94 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -40,6 +40,7 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.timeout.ReadTimeoutException; +import io.netty.handler.timeout.ReadTimeoutHandler; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.common.Strings; @@ -86,7 +87,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; -import static org.elasticsearch.common.settings.Setting.boolSetting; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_CREDENTIALS; import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_HEADERS; @@ -105,6 +105,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INIT import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PORT; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_HOST; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_PORT; +import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_ALIVE; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_NO_DELAY; @@ -172,6 +173,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem protected final ByteSizeValue tcpSendBufferSize; protected final ByteSizeValue tcpReceiveBufferSize; protected final RecvByteBufAllocator recvByteBufAllocator; + private final int readTimeoutMillis; protected final int maxCompositeBufferComponents; private final Dispatcher dispatcher; @@ -220,6 +222,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem this.tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings); this.tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings); this.detailedErrorsEnabled = SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings); + this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis()); ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings); recvByteBufAllocator = new FixedRecvByteBufAllocator(receivePredictor.bytesAsInt()); @@ -480,7 +483,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof ReadTimeoutException) { if (logger.isTraceEnabled()) { - logger.trace("Connection timeout [{}]", ctx.channel().remoteAddress()); + logger.trace("Read timeout [{}]", ctx.channel().remoteAddress()); } ctx.channel().close(); } else { @@ -524,6 +527,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("openChannels", transport.serverOpenChannels); + ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS)); final HttpRequestDecoder decoder = new HttpRequestDecoder( Math.toIntExact(transport.maxInitialLineLength.getBytes()), Math.toIntExact(transport.maxHeaderSize.getBytes()), diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index 846c59565c2..c347bd2805d 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -19,8 +19,15 @@ package org.elasticsearch.http.netty4; +import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; @@ -39,6 +46,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.http.BindHttpException; @@ -63,6 +71,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -313,4 +323,53 @@ public class Netty4HttpServerTransportTests extends ESTestCase { assertNull(threadPool.getThreadContext().getTransient("bar_bad")); } } + + public void testReadTimeout() throws Exception { + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + throw new AssertionError("Should not have received a dispatched request"); + } + + @Override + public void dispatchBadRequest(final RestRequest request, + final RestChannel channel, + final ThreadContext threadContext, + final Throwable cause) { + throw new AssertionError("Should not have received a dispatched request"); + } + + }; + + Settings settings = Settings.builder() + .put(HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(), new TimeValue(randomIntBetween(100, 300))) + .build(); + + + NioEventLoopGroup group = new NioEventLoopGroup(); + try (Netty4HttpServerTransport transport = + new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress.boundAddresses()); + + AtomicBoolean channelClosed = new AtomicBoolean(false); + + Bootstrap clientBootstrap = new Bootstrap().channel(NioSocketChannel.class).handler(new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast(new ChannelHandlerAdapter() {}); + + } + }).group(group); + ChannelFuture connect = clientBootstrap.connect(remoteAddress.address()); + connect.channel().closeFuture().addListener(future -> channelClosed.set(true)); + + assertBusy(() -> assertTrue("Channel should be closed due to read timeout", channelClosed.get()), 5, TimeUnit.SECONDS); + + } finally { + group.shutdownGracefully().await(); + } + } }