Fixes #3601 - HTTP2 stall on reset streams.

After review, updated the logic to always fail the transport.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-08-01 15:52:47 +02:00
parent 753024af54
commit 762767c62c
4 changed files with 59 additions and 48 deletions

View File

@ -135,25 +135,28 @@ public class AsyncServletTest extends AbstractTest
MetaData.Request metaData = newRequest("GET", fields); MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(metaData, null, true); HeadersFrame frame = new HeadersFrame(metaData, null, true);
FuturePromise<Stream> promise = new FuturePromise<>(); FuturePromise<Stream> promise = new FuturePromise<>();
CountDownLatch clientLatch = new CountDownLatch(1); CountDownLatch responseLatch = new CountDownLatch(1);
CountDownLatch resetLatch = new CountDownLatch(1);
session.newStream(frame, promise, new Stream.Listener.Adapter() session.newStream(frame, promise, new Stream.Listener.Adapter()
{ {
@Override @Override
public void onHeaders(Stream stream, HeadersFrame frame) public void onHeaders(Stream stream, HeadersFrame frame)
{ {
MetaData.Response response = (MetaData.Response)frame.getMetaData(); responseLatch.countDown();
if (response.getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500 && frame.isEndStream()) }
clientLatch.countDown();
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
} }
}); });
Stream stream = promise.get(5, TimeUnit.SECONDS); Stream stream = promise.get(5, TimeUnit.SECONDS);
stream.setIdleTimeout(10 * idleTimeout); stream.setIdleTimeout(10 * idleTimeout);
// When the client closes, the server receives the
// corresponding frame and acts by notifying the failure,
// which sends back to the client the error response.
assertTrue(serverLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); assertTrue(serverLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(clientLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); assertFalse(responseLatch.await(idleTimeout + 1000, TimeUnit.MILLISECONDS));
assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
} }
@Test @Test

View File

@ -33,7 +33,6 @@ import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpTransport; import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -332,7 +331,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
LOG.debug("HTTP2 Response #{}/{} aborted", stream == null ? -1 : stream.getId(), LOG.debug("HTTP2 Response #{}/{} aborted", stream == null ? -1 : stream.getId(),
stream == null ? -1 : Integer.toHexString(stream.getSession().hashCode())); stream == null ? -1 : Integer.toHexString(stream.getSession().hashCode()));
if (stream != null) if (stream != null)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP); stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
} }
private class TransportCallback implements Callback private class TransportCallback implements Callback
@ -393,36 +392,20 @@ public class HttpTransportOverHTTP2 implements HttpTransport
public void failed(Throwable failure) public void failed(Throwable failure)
{ {
boolean commit; boolean commit;
Callback callback = null;
synchronized (this)
{
commit = this.commit;
// Don't always fail, as we may need to write an error response.
EndPoint endPoint = connection.getEndPoint();
boolean cannotWrite = stream.isReset() || endPoint.isOutputShutdown() || !endPoint.isOpen();
if ((cannotWrite && state == State.IDLE) || state == State.WRITING)
{
this.state = State.FAILED;
callback = this.callback;
this.callback = null;
this.failure = failure;
}
}
if (LOG.isDebugEnabled())
LOG.debug(String.format("HTTP2 Response #%d/%h %s %s", stream.getId(), stream.getSession(), commit ? "commit" : "flush", callback == null ? "ignored" : "failed"), failure);
if (callback != null)
callback.failed(failure);
}
@Override
public InvocationType getInvocationType()
{
Callback callback; Callback callback;
synchronized (this) synchronized (this)
{ {
commit = this.commit;
this.state = State.FAILED;
callback = this.callback; callback = this.callback;
this.callback = null;
this.failure = failure;
} }
return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType(); if (LOG.isDebugEnabled())
LOG.debug(String.format("HTTP2 Response #%d/%h %s %s", stream.getId(), stream.getSession(),
commit ? "commit" : "flush", callback == null ? "ignored" : "failed"), failure);
if (callback != null)
callback.failed(failure);
} }
private boolean onIdleTimeout(Throwable failure) private boolean onIdleTimeout(Throwable failure)
@ -448,6 +431,17 @@ public class HttpTransportOverHTTP2 implements HttpTransport
callback.failed(failure); callback.failed(failure);
return result; return result;
} }
@Override
public InvocationType getInvocationType()
{
Callback callback;
synchronized (this)
{
callback = this.callback;
}
return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType();
}
} }
private enum State private enum State

View File

@ -70,7 +70,6 @@ import org.eclipse.jetty.server.HttpInput.Content;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context; import org.eclipse.jetty.server.handler.ContextHandler.Context;
import org.eclipse.jetty.unixsocket.UnixSocketConnector;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.util.log.StacklessLogging;
@ -82,6 +81,8 @@ import org.junit.jupiter.params.provider.ArgumentsSource;
import static java.nio.ByteBuffer.wrap; import static java.nio.ByteBuffer.wrap;
import static org.eclipse.jetty.http.client.Transport.FCGI; import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.eclipse.jetty.http.client.Transport.H2C;
import static org.eclipse.jetty.http.client.Transport.HTTP;
import static org.eclipse.jetty.util.BufferUtil.toArray; import static org.eclipse.jetty.util.BufferUtil.toArray;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
@ -1074,6 +1075,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
// only generates the close alert back, without encrypting the // only generates the close alert back, without encrypting the
// response, so we need to skip the transports over TLS. // response, so we need to skip the transports over TLS.
Assumptions.assumeFalse(scenario.transport.isTlsBased()); Assumptions.assumeFalse(scenario.transport.isTlsBased());
Assumptions.assumeFalse(scenario.transport == FCGI);
String content = "jetty"; String content = "jetty";
int responseCode = HttpStatus.NO_CONTENT_204; int responseCode = HttpStatus.NO_CONTENT_204;
@ -1100,7 +1102,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
} }
@Override @Override
public void onAllDataRead() throws IOException public void onAllDataRead()
{ {
} }
@ -1122,13 +1124,16 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
.method(HttpMethod.POST) .method(HttpMethod.POST)
.path(scenario.servletPath) .path(scenario.servletPath)
.content(contentProvider) .content(contentProvider)
.onResponseSuccess(response -> responseLatch.countDown()); .onResponseSuccess(response ->
{
if (scenario.connector instanceof UnixSocketConnector) if (transport == HTTP)
{ responseLatch.countDown();
// skip rest of this test for unix socket })
return; .onResponseFailure((response, failure) ->
} {
if (transport == H2C)
responseLatch.countDown();
});
Destination destination = scenario.client.getDestination(scenario.getScheme(), Destination destination = scenario.client.getDestination(scenario.getScheme(),
"localhost", "localhost",
@ -1139,7 +1144,18 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
CountDownLatch clientLatch = new CountDownLatch(1); CountDownLatch clientLatch = new CountDownLatch(1);
connection.send(request, result -> connection.send(request, result ->
{ {
assertThat(result.getResponse().getStatus(), Matchers.equalTo(responseCode)); switch (transport)
{
case HTTP:
assertThat(result.getResponse().getStatus(), Matchers.equalTo(responseCode));
break;
case H2C:
// HTTP/2 does not attempt to write a response back, just a RST_STREAM.
assertTrue(result.isFailed());
break;
default:
fail("Unhandled transport: " + transport);
}
clientLatch.countDown(); clientLatch.countDown();
}); });
@ -1148,11 +1164,9 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
switch (transport) switch (transport)
{ {
case HTTP: case HTTP:
case HTTPS:
((HttpConnectionOverHTTP)connection).getEndPoint().shutdownOutput(); ((HttpConnectionOverHTTP)connection).getEndPoint().shutdownOutput();
break; break;
case H2C: case H2C:
case H2:
// In case of HTTP/2, we not only send the request, but also the preface and // In case of HTTP/2, we not only send the request, but also the preface and
// SETTINGS frames. SETTINGS frame need to be replied, so we want to wait to // SETTINGS frames. SETTINGS frame need to be replied, so we want to wait to
// write the reply before shutting output down, so that the test does not fail. // write the reply before shutting output down, so that the test does not fail.

View File

@ -2,6 +2,6 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.client.LEVEL=DEBUG #org.eclipse.jetty.client.LEVEL=DEBUG
#org.eclipse.jetty.http2.LEVEL=DEBUG #org.eclipse.jetty.http2.LEVEL=DEBUG
#org.eclipse.jetty.http2.hpack.LEVEL=INFO org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.http2.client.LEVEL=DEBUG #org.eclipse.jetty.http2.client.LEVEL=DEBUG
#org.eclipse.jetty.io.LEVEL=DEBUG #org.eclipse.jetty.io.LEVEL=DEBUG