Add read timeouts to http module (#27713)
We currently do not have any server-side read timeouts implemented in elasticsearch. This commit adds a read timeout setting that defaults to 30 seconds. If after 30 seconds a read has not occurred, the channel will be closed. A timeout of value of 0 will disable the timeout.
This commit is contained in:
parent
ec5e540174
commit
ad8a571677
|
@ -26,8 +26,10 @@ import org.elasticsearch.common.settings.Setting.Property;
|
||||||
import org.elasticsearch.common.transport.PortsRange;
|
import org.elasticsearch.common.transport.PortsRange;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static java.util.Collections.emptyList;
|
import static java.util.Collections.emptyList;
|
||||||
|
@ -93,6 +95,9 @@ public final class HttpTransportSettings {
|
||||||
public static final Setting<Boolean> SETTING_HTTP_RESET_COOKIES =
|
public static final Setting<Boolean> SETTING_HTTP_RESET_COOKIES =
|
||||||
Setting.boolSetting("http.reset_cookies", false, Property.NodeScope);
|
Setting.boolSetting("http.reset_cookies", false, Property.NodeScope);
|
||||||
|
|
||||||
|
public static final Setting<TimeValue> SETTING_HTTP_READ_TIMEOUT =
|
||||||
|
Setting.timeSetting("http.read_timeout", new TimeValue(30, TimeUnit.SECONDS), new TimeValue(0), Property.NodeScope);
|
||||||
|
|
||||||
public static final Setting<Boolean> SETTING_HTTP_TCP_NO_DELAY =
|
public static final Setting<Boolean> SETTING_HTTP_TCP_NO_DELAY =
|
||||||
boolSetting("http.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope);
|
boolSetting("http.tcp_no_delay", NetworkService.TCP_NO_DELAY, Setting.Property.NodeScope);
|
||||||
public static final Setting<Boolean> SETTING_HTTP_TCP_KEEP_ALIVE =
|
public static final Setting<Boolean> SETTING_HTTP_TCP_KEEP_ALIVE =
|
||||||
|
|
|
@ -40,6 +40,7 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
|
||||||
import io.netty.handler.codec.http.HttpRequestDecoder;
|
import io.netty.handler.codec.http.HttpRequestDecoder;
|
||||||
import io.netty.handler.codec.http.HttpResponseEncoder;
|
import io.netty.handler.codec.http.HttpResponseEncoder;
|
||||||
import io.netty.handler.timeout.ReadTimeoutException;
|
import io.netty.handler.timeout.ReadTimeoutException;
|
||||||
|
import io.netty.handler.timeout.ReadTimeoutHandler;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.apache.logging.log4j.util.Supplier;
|
import org.apache.logging.log4j.util.Supplier;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
@ -86,7 +87,6 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import static org.elasticsearch.common.settings.Setting.boolSetting;
|
|
||||||
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
|
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
|
||||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_CREDENTIALS;
|
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_CREDENTIALS;
|
||||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_HEADERS;
|
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_HEADERS;
|
||||||
|
@ -105,6 +105,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INIT
|
||||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PORT;
|
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PORT;
|
||||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_HOST;
|
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_HOST;
|
||||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_PORT;
|
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_PORT;
|
||||||
|
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT;
|
||||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES;
|
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES;
|
||||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_ALIVE;
|
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_ALIVE;
|
||||||
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_NO_DELAY;
|
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_NO_DELAY;
|
||||||
|
@ -172,6 +173,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
|
||||||
protected final ByteSizeValue tcpSendBufferSize;
|
protected final ByteSizeValue tcpSendBufferSize;
|
||||||
protected final ByteSizeValue tcpReceiveBufferSize;
|
protected final ByteSizeValue tcpReceiveBufferSize;
|
||||||
protected final RecvByteBufAllocator recvByteBufAllocator;
|
protected final RecvByteBufAllocator recvByteBufAllocator;
|
||||||
|
private final int readTimeoutMillis;
|
||||||
|
|
||||||
protected final int maxCompositeBufferComponents;
|
protected final int maxCompositeBufferComponents;
|
||||||
private final Dispatcher dispatcher;
|
private final Dispatcher dispatcher;
|
||||||
|
@ -220,6 +222,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
|
||||||
this.tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
|
this.tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
|
||||||
this.tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
|
this.tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
|
||||||
this.detailedErrorsEnabled = SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings);
|
this.detailedErrorsEnabled = SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings);
|
||||||
|
this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis());
|
||||||
|
|
||||||
ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings);
|
ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings);
|
||||||
recvByteBufAllocator = new FixedRecvByteBufAllocator(receivePredictor.bytesAsInt());
|
recvByteBufAllocator = new FixedRecvByteBufAllocator(receivePredictor.bytesAsInt());
|
||||||
|
@ -480,7 +483,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
|
||||||
protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
if (cause instanceof ReadTimeoutException) {
|
if (cause instanceof ReadTimeoutException) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Connection timeout [{}]", ctx.channel().remoteAddress());
|
logger.trace("Read timeout [{}]", ctx.channel().remoteAddress());
|
||||||
}
|
}
|
||||||
ctx.channel().close();
|
ctx.channel().close();
|
||||||
} else {
|
} else {
|
||||||
|
@ -524,6 +527,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
|
||||||
@Override
|
@Override
|
||||||
protected void initChannel(Channel ch) throws Exception {
|
protected void initChannel(Channel ch) throws Exception {
|
||||||
ch.pipeline().addLast("openChannels", transport.serverOpenChannels);
|
ch.pipeline().addLast("openChannels", transport.serverOpenChannels);
|
||||||
|
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
|
||||||
final HttpRequestDecoder decoder = new HttpRequestDecoder(
|
final HttpRequestDecoder decoder = new HttpRequestDecoder(
|
||||||
Math.toIntExact(transport.maxInitialLineLength.getBytes()),
|
Math.toIntExact(transport.maxInitialLineLength.getBytes()),
|
||||||
Math.toIntExact(transport.maxHeaderSize.getBytes()),
|
Math.toIntExact(transport.maxHeaderSize.getBytes()),
|
||||||
|
|
|
@ -19,8 +19,15 @@
|
||||||
|
|
||||||
package org.elasticsearch.http.netty4;
|
package org.elasticsearch.http.netty4;
|
||||||
|
|
||||||
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.buffer.ByteBufUtil;
|
import io.netty.buffer.ByteBufUtil;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelHandlerAdapter;
|
||||||
|
import io.netty.channel.ChannelInitializer;
|
||||||
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
|
import io.netty.channel.socket.SocketChannel;
|
||||||
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
import io.netty.handler.codec.TooLongFrameException;
|
import io.netty.handler.codec.TooLongFrameException;
|
||||||
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
||||||
import io.netty.handler.codec.http.FullHttpRequest;
|
import io.netty.handler.codec.http.FullHttpRequest;
|
||||||
|
@ -39,6 +46,7 @@ import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.TransportAddress;
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.MockBigArrays;
|
import org.elasticsearch.common.util.MockBigArrays;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.http.BindHttpException;
|
import org.elasticsearch.http.BindHttpException;
|
||||||
|
@ -63,6 +71,8 @@ import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -313,4 +323,53 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
|
||||||
assertNull(threadPool.getThreadContext().getTransient("bar_bad"));
|
assertNull(threadPool.getThreadContext().getTransient("bar_bad"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testReadTimeout() throws Exception {
|
||||||
|
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
|
||||||
|
throw new AssertionError("Should not have received a dispatched request");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void dispatchBadRequest(final RestRequest request,
|
||||||
|
final RestChannel channel,
|
||||||
|
final ThreadContext threadContext,
|
||||||
|
final Throwable cause) {
|
||||||
|
throw new AssertionError("Should not have received a dispatched request");
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
Settings settings = Settings.builder()
|
||||||
|
.put(HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(), new TimeValue(randomIntBetween(100, 300)))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
NioEventLoopGroup group = new NioEventLoopGroup();
|
||||||
|
try (Netty4HttpServerTransport transport =
|
||||||
|
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) {
|
||||||
|
transport.start();
|
||||||
|
final TransportAddress remoteAddress = randomFrom(transport.boundAddress.boundAddresses());
|
||||||
|
|
||||||
|
AtomicBoolean channelClosed = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
Bootstrap clientBootstrap = new Bootstrap().channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) {
|
||||||
|
ch.pipeline().addLast(new ChannelHandlerAdapter() {});
|
||||||
|
|
||||||
|
}
|
||||||
|
}).group(group);
|
||||||
|
ChannelFuture connect = clientBootstrap.connect(remoteAddress.address());
|
||||||
|
connect.channel().closeFuture().addListener(future -> channelClosed.set(true));
|
||||||
|
|
||||||
|
assertBusy(() -> assertTrue("Channel should be closed due to read timeout", channelClosed.get()), 5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
group.shutdownGracefully().await();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue