Merged branch 'jetty-9.3.x' into 'jetty-9.4.x'.

This commit is contained in:
Simone Bordet 2016-12-14 11:19:49 +01:00
commit dd3a73e57a
6 changed files with 192 additions and 43 deletions

View File

@ -240,7 +240,6 @@ public class HttpClient extends ContainerLifeCycle
@Override
protected void doStop() throws Exception
{
cookieStore.removeAll();
decoderFactories.clear();
handlers.clear();

View File

@ -23,12 +23,17 @@ import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -451,4 +456,139 @@ public class StreamResetTest extends AbstractTest
Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0));
}
}
@Test
public void testResetAfterAsyncRequestBlockingWriteStalledByFlowControl() throws Exception
{
int windowSize = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
CountDownLatch writeLatch = new CountDownLatch(1);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
AsyncContext asyncContext = request.startAsync();
asyncContext.start(() ->
{
try
{
// Make sure we are in async wait before writing.
Thread.sleep(1000);
response.getOutputStream().write(new byte[10 * windowSize]);
asyncContext.complete();
}
catch (IOException x)
{
writeLatch.countDown();
}
catch (Throwable x)
{
x.printStackTrace();
}
});
}
});
Deque<Object> dataQueue = new ArrayDeque<>();
AtomicLong received = new AtomicLong();
CountDownLatch latch = new CountDownLatch(1);
Session client = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(request, null, true);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
dataQueue.offer(frame);
dataQueue.offer(callback);
// Do not consume the data yet.
if (received.addAndGet(frame.getData().remaining()) == windowSize)
latch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
// Reset and consume.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
dataQueue.stream()
.filter(item -> item instanceof Callback)
.map(item -> (Callback)item)
.forEach(Callback::succeeded);
Assert.assertTrue(writeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testResetAfterAsyncRequestAsyncWriteStalledByFlowControl() throws Exception
{
int windowSize = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
CountDownLatch writeLatch = new CountDownLatch(1);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
AsyncContext asyncContext = request.startAsync();
ServletOutputStream output = response.getOutputStream();
output.setWriteListener(new WriteListener()
{
private boolean written;
@Override
public void onWritePossible() throws IOException
{
while (output.isReady())
{
if (written)
{
asyncContext.complete();
break;
}
else
{
output.write(new byte[10 * windowSize]);
written = true;
}
}
}
@Override
public void onError(Throwable t)
{
writeLatch.countDown();
}
});
}
});
Deque<Callback> dataQueue = new ArrayDeque<>();
AtomicLong received = new AtomicLong();
CountDownLatch latch = new CountDownLatch(1);
Session client = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(request, null, true);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
dataQueue.offer(callback);
// Do not consume the data yet.
if (received.addAndGet(frame.getData().remaining()) == windowSize)
latch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
// Reset and consume.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
dataQueue.forEach(Callback::succeeded);
Assert.assertTrue(writeLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -296,6 +296,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
public void onFailure(Throwable failure)
{
getHttpTransport().onStreamFailure(failure);
if (onEarlyEOF())
handle();
else

View File

@ -197,6 +197,11 @@ public class HttpTransportOverHTTP2 implements HttpTransport
stream.data(frame, callback);
}
public void onStreamFailure(Throwable failure)
{
transportCallback.failed(failure);
}
public boolean onStreamTimeout(Throwable failure)
{
return transportCallback.onIdleTimeout(failure);
@ -264,9 +269,10 @@ public class HttpTransportOverHTTP2 implements HttpTransport
synchronized (this)
{
commit = this.commit;
if (state != State.TIMEOUT)
if (state == State.WRITING)
{
callback = this.callback;
this.callback = null;
this.state = State.IDLE;
}
}
@ -284,9 +290,10 @@ public class HttpTransportOverHTTP2 implements HttpTransport
synchronized (this)
{
commit = this.commit;
if (state != State.TIMEOUT)
if (state == State.WRITING)
{
callback = this.callback;
this.callback = null;
this.state = State.FAILED;
}
}
@ -317,6 +324,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
if (result)
{
callback = this.callback;
this.callback = null;
this.state = State.TIMEOUT;
}
}

View File

@ -345,7 +345,8 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{
if (_response.isCommitted())
{
LOG.warn("Error Dispatch already committed");
if (LOG.isDebugEnabled())
LOG.debug("Could not perform Error Dispatch because the response is already committed, aborting");
_transport.abort((Throwable)_request.getAttribute(ERROR_EXCEPTION));
}
else