Merged branch 'jetty-9.3.x' into 'jetty-9.4.x'.

This commit is contained in:
Simone Bordet 2017-08-11 16:42:21 +02:00
parent 7179ac0b61
commit 87d090e062
14 changed files with 447 additions and 256 deletions

View File

@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.ServletInputStream; import javax.servlet.ServletInputStream;
@ -36,6 +37,7 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.api.server.ServerSessionListener;
@ -43,11 +45,17 @@ import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame; import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.thread.Invocable.InvocationType; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -422,7 +430,7 @@ public class IdleTimeoutTest extends AbstractTest
} }
@Test @Test
public void testStreamIdleTimeoutIsNotEnforcedWhenReceiving() throws Exception public void testServerStreamIdleTimeoutIsNotEnforcedWhenReceiving() throws Exception
{ {
final CountDownLatch timeoutLatch = new CountDownLatch(1); final CountDownLatch timeoutLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter() start(new ServerSessionListener.Adapter()
@ -468,10 +476,11 @@ public class IdleTimeoutTest extends AbstractTest
{ {
return InvocationType.NON_BLOCKING; return InvocationType.NON_BLOCKING;
} }
@Override @Override
public void succeeded() public void succeeded()
{ {
// Idle timeout should not fire while receiving. // Idle timeout should not fire while the server is receiving.
Assert.assertEquals(1, timeoutLatch.getCount()); Assert.assertEquals(1, timeoutLatch.getCount());
dataLatch.countDown(); dataLatch.countDown();
} }
@ -485,7 +494,7 @@ public class IdleTimeoutTest extends AbstractTest
} }
@Test @Test
public void testStreamIdleTimeoutIsNotEnforcedWhenSending() throws Exception public void testClientStreamIdleTimeoutIsNotEnforcedWhenSending() throws Exception
{ {
final CountDownLatch resetLatch = new CountDownLatch(1); final CountDownLatch resetLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter() start(new ServerSessionListener.Adapter()
@ -589,6 +598,83 @@ public class IdleTimeoutTest extends AbstractTest
Assert.assertTrue(latch.await(2 * (contentLength / bufferSize + 1) * delay, TimeUnit.MILLISECONDS)); Assert.assertTrue(latch.await(2 * (contentLength / bufferSize + 1) * delay, TimeUnit.MILLISECONDS));
} }
@Test
public void testServerIdleTimeoutIsEnforcedForQueuedRequest() throws Exception
{
long idleTimeout = 2000;
// Use a small thread pool to cause request queueing.
QueuedThreadPool serverExecutor = new QueuedThreadPool(4);
serverExecutor.setName("server");
server = new Server(serverExecutor);
HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(new HttpConfiguration());
h2.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
h2.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
h2.setStreamIdleTimeout(idleTimeout);
connector = new ServerConnector(server, 1, 1, h2);
connector.setIdleTimeout(10 * idleTimeout);
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(server, "/", true, false);
AtomicReference<CountDownLatch> phaser = new AtomicReference<>();
context.addServlet(new ServletHolder(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
phaser.get().countDown();
// Hold the dispatched requests enough for the idle requests to idle timeout.
sleep(2 * idleTimeout);
}
}), servletPath + "/*");
server.start();
prepareClient();
client.start();
Session client = newClient(new Session.Listener.Adapter());
// Send requests until one is queued on the server but not dispatched.
while (true)
{
phaser.set(new CountDownLatch(1));
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(request, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, new Stream.Listener.Adapter());
Stream stream = promise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(10);
stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP);
if (!phaser.get().await(1, TimeUnit.SECONDS))
break;
}
// Send one more request to consume the whole session flow control window.
CountDownLatch resetLatch = new CountDownLatch(1);
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(request, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(((ISession)client).updateSendWindow(0));
stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP);
Assert.assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Wait for WINDOW_UPDATEs to be processed by the client.
sleep(1000);
Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0));
}
private void sleep(long value) private void sleep(long value)
{ {
try try

View File

@ -24,7 +24,9 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque; import java.util.Deque;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -39,6 +41,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.ErrorCode;
@ -51,13 +54,22 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpOutput; import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -242,7 +254,7 @@ public class StreamResetTest extends AbstractTest
response.setStatus(200); response.setStatus(200);
response.setContentType("text/plain;charset=" + charset.name()); response.setContentType("text/plain;charset=" + charset.name());
response.setContentLength(data.length*10); response.setContentLength(data.length * 10);
response.flushBuffer(); response.flushBuffer();
try try
@ -259,7 +271,7 @@ public class StreamResetTest extends AbstractTest
{ {
// Write some content after the stream has // Write some content after the stream has
// been reset, it should throw an exception. // been reset, it should throw an exception.
for (int i=0;i<10;i++) for (int i = 0; i < 10; i++)
{ {
Thread.sleep(500); Thread.sleep(500);
response.getOutputStream().write(data); response.getOutputStream().write(data);
@ -407,6 +419,106 @@ public class StreamResetTest extends AbstractTest
Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0)); Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0));
} }
@Test
public void testClientResetConsumesQueuedRequestWithData() throws Exception
{
// Use a small thread pool.
QueuedThreadPool serverExecutor = new QueuedThreadPool(4);
serverExecutor.setName("server");
serverExecutor.setDetailedDump(true);
server = new Server(serverExecutor);
HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(new HttpConfiguration());
h2.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
h2.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
connector = new ServerConnector(server, 1, 1, h2);
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(server, "/");
AtomicReference<CountDownLatch> phaser = new AtomicReference<>();
context.addServlet(new ServletHolder(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
Log.getLogger(StreamResetTest.class).info("SIMON: uri={}", request.getRequestURI());
phaser.get().countDown();
IO.copy(request.getInputStream(), response.getOutputStream());
}
}), servletPath + "/*");
server.start();
prepareClient();
client.start();
Session client = newClient(new Session.Listener.Adapter());
// Send requests until one is queued on the server but not dispatched.
AtomicReference<CountDownLatch> latch = new AtomicReference<>();
List<Stream> streams = new ArrayList<>();
while (true)
{
phaser.set(new CountDownLatch(1));
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(request, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
Log.getLogger(StreamResetTest.class).info("SIMON: response={}/{}", stream.getId(), frame.getMetaData());
MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (response.getStatus() == HttpStatus.OK_200)
latch.get().countDown();
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
Log.getLogger(StreamResetTest.class).info("SIMON: data={}/{}", stream.getId(), frame);
callback.succeeded();
if (frame.isEndStream())
latch.get().countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
streams.add(stream);
ByteBuffer data = ByteBuffer.allocate(10);
stream.data(new DataFrame(stream.getId(), data, false), Callback.NOOP);
if (!phaser.get().await(1, TimeUnit.SECONDS))
break;
}
// Send one more request to consume the whole session flow control window, then reset it.
MetaData.Request request = newRequest("GET", "/x", new HttpFields());
HeadersFrame frame = new HeadersFrame(request, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
// This request will get no event from the server since it's reset by the client.
client.newStream(frame, promise, new Stream.Listener.Adapter());
Stream stream = promise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(((ISession)client).updateSendWindow(0));
stream.data(new DataFrame(stream.getId(), data, false), new Callback()
{
@Override
public void succeeded()
{
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), NOOP);
}
});
// Wait for WINDOW_UPDATEs to be processed by the client.
Thread.sleep(1000);
Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0));
latch.set(new CountDownLatch(2 * streams.size()));
// Complete all streams.
streams.forEach(s -> s.data(new DataFrame(s.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP));
Assert.assertTrue(latch.get().await(5, TimeUnit.SECONDS));
}
@Test @Test
public void testServerExceptionConsumesQueuedData() throws Exception public void testServerExceptionConsumesQueuedData() throws Exception
{ {

View File

@ -49,7 +49,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxHeaderBlockFragment = 0; private int maxHeaderBlockFragment = 0;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F); private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private long streamIdleTimeout; private long streamIdleTimeout;
private int reservedThreads = -1; private int reservedThreads;
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration) public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{ {
@ -154,7 +154,8 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
public void setReservedThreads(int threads) public void setReservedThreads(int threads)
{ {
this.reservedThreads = threads; // TODO: currently disabled since the only value that works is 0.
// this.reservedThreads = threads;
} }
public HttpConfiguration getHttpConfiguration() public HttpConfiguration getHttpConfiguration()
@ -193,7 +194,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
try try
{ {
executor = new ReservedThreadExecutor(connector.getExecutor(),getReservedThreads()); executor = new ReservedThreadExecutor(connector.getExecutor(), getReservedThreads());
executor.start(); executor.start();
connector.addBean(executor,true); connector.addBean(executor,true);
} }

View File

@ -25,7 +25,6 @@ import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.BadMessageException;
@ -223,7 +222,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
{ {
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
if (channel != null) if (channel != null)
result &= !channel.isRequestHandled(); result &= !channel.isRequestExecuting();
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure); LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure);

View File

@ -55,7 +55,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
private boolean _expect100Continue; private boolean _expect100Continue;
private boolean _delayedUntilContent; private boolean _delayedUntilContent;
private boolean _handled;
public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport) public HttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport)
{ {
@ -123,7 +122,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
_delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() && _delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() &&
!endStream && !_expect100Continue; !endStream && !_expect100Continue;
_handled = !_delayedUntilContent;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {
@ -192,7 +190,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
{ {
_expect100Continue = false; _expect100Continue = false;
_delayedUntilContent = false; _delayedUntilContent = false;
_handled = false;
super.recycle(); super.recycle();
getHttpTransport().recycle(); getHttpTransport().recycle();
} }
@ -279,8 +276,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
boolean wasDelayed = _delayedUntilContent; boolean wasDelayed = _delayedUntilContent;
_delayedUntilContent = false; _delayedUntilContent = false;
if (wasDelayed)
_handled = true;
return handle || wasDelayed ? this : null; return handle || wasDelayed ? this : null;
} }
@ -302,35 +297,31 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
boolean wasDelayed = _delayedUntilContent; boolean wasDelayed = _delayedUntilContent;
_delayedUntilContent = false; _delayedUntilContent = false;
if (wasDelayed)
_handled = true;
return handle || wasDelayed ? this : null; return handle || wasDelayed ? this : null;
} }
public boolean isRequestHandled() public boolean isRequestExecuting()
{ {
return _handled; return !getState().isIdle();
} }
public boolean onStreamTimeout(Throwable failure) public boolean onStreamTimeout(Throwable failure)
{ {
if (!_handled) getHttpTransport().onStreamTimeout(failure);
return true; if (getRequest().getHttpInput().onIdleTimeout(failure))
HttpInput input = getRequest().getHttpInput();
boolean readFailed = input.failed(failure);
if (readFailed)
handle(); handle();
boolean writeFailed = getHttpTransport().onStreamTimeout(failure); if (isRequestExecuting())
return false;
return readFailed || writeFailed; consumeInput();
return true;
} }
public void onFailure(Throwable failure) public void onFailure(Throwable failure)
{ {
getHttpTransport().onStreamFailure(failure); getHttpTransport().onStreamFailure(failure);
if (onEarlyEOF()) if (getRequest().getHttpInput().failed(failure))
{ {
ContextHandler handler = getState().getContextHandler(); ContextHandler handler = getState().getContextHandler();
if (handler != null) if (handler != null)
@ -342,6 +333,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
{ {
getState().asyncError(failure); getState().asyncError(failure);
} }
consumeInput();
} }
protected void consumeInput() protected void consumeInput()

View File

@ -99,7 +99,11 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
_requestLog = connector == null ? null : connector.getServer().getRequestLog(); _requestLog = connector == null ? null : connector.getServer().getRequestLog();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("new {} -> {},{},{}",this,_endPoint,_endPoint.getConnection(),_state); LOG.debug("new {} -> {},{},{}",
this,
_endPoint,
_endPoint==null?null:_endPoint.getConnection(),
_state);
} }
protected HttpInput newHttpInput(HttpChannelState state) protected HttpInput newHttpInput(HttpChannelState state)
@ -258,10 +262,19 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
_oldIdleTimeout=0; _oldIdleTimeout=0;
} }
public void asyncReadFillInterested() public void onAsyncWaitForContent()
{ {
} }
public void onBlockWaitForContent()
{
}
public void onBlockWaitForContentFailure(Throwable failure)
{
getRequest().getHttpInput().failed(failure);
}
@Override @Override
public void run() public void run()
{ {
@ -433,7 +446,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
if (hasContent && !_response.isContentComplete(_response.getHttpOutput().getWritten())) if (hasContent && !_response.isContentComplete(_response.getHttpOutput().getWritten()))
{ {
if (isCommitted()) if (isCommitted())
_transport.abort(new IOException("insufficient content written")); abort(new IOException("insufficient content written"));
else else
_response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500,"insufficient content written"); _response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500,"insufficient content written");
} }
@ -546,7 +559,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
catch (Throwable x) catch (Throwable x)
{ {
failure.addSuppressed(x); failure.addSuppressed(x);
_transport.abort(failure); abort(failure);
} }
} }
@ -843,14 +856,14 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
@Override @Override
public void failed(Throwable th) public void failed(Throwable th)
{ {
_transport.abort(x); abort(x);
super.failed(x); super.failed(x);
} }
}); });
} }
else else
{ {
_transport.abort(x); abort(x);
super.failed(x); super.failed(x);
} }
} }

View File

@ -249,11 +249,23 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
return handle; return handle;
} }
public void asyncReadFillInterested() public void onAsyncWaitForContent()
{ {
_httpConnection.asyncReadFillInterested(); _httpConnection.asyncReadFillInterested();
} }
@Override
public void onBlockWaitForContent()
{
_httpConnection.blockingReadFillInterested();
}
@Override
public void onBlockWaitForContentFailure(Throwable failure)
{
_httpConnection.blockingReadFailure(failure);
}
@Override @Override
public void badMessage(int status, String reason) public void badMessage(int status, String reason)
{ {

View File

@ -382,7 +382,7 @@ public class HttpChannelState
/** /**
* Signal that the HttpConnection has finished handling the request. * Signal that the HttpConnection has finished handling the request.
* For blocking connectors,this call may block if the request has * For blocking connectors, this call may block if the request has
* been suspended (startAsync called). * been suspended (startAsync called).
* @return next actions * @return next actions
* be handled again (eg because of a resume that happened before unhandle was called) * be handled again (eg because of a resume that happened before unhandle was called)
@ -498,7 +498,7 @@ public class HttpChannelState
finally finally
{ {
if (read_interested) if (read_interested)
_channel.asyncReadFillInterested(); _channel.onAsyncWaitForContent();
} }
} }
@ -1129,8 +1129,8 @@ public class HttpChannelState
/** /**
* Called to signal async read isReady() has returned false. * Called to signal async read isReady() has returned false.
* This indicates that there is no content available to be consumed * This indicates that there is no content available to be consumed
* and that once the channel enteres the ASYNC_WAIT state it will * and that once the channel enters the ASYNC_WAIT state it will
* register for read interest by calling {@link HttpChannel#asyncReadFillInterested()} * register for read interest by calling {@link HttpChannel#onAsyncWaitForContent()}
* either from this method or from a subsequent call to {@link #unhandle()}. * either from this method or from a subsequent call to {@link #unhandle()}.
*/ */
public void onReadUnready() public void onReadUnready()
@ -1165,7 +1165,7 @@ public class HttpChannelState
} }
if (interested) if (interested)
_channel.asyncReadFillInterested(); _channel.onAsyncWaitForContent();
} }
/** /**

View File

@ -560,7 +560,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
getEndPoint().fillInterested(_blockingReadCallback); getEndPoint().fillInterested(_blockingReadCallback);
} }
public void blockingReadException(Throwable e) public void blockingReadFailure(Throwable e)
{ {
_blockingReadCallback.failed(e); _blockingReadCallback.failed(e);
} }

View File

@ -18,8 +18,8 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Deque; import java.util.Deque;
@ -133,6 +133,7 @@ public class HttpInput extends ServletInputStream implements Runnable
private long _contentArrived; private long _contentArrived;
private long _contentConsumed; private long _contentConsumed;
private long _blockUntil; private long _blockUntil;
private boolean _waitingForContent;
private Interceptor _interceptor; private Interceptor _interceptor;
public HttpInput(HttpChannelState state) public HttpInput(HttpChannelState state)
@ -518,70 +519,47 @@ public class HttpInput extends ServletInputStream implements Runnable
/** /**
* Blocks until some content or some end-of-file event arrives. * Blocks until some content or some end-of-file event arrives.
* *
* @throws IOException * @throws IOException if the wait is interrupted
* if the wait is interrupted
*/ */
protected void blockForContent() throws IOException protected void blockForContent() throws IOException
{ {
try try
{ {
_waitingForContent = true;
_channelState.getHttpChannel().onBlockWaitForContent();
boolean loop = false;
long timeout = 0; long timeout = 0;
if (_blockUntil != 0) while (true)
{ {
timeout = TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()); if (_blockUntil != 0)
if (timeout <= 0) {
throw new TimeoutException(); timeout = TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime());
if (timeout <= 0)
throw new TimeoutException(String.format("Blocking timeout %d ms", getBlockingTimeout()));
}
// This method is called from a loop, so we just
// need to check the timeout before and after waiting.
if (loop)
break;
if (LOG.isDebugEnabled())
LOG.debug("{} blocking for content timeout={}", this, timeout);
if (timeout > 0)
_inputQ.wait(timeout);
else
_inputQ.wait();
loop = true;
} }
if (LOG.isDebugEnabled())
LOG.debug("{} blocking for content timeout={}",this,timeout);
if (timeout > 0)
_inputQ.wait(timeout);
else
_inputQ.wait();
// TODO: cannot return unless there is content or timeout,
// TODO: so spurious wakeups are not handled correctly.
if (_blockUntil != 0 && TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()) <= 0)
throw new TimeoutException(String.format("Blocking timeout %d ms",getBlockingTimeout()));
} }
catch (Throwable e) catch (Throwable x)
{ {
throw (IOException)new InterruptedIOException().initCause(e); _channelState.getHttpChannel().onBlockWaitForContentFailure(x);
} }
} }
/**
* Adds some content to the start of this input stream.
* <p>
* Typically used to push back content that has been read, perhaps mutated. The bytes prepended are deducted for the contentConsumed total
* </p>
*
* @param item
* the content to add
* @return true if content channel woken for read
*/
public boolean prependContent(Content item)
{
boolean woken = false;
synchronized (_inputQ)
{
if (_content != null)
_inputQ.push(_content);
_content = item;
_contentConsumed -= item.remaining();
if (LOG.isDebugEnabled())
LOG.debug("{} prependContent {}",this,item);
if (_listener == null)
_inputQ.notify();
else
woken = _channelState.onContentAdded();
}
return woken;
}
/** /**
* Adds some content to this input stream. * Adds some content to this input stream.
* *
@ -591,31 +569,36 @@ public class HttpInput extends ServletInputStream implements Runnable
*/ */
public boolean addContent(Content content) public boolean addContent(Content content)
{ {
boolean woken = false;
synchronized (_inputQ) synchronized (_inputQ)
{ {
_waitingForContent = false;
if (_firstByteTimeStamp == -1) if (_firstByteTimeStamp == -1)
_firstByteTimeStamp = System.nanoTime(); _firstByteTimeStamp = System.nanoTime();
_contentArrived += content.remaining(); if (isFinished())
if (_content==null && _inputQ.isEmpty())
_content=content;
else
_inputQ.offer(content);
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}",this,content);
if (nextInterceptedContent()!=null)
{ {
if (_listener == null) Throwable failure = isError() ? _state.getError() : new EOFException("Content after EOF");
_inputQ.notify(); content.failed(failure);
return false;
}
else
{
_contentArrived += content.remaining();
if (_content==null && _inputQ.isEmpty())
_content=content;
else else
woken = _channelState.onContentAdded(); _inputQ.offer(content);
if (LOG.isDebugEnabled())
LOG.debug("{} addContent {}",this,content);
if (nextInterceptedContent()!=null)
return wakeup();
else
return false;
} }
} }
return woken;
} }
public boolean hasContent() public boolean hasContent()
@ -670,13 +653,13 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
try try
{ {
while (!isFinished()) while (true)
{ {
Content item = nextContent(); Content item = nextContent();
if (item == null) if (item == null)
break; // Let's not bother blocking break; // Let's not bother blocking
skip(item,item.remaining()); skip(item, item.remaining());
} }
return isFinished() && !isError(); return isFinished() && !isError();
} }
@ -713,14 +696,6 @@ public class HttpInput extends ServletInputStream implements Runnable
} }
} }
public boolean isAsyncEOF()
{
synchronized (_inputQ)
{
return _state == AEOF;
}
}
@Override @Override
public boolean isReady() public boolean isReady()
{ {
@ -734,8 +709,8 @@ public class HttpInput extends ServletInputStream implements Runnable
return true; return true;
if (produceNextContext() != null) if (produceNextContext() != null)
return true; return true;
_channelState.onReadUnready(); _channelState.onReadUnready();
_waitingForContent = true;
} }
return false; return false;
} }
@ -775,6 +750,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
_state = ASYNC; _state = ASYNC;
_channelState.onReadUnready(); _channelState.onReadUnready();
_waitingForContent = true;
} }
} }
} }
@ -787,18 +763,36 @@ public class HttpInput extends ServletInputStream implements Runnable
wake(); wake();
} }
public boolean failed(Throwable x) public boolean onIdleTimeout(Throwable x)
{ {
boolean woken = false;
synchronized (_inputQ) synchronized (_inputQ)
{ {
if (_state instanceof ErrorState) if (_waitingForContent && !isError())
{ {
// Log both the original and current failure x.addSuppressed(new Throwable("HttpInput idle timeout"));
// without modifying the original failure. _state = new ErrorState(x);
Throwable failure = new Throwable(((ErrorState)_state).getError()); return wakeup();
failure.addSuppressed(x); }
LOG.warn(failure); return false;
}
}
public boolean failed(Throwable x)
{
synchronized (_inputQ)
{
// Errors may be reported multiple times, for example
// a local idle timeout and a remote I/O failure.
if (isError())
{
if (LOG.isDebugEnabled())
{
// Log both the original and current failure
// without modifying the original failure.
Throwable failure = new Throwable(_state.getError());
failure.addSuppressed(x);
LOG.debug(failure);
}
} }
else else
{ {
@ -807,14 +801,16 @@ public class HttpInput extends ServletInputStream implements Runnable
x.addSuppressed(new Throwable("HttpInput failure")); x.addSuppressed(new Throwable("HttpInput failure"));
_state = new ErrorState(x); _state = new ErrorState(x);
} }
return wakeup();
if (_listener == null)
_inputQ.notify();
else
woken = _channelState.onContentAdded();
} }
}
return woken; private boolean wakeup()
{
if (_listener != null)
return _channelState.onContentAdded();
_inputQ.notify();
return false;
} }
/* /*
@ -1133,5 +1129,4 @@ public class HttpInput extends ServletInputStream implements Runnable
return "AEOF"; return "AEOF";
} }
}; };
} }

View File

@ -32,18 +32,4 @@ public class HttpInputOverHTTP extends HttpInput
{ {
((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).fillAndParseForContent(); ((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).fillAndParseForContent();
} }
@Override
protected void blockForContent() throws IOException
{
((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).blockingReadFillInterested();
try
{
super.blockForContent();
}
catch(Throwable e)
{
((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).blockingReadException(e);
}
}
} }

View File

@ -18,14 +18,6 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import static org.eclipse.jetty.server.HttpInput.EARLY_EOF_CONTENT;
import static org.eclipse.jetty.server.HttpInput.EOF_CONTENT;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Queue; import java.util.Queue;
@ -45,6 +37,14 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import static org.eclipse.jetty.server.HttpInput.EARLY_EOF_CONTENT;
import static org.eclipse.jetty.server.HttpInput.EOF_CONTENT;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/** /**
* this tests HttpInput and its interaction with HttpChannelState * this tests HttpInput and its interaction with HttpChannelState
@ -104,10 +104,11 @@ public class HttpInputAsyncStateTest
_in = new HttpInput(new HttpChannelState(new HttpChannel(null, new HttpConfiguration(), null, null) _in = new HttpInput(new HttpChannelState(new HttpChannel(null, new HttpConfiguration(), null, null)
{ {
@Override @Override
public void asyncReadFillInterested() public void onAsyncWaitForContent()
{ {
__history.add("asyncReadFillInterested"); __history.add("onAsyncWaitForContent");
} }
@Override @Override
public Scheduler getScheduler() public Scheduler getScheduler()
{ {
@ -317,7 +318,7 @@ public class HttpInputAsyncStateTest
check("onReadUnready"); check("onReadUnready");
}); });
check("asyncReadFillInterested"); check("onAsyncWaitForContent");
deliver(EOF_CONTENT); deliver(EOF_CONTENT);
check("onReadPossible true"); check("onReadPossible true");
@ -355,7 +356,7 @@ public class HttpInputAsyncStateTest
}); });
_in.setReadListener(_listener); _in.setReadListener(_listener);
check("asyncReadFillInterested","onReadUnready"); check("onAsyncWaitForContent","onReadUnready");
deliver(EOF_CONTENT); deliver(EOF_CONTENT);
check("onReadPossible true"); check("onReadPossible true");
@ -409,7 +410,7 @@ public class HttpInputAsyncStateTest
check("onReadUnready"); check("onReadUnready");
}); });
check("asyncReadFillInterested"); check("onAsyncWaitForContent");
deliver(EARLY_EOF_CONTENT); deliver(EARLY_EOF_CONTENT);
check("onReadPossible true"); check("onReadPossible true");
@ -447,7 +448,7 @@ public class HttpInputAsyncStateTest
}); });
_in.setReadListener(_listener); _in.setReadListener(_listener);
check("asyncReadFillInterested","onReadUnready"); check("onAsyncWaitForContent","onReadUnready");
deliver(EARLY_EOF_CONTENT); deliver(EARLY_EOF_CONTENT);
check("onReadPossible true"); check("onReadPossible true");
@ -503,7 +504,7 @@ public class HttpInputAsyncStateTest
check("onReadUnready"); check("onReadUnready");
}); });
check("asyncReadFillInterested"); check("onAsyncWaitForContent");
deliver(new TContent("Hello"),EOF_CONTENT); deliver(new TContent("Hello"),EOF_CONTENT);
check("onReadPossible true","onReadPossible false"); check("onReadPossible true","onReadPossible false");
@ -541,7 +542,7 @@ public class HttpInputAsyncStateTest
}); });
_in.setReadListener(_listener); _in.setReadListener(_listener);
check("asyncReadFillInterested","onReadUnready"); check("onAsyncWaitForContent","onReadUnready");
deliver(new TContent("Hello"),EOF_CONTENT); deliver(new TContent("Hello"),EOF_CONTENT);
check("onReadPossible true","onReadPossible false"); check("onReadPossible true","onReadPossible false");
@ -604,7 +605,7 @@ public class HttpInputAsyncStateTest
check("onReadUnready"); check("onReadUnready");
}); });
check("asyncReadFillInterested"); check("onAsyncWaitForContent");
deliver(new TContent("Hello"),EARLY_EOF_CONTENT); deliver(new TContent("Hello"),EARLY_EOF_CONTENT);
check("onReadPossible true","onReadPossible false"); check("onReadPossible true","onReadPossible false");
@ -650,7 +651,7 @@ public class HttpInputAsyncStateTest
}); });
_in.setReadListener(_listener); _in.setReadListener(_listener);
check("asyncReadFillInterested","onReadUnready"); check("onAsyncWaitForContent","onReadUnready");
deliver(new TContent("Hello"),EARLY_EOF_CONTENT); deliver(new TContent("Hello"),EARLY_EOF_CONTENT);
check("onReadPossible true","onReadPossible false"); check("onReadPossible true","onReadPossible false");
@ -675,7 +676,7 @@ public class HttpInputAsyncStateTest
check("onReadUnready"); check("onReadUnready");
}); });
check("asyncReadFillInterested"); check("onAsyncWaitForContent");
deliver(new TContent("Hello"),EOF_CONTENT); deliver(new TContent("Hello"),EOF_CONTENT);
check("onReadPossible true","onReadPossible false"); check("onReadPossible true","onReadPossible false");
@ -700,7 +701,7 @@ public class HttpInputAsyncStateTest
check("onReadUnready"); check("onReadUnready");
}); });
check("asyncReadFillInterested"); check("onAsyncWaitForContent");
deliver(new TContent("Hello"),EOF_CONTENT); deliver(new TContent("Hello"),EOF_CONTENT);
check("onReadPossible true","onReadPossible false"); check("onReadPossible true","onReadPossible false");
@ -735,7 +736,7 @@ public class HttpInputAsyncStateTest
check("onReadUnready"); check("onReadUnready");
}); });
check("asyncReadFillInterested"); check("onAsyncWaitForContent");
deliver(new TContent("Hello"),EOF_CONTENT); deliver(new TContent("Hello"),EOF_CONTENT);
check("onReadPossible true","onReadPossible false"); check("onReadPossible true","onReadPossible false");

View File

@ -93,9 +93,9 @@ public class HttpInputTest
_in = new HttpInput(new HttpChannelState(new HttpChannel(null, new HttpConfiguration(), null, null) _in = new HttpInput(new HttpChannelState(new HttpChannel(null, new HttpConfiguration(), null, null)
{ {
@Override @Override
public void asyncReadFillInterested() public void onAsyncWaitForContent()
{ {
_history.add("asyncReadFillInterested"); _history.add("asyncReadInterested");
} }
}) })
{ {
@ -213,82 +213,21 @@ public class HttpInputTest
Assert.assertThat(_history.poll(), Matchers.nullValue()); Assert.assertThat(_history.poll(), Matchers.nullValue());
} }
@Test
public void testReRead() throws Exception
{
_in.addContent(new TContent("AB"));
_in.addContent(new TContent("CD"));
_fillAndParseSimulate.offer("EF");
_fillAndParseSimulate.offer("GH");
Assert.assertThat(_in.available(), Matchers.equalTo(2));
Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(0L));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'A'));
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(1L));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'B'));
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(2L));
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded AB"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.read(), Matchers.equalTo((int)'C'));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'D'));
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded CD"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.read(), Matchers.equalTo((int)'E'));
_in.prependContent(new HttpInput.Content(BufferUtil.toBuffer("abcde")));
Assert.assertThat(_in.available(), Matchers.equalTo(5));
Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(0L));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'a'));
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(1L));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'b'));
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(2L));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'c'));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'d'));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'e'));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'F'));
Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 2"));
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded EF"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.read(), Matchers.equalTo((int)'G'));
Assert.assertThat(_in.read(), Matchers.equalTo((int)'H'));
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded GH"));
Assert.assertThat(_history.poll(), Matchers.nullValue());
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(8L));
Assert.assertThat(_history.poll(), Matchers.nullValue());
}
@Test @Test
public void testBlockingRead() throws Exception public void testBlockingRead() throws Exception
{ {
new Thread() new Thread(() ->
{ {
public void run() try
{ {
try Thread.sleep(500);
{ _in.addContent(new TContent("AB"));
Thread.sleep(500);
_in.addContent(new TContent("AB"));
}
catch (Throwable th)
{
th.printStackTrace();
}
} }
}.start(); catch (Throwable th)
{
th.printStackTrace();
}
}).start();
Assert.assertThat(_in.read(), Matchers.equalTo((int)'A')); Assert.assertThat(_in.read(), Matchers.equalTo((int)'A'));
@ -367,21 +306,18 @@ public class HttpInputTest
@Test @Test
public void testBlockingEOF() throws Exception public void testBlockingEOF() throws Exception
{ {
new Thread() new Thread(() ->
{ {
public void run() try
{ {
try Thread.sleep(500);
{ _in.eof();
Thread.sleep(500);
_in.eof();
}
catch (Throwable th)
{
th.printStackTrace();
}
} }
}.start(); catch (Throwable th)
{
th.printStackTrace();
}
}).start();
Assert.assertThat(_in.isFinished(), Matchers.equalTo(false)); Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
Assert.assertThat(_in.read(), Matchers.equalTo(-1)); Assert.assertThat(_in.read(), Matchers.equalTo(-1));

View File

@ -19,7 +19,9 @@
package org.eclipse.jetty.http.client; package org.eclipse.jetty.http.client;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -32,9 +34,12 @@ import javax.servlet.ServletException;
import javax.servlet.ServletInputStream; import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream; import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener; import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
@ -43,6 +48,7 @@ import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.util.log.StacklessLogging;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -517,7 +523,6 @@ public class ServerTimeoutsTest extends AbstractTest
throw x; throw x;
} }
} }
}); });
DeferredContentProvider contentProvider = new DeferredContentProvider(); DeferredContentProvider contentProvider = new DeferredContentProvider();
@ -678,6 +683,59 @@ public class ServerTimeoutsTest extends AbstractTest
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
} }
@Test
public void testIdleTimeoutBeforeReadIsIgnored() throws Exception
{
long idleTimeout = 1000;
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
try
{
Thread.sleep(2 * idleTimeout);
IO.copy(request.getInputStream(), response.getOutputStream());
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
});
setServerIdleTimeout(idleTimeout);
byte[] data = new byte[1024];
new Random().nextBytes(data);
byte[] data1 = new byte[data.length / 2];
System.arraycopy(data, 0, data1, 0, data1.length);
byte[] data2 = new byte[data.length - data1.length];
System.arraycopy(data, data1.length, data2, 0, data2.length);
DeferredContentProvider content = new DeferredContentProvider(ByteBuffer.wrap(data1));
CountDownLatch latch = new CountDownLatch(1);
client.newRequest(newURI())
.path(servletPath)
.content(content)
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isSucceeded());
Assert.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
Assert.assertArrayEquals(data, getContent());
latch.countDown();
}
});
// Wait for the server application to block reading.
Thread.sleep(3 * idleTimeout);
content.offer(ByteBuffer.wrap(data2));
content.close();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
private static class BlockingReadHandler extends AbstractHandler.ErrorDispatchHandler private static class BlockingReadHandler extends AbstractHandler.ErrorDispatchHandler
{ {
private final CountDownLatch handlerLatch; private final CountDownLatch handlerLatch;