diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java index 7882518dcb6..c139924cbd8 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import javax.servlet.ServletException; import javax.servlet.ServletInputStream; @@ -36,6 +37,7 @@ import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.HTTP2Session; +import org.eclipse.jetty.http2.ISession; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; @@ -43,11 +45,17 @@ import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.GoAwayFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.thread.Invocable.InvocationType; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; @@ -422,7 +430,7 @@ public class IdleTimeoutTest extends AbstractTest } @Test - public void testStreamIdleTimeoutIsNotEnforcedWhenReceiving() throws Exception + public void testServerStreamIdleTimeoutIsNotEnforcedWhenReceiving() throws Exception { final CountDownLatch timeoutLatch = new CountDownLatch(1); start(new ServerSessionListener.Adapter() @@ -468,10 +476,11 @@ public class IdleTimeoutTest extends AbstractTest { return InvocationType.NON_BLOCKING; } + @Override public void succeeded() { - // Idle timeout should not fire while receiving. + // Idle timeout should not fire while the server is receiving. Assert.assertEquals(1, timeoutLatch.getCount()); dataLatch.countDown(); } @@ -485,7 +494,7 @@ public class IdleTimeoutTest extends AbstractTest } @Test - public void testStreamIdleTimeoutIsNotEnforcedWhenSending() throws Exception + public void testClientStreamIdleTimeoutIsNotEnforcedWhenSending() throws Exception { final CountDownLatch resetLatch = new CountDownLatch(1); start(new ServerSessionListener.Adapter() @@ -589,6 +598,83 @@ public class IdleTimeoutTest extends AbstractTest Assert.assertTrue(latch.await(2 * (contentLength / bufferSize + 1) * delay, TimeUnit.MILLISECONDS)); } + @Test + public void testServerIdleTimeoutIsEnforcedForQueuedRequest() throws Exception + { + long idleTimeout = 2000; + // Use a small thread pool to cause request queueing. + QueuedThreadPool serverExecutor = new QueuedThreadPool(4); + serverExecutor.setName("server"); + server = new Server(serverExecutor); + HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(new HttpConfiguration()); + h2.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + h2.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + h2.setStreamIdleTimeout(idleTimeout); + connector = new ServerConnector(server, 1, 1, h2); + connector.setIdleTimeout(10 * idleTimeout); + server.addConnector(connector); + ServletContextHandler context = new ServletContextHandler(server, "/", true, false); + AtomicReference phaser = new AtomicReference<>(); + context.addServlet(new ServletHolder(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + phaser.get().countDown(); + + // Hold the dispatched requests enough for the idle requests to idle timeout. + sleep(2 * idleTimeout); + } + }), servletPath + "/*"); + server.start(); + + prepareClient(); + client.start(); + + Session client = newClient(new Session.Listener.Adapter()); + + // Send requests until one is queued on the server but not dispatched. + while (true) + { + phaser.set(new CountDownLatch(1)); + + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame frame = new HeadersFrame(request, null, false); + FuturePromise promise = new FuturePromise<>(); + client.newStream(frame, promise, new Stream.Listener.Adapter()); + Stream stream = promise.get(5, TimeUnit.SECONDS); + ByteBuffer data = ByteBuffer.allocate(10); + stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP); + + if (!phaser.get().await(1, TimeUnit.SECONDS)) + break; + } + + // Send one more request to consume the whole session flow control window. + CountDownLatch resetLatch = new CountDownLatch(1); + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame frame = new HeadersFrame(request, null, false); + FuturePromise promise = new FuturePromise<>(); + client.newStream(frame, promise, new Stream.Listener.Adapter() + { + @Override + public void onReset(Stream stream, ResetFrame frame) + { + resetLatch.countDown(); + } + }); + Stream stream = promise.get(5, TimeUnit.SECONDS); + ByteBuffer data = ByteBuffer.allocate(((ISession)client).updateSendWindow(0)); + stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP); + + Assert.assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + + // Wait for WINDOW_UPDATEs to be processed by the client. + sleep(1000); + + Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0)); + } + private void sleep(long value) { try diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index 90727d255a7..56c8adfdd19 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -24,7 +24,9 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Deque; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -39,6 +41,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.ErrorCode; @@ -51,13 +54,22 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; import org.eclipse.jetty.server.HttpChannel; +import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpOutput; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.StacklessLogging; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; @@ -242,7 +254,7 @@ public class StreamResetTest extends AbstractTest response.setStatus(200); response.setContentType("text/plain;charset=" + charset.name()); - response.setContentLength(data.length*10); + response.setContentLength(data.length * 10); response.flushBuffer(); try @@ -259,7 +271,7 @@ public class StreamResetTest extends AbstractTest { // Write some content after the stream has // been reset, it should throw an exception. - for (int i=0;i<10;i++) + for (int i = 0; i < 10; i++) { Thread.sleep(500); response.getOutputStream().write(data); @@ -407,6 +419,106 @@ public class StreamResetTest extends AbstractTest Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0)); } + @Test + public void testClientResetConsumesQueuedRequestWithData() throws Exception + { + // Use a small thread pool. + QueuedThreadPool serverExecutor = new QueuedThreadPool(4); + serverExecutor.setName("server"); + serverExecutor.setDetailedDump(true); + server = new Server(serverExecutor); + HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(new HttpConfiguration()); + h2.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + h2.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + connector = new ServerConnector(server, 1, 1, h2); + server.addConnector(connector); + ServletContextHandler context = new ServletContextHandler(server, "/"); + AtomicReference phaser = new AtomicReference<>(); + context.addServlet(new ServletHolder(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + Log.getLogger(StreamResetTest.class).info("SIMON: uri={}", request.getRequestURI()); + phaser.get().countDown(); + IO.copy(request.getInputStream(), response.getOutputStream()); + } + }), servletPath + "/*"); + server.start(); + + prepareClient(); + client.start(); + + Session client = newClient(new Session.Listener.Adapter()); + + // Send requests until one is queued on the server but not dispatched. + AtomicReference latch = new AtomicReference<>(); + List streams = new ArrayList<>(); + while (true) + { + phaser.set(new CountDownLatch(1)); + + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame frame = new HeadersFrame(request, null, false); + FuturePromise promise = new FuturePromise<>(); + client.newStream(frame, promise, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + Log.getLogger(StreamResetTest.class).info("SIMON: response={}/{}", stream.getId(), frame.getMetaData()); + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (response.getStatus() == HttpStatus.OK_200) + latch.get().countDown(); + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + Log.getLogger(StreamResetTest.class).info("SIMON: data={}/{}", stream.getId(), frame); + callback.succeeded(); + if (frame.isEndStream()) + latch.get().countDown(); + } + }); + Stream stream = promise.get(5, TimeUnit.SECONDS); + streams.add(stream); + ByteBuffer data = ByteBuffer.allocate(10); + stream.data(new DataFrame(stream.getId(), data, false), Callback.NOOP); + + if (!phaser.get().await(1, TimeUnit.SECONDS)) + break; + } + + // Send one more request to consume the whole session flow control window, then reset it. + MetaData.Request request = newRequest("GET", "/x", new HttpFields()); + HeadersFrame frame = new HeadersFrame(request, null, false); + FuturePromise promise = new FuturePromise<>(); + // This request will get no event from the server since it's reset by the client. + client.newStream(frame, promise, new Stream.Listener.Adapter()); + Stream stream = promise.get(5, TimeUnit.SECONDS); + ByteBuffer data = ByteBuffer.allocate(((ISession)client).updateSendWindow(0)); + stream.data(new DataFrame(stream.getId(), data, false), new Callback() + { + @Override + public void succeeded() + { + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), NOOP); + } + }); + + // Wait for WINDOW_UPDATEs to be processed by the client. + Thread.sleep(1000); + + Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0)); + + latch.set(new CountDownLatch(2 * streams.size())); + // Complete all streams. + streams.forEach(s -> s.data(new DataFrame(s.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP)); + + Assert.assertTrue(latch.get().await(5, TimeUnit.SECONDS)); + } + @Test public void testServerExceptionConsumesQueuedData() throws Exception { diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java index b0e6c8d2dbe..8596c7e8720 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java @@ -49,7 +49,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne private int maxHeaderBlockFragment = 0; private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F); private long streamIdleTimeout; - private int reservedThreads = -1; + private int reservedThreads; public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration) { @@ -154,7 +154,8 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne public void setReservedThreads(int threads) { - this.reservedThreads = threads; + // TODO: currently disabled since the only value that works is 0. +// this.reservedThreads = threads; } public HttpConfiguration getHttpConfiguration() @@ -193,7 +194,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne try { - executor = new ReservedThreadExecutor(connector.getExecutor(),getReservedThreads()); + executor = new ReservedThreadExecutor(connector.getExecutor(), getReservedThreads()); executor.start(); connector.addBean(executor,true); } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index e3092db044d..8e51c970319 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -25,7 +25,6 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Queue; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.http.BadMessageException; @@ -223,7 +222,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection { HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); if (channel != null) - result &= !channel.isRequestHandled(); + result &= !channel.isRequestExecuting(); } if (LOG.isDebugEnabled()) LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure); diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index 6487577ba87..bceb73376ce 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -55,7 +55,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable private boolean _expect100Continue; private boolean _delayedUntilContent; - private boolean _handled; public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport) { @@ -123,7 +122,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable _delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() && !endStream && !_expect100Continue; - _handled = !_delayedUntilContent; if (LOG.isDebugEnabled()) { @@ -192,7 +190,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable { _expect100Continue = false; _delayedUntilContent = false; - _handled = false; super.recycle(); getHttpTransport().recycle(); } @@ -279,8 +276,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable boolean wasDelayed = _delayedUntilContent; _delayedUntilContent = false; - if (wasDelayed) - _handled = true; return handle || wasDelayed ? this : null; } @@ -302,35 +297,31 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable boolean wasDelayed = _delayedUntilContent; _delayedUntilContent = false; - if (wasDelayed) - _handled = true; return handle || wasDelayed ? this : null; } - public boolean isRequestHandled() + public boolean isRequestExecuting() { - return _handled; + return !getState().isIdle(); } public boolean onStreamTimeout(Throwable failure) { - if (!_handled) - return true; - - HttpInput input = getRequest().getHttpInput(); - boolean readFailed = input.failed(failure); - if (readFailed) + getHttpTransport().onStreamTimeout(failure); + if (getRequest().getHttpInput().onIdleTimeout(failure)) handle(); - boolean writeFailed = getHttpTransport().onStreamTimeout(failure); + if (isRequestExecuting()) + return false; - return readFailed || writeFailed; + consumeInput(); + return true; } public void onFailure(Throwable failure) { getHttpTransport().onStreamFailure(failure); - if (onEarlyEOF()) + if (getRequest().getHttpInput().failed(failure)) { ContextHandler handler = getState().getContextHandler(); if (handler != null) @@ -342,6 +333,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable { getState().asyncError(failure); } + consumeInput(); } protected void consumeInput() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index ba184350f28..9967da8fd17 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -99,7 +99,11 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor _requestLog = connector == null ? null : connector.getServer().getRequestLog(); if (LOG.isDebugEnabled()) - LOG.debug("new {} -> {},{},{}",this,_endPoint,_endPoint.getConnection(),_state); + LOG.debug("new {} -> {},{},{}", + this, + _endPoint, + _endPoint==null?null:_endPoint.getConnection(), + _state); } protected HttpInput newHttpInput(HttpChannelState state) @@ -258,10 +262,19 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor _oldIdleTimeout=0; } - public void asyncReadFillInterested() + public void onAsyncWaitForContent() { } + public void onBlockWaitForContent() + { + } + + public void onBlockWaitForContentFailure(Throwable failure) + { + getRequest().getHttpInput().failed(failure); + } + @Override public void run() { @@ -433,7 +446,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor if (hasContent && !_response.isContentComplete(_response.getHttpOutput().getWritten())) { if (isCommitted()) - _transport.abort(new IOException("insufficient content written")); + abort(new IOException("insufficient content written")); else _response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500,"insufficient content written"); } @@ -546,7 +559,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor catch (Throwable x) { failure.addSuppressed(x); - _transport.abort(failure); + abort(failure); } } @@ -843,14 +856,14 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor @Override public void failed(Throwable th) { - _transport.abort(x); + abort(x); super.failed(x); } }); } else { - _transport.abort(x); + abort(x); super.failed(x); } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java index bb7629a0291..54dd7139615 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelOverHttp.java @@ -249,11 +249,23 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque return handle; } - public void asyncReadFillInterested() + public void onAsyncWaitForContent() { _httpConnection.asyncReadFillInterested(); } + @Override + public void onBlockWaitForContent() + { + _httpConnection.blockingReadFillInterested(); + } + + @Override + public void onBlockWaitForContentFailure(Throwable failure) + { + _httpConnection.blockingReadFailure(failure); + } + @Override public void badMessage(int status, String reason) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 6fbc41a9489..c5929b78c1a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -382,7 +382,7 @@ public class HttpChannelState /** * Signal that the HttpConnection has finished handling the request. - * For blocking connectors,this call may block if the request has + * For blocking connectors, this call may block if the request has * been suspended (startAsync called). * @return next actions * be handled again (eg because of a resume that happened before unhandle was called) @@ -498,7 +498,7 @@ public class HttpChannelState finally { if (read_interested) - _channel.asyncReadFillInterested(); + _channel.onAsyncWaitForContent(); } } @@ -1129,8 +1129,8 @@ public class HttpChannelState /** * Called to signal async read isReady() has returned false. * This indicates that there is no content available to be consumed - * and that once the channel enteres the ASYNC_WAIT state it will - * register for read interest by calling {@link HttpChannel#asyncReadFillInterested()} + * and that once the channel enters the ASYNC_WAIT state it will + * register for read interest by calling {@link HttpChannel#onAsyncWaitForContent()} * either from this method or from a subsequent call to {@link #unhandle()}. */ public void onReadUnready() @@ -1165,7 +1165,7 @@ public class HttpChannelState } if (interested) - _channel.asyncReadFillInterested(); + _channel.onAsyncWaitForContent(); } /** diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index b93f4885537..746158aa6c3 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -560,7 +560,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http getEndPoint().fillInterested(_blockingReadCallback); } - public void blockingReadException(Throwable e) + public void blockingReadFailure(Throwable e) { _blockingReadCallback.failed(e); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index dcaae2593f5..653d8a16fbb 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -18,8 +18,8 @@ package org.eclipse.jetty.server; +import java.io.EOFException; import java.io.IOException; -import java.io.InterruptedIOException; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Deque; @@ -133,6 +133,7 @@ public class HttpInput extends ServletInputStream implements Runnable private long _contentArrived; private long _contentConsumed; private long _blockUntil; + private boolean _waitingForContent; private Interceptor _interceptor; public HttpInput(HttpChannelState state) @@ -518,70 +519,47 @@ public class HttpInput extends ServletInputStream implements Runnable /** * Blocks until some content or some end-of-file event arrives. * - * @throws IOException - * if the wait is interrupted + * @throws IOException if the wait is interrupted */ protected void blockForContent() throws IOException { try { + _waitingForContent = true; + _channelState.getHttpChannel().onBlockWaitForContent(); + + boolean loop = false; long timeout = 0; - if (_blockUntil != 0) + while (true) { - timeout = TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()); - if (timeout <= 0) - throw new TimeoutException(); + if (_blockUntil != 0) + { + timeout = TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()); + if (timeout <= 0) + throw new TimeoutException(String.format("Blocking timeout %d ms", getBlockingTimeout())); + } + + // This method is called from a loop, so we just + // need to check the timeout before and after waiting. + if (loop) + break; + + if (LOG.isDebugEnabled()) + LOG.debug("{} blocking for content timeout={}", this, timeout); + if (timeout > 0) + _inputQ.wait(timeout); + else + _inputQ.wait(); + + loop = true; } - - if (LOG.isDebugEnabled()) - LOG.debug("{} blocking for content timeout={}",this,timeout); - if (timeout > 0) - _inputQ.wait(timeout); - else - _inputQ.wait(); - - // TODO: cannot return unless there is content or timeout, - // TODO: so spurious wakeups are not handled correctly. - - if (_blockUntil != 0 && TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()) <= 0) - throw new TimeoutException(String.format("Blocking timeout %d ms",getBlockingTimeout())); } - catch (Throwable e) + catch (Throwable x) { - throw (IOException)new InterruptedIOException().initCause(e); + _channelState.getHttpChannel().onBlockWaitForContentFailure(x); } } - /** - * Adds some content to the start of this input stream. - *

- * Typically used to push back content that has been read, perhaps mutated. The bytes prepended are deducted for the contentConsumed total - *

- * - * @param item - * the content to add - * @return true if content channel woken for read - */ - public boolean prependContent(Content item) - { - boolean woken = false; - synchronized (_inputQ) - { - if (_content != null) - _inputQ.push(_content); - _content = item; - _contentConsumed -= item.remaining(); - if (LOG.isDebugEnabled()) - LOG.debug("{} prependContent {}",this,item); - - if (_listener == null) - _inputQ.notify(); - else - woken = _channelState.onContentAdded(); - } - return woken; - } - /** * Adds some content to this input stream. * @@ -591,31 +569,36 @@ public class HttpInput extends ServletInputStream implements Runnable */ public boolean addContent(Content content) { - boolean woken = false; synchronized (_inputQ) { + _waitingForContent = false; if (_firstByteTimeStamp == -1) _firstByteTimeStamp = System.nanoTime(); - _contentArrived += content.remaining(); - - if (_content==null && _inputQ.isEmpty()) - _content=content; - else - _inputQ.offer(content); - - if (LOG.isDebugEnabled()) - LOG.debug("{} addContent {}",this,content); - - if (nextInterceptedContent()!=null) + if (isFinished()) { - if (_listener == null) - _inputQ.notify(); + Throwable failure = isError() ? _state.getError() : new EOFException("Content after EOF"); + content.failed(failure); + return false; + } + else + { + _contentArrived += content.remaining(); + + if (_content==null && _inputQ.isEmpty()) + _content=content; else - woken = _channelState.onContentAdded(); + _inputQ.offer(content); + + if (LOG.isDebugEnabled()) + LOG.debug("{} addContent {}",this,content); + + if (nextInterceptedContent()!=null) + return wakeup(); + else + return false; } } - return woken; } public boolean hasContent() @@ -670,13 +653,13 @@ public class HttpInput extends ServletInputStream implements Runnable { try { - while (!isFinished()) + while (true) { Content item = nextContent(); if (item == null) break; // Let's not bother blocking - skip(item,item.remaining()); + skip(item, item.remaining()); } return isFinished() && !isError(); } @@ -713,14 +696,6 @@ public class HttpInput extends ServletInputStream implements Runnable } } - public boolean isAsyncEOF() - { - synchronized (_inputQ) - { - return _state == AEOF; - } - } - @Override public boolean isReady() { @@ -734,8 +709,8 @@ public class HttpInput extends ServletInputStream implements Runnable return true; if (produceNextContext() != null) return true; - _channelState.onReadUnready(); + _waitingForContent = true; } return false; } @@ -775,6 +750,7 @@ public class HttpInput extends ServletInputStream implements Runnable { _state = ASYNC; _channelState.onReadUnready(); + _waitingForContent = true; } } } @@ -787,18 +763,36 @@ public class HttpInput extends ServletInputStream implements Runnable wake(); } - public boolean failed(Throwable x) + public boolean onIdleTimeout(Throwable x) { - boolean woken = false; synchronized (_inputQ) { - if (_state instanceof ErrorState) + if (_waitingForContent && !isError()) { - // Log both the original and current failure - // without modifying the original failure. - Throwable failure = new Throwable(((ErrorState)_state).getError()); - failure.addSuppressed(x); - LOG.warn(failure); + x.addSuppressed(new Throwable("HttpInput idle timeout")); + _state = new ErrorState(x); + return wakeup(); + } + return false; + } + } + + public boolean failed(Throwable x) + { + synchronized (_inputQ) + { + // Errors may be reported multiple times, for example + // a local idle timeout and a remote I/O failure. + if (isError()) + { + if (LOG.isDebugEnabled()) + { + // Log both the original and current failure + // without modifying the original failure. + Throwable failure = new Throwable(_state.getError()); + failure.addSuppressed(x); + LOG.debug(failure); + } } else { @@ -807,14 +801,16 @@ public class HttpInput extends ServletInputStream implements Runnable x.addSuppressed(new Throwable("HttpInput failure")); _state = new ErrorState(x); } - - if (_listener == null) - _inputQ.notify(); - else - woken = _channelState.onContentAdded(); + return wakeup(); } + } - return woken; + private boolean wakeup() + { + if (_listener != null) + return _channelState.onContentAdded(); + _inputQ.notify(); + return false; } /* @@ -1133,5 +1129,4 @@ public class HttpInput extends ServletInputStream implements Runnable return "AEOF"; } }; - } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java index 204c0642b03..53fad2119ff 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java @@ -32,18 +32,4 @@ public class HttpInputOverHTTP extends HttpInput { ((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).fillAndParseForContent(); } - - @Override - protected void blockForContent() throws IOException - { - ((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).blockingReadFillInterested(); - try - { - super.blockForContent(); - } - catch(Throwable e) - { - ((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).blockingReadException(e); - } - } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputAsyncStateTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputAsyncStateTest.java index f24695de433..36e7b9ae461 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputAsyncStateTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputAsyncStateTest.java @@ -18,14 +18,6 @@ package org.eclipse.jetty.server; -import static org.eclipse.jetty.server.HttpInput.EARLY_EOF_CONTENT; -import static org.eclipse.jetty.server.HttpInput.EOF_CONTENT; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.Queue; @@ -45,6 +37,14 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import static org.eclipse.jetty.server.HttpInput.EARLY_EOF_CONTENT; +import static org.eclipse.jetty.server.HttpInput.EOF_CONTENT; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + /** * this tests HttpInput and its interaction with HttpChannelState @@ -104,10 +104,11 @@ public class HttpInputAsyncStateTest _in = new HttpInput(new HttpChannelState(new HttpChannel(null, new HttpConfiguration(), null, null) { @Override - public void asyncReadFillInterested() + public void onAsyncWaitForContent() { - __history.add("asyncReadFillInterested"); + __history.add("onAsyncWaitForContent"); } + @Override public Scheduler getScheduler() { @@ -317,7 +318,7 @@ public class HttpInputAsyncStateTest check("onReadUnready"); }); - check("asyncReadFillInterested"); + check("onAsyncWaitForContent"); deliver(EOF_CONTENT); check("onReadPossible true"); @@ -355,7 +356,7 @@ public class HttpInputAsyncStateTest }); _in.setReadListener(_listener); - check("asyncReadFillInterested","onReadUnready"); + check("onAsyncWaitForContent","onReadUnready"); deliver(EOF_CONTENT); check("onReadPossible true"); @@ -409,7 +410,7 @@ public class HttpInputAsyncStateTest check("onReadUnready"); }); - check("asyncReadFillInterested"); + check("onAsyncWaitForContent"); deliver(EARLY_EOF_CONTENT); check("onReadPossible true"); @@ -447,7 +448,7 @@ public class HttpInputAsyncStateTest }); _in.setReadListener(_listener); - check("asyncReadFillInterested","onReadUnready"); + check("onAsyncWaitForContent","onReadUnready"); deliver(EARLY_EOF_CONTENT); check("onReadPossible true"); @@ -503,7 +504,7 @@ public class HttpInputAsyncStateTest check("onReadUnready"); }); - check("asyncReadFillInterested"); + check("onAsyncWaitForContent"); deliver(new TContent("Hello"),EOF_CONTENT); check("onReadPossible true","onReadPossible false"); @@ -541,7 +542,7 @@ public class HttpInputAsyncStateTest }); _in.setReadListener(_listener); - check("asyncReadFillInterested","onReadUnready"); + check("onAsyncWaitForContent","onReadUnready"); deliver(new TContent("Hello"),EOF_CONTENT); check("onReadPossible true","onReadPossible false"); @@ -604,7 +605,7 @@ public class HttpInputAsyncStateTest check("onReadUnready"); }); - check("asyncReadFillInterested"); + check("onAsyncWaitForContent"); deliver(new TContent("Hello"),EARLY_EOF_CONTENT); check("onReadPossible true","onReadPossible false"); @@ -650,7 +651,7 @@ public class HttpInputAsyncStateTest }); _in.setReadListener(_listener); - check("asyncReadFillInterested","onReadUnready"); + check("onAsyncWaitForContent","onReadUnready"); deliver(new TContent("Hello"),EARLY_EOF_CONTENT); check("onReadPossible true","onReadPossible false"); @@ -675,7 +676,7 @@ public class HttpInputAsyncStateTest check("onReadUnready"); }); - check("asyncReadFillInterested"); + check("onAsyncWaitForContent"); deliver(new TContent("Hello"),EOF_CONTENT); check("onReadPossible true","onReadPossible false"); @@ -700,7 +701,7 @@ public class HttpInputAsyncStateTest check("onReadUnready"); }); - check("asyncReadFillInterested"); + check("onAsyncWaitForContent"); deliver(new TContent("Hello"),EOF_CONTENT); check("onReadPossible true","onReadPossible false"); @@ -735,7 +736,7 @@ public class HttpInputAsyncStateTest check("onReadUnready"); }); - check("asyncReadFillInterested"); + check("onAsyncWaitForContent"); deliver(new TContent("Hello"),EOF_CONTENT); check("onReadPossible true","onReadPossible false"); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java index c2b2ff83911..a70034194bf 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpInputTest.java @@ -93,9 +93,9 @@ public class HttpInputTest _in = new HttpInput(new HttpChannelState(new HttpChannel(null, new HttpConfiguration(), null, null) { @Override - public void asyncReadFillInterested() + public void onAsyncWaitForContent() { - _history.add("asyncReadFillInterested"); + _history.add("asyncReadInterested"); } }) { @@ -213,82 +213,21 @@ public class HttpInputTest Assert.assertThat(_history.poll(), Matchers.nullValue()); } - @Test - public void testReRead() throws Exception - { - _in.addContent(new TContent("AB")); - _in.addContent(new TContent("CD")); - _fillAndParseSimulate.offer("EF"); - _fillAndParseSimulate.offer("GH"); - Assert.assertThat(_in.available(), Matchers.equalTo(2)); - Assert.assertThat(_in.isFinished(), Matchers.equalTo(false)); - Assert.assertThat(_in.isReady(), Matchers.equalTo(true)); - - Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(0L)); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'A')); - Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(1L)); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'B')); - Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(2L)); - - Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded AB")); - Assert.assertThat(_history.poll(), Matchers.nullValue()); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'C')); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'D')); - - Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded CD")); - Assert.assertThat(_history.poll(), Matchers.nullValue()); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'E')); - - _in.prependContent(new HttpInput.Content(BufferUtil.toBuffer("abcde"))); - - Assert.assertThat(_in.available(), Matchers.equalTo(5)); - Assert.assertThat(_in.isFinished(), Matchers.equalTo(false)); - Assert.assertThat(_in.isReady(), Matchers.equalTo(true)); - - Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(0L)); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'a')); - Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(1L)); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'b')); - Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(2L)); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'c')); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'d')); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'e')); - - Assert.assertThat(_in.read(), Matchers.equalTo((int)'F')); - - Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 2")); - Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded EF")); - Assert.assertThat(_history.poll(), Matchers.nullValue()); - - Assert.assertThat(_in.read(), Matchers.equalTo((int)'G')); - Assert.assertThat(_in.read(), Matchers.equalTo((int)'H')); - - Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded GH")); - Assert.assertThat(_history.poll(), Matchers.nullValue()); - - Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(8L)); - - Assert.assertThat(_history.poll(), Matchers.nullValue()); - } - @Test public void testBlockingRead() throws Exception { - new Thread() + new Thread(() -> { - public void run() + try { - try - { - Thread.sleep(500); - _in.addContent(new TContent("AB")); - } - catch (Throwable th) - { - th.printStackTrace(); - } + Thread.sleep(500); + _in.addContent(new TContent("AB")); } - }.start(); + catch (Throwable th) + { + th.printStackTrace(); + } + }).start(); Assert.assertThat(_in.read(), Matchers.equalTo((int)'A')); @@ -367,21 +306,18 @@ public class HttpInputTest @Test public void testBlockingEOF() throws Exception { - new Thread() + new Thread(() -> { - public void run() + try { - try - { - Thread.sleep(500); - _in.eof(); - } - catch (Throwable th) - { - th.printStackTrace(); - } + Thread.sleep(500); + _in.eof(); } - }.start(); + catch (Throwable th) + { + th.printStackTrace(); + } + }).start(); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false)); Assert.assertThat(_in.read(), Matchers.equalTo(-1)); diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java index 74709373579..15470916179 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java @@ -19,7 +19,9 @@ package org.eclipse.jetty.http.client; import java.io.IOException; +import java.io.InterruptedIOException; import java.nio.ByteBuffer; +import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -32,9 +34,12 @@ import javax.servlet.ServletException; import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; +import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.HttpStatus; @@ -43,6 +48,7 @@ import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.log.StacklessLogging; import org.junit.Assert; import org.junit.Test; @@ -517,7 +523,6 @@ public class ServerTimeoutsTest extends AbstractTest throw x; } } - }); DeferredContentProvider contentProvider = new DeferredContentProvider(); @@ -678,6 +683,59 @@ public class ServerTimeoutsTest extends AbstractTest Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); } + @Test + public void testIdleTimeoutBeforeReadIsIgnored() throws Exception + { + long idleTimeout = 1000; + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + try + { + Thread.sleep(2 * idleTimeout); + IO.copy(request.getInputStream(), response.getOutputStream()); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + }); + setServerIdleTimeout(idleTimeout); + + byte[] data = new byte[1024]; + new Random().nextBytes(data); + byte[] data1 = new byte[data.length / 2]; + System.arraycopy(data, 0, data1, 0, data1.length); + byte[] data2 = new byte[data.length - data1.length]; + System.arraycopy(data, data1.length, data2, 0, data2.length); + DeferredContentProvider content = new DeferredContentProvider(ByteBuffer.wrap(data1)); + CountDownLatch latch = new CountDownLatch(1); + client.newRequest(newURI()) + .path(servletPath) + .content(content) + .send(new BufferingResponseListener() + { + @Override + public void onComplete(Result result) + { + Assert.assertTrue(result.isSucceeded()); + Assert.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + Assert.assertArrayEquals(data, getContent()); + latch.countDown(); + } + }); + + // Wait for the server application to block reading. + Thread.sleep(3 * idleTimeout); + content.offer(ByteBuffer.wrap(data2)); + content.close(); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + private static class BlockingReadHandler extends AbstractHandler.ErrorDispatchHandler { private final CountDownLatch handlerLatch;