Fixes #1169 - HTTP/2 reset on a stalled write does not unblock writer thread.

The fix notifies the transport when a reset frame is received,
allowing the transport to fail the write callback which then notifies
the application, either by throwing (in case of blocking writes) or
by calling error listeners.

Also added a guard, in HttpChannel.handle() for the ERROR_DISPATCH case,
that checks if the response is already committed, and if so, abort
the transport - similar to what's already there for 9.4.
This commit is contained in:
Simone Bordet 2016-12-14 11:07:00 +01:00
parent 9b609e0f6d
commit 07c9bc5a51
4 changed files with 188 additions and 33 deletions

View File

@ -23,12 +23,17 @@ import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -441,4 +446,139 @@ public class StreamResetTest extends AbstractTest
Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0));
}
}
@Test
public void testResetAfterAsyncRequestBlockingWriteStalledByFlowControl() throws Exception
{
int windowSize = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
CountDownLatch writeLatch = new CountDownLatch(1);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
AsyncContext asyncContext = request.startAsync();
asyncContext.start(() ->
{
try
{
// Make sure we are in async wait before writing.
Thread.sleep(1000);
response.getOutputStream().write(new byte[10 * windowSize]);
asyncContext.complete();
}
catch (IOException x)
{
writeLatch.countDown();
}
catch (Throwable x)
{
x.printStackTrace();
}
});
}
});
Deque<Object> dataQueue = new ArrayDeque<>();
AtomicLong received = new AtomicLong();
CountDownLatch latch = new CountDownLatch(1);
Session client = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(request, null, true);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
dataQueue.offer(frame);
dataQueue.offer(callback);
// Do not consume the data yet.
if (received.addAndGet(frame.getData().remaining()) == windowSize)
latch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
// Reset and consume.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
dataQueue.stream()
.filter(item -> item instanceof Callback)
.map(item -> (Callback)item)
.forEach(Callback::succeeded);
Assert.assertTrue(writeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testResetAfterAsyncRequestAsyncWriteStalledByFlowControl() throws Exception
{
int windowSize = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
CountDownLatch writeLatch = new CountDownLatch(1);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
AsyncContext asyncContext = request.startAsync();
ServletOutputStream output = response.getOutputStream();
output.setWriteListener(new WriteListener()
{
private boolean written;
@Override
public void onWritePossible() throws IOException
{
while (output.isReady())
{
if (written)
{
asyncContext.complete();
break;
}
else
{
output.write(new byte[10 * windowSize]);
written = true;
}
}
}
@Override
public void onError(Throwable t)
{
writeLatch.countDown();
}
});
}
});
Deque<Callback> dataQueue = new ArrayDeque<>();
AtomicLong received = new AtomicLong();
CountDownLatch latch = new CountDownLatch(1);
Session client = newClient(new Session.Listener.Adapter());
MetaData.Request request = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(request, null, true);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
dataQueue.offer(callback);
// Do not consume the data yet.
if (received.addAndGet(frame.getData().remaining()) == windowSize)
latch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
// Reset and consume.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
dataQueue.forEach(Callback::succeeded);
Assert.assertTrue(writeLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -296,6 +296,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel
public void onFailure(Throwable failure)
{
getHttpTransport().onStreamFailure(failure);
if (onEarlyEOF())
handle();
else

View File

@ -195,6 +195,11 @@ public class HttpTransportOverHTTP2 implements HttpTransport
stream.data(frame, callback);
}
public void onStreamFailure(Throwable failure)
{
transportCallback.failed(failure);
}
public boolean onStreamTimeout(Throwable failure)
{
return transportCallback.onIdleTimeout(failure);
@ -261,9 +266,10 @@ public class HttpTransportOverHTTP2 implements HttpTransport
synchronized (this)
{
commit = this.commit;
if (state != State.TIMEOUT)
if (state == State.WRITING)
{
callback = this.callback;
this.callback = null;
this.state = State.IDLE;
}
}
@ -281,9 +287,10 @@ public class HttpTransportOverHTTP2 implements HttpTransport
synchronized (this)
{
commit = this.commit;
if (state != State.TIMEOUT)
if (state == State.WRITING)
{
callback = this.callback;
this.callback = null;
this.state = State.FAILED;
}
}
@ -309,6 +316,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
if (result)
{
callback = this.callback;
this.callback = null;
this.state = State.TIMEOUT;
}
}

View File

@ -368,45 +368,51 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
break loop;
}
_request.setHandled(false);
_response.resetBuffer();
_response.getHttpOutput().reopen();
String reason;
if (ex == null || ex instanceof TimeoutException)
if (_response.isCommitted())
{
reason = "Async Timeout";
if (LOG.isDebugEnabled())
LOG.debug("Could not perform Error Dispatch because the response is already committed, aborting");
_transport.abort(ex);
}
else
{
reason = HttpStatus.Code.INTERNAL_SERVER_ERROR.getMessage();
_request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, ex);
}
_request.setHandled(false);
_response.resetBuffer();
_response.getHttpOutput().reopen();
_request.setAttribute(RequestDispatcher.ERROR_STATUS_CODE, 500);
_request.setAttribute(RequestDispatcher.ERROR_MESSAGE, reason);
_request.setAttribute(RequestDispatcher.ERROR_REQUEST_URI, _request.getRequestURI());
String reason;
if (ex == null || ex instanceof TimeoutException)
{
reason = "Async Timeout";
}
else
{
reason = HttpStatus.Code.INTERNAL_SERVER_ERROR.getMessage();
_request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, ex);
}
_response.setStatusWithReason(HttpStatus.INTERNAL_SERVER_ERROR_500, reason);
_request.setAttribute(RequestDispatcher.ERROR_STATUS_CODE, 500);
_request.setAttribute(RequestDispatcher.ERROR_MESSAGE, reason);
_request.setAttribute(RequestDispatcher.ERROR_REQUEST_URI, _request.getRequestURI());
_response.setStatusWithReason(HttpStatus.INTERNAL_SERVER_ERROR_500, reason);
ErrorHandler eh = ErrorHandler.getErrorHandler(getServer(), _state.getContextHandler());
if (eh instanceof ErrorHandler.ErrorPageMapper)
{
String error_page = ((ErrorHandler.ErrorPageMapper)eh).getErrorPage((HttpServletRequest)_state.getAsyncContextEvent().getSuppliedRequest());
if (error_page != null)
_state.getAsyncContextEvent().setDispatchPath(error_page);
}
ErrorHandler eh = ErrorHandler.getErrorHandler(getServer(), _state.getContextHandler());
if (eh instanceof ErrorHandler.ErrorPageMapper)
{
String error_page = ((ErrorHandler.ErrorPageMapper)eh).getErrorPage((HttpServletRequest)_state.getAsyncContextEvent().getSuppliedRequest());
if (error_page != null)
_state.getAsyncContextEvent().setDispatchPath(error_page);
}
try
{
_request.setDispatcherType(DispatcherType.ERROR);
getServer().handleAsync(this);
}
finally
{
_request.setDispatcherType(null);
try
{
_request.setDispatcherType(DispatcherType.ERROR);
getServer().handleAsync(this);
}
finally
{
_request.setDispatcherType(null);
}
}
break;
}