Fixes #1719 - Improve handling of HTTP/2 queued requests.
Idle timeout have a special meaning in that they become a no-operation if the application is dispatched but idle (neither reading nor writing). HttpChannelOverHTTP2 now forwards the idle timeout to HttpInput, which will only change its state if it is interested in reading. HttpInput.consumeAll() has been modified to consume all input even if it's already failed. Failures caused by the other peer (e.g. I/O failures or stream resets) are now retained and will eagerly consumed any queued data to free up the flow control windows.
This commit is contained in:
parent
93e8af1db5
commit
d3d02f227e
|
@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletInputStream;
|
||||
|
@ -36,6 +37,7 @@ import org.eclipse.jetty.http.HttpVersion;
|
|||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http2.FlowControlStrategy;
|
||||
import org.eclipse.jetty.http2.HTTP2Session;
|
||||
import org.eclipse.jetty.http2.ISession;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||
|
@ -43,10 +45,17 @@ import org.eclipse.jetty.http2.frames.DataFrame;
|
|||
import org.eclipse.jetty.http2.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FuturePromise;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -421,7 +430,7 @@ public class IdleTimeoutTest extends AbstractTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testStreamIdleTimeoutIsNotEnforcedWhenReceiving() throws Exception
|
||||
public void testServerStreamIdleTimeoutIsNotEnforcedWhenReceiving() throws Exception
|
||||
{
|
||||
final CountDownLatch timeoutLatch = new CountDownLatch(1);
|
||||
start(new ServerSessionListener.Adapter()
|
||||
|
@ -465,7 +474,7 @@ public class IdleTimeoutTest extends AbstractTest
|
|||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
// Idle timeout should not fire while receiving.
|
||||
// Idle timeout should not fire while the server is receiving.
|
||||
Assert.assertEquals(1, timeoutLatch.getCount());
|
||||
dataLatch.countDown();
|
||||
}
|
||||
|
@ -479,7 +488,7 @@ public class IdleTimeoutTest extends AbstractTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testStreamIdleTimeoutIsNotEnforcedWhenSending() throws Exception
|
||||
public void testClientStreamIdleTimeoutIsNotEnforcedWhenSending() throws Exception
|
||||
{
|
||||
final CountDownLatch resetLatch = new CountDownLatch(1);
|
||||
start(new ServerSessionListener.Adapter()
|
||||
|
@ -575,6 +584,83 @@ public class IdleTimeoutTest extends AbstractTest
|
|||
Assert.assertTrue(latch.await(2 * (contentLength / bufferSize + 1) * delay, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerIdleTimeoutIsEnforcedForQueuedRequest() throws Exception
|
||||
{
|
||||
long idleTimeout = 2000;
|
||||
// Use a small thread pool to cause request queueing.
|
||||
QueuedThreadPool serverExecutor = new QueuedThreadPool(4);
|
||||
serverExecutor.setName("server");
|
||||
server = new Server(serverExecutor);
|
||||
HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(new HttpConfiguration());
|
||||
h2.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
||||
h2.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
||||
h2.setStreamIdleTimeout(idleTimeout);
|
||||
connector = new ServerConnector(server, 1, 1, h2);
|
||||
connector.setIdleTimeout(10 * idleTimeout);
|
||||
server.addConnector(connector);
|
||||
ServletContextHandler context = new ServletContextHandler(server, "/", true, false);
|
||||
AtomicReference<CountDownLatch> phaser = new AtomicReference<>();
|
||||
context.addServlet(new ServletHolder(new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
phaser.get().countDown();
|
||||
|
||||
// Hold the dispatched requests enough for the idle requests to idle timeout.
|
||||
sleep(2 * idleTimeout);
|
||||
}
|
||||
}), servletPath + "/*");
|
||||
server.start();
|
||||
|
||||
prepareClient();
|
||||
client.start();
|
||||
|
||||
Session client = newClient(new Session.Listener.Adapter());
|
||||
|
||||
// Send requests until one is queued on the server but not dispatched.
|
||||
while (true)
|
||||
{
|
||||
phaser.set(new CountDownLatch(1));
|
||||
|
||||
MetaData.Request request = newRequest("GET", new HttpFields());
|
||||
HeadersFrame frame = new HeadersFrame(request, null, false);
|
||||
FuturePromise<Stream> promise = new FuturePromise<>();
|
||||
client.newStream(frame, promise, new Stream.Listener.Adapter());
|
||||
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
||||
ByteBuffer data = ByteBuffer.allocate(10);
|
||||
stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP);
|
||||
|
||||
if (!phaser.get().await(1, TimeUnit.SECONDS))
|
||||
break;
|
||||
}
|
||||
|
||||
// Send one more request to consume the whole session flow control window.
|
||||
CountDownLatch resetLatch = new CountDownLatch(1);
|
||||
MetaData.Request request = newRequest("GET", new HttpFields());
|
||||
HeadersFrame frame = new HeadersFrame(request, null, false);
|
||||
FuturePromise<Stream> promise = new FuturePromise<>();
|
||||
client.newStream(frame, promise, new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onReset(Stream stream, ResetFrame frame)
|
||||
{
|
||||
resetLatch.countDown();
|
||||
}
|
||||
});
|
||||
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
||||
ByteBuffer data = ByteBuffer.allocate(((ISession)client).updateSendWindow(0));
|
||||
stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP);
|
||||
|
||||
Assert.assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||
|
||||
// Wait for WINDOW_UPDATEs to be processed by the client.
|
||||
sleep(1000);
|
||||
|
||||
Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0));
|
||||
}
|
||||
|
||||
private void sleep(long value)
|
||||
{
|
||||
try
|
||||
|
|
|
@ -24,7 +24,9 @@ import java.nio.ByteBuffer;
|
|||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Deque;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -39,6 +41,7 @@ import javax.servlet.http.HttpServletRequest;
|
|||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http2.ErrorCode;
|
||||
|
@ -50,12 +53,22 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
|||
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.HttpOutput;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FutureCallback;
|
||||
import org.eclipse.jetty.util.FuturePromise;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -396,6 +409,105 @@ public class StreamResetTest extends AbstractTest
|
|||
Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientResetConsumesQueuedRequestWithData() throws Exception
|
||||
{
|
||||
// Use a small thread pool.
|
||||
QueuedThreadPool serverExecutor = new QueuedThreadPool(4);
|
||||
serverExecutor.setName("server");
|
||||
server = new Server(serverExecutor);
|
||||
HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(new HttpConfiguration());
|
||||
h2.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
||||
h2.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
|
||||
connector = new ServerConnector(server, 1, 1, h2);
|
||||
server.addConnector(connector);
|
||||
ServletContextHandler context = new ServletContextHandler(server, "/");
|
||||
AtomicReference<CountDownLatch> phaser = new AtomicReference<>();
|
||||
context.addServlet(new ServletHolder(new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
Log.getLogger(StreamResetTest.class).info("SIMON: uri={}", request.getRequestURI());
|
||||
phaser.get().countDown();
|
||||
IO.copy(request.getInputStream(), response.getOutputStream());
|
||||
}
|
||||
}), servletPath + "/*");
|
||||
server.start();
|
||||
|
||||
prepareClient();
|
||||
client.start();
|
||||
|
||||
Session client = newClient(new Session.Listener.Adapter());
|
||||
|
||||
// Send requests until one is queued on the server but not dispatched.
|
||||
AtomicReference<CountDownLatch> latch = new AtomicReference<>();
|
||||
List<Stream> streams = new ArrayList<>();
|
||||
while (true)
|
||||
{
|
||||
phaser.set(new CountDownLatch(1));
|
||||
|
||||
MetaData.Request request = newRequest("GET", new HttpFields());
|
||||
HeadersFrame frame = new HeadersFrame(request, null, false);
|
||||
FuturePromise<Stream> promise = new FuturePromise<>();
|
||||
client.newStream(frame, promise, new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
Log.getLogger(StreamResetTest.class).info("SIMON: response={}/{}", stream.getId(), frame.getMetaData());
|
||||
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||
if (response.getStatus() == HttpStatus.OK_200)
|
||||
latch.get().countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
Log.getLogger(StreamResetTest.class).info("SIMON: data={}/{}", stream.getId(), frame);
|
||||
callback.succeeded();
|
||||
if (frame.isEndStream())
|
||||
latch.get().countDown();
|
||||
}
|
||||
});
|
||||
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
||||
streams.add(stream);
|
||||
ByteBuffer data = ByteBuffer.allocate(10);
|
||||
stream.data(new DataFrame(stream.getId(), data, false), Callback.NOOP);
|
||||
|
||||
if (!phaser.get().await(1, TimeUnit.SECONDS))
|
||||
break;
|
||||
}
|
||||
|
||||
// Send one more request to consume the whole session flow control window, then reset it.
|
||||
MetaData.Request request = newRequest("GET", "/x", new HttpFields());
|
||||
HeadersFrame frame = new HeadersFrame(request, null, false);
|
||||
FuturePromise<Stream> promise = new FuturePromise<>();
|
||||
// This request will get no event from the server since it's reset by the client.
|
||||
client.newStream(frame, promise, new Stream.Listener.Adapter());
|
||||
Stream stream = promise.get(5, TimeUnit.SECONDS);
|
||||
ByteBuffer data = ByteBuffer.allocate(((ISession)client).updateSendWindow(0));
|
||||
stream.data(new DataFrame(stream.getId(), data, false), new Callback()
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), NOOP);
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for WINDOW_UPDATEs to be processed by the client.
|
||||
Thread.sleep(1000);
|
||||
|
||||
Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0));
|
||||
|
||||
latch.set(new CountDownLatch(2 * streams.size()));
|
||||
// Complete all streams.
|
||||
streams.forEach(s -> s.data(new DataFrame(s.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP));
|
||||
|
||||
Assert.assertTrue(latch.get().await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerExceptionConsumesQueuedData() throws Exception
|
||||
{
|
||||
|
|
|
@ -179,7 +179,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
|
|||
{
|
||||
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
|
||||
if (channel != null)
|
||||
result &= !channel.isRequestHandled();
|
||||
result &= !channel.isRequestExecuting();
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure);
|
||||
|
|
|
@ -277,33 +277,32 @@ public class HttpChannelOverHTTP2 extends HttpChannel
|
|||
return handle || wasDelayed ? this : null;
|
||||
}
|
||||
|
||||
public boolean isRequestHandled()
|
||||
public boolean isRequestExecuting()
|
||||
{
|
||||
return !getState().isIdle();
|
||||
}
|
||||
|
||||
public boolean onStreamTimeout(Throwable failure)
|
||||
{
|
||||
if (!isRequestHandled())
|
||||
return true;
|
||||
|
||||
HttpInput input = getRequest().getHttpInput();
|
||||
boolean readFailed = input.failed(failure);
|
||||
if (readFailed)
|
||||
getHttpTransport().onStreamTimeout(failure);
|
||||
if (getRequest().getHttpInput().onIdleTimeout(failure))
|
||||
handle();
|
||||
|
||||
boolean writeFailed = getHttpTransport().onStreamTimeout(failure);
|
||||
if (isRequestExecuting())
|
||||
return false;
|
||||
|
||||
return readFailed || writeFailed;
|
||||
consumeInput();
|
||||
return true;
|
||||
}
|
||||
|
||||
public void onFailure(Throwable failure)
|
||||
{
|
||||
getHttpTransport().onStreamFailure(failure);
|
||||
if (onEarlyEOF())
|
||||
if (getRequest().getHttpInput().failed(failure))
|
||||
handle();
|
||||
else
|
||||
getState().asyncError(failure);
|
||||
consumeInput();
|
||||
}
|
||||
|
||||
protected void consumeInput()
|
||||
|
|
|
@ -100,7 +100,11 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
|
|||
_requestLog = connector == null ? null : connector.getServer().getRequestLog();
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("new {} -> {},{},{}",this,_endPoint,_endPoint.getConnection(),_state);
|
||||
LOG.debug("new {} -> {},{},{}",
|
||||
this,
|
||||
_endPoint,
|
||||
_endPoint==null?null:_endPoint.getConnection(),
|
||||
_state);
|
||||
}
|
||||
|
||||
protected HttpInput newHttpInput(HttpChannelState state)
|
||||
|
@ -257,10 +261,19 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
|
|||
_written=0;
|
||||
}
|
||||
|
||||
public void asyncReadFillInterested()
|
||||
public void onAsyncWaitForContent()
|
||||
{
|
||||
}
|
||||
|
||||
public void onBlockWaitForContent()
|
||||
{
|
||||
}
|
||||
|
||||
public void onBlockWaitForContentFailure(Throwable failure)
|
||||
{
|
||||
getRequest().getHttpInput().failed(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
|
|
|
@ -239,11 +239,23 @@ public class HttpChannelOverHttp extends HttpChannel implements HttpParser.Reque
|
|||
return handle;
|
||||
}
|
||||
|
||||
public void asyncReadFillInterested()
|
||||
public void onAsyncWaitForContent()
|
||||
{
|
||||
_httpConnection.asyncReadFillInterested();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onBlockWaitForContent()
|
||||
{
|
||||
_httpConnection.blockingReadFillInterested();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onBlockWaitForContentFailure(Throwable failure)
|
||||
{
|
||||
_httpConnection.blockingReadFailure(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void badMessage(int status, String reason)
|
||||
{
|
||||
|
|
|
@ -479,7 +479,7 @@ public class HttpChannelState
|
|||
}
|
||||
|
||||
if (read_interested)
|
||||
_channel.asyncReadFillInterested();
|
||||
_channel.onAsyncWaitForContent();
|
||||
|
||||
return action;
|
||||
}
|
||||
|
@ -974,8 +974,8 @@ public class HttpChannelState
|
|||
/* ------------------------------------------------------------ */
|
||||
/** Called to signal async read isReady() has returned false.
|
||||
* This indicates that there is no content available to be consumed
|
||||
* and that once the channel enteres the ASYNC_WAIT state it will
|
||||
* register for read interest by calling {@link HttpChannel#asyncReadFillInterested()}
|
||||
* and that once the channel enters the ASYNC_WAIT state it will
|
||||
* register for read interest by calling {@link HttpChannel#onAsyncWaitForContent()}
|
||||
* either from this method or from a subsequent call to {@link #unhandle()}.
|
||||
*/
|
||||
public void onReadUnready()
|
||||
|
@ -998,7 +998,7 @@ public class HttpChannelState
|
|||
}
|
||||
|
||||
if (interested)
|
||||
_channel.asyncReadFillInterested();
|
||||
_channel.onAsyncWaitForContent();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
|
|
|
@ -555,7 +555,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
|||
getEndPoint().fillInterested(_blockingReadCallback);
|
||||
}
|
||||
|
||||
public void blockingReadException(Throwable e)
|
||||
public void blockingReadFailure(Throwable e)
|
||||
{
|
||||
_blockingReadCallback.failed(e);
|
||||
}
|
||||
|
|
|
@ -18,8 +18,8 @@
|
|||
|
||||
package org.eclipse.jetty.server;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
|
@ -63,6 +63,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
private long _contentArrived;
|
||||
private long _contentConsumed;
|
||||
private long _blockUntil;
|
||||
private boolean _waitingForContent;
|
||||
|
||||
public HttpInput(HttpChannelState state)
|
||||
{
|
||||
|
@ -382,61 +383,41 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
{
|
||||
try
|
||||
{
|
||||
_waitingForContent = true;
|
||||
_channelState.getHttpChannel().onBlockWaitForContent();
|
||||
|
||||
boolean loop = false;
|
||||
long timeout = 0;
|
||||
if (_blockUntil != 0)
|
||||
while (true)
|
||||
{
|
||||
timeout = TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime());
|
||||
if (timeout <= 0)
|
||||
throw new TimeoutException();
|
||||
if (_blockUntil != 0)
|
||||
{
|
||||
timeout = TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime());
|
||||
if (timeout <= 0)
|
||||
throw new TimeoutException(String.format("Blocking timeout %d ms", getBlockingTimeout()));
|
||||
}
|
||||
|
||||
// This method is called from a loop, so we just
|
||||
// need to check the timeout before and after waiting.
|
||||
if (loop)
|
||||
break;
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} blocking for content timeout={}", this, timeout);
|
||||
if (timeout > 0)
|
||||
_inputQ.wait(timeout);
|
||||
else
|
||||
_inputQ.wait();
|
||||
|
||||
loop = true;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} blocking for content timeout={}", this, timeout);
|
||||
if (timeout > 0)
|
||||
_inputQ.wait(timeout);
|
||||
else
|
||||
_inputQ.wait();
|
||||
|
||||
// TODO: cannot return unless there is content or timeout,
|
||||
// TODO: so spurious wakeups are not handled correctly.
|
||||
|
||||
if (_blockUntil != 0 && TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()) <= 0)
|
||||
throw new TimeoutException(String.format("Blocking timeout %d ms", getBlockingTimeout()));
|
||||
}
|
||||
catch (Throwable e)
|
||||
catch (Throwable x)
|
||||
{
|
||||
throw (IOException)new InterruptedIOException().initCause(e);
|
||||
_channelState.getHttpChannel().onBlockWaitForContentFailure(x);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds some content to the start of this input stream.
|
||||
* <p>Typically used to push back content that has
|
||||
* been read, perhaps mutated. The bytes prepended are
|
||||
* deducted for the contentConsumed total</p>
|
||||
*
|
||||
* @param item the content to add
|
||||
* @return true if content channel woken for read
|
||||
*/
|
||||
public boolean prependContent(Content item)
|
||||
{
|
||||
boolean woken = false;
|
||||
synchronized (_inputQ)
|
||||
{
|
||||
_inputQ.push(item);
|
||||
_contentConsumed -= item.remaining();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} prependContent {}", this, item);
|
||||
|
||||
if (_listener == null)
|
||||
_inputQ.notify();
|
||||
else
|
||||
woken = _channelState.onReadPossible();
|
||||
}
|
||||
|
||||
return woken;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds some content to this input stream.
|
||||
*
|
||||
|
@ -445,23 +426,26 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
*/
|
||||
public boolean addContent(Content item)
|
||||
{
|
||||
boolean woken = false;
|
||||
synchronized (_inputQ)
|
||||
{
|
||||
_waitingForContent = false;
|
||||
if (_firstByteTimeStamp == -1)
|
||||
_firstByteTimeStamp = System.nanoTime();
|
||||
_contentArrived += item.remaining();
|
||||
_inputQ.offer(item);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} addContent {}", this, item);
|
||||
|
||||
if (_listener == null)
|
||||
_inputQ.notify();
|
||||
if (isFinished())
|
||||
{
|
||||
Throwable failure = isError() ? ((ErrorState)_state).getError() : new EOFException("Content after EOF");
|
||||
item.failed(failure);
|
||||
return false;
|
||||
}
|
||||
else
|
||||
woken = _channelState.onReadPossible();
|
||||
{
|
||||
_contentArrived += item.remaining();
|
||||
_inputQ.offer(item);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} addContent {}", this, item);
|
||||
return wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
return woken;
|
||||
}
|
||||
|
||||
public boolean hasContent()
|
||||
|
@ -519,7 +503,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
{
|
||||
try
|
||||
{
|
||||
while (!isFinished())
|
||||
while (true)
|
||||
{
|
||||
Content item = nextContent();
|
||||
if (item == null)
|
||||
|
@ -575,8 +559,8 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
return true;
|
||||
if (nextReadable() != null)
|
||||
return true;
|
||||
|
||||
_channelState.onReadUnready();
|
||||
_waitingForContent = true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -606,9 +590,14 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
boolean content = nextContent() != null;
|
||||
|
||||
if (content)
|
||||
{
|
||||
woken = _channelState.onReadReady();
|
||||
}
|
||||
else
|
||||
{
|
||||
_channelState.onReadUnready();
|
||||
_waitingForContent = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
|
@ -620,18 +609,36 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
wake();
|
||||
}
|
||||
|
||||
public boolean failed(Throwable x)
|
||||
public boolean onIdleTimeout(Throwable x)
|
||||
{
|
||||
boolean woken = false;
|
||||
synchronized (_inputQ)
|
||||
{
|
||||
if (_state instanceof ErrorState)
|
||||
if (_waitingForContent && !isError())
|
||||
{
|
||||
// Log both the original and current failure
|
||||
// without modifying the original failure.
|
||||
Throwable failure = new Throwable(((ErrorState)_state).getError());
|
||||
failure.addSuppressed(x);
|
||||
LOG.warn(failure);
|
||||
x.addSuppressed(new Throwable("HttpInput idle timeout"));
|
||||
_state = new ErrorState(x);
|
||||
return wakeup();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean failed(Throwable x)
|
||||
{
|
||||
synchronized (_inputQ)
|
||||
{
|
||||
// Errors may be reported multiple times, for example
|
||||
// a local idle timeout and a remote I/O failure.
|
||||
if (isError())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
// Log both the original and current failure
|
||||
// without modifying the original failure.
|
||||
Throwable failure = new Throwable(((ErrorState)_state).getError());
|
||||
failure.addSuppressed(x);
|
||||
LOG.debug(failure);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -640,14 +647,16 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
x.addSuppressed(new Throwable("HttpInput failure"));
|
||||
_state = new ErrorState(x);
|
||||
}
|
||||
|
||||
if (_listener == null)
|
||||
_inputQ.notify();
|
||||
else
|
||||
woken = _channelState.onReadPossible();
|
||||
return wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
return woken;
|
||||
private boolean wakeup()
|
||||
{
|
||||
if (_listener != null)
|
||||
return _channelState.onReadPossible();
|
||||
_inputQ.notify();
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -32,18 +32,4 @@ public class HttpInputOverHTTP extends HttpInput
|
|||
{
|
||||
((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).fillAndParseForContent();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void blockForContent() throws IOException
|
||||
{
|
||||
((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).blockingReadFillInterested();
|
||||
try
|
||||
{
|
||||
super.blockForContent();
|
||||
}
|
||||
catch(Throwable e)
|
||||
{
|
||||
((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).blockingReadException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -90,9 +90,9 @@ public class HttpInputTest
|
|||
_in = new HttpInput(new HttpChannelState(new HttpChannel(null, new HttpConfiguration(), null, null)
|
||||
{
|
||||
@Override
|
||||
public void asyncReadFillInterested()
|
||||
public void onAsyncWaitForContent()
|
||||
{
|
||||
_history.add("asyncReadFillInterested");
|
||||
_history.add("asyncReadInterested");
|
||||
}
|
||||
})
|
||||
{
|
||||
|
@ -203,82 +203,21 @@ public class HttpInputTest
|
|||
Assert.assertThat(_history.poll(), Matchers.nullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReRead() throws Exception
|
||||
{
|
||||
_in.addContent(new TContent("AB"));
|
||||
_in.addContent(new TContent("CD"));
|
||||
_fillAndParseSimulate.offer("EF");
|
||||
_fillAndParseSimulate.offer("GH");
|
||||
Assert.assertThat(_in.available(), Matchers.equalTo(2));
|
||||
Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
|
||||
Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
|
||||
|
||||
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(0L));
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo((int)'A'));
|
||||
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(1L));
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo((int)'B'));
|
||||
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(2L));
|
||||
|
||||
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded AB"));
|
||||
Assert.assertThat(_history.poll(), Matchers.nullValue());
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo((int)'C'));
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo((int)'D'));
|
||||
|
||||
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded CD"));
|
||||
Assert.assertThat(_history.poll(), Matchers.nullValue());
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo((int)'E'));
|
||||
|
||||
_in.prependContent(new HttpInput.Content(BufferUtil.toBuffer("abcde")));
|
||||
|
||||
Assert.assertThat(_in.available(), Matchers.equalTo(5));
|
||||
Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
|
||||
Assert.assertThat(_in.isReady(), Matchers.equalTo(true));
|
||||
|
||||
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(0L));
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo((int)'a'));
|
||||
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(1L));
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo((int)'b'));
|
||||
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(2L));
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo((int)'c'));
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo((int)'d'));
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo((int)'e'));
|
||||
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo((int)'F'));
|
||||
|
||||
Assert.assertThat(_history.poll(), Matchers.equalTo("produceContent 2"));
|
||||
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded EF"));
|
||||
Assert.assertThat(_history.poll(), Matchers.nullValue());
|
||||
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo((int)'G'));
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo((int)'H'));
|
||||
|
||||
Assert.assertThat(_history.poll(), Matchers.equalTo("Content succeeded GH"));
|
||||
Assert.assertThat(_history.poll(), Matchers.nullValue());
|
||||
|
||||
Assert.assertThat(_in.getContentConsumed(), Matchers.equalTo(8L));
|
||||
|
||||
Assert.assertThat(_history.poll(), Matchers.nullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockingRead() throws Exception
|
||||
{
|
||||
new Thread()
|
||||
new Thread(() ->
|
||||
{
|
||||
public void run()
|
||||
try
|
||||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep(500);
|
||||
_in.addContent(new TContent("AB"));
|
||||
}
|
||||
catch (Throwable th)
|
||||
{
|
||||
th.printStackTrace();
|
||||
}
|
||||
Thread.sleep(500);
|
||||
_in.addContent(new TContent("AB"));
|
||||
}
|
||||
}.start();
|
||||
catch (Throwable th)
|
||||
{
|
||||
th.printStackTrace();
|
||||
}
|
||||
}).start();
|
||||
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo((int)'A'));
|
||||
|
||||
|
@ -357,21 +296,18 @@ public class HttpInputTest
|
|||
@Test
|
||||
public void testBlockingEOF() throws Exception
|
||||
{
|
||||
new Thread()
|
||||
new Thread(() ->
|
||||
{
|
||||
public void run()
|
||||
try
|
||||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep(500);
|
||||
_in.eof();
|
||||
}
|
||||
catch (Throwable th)
|
||||
{
|
||||
th.printStackTrace();
|
||||
}
|
||||
Thread.sleep(500);
|
||||
_in.eof();
|
||||
}
|
||||
}.start();
|
||||
catch (Throwable th)
|
||||
{
|
||||
th.printStackTrace();
|
||||
}
|
||||
}).start();
|
||||
|
||||
Assert.assertThat(_in.isFinished(), Matchers.equalTo(false));
|
||||
Assert.assertThat(_in.read(), Matchers.equalTo(-1));
|
||||
|
|
|
@ -19,7 +19,9 @@
|
|||
package org.eclipse.jetty.http.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
@ -32,9 +34,12 @@ import javax.servlet.ServletException;
|
|||
import javax.servlet.ServletInputStream;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.WriteListener;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.client.util.BufferingResponseListener;
|
||||
import org.eclipse.jetty.client.util.DeferredContentProvider;
|
||||
import org.eclipse.jetty.http.BadMessageException;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
|
@ -43,6 +48,7 @@ import org.eclipse.jetty.server.HttpChannel;
|
|||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -677,6 +683,59 @@ public class ServerTimeoutsTest extends AbstractTest
|
|||
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIdleTimeoutBeforeReadIsIgnored() throws Exception
|
||||
{
|
||||
long idleTimeout = 1000;
|
||||
start(new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep(2 * idleTimeout);
|
||||
IO.copy(request.getInputStream(), response.getOutputStream());
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
}
|
||||
});
|
||||
setServerIdleTimeout(idleTimeout);
|
||||
|
||||
byte[] data = new byte[1024];
|
||||
new Random().nextBytes(data);
|
||||
byte[] data1 = new byte[data.length / 2];
|
||||
System.arraycopy(data, 0, data1, 0, data1.length);
|
||||
byte[] data2 = new byte[data.length - data1.length];
|
||||
System.arraycopy(data, data1.length, data2, 0, data2.length);
|
||||
DeferredContentProvider content = new DeferredContentProvider(ByteBuffer.wrap(data1));
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
client.newRequest(newURI())
|
||||
.path(servletPath)
|
||||
.content(content)
|
||||
.send(new BufferingResponseListener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
Assert.assertTrue(result.isSucceeded());
|
||||
Assert.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
|
||||
Assert.assertArrayEquals(data, getContent());
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for the server application to block reading.
|
||||
Thread.sleep(3 * idleTimeout);
|
||||
content.offer(ByteBuffer.wrap(data2));
|
||||
content.close();
|
||||
|
||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
private static class BlockingReadHandler extends AbstractHandler
|
||||
{
|
||||
private final CountDownLatch handlerLatch;
|
||||
|
|
Loading…
Reference in New Issue