Support http read timeouts for transport-nio (#41466)

This is related to #27260. Currently there is a setting
http.read_timeout that allows users to define a read timeout for the
http transport. This commit implements support for this functionality
with the transport-nio plugin. The behavior here is that a repeating
task will be scheduled for the interval defined. If there have been
no requests received since the last run and there are no inflight
requests, the channel will be closed.
This commit is contained in:
Tim Brooks 2019-05-02 09:48:52 -06:00 committed by GitHub
parent a92c06ae09
commit b4bcbf9f64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 263 additions and 107 deletions

View File

@ -34,6 +34,9 @@ public abstract class BytesWriteHandler implements ReadWriteHandler {
return new FlushReadyWrite(context, (ByteBuffer[]) message, listener);
}
@Override
public void channelRegistered() {}
@Override
public List<FlushOperation> writeToBytes(WriteOperation writeOperation) {
assert writeOperation instanceof FlushReadyWrite : "Write operation must be flush ready";

View File

@ -28,6 +28,11 @@ import java.util.function.BiConsumer;
*/
public interface ReadWriteHandler {
/**
* This method is called when the channel is registered with its selector.
*/
void channelRegistered();
/**
* This method is called when a message is queued with a channel. It can be called from any thread.
* This method should validate that the message is a valid type and return a write operation object

View File

@ -169,6 +169,7 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
@Override
protected void register() throws IOException {
super.register();
readWriteHandler.channelRegistered();
if (allowChannelPredicate.test(channel) == false) {
closeNow = true;
}

View File

@ -45,7 +45,7 @@ public class TaskScheduler {
return delayedTask;
}
Runnable pollTask(long relativeNanos) {
public Runnable pollTask(long relativeNanos) {
DelayedTask task;
while ((task = tasks.peek()) != null) {
if (relativeNanos - task.deadline >= 0) {

View File

@ -45,7 +45,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
@ -59,6 +58,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpReadTimeoutException;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder;
@ -289,12 +289,9 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
}
@Override
protected void onException(HttpChannel channel, Exception cause) {
public void onException(HttpChannel channel, Exception cause) {
if (cause instanceof ReadTimeoutException) {
if (logger.isTraceEnabled()) {
logger.trace("Http read timeout {}", channel);
}
CloseableChannel.closeChannel(channel);
super.onException(channel, new HttpReadTimeoutException(readTimeoutMillis, cause));
} else {
super.onException(channel, cause);
}

View File

@ -73,8 +73,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
@ -346,7 +346,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
AtomicBoolean channelClosed = new AtomicBoolean(false);
CountDownLatch channelClosedLatch = new CountDownLatch(1);
Bootstrap clientBootstrap = new Bootstrap().channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@ -357,9 +357,9 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
}
}).group(group);
ChannelFuture connect = clientBootstrap.connect(remoteAddress.address());
connect.channel().closeFuture().addListener(future -> channelClosed.set(true));
connect.channel().closeFuture().addListener(future -> channelClosedLatch.countDown());
assertBusy(() -> assertTrue("Channel should be closed due to read timeout", channelClosed.get()), 5, TimeUnit.SECONDS);
assertTrue("Channel should be closed due to read timeout", channelClosedLatch.await(1, TimeUnit.MINUTES));
} finally {
group.shutdownGracefully().await();

View File

@ -30,31 +30,45 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.http.HttpReadTimeoutException;
import org.elasticsearch.http.nio.cors.NioCorsConfig;
import org.elasticsearch.http.nio.cors.NioCorsHandler;
import org.elasticsearch.nio.FlushOperation;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.ReadWriteHandler;
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.nio.TaskScheduler;
import org.elasticsearch.nio.WriteOperation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
public class HttpReadWriteHandler implements ReadWriteHandler {
private final NettyAdaptor adaptor;
private final NioHttpChannel nioHttpChannel;
private final NioHttpServerTransport transport;
private final TaskScheduler taskScheduler;
private final LongSupplier nanoClock;
private final long readTimeoutNanos;
private boolean channelRegistered = false;
private boolean requestSinceReadTimeoutTrigger = false;
private int inFlightRequests = 0;
public HttpReadWriteHandler(NioHttpChannel nioHttpChannel, NioHttpServerTransport transport, HttpHandlingSettings settings,
NioCorsConfig corsConfig) {
NioCorsConfig corsConfig, TaskScheduler taskScheduler, LongSupplier nanoClock) {
this.nioHttpChannel = nioHttpChannel;
this.transport = transport;
this.taskScheduler = taskScheduler;
this.nanoClock = nanoClock;
this.readTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(settings.getReadTimeoutMillis());
List<ChannelHandler> handlers = new ArrayList<>(5);
HttpRequestDecoder decoder = new HttpRequestDecoder(settings.getMaxInitialLineLength(), settings.getMaxHeaderSize(),
@ -77,10 +91,21 @@ public class HttpReadWriteHandler implements ReadWriteHandler {
}
@Override
public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException {
public void channelRegistered() {
channelRegistered = true;
if (readTimeoutNanos > 0) {
scheduleReadTimeout();
}
}
@Override
public int consumeReads(InboundChannelBuffer channelBuffer) {
assert channelRegistered : "channelRegistered should have been called";
int bytesConsumed = adaptor.read(channelBuffer.sliceAndRetainPagesTo(channelBuffer.getIndex()));
Object message;
while ((message = adaptor.pollInboundMessage()) != null) {
++inFlightRequests;
requestSinceReadTimeoutTrigger = true;
handleRequest(message);
}
@ -96,6 +121,11 @@ public class HttpReadWriteHandler implements ReadWriteHandler {
@Override
public List<FlushOperation> writeToBytes(WriteOperation writeOperation) {
assert writeOperation.getObject() instanceof NioHttpResponse : "This channel only supports messages that are of type: "
+ NioHttpResponse.class + ". Found type: " + writeOperation.getObject().getClass() + ".";
assert channelRegistered : "channelRegistered should have been called";
--inFlightRequests;
assert inFlightRequests >= 0 : "Inflight requests should never drop below zero, found: " + inFlightRequests;
adaptor.write(writeOperation);
return pollFlushOperations();
}
@ -152,4 +182,17 @@ public class HttpReadWriteHandler implements ReadWriteHandler {
request.release();
}
}
private void maybeReadTimeout() {
if (requestSinceReadTimeoutTrigger == false && inFlightRequests == 0) {
transport.onException(nioHttpChannel, new HttpReadTimeoutException(TimeValue.nsecToMSec(readTimeoutNanos)));
} else {
requestSinceReadTimeoutTrigger = false;
scheduleReadTimeout();
}
}
private void scheduleReadTimeout() {
taskScheduler.scheduleAtRelativeTime(this::maybeReadTimeout, nanoClock.getAsLong() + readTimeoutNanos);
}
}

View File

@ -211,7 +211,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
return new Page(ByteBuffer.wrap(bytes.v()), bytes::close);
};
HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(httpChannel,NioHttpServerTransport.this,
handlingSettings, corsConfig);
handlingSettings, corsConfig, selector.getTaskScheduler(), threadPool::relativeTimeInMillis);
Consumer<Exception> exceptionHandler = (e) -> onException(httpChannel, e);
SocketChannelContext context = new BytesChannelContext(httpChannel, selector, exceptionHandler, httpReadWritePipeline,
new InboundChannelBuffer(pageSupplier));

View File

@ -33,12 +33,13 @@ import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpReadTimeoutException;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.http.HttpTransportSettings;
@ -48,6 +49,7 @@ import org.elasticsearch.http.nio.cors.NioCorsHandler;
import org.elasticsearch.nio.FlushOperation;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.nio.TaskScheduler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
@ -56,6 +58,8 @@ import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiConsumer;
@ -63,19 +67,14 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_CR
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_METHODS;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_CORS_ENABLED;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_COMPRESSION;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_COMPRESSION_LEVEL;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_DETAILED_ERRORS_ENABLED;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -84,8 +83,9 @@ import static org.mockito.Mockito.verify;
public class HttpReadWriteHandlerTests extends ESTestCase {
private HttpReadWriteHandler handler;
private NioHttpChannel nioHttpChannel;
private NioHttpChannel channel;
private NioHttpServerTransport transport;
private TaskScheduler taskScheduler;
private final RequestEncoder requestEncoder = new RequestEncoder();
private final ResponseDecoder responseDecoder = new ResponseDecoder();
@ -93,22 +93,14 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
@Before
public void setMocks() {
transport = mock(NioHttpServerTransport.class);
Settings settings = Settings.EMPTY;
ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.getDefault(settings);
ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.getDefault(settings);
ByteSizeValue maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.getDefault(settings);
HttpHandlingSettings httpHandlingSettings = new HttpHandlingSettings(1024,
Math.toIntExact(maxChunkSize.getBytes()),
Math.toIntExact(maxHeaderSize.getBytes()),
Math.toIntExact(maxInitialLineLength.getBytes()),
SETTING_HTTP_RESET_COOKIES.getDefault(settings),
SETTING_HTTP_COMPRESSION.getDefault(settings),
SETTING_HTTP_COMPRESSION_LEVEL.getDefault(settings),
SETTING_HTTP_DETAILED_ERRORS_ENABLED.getDefault(settings),
SETTING_PIPELINING_MAX_EVENTS.getDefault(settings),
SETTING_CORS_ENABLED.getDefault(settings));
nioHttpChannel = mock(NioHttpChannel.class);
handler = new HttpReadWriteHandler(nioHttpChannel, transport, httpHandlingSettings, NioCorsConfigBuilder.forAnyOrigin().build());
Settings settings = Settings.builder().put(SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), new ByteSizeValue(1024)).build();
HttpHandlingSettings httpHandlingSettings = HttpHandlingSettings.fromSettings(settings);
channel = mock(NioHttpChannel.class);
taskScheduler = mock(TaskScheduler.class);
NioCorsConfig corsConfig = NioCorsConfigBuilder.forAnyOrigin().build();
handler = new HttpReadWriteHandler(channel, transport, httpHandlingSettings, corsConfig, taskScheduler, System::nanoTime);
handler.channelRegistered();
}
public void testSuccessfulDecodeHttpRequest() throws IOException {
@ -188,7 +180,7 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
flushOperation.getListener().accept(null, null);
// Since we have keep-alive set to false, we should close the channel after the response has been
// flushed
verify(nioHttpChannel).close();
verify(channel).close();
} finally {
response.release();
}
@ -335,10 +327,59 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
}
}
private FullHttpResponse executeCorsRequest(final Settings settings, final String originValue, final String host) throws IOException {
@SuppressWarnings("unchecked")
public void testReadTimeout() throws IOException {
TimeValue timeValue = TimeValue.timeValueMillis(500);
Settings settings = Settings.builder().put(SETTING_HTTP_READ_TIMEOUT.getKey(), timeValue).build();
HttpHandlingSettings httpHandlingSettings = HttpHandlingSettings.fromSettings(settings);
NioCorsConfig nioCorsConfig = NioHttpServerTransport.buildCorsConfig(settings);
HttpReadWriteHandler handler = new HttpReadWriteHandler(nioHttpChannel, transport, httpHandlingSettings, nioCorsConfig);
DefaultFullHttpRequest nettyRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
NioHttpRequest nioHttpRequest = new NioHttpRequest(nettyRequest, 0);
NioHttpResponse httpResponse = nioHttpRequest.createResponse(RestStatus.OK, BytesArray.EMPTY);
httpResponse.addHeader(HttpHeaderNames.CONTENT_LENGTH.toString(), "0");
NioCorsConfig corsConfig = NioCorsConfigBuilder.forAnyOrigin().build();
TaskScheduler taskScheduler = new TaskScheduler();
Iterator<Integer> timeValues = Arrays.asList(0, 2, 4, 6, 8).iterator();
handler = new HttpReadWriteHandler(channel, transport, httpHandlingSettings, corsConfig, taskScheduler, timeValues::next);
handler.channelRegistered();
prepareHandlerForResponse(handler);
SocketChannelContext context = mock(SocketChannelContext.class);
HttpWriteOperation writeOperation = new HttpWriteOperation(context, httpResponse, mock(BiConsumer.class));
handler.writeToBytes(writeOperation);
taskScheduler.pollTask(timeValue.getNanos() + 1).run();
// There was a read. Do not close.
verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class));
prepareHandlerForResponse(handler);
prepareHandlerForResponse(handler);
taskScheduler.pollTask(timeValue.getNanos() + 3).run();
// There was a read. Do not close.
verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class));
handler.writeToBytes(writeOperation);
taskScheduler.pollTask(timeValue.getNanos() + 5).run();
// There has not been a read, however there is still an inflight request. Do not close.
verify(transport, times(0)).onException(eq(channel), any(HttpReadTimeoutException.class));
handler.writeToBytes(writeOperation);
taskScheduler.pollTask(timeValue.getNanos() + 7).run();
// No reads and no inflight requests, close
verify(transport, times(1)).onException(eq(channel), any(HttpReadTimeoutException.class));
assertNull(taskScheduler.pollTask(timeValue.getNanos() + 9));
}
private FullHttpResponse executeCorsRequest(final Settings settings, final String originValue, final String host) throws IOException {
HttpHandlingSettings httpSettings = HttpHandlingSettings.fromSettings(settings);
NioCorsConfig corsConfig = NioHttpServerTransport.buildCorsConfig(settings);
HttpReadWriteHandler handler = new HttpReadWriteHandler(channel, transport, httpSettings, corsConfig, taskScheduler,
System::nanoTime);
handler.channelRegistered();
prepareHandlerForResponse(handler);
DefaultFullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
if (originValue != null) {
@ -360,7 +401,7 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
private NioHttpRequest prepareHandlerForResponse(HttpReadWriteHandler handler) throws IOException {
private void prepareHandlerForResponse(HttpReadWriteHandler handler) throws IOException {
HttpMethod method = randomBoolean() ? HttpMethod.GET : HttpMethod.HEAD;
HttpVersion version = randomBoolean() ? HttpVersion.HTTP_1_0 : HttpVersion.HTTP_1_1;
String uri = "http://localhost:9090/" + randomAlphaOfLength(8);
@ -385,7 +426,6 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
assertEquals(HttpRequest.HttpVersion.HTTP_1_0, nioHttpRequest.protocolVersion());
}
assertEquals(nioHttpRequest.uri(), uri);
return nioHttpRequest;
}
private InboundChannelBuffer toChannelBuffer(ByteBuf buf) {

View File

@ -116,6 +116,20 @@ class NioHttpClient implements Closeable {
return responses.iterator().next();
}
public final NioSocketChannel connect(InetSocketAddress remoteAddress) {
ChannelFactory<NioServerSocketChannel, NioSocketChannel> factory = new ClientChannelFactory(new CountDownLatch(0), new
ArrayList<>());
try {
NioSocketChannel nioSocketChannel = nioGroup.openChannel(remoteAddress, factory);
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
nioSocketChannel.addConnectListener(ActionListener.toBiConsumer(connectFuture));
connectFuture.actionGet();
return nioSocketChannel;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private void onException(Exception e) {
logger.error("Exception from http client", e);
}
@ -212,6 +226,9 @@ class NioHttpClient implements Closeable {
adaptor.addCloseListener((v, e) -> channel.close());
}
@Override
public void channelRegistered() {}
@Override
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Exception> listener) {
assert message instanceof HttpRequest : "Expected type HttpRequest.class, found: " + message.getClass();

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -49,6 +50,7 @@ import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.http.NullDispatcher;
import org.elasticsearch.http.nio.cors.NioCorsConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
@ -66,6 +68,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
@ -309,52 +313,47 @@ public class NioHttpServerTransportTests extends ESTestCase {
assertThat(causeReference.get(), instanceOf(TooLongFrameException.class));
}
// public void testReadTimeout() throws Exception {
// final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
//
// @Override
// public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
// throw new AssertionError("Should not have received a dispatched request");
// }
//
// @Override
// public void dispatchBadRequest(final RestRequest request,
// final RestChannel channel,
// final ThreadContext threadContext,
// final Throwable cause) {
// throw new AssertionError("Should not have received a dispatched request");
// }
//
// };
//
// Settings settings = Settings.builder()
// .put(HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(), new TimeValue(randomIntBetween(100, 300)))
// .build();
//
//
// NioEventLoopGroup group = new NioEventLoopGroup();
// try (NioHttpServerTransport transport =
// new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) {
// transport.start();
// final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
//
// AtomicBoolean channelClosed = new AtomicBoolean(false);
//
// Bootstrap clientBootstrap = new Bootstrap().channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
//
// @Override
// protected void initChannel(SocketChannel ch) {
// ch.pipeline().addLast(new ChannelHandlerAdapter() {});
//
// }
// }).group(group);
// ChannelFuture connect = clientBootstrap.connect(remoteAddress.address());
// connect.channel().closeFuture().addListener(future -> channelClosed.set(true));
//
// assertBusy(() -> assertTrue("Channel should be closed due to read timeout", channelClosed.get()), 5, TimeUnit.SECONDS);
//
// } finally {
// group.shutdownGracefully().await();
// }
// }
public void testReadTimeout() throws Exception {
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
@Override
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
throw new AssertionError("Should not have received a dispatched request");
}
@Override
public void dispatchBadRequest(final RestRequest request,
final RestChannel channel,
final ThreadContext threadContext,
final Throwable cause) {
throw new AssertionError("Should not have received a dispatched request");
}
};
Settings settings = Settings.builder()
.put(HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT.getKey(), new TimeValue(randomIntBetween(100, 300)))
.build();
try (NioHttpServerTransport transport = new NioHttpServerTransport(settings, networkService, bigArrays, pageRecycler,
threadPool, xContentRegistry(), dispatcher, new NioGroupFactory(settings, logger))) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
try (NioHttpClient client = new NioHttpClient()) {
NioSocketChannel channel = null;
try {
CountDownLatch channelClosedLatch = new CountDownLatch(1);
channel = client.connect(remoteAddress.address());
channel.addCloseListener((r, t) -> channelClosedLatch.countDown());
assertTrue("Channel should be closed due to read timeout", channelClosedLatch.await(1, TimeUnit.MINUTES));
} finally {
if (channel != null) {
channel.close();
}
}
}
}
}
}

View File

@ -249,7 +249,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
return publishPort;
}
protected void onException(HttpChannel channel, Exception e) {
public void onException(HttpChannel channel, Exception e) {
if (lifecycle.started() == false) {
// just close and ignore - we are already stopped and just need to make sure we release all resources
CloseableChannel.closeChannel(channel);
@ -263,6 +263,9 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
logger.trace(() -> new ParameterizedMessage(
"connect exception caught while handling client http traffic, closing connection {}", channel), e);
CloseableChannel.closeChannel(channel);
} else if (e instanceof HttpReadTimeoutException) {
logger.trace(() -> new ParameterizedMessage("http read timeout, closing connection {}", channel), e);
CloseableChannel.closeChannel(channel);
} else if (e instanceof CancelledKeyException) {
logger.trace(() -> new ParameterizedMessage(
"cancelled key exception caught while handling client http traffic, closing connection {}", channel), e);

View File

@ -29,6 +29,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUN
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
@ -43,11 +44,12 @@ public class HttpHandlingSettings {
private final int compressionLevel;
private final boolean detailedErrorsEnabled;
private final int pipeliningMaxEvents;
private final long readTimeoutMillis;
private boolean corsEnabled;
public HttpHandlingSettings(int maxContentLength, int maxChunkSize, int maxHeaderSize, int maxInitialLineLength,
boolean resetCookies, boolean compression, int compressionLevel, boolean detailedErrorsEnabled,
int pipeliningMaxEvents, boolean corsEnabled) {
int pipeliningMaxEvents, long readTimeoutMillis, boolean corsEnabled) {
this.maxContentLength = maxContentLength;
this.maxChunkSize = maxChunkSize;
this.maxHeaderSize = maxHeaderSize;
@ -57,6 +59,7 @@ public class HttpHandlingSettings {
this.compressionLevel = compressionLevel;
this.detailedErrorsEnabled = detailedErrorsEnabled;
this.pipeliningMaxEvents = pipeliningMaxEvents;
this.readTimeoutMillis = readTimeoutMillis;
this.corsEnabled = corsEnabled;
}
@ -70,6 +73,7 @@ public class HttpHandlingSettings {
SETTING_HTTP_COMPRESSION_LEVEL.get(settings),
SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings),
SETTING_PIPELINING_MAX_EVENTS.get(settings),
SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis(),
SETTING_CORS_ENABLED.get(settings));
}
@ -109,6 +113,10 @@ public class HttpHandlingSettings {
return pipeliningMaxEvents;
}
public long getReadTimeoutMillis() {
return readTimeoutMillis;
}
public boolean isCorsEnabled() {
return corsEnabled;
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http;
public class HttpReadTimeoutException extends RuntimeException {
public HttpReadTimeoutException(long readTimeoutMillis) {
super("http read timeout after " + readTimeoutMillis + "ms");
}
public HttpReadTimeoutException(long readTimeoutMillis, Exception cause) {
super("http read timeout after " + readTimeoutMillis + "ms", cause);
}
}

View File

@ -237,7 +237,17 @@ public class ThreadPool implements Scheduler, Closeable {
* timestamp, see {@link #absoluteTimeInMillis()}.
*/
public long relativeTimeInMillis() {
return cachedTimeThread.relativeTimeInMillis();
return TimeValue.nsecToMSec(relativeTimeInNanos());
}
/**
* Returns a value of nanoseconds that may be used for relative time calculations.
*
* This method should only be used for calculating time deltas. For an epoch based
* timestamp, see {@link #absoluteTimeInMillis()}.
*/
public long relativeTimeInNanos() {
return cachedTimeThread.relativeTimeInNanos();
}
/**
@ -534,30 +544,29 @@ public class ThreadPool implements Scheduler, Closeable {
final long interval;
volatile boolean running = true;
volatile long relativeMillis;
volatile long relativeNanos;
volatile long absoluteMillis;
CachedTimeThread(String name, long interval) {
super(name);
this.interval = interval;
this.relativeMillis = TimeValue.nsecToMSec(System.nanoTime());
this.relativeNanos = System.nanoTime();
this.absoluteMillis = System.currentTimeMillis();
setDaemon(true);
}
/**
* Return the current time used for relative calculations. This is
* {@link System#nanoTime()} truncated to milliseconds.
* Return the current time used for relative calculations. This is {@link System#nanoTime()}.
* <p>
* If {@link ThreadPool#ESTIMATED_TIME_INTERVAL_SETTING} is set to 0
* then the cache is disabled and the method calls {@link System#nanoTime()}
* whenever called. Typically used for testing.
*/
long relativeTimeInMillis() {
long relativeTimeInNanos() {
if (0 < interval) {
return relativeMillis;
return relativeNanos;
}
return TimeValue.nsecToMSec(System.nanoTime());
return System.nanoTime();
}
/**
@ -578,7 +587,7 @@ public class ThreadPool implements Scheduler, Closeable {
@Override
public void run() {
while (running && 0 < interval) {
relativeMillis = TimeValue.nsecToMSec(System.nanoTime());
relativeNanos = System.nanoTime();
absoluteMillis = System.currentTimeMillis();
try {
Thread.sleep(interval);

View File

@ -54,7 +54,7 @@ public class SecurityNetty4HttpServerTransport extends Netty4HttpServerTransport
}
@Override
protected void onException(HttpChannel channel, Exception e) {
public void onException(HttpChannel channel, Exception e) {
securityExceptionHandler.accept(channel, e);
}

View File

@ -98,7 +98,7 @@ public class SecurityNioHttpServerTransport extends NioHttpServerTransport {
return new Page(ByteBuffer.wrap(bytes.v()), bytes::close);
};
HttpReadWriteHandler httpHandler = new HttpReadWriteHandler(httpChannel,SecurityNioHttpServerTransport.this,
handlingSettings, corsConfig);
handlingSettings, corsConfig, selector.getTaskScheduler(), threadPool::relativeTimeInNanos);
InboundChannelBuffer buffer = new InboundChannelBuffer(pageSupplier);
Consumer<Exception> exceptionHandler = (e) -> securityExceptionHandler.accept(httpChannel, e);