Added tests to verify input data consumption.

Verify that input data is consumed at the end of a request handling,
either when input is not read and when an exception is thrown,
to make sure that the session flow control is not stalled.
This commit is contained in:
Simone Bordet 2016-05-10 14:57:57 +02:00
parent 9b6d42317b
commit 8ac23d187a
3 changed files with 105 additions and 14 deletions

View File

@ -37,6 +37,8 @@ import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
@ -47,6 +49,7 @@ import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.FuturePromise;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
@ -221,7 +224,7 @@ public class StreamResetTest extends AbstractTest
response.setStatus(200);
response.setContentType("text/plain;charset=" + charset.name());
response.setContentLength(data.length*10);
response.setContentLength(data.length * 10);
response.flushBuffer();
try
@ -238,7 +241,7 @@ public class StreamResetTest extends AbstractTest
{
// Write some content after the stream has
// been reset, it should throw an exception.
for (int i=0;i<10;i++)
for (int i = 0; i < 10; i++)
{
Thread.sleep(500);
response.getOutputStream().write(data);
@ -350,4 +353,87 @@ public class StreamResetTest extends AbstractTest
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testClientResetConsumesQueuedData() throws Exception
{
start(new EmptyHttpServlet());
Session client = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(request, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, new Stream.Listener.Adapter());
Stream stream = promise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, false), new Callback()
{
@Override
public void succeeded()
{
dataLatch.countDown();
}
});
// The server does not read the data, so the flow control window should be zero.
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(0, ((ISession)client).updateSendWindow(0));
// Now reset the stream.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
// Wait for the server to receive the reset and process
// it, and for the client to process the window updates.
Thread.sleep(1000);
Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0));
}
@Test
public void testServerExceptionConsumesQueuedData() throws Exception
{
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
try
{
// Wait to let the data sent by the client to be queued.
Thread.sleep(1000);
throw new IllegalStateException();
}
catch (InterruptedException e)
{
throw new InterruptedIOException();
}
}
});
Session client = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(request, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, new Stream.Listener.Adapter());
Stream stream = promise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, false), new Callback()
{
@Override
public void succeeded()
{
dataLatch.countDown();
}
});
// The server does not read the data, so the flow control window should be zero.
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(0, ((ISession)client).updateSendWindow(0));
// Wait for the server process the exception, and
// for the client to process the window updates.
Thread.sleep(2000);
Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0));
}
}

View File

@ -1,4 +1,5 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.http2.LEVEL=DEBUG
org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.servlets.LEVEL=DEBUG

View File

@ -62,7 +62,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
// copying we can defer to the endpoint
return connection.getEndPoint().isOptimizedForDirectBuffers();
}
public IStream getStream()
{
return stream;
@ -145,7 +145,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
if (LOG.isDebugEnabled())
LOG.debug("HTTP/2 Push {}",request);
stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Promise<Stream>()
{
@Override
@ -190,16 +190,20 @@ public class HttpTransportOverHTTP2 implements HttpTransport
@Override
public void onCompleted()
{
// If the stream is not closed, it is still reading the request content.
// Send a reset to the other end so that it stops sending data.
if (!stream.isClosed())
{
// If the stream is not closed, it is still reading the request content.
// Send a reset to the other end so that it stops sending data.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
// Now that this stream is reset, in-flight data frames will be consumed and discarded.
// Consume the existing queued data frames to avoid stalling the flow control.
HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
channel.getRequest().getHttpInput().consumeAll();
}
// Consume the existing queued data frames to
// avoid stalling the session flow control.
consumeInput();
}
protected void consumeInput()
{
HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
channel.getRequest().getHttpInput().consumeAll();
}
@Override
@ -213,7 +217,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
}
private class CommitCallback implements Callback.NonBlocking
{
{
@Override
public void succeeded()
{