Merge pull request #3908 from eclipse/jetty-9.4.x-3601-http2_stall_on_reset_stream

Fixes #3601 - HTTP2 stall on reset streams.
This commit is contained in:
Simone Bordet 2019-08-07 14:49:55 +03:00 committed by GitHub
commit e64e3309d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 296 additions and 51 deletions

View File

@ -135,25 +135,28 @@ public class AsyncServletTest extends AbstractTest
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(metaData, null, true);
FuturePromise<Stream> promise = new FuturePromise<>();
CountDownLatch clientLatch = new CountDownLatch(1);
CountDownLatch responseLatch = new CountDownLatch(1);
CountDownLatch resetLatch = new CountDownLatch(1);
session.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
if (response.getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500 && frame.isEndStream())
clientLatch.countDown();
responseLatch.countDown();
}
@Override
public void onReset(Stream stream, ResetFrame frame)
{
resetLatch.countDown();
}
});
Stream stream = promise.get(5, TimeUnit.SECONDS);
stream.setIdleTimeout(10 * idleTimeout);
// When the client closes, the server receives the
// corresponding frame and acts by notifying the failure,
// which sends back to the client the error response.
assertTrue(serverLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(clientLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertFalse(responseLatch.await(idleTimeout + 1000, TimeUnit.MILLISECONDS));
assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Test

View File

@ -20,16 +20,22 @@ package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.AsyncContext;
@ -40,7 +46,9 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
@ -54,12 +62,20 @@ import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
@ -833,4 +849,210 @@ public class StreamResetTest extends AbstractTest
// Read on server should fail.
assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testResetAfterTCPCongestedWrite() throws Exception
{
AtomicReference<WriteFlusher> flusherRef = new AtomicReference<>();
CountDownLatch flusherLatch = new CountDownLatch(1);
CountDownLatch writeLatch1 = new CountDownLatch(1);
CountDownLatch writeLatch2 = new CountDownLatch(1);
start(new EmptyHttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
Request jettyRequest = (Request)request;
flusherRef.set(((AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint()).getWriteFlusher());
flusherLatch.countDown();
ServletOutputStream output = response.getOutputStream();
try
{
// Large write, it blocks due to TCP congestion.
byte[] data = new byte[128 * 1024 * 1024];
output.write(data);
}
catch (IOException x)
{
writeLatch1.countDown();
try
{
// Try to write again, must fail immediately.
output.write(0xFF);
}
catch (IOException xx)
{
writeLatch2.countDown();
}
}
}
});
ByteBufferPool byteBufferPool = client.getByteBufferPool();
try (SocketChannel socket = SocketChannel.open())
{
String host = "localhost";
int port = connector.getLocalPort();
socket.connect(new InetSocketAddress(host, port));
Generator generator = new Generator(byteBufferPool);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
Map<Integer, Integer> clientSettings = new HashMap<>();
// Max stream HTTP/2 flow control window.
clientSettings.put(SettingsFrame.INITIAL_WINDOW_SIZE, Integer.MAX_VALUE);
generator.control(lease, new SettingsFrame(clientSettings, false));
// Max session HTTP/2 flow control window.
generator.control(lease, new WindowUpdateFrame(0, Integer.MAX_VALUE - FlowControlStrategy.DEFAULT_WINDOW_SIZE));
HttpURI uri = new HttpURI("http", host, port, servletPath);
MetaData.Request request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields());
int streamId = 3;
HeadersFrame headersFrame = new HeadersFrame(streamId, request, null, true);
generator.control(lease, headersFrame);
List<ByteBuffer> buffers = lease.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[0]));
// Wait until the server is TCP congested.
assertTrue(flusherLatch.await(5, TimeUnit.SECONDS));
WriteFlusher flusher = flusherRef.get();
waitUntilTCPCongested(flusher);
lease.recycle();
generator.control(lease, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code));
buffers = lease.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[0]));
assertTrue(writeLatch1.await(5, TimeUnit.SECONDS));
assertTrue(writeLatch2.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testResetSecondRequestAfterTCPCongestedWriteBeforeWrite() throws Exception
{
Exchanger<WriteFlusher> exchanger = new Exchanger<>();
CountDownLatch requestLatch1 = new CountDownLatch(1);
CountDownLatch requestLatch2 = new CountDownLatch(1);
CountDownLatch writeLatch1 = new CountDownLatch(1);
start(new EmptyHttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
if (request.getPathInfo().equals("/1"))
service1(request, response);
else if (request.getPathInfo().equals("/2"))
service2(request, response);
else
throw new IllegalArgumentException();
}
private void service1(HttpServletRequest request, HttpServletResponse response) throws IOException
{
try
{
Request jettyRequest = (Request)request;
exchanger.exchange(((AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint()).getWriteFlusher());
ServletOutputStream output = response.getOutputStream();
// Large write, it blocks due to TCP congestion.
output.write(new byte[128 * 1024 * 1024]);
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
private void service2(HttpServletRequest request, HttpServletResponse response) throws IOException
{
try
{
requestLatch1.countDown();
requestLatch2.await();
ServletOutputStream output = response.getOutputStream();
int length = 512 * 1024;
AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class);
if (h2 != null)
length = h2.getHttpConfiguration().getOutputAggregationSize();
// Medium write so we don't aggregate it, must not block.
output.write(new byte[length * 2]);
}
catch (IOException x)
{
writeLatch1.countDown();
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
});
ByteBufferPool byteBufferPool = client.getByteBufferPool();
try (SocketChannel socket = SocketChannel.open())
{
String host = "localhost";
int port = connector.getLocalPort();
socket.connect(new InetSocketAddress(host, port));
Generator generator = new Generator(byteBufferPool);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
Map<Integer, Integer> clientSettings = new HashMap<>();
// Max stream HTTP/2 flow control window.
clientSettings.put(SettingsFrame.INITIAL_WINDOW_SIZE, Integer.MAX_VALUE);
generator.control(lease, new SettingsFrame(clientSettings, false));
// Max session HTTP/2 flow control window.
generator.control(lease, new WindowUpdateFrame(0, Integer.MAX_VALUE - FlowControlStrategy.DEFAULT_WINDOW_SIZE));
HttpURI uri = new HttpURI("http", host, port, servletPath + "/1");
MetaData.Request request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields());
HeadersFrame headersFrame = new HeadersFrame(3, request, null, true);
generator.control(lease, headersFrame);
List<ByteBuffer> buffers = lease.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[0]));
waitUntilTCPCongested(exchanger.exchange(null));
// Send a second request.
uri = new HttpURI("http", host, port, servletPath + "/2");
request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields());
int streamId = 5;
headersFrame = new HeadersFrame(streamId, request, null, true);
generator.control(lease, headersFrame);
buffers = lease.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[0]));
assertTrue(requestLatch1.await(5, TimeUnit.SECONDS));
// Now reset the second request, which has not started writing yet.
lease.recycle();
generator.control(lease, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code));
buffers = lease.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[0]));
// Wait to be sure that the server processed the reset.
Thread.sleep(1000);
// Let the request write, it should not block.
requestLatch2.countDown();
assertTrue(writeLatch1.await(5, TimeUnit.SECONDS));
}
}
private void waitUntilTCPCongested(WriteFlusher flusher) throws TimeoutException, InterruptedException
{
long start = System.nanoTime();
while (!flusher.isPending())
{
long elapsed = System.nanoTime() - start;
if (TimeUnit.NANOSECONDS.toSeconds(elapsed) > 15)
throw new TimeoutException();
Thread.sleep(100);
}
// Wait for the selector to update the SelectionKey to OP_WRITE.
Thread.sleep(1000);
}
}

View File

@ -331,7 +331,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
LOG.debug("HTTP2 Response #{}/{} aborted", stream == null ? -1 : stream.getId(),
stream == null ? -1 : Integer.toHexString(stream.getSession().hashCode()));
if (stream != null)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP);
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
}
private class TransportCallback implements Callback
@ -392,35 +392,20 @@ public class HttpTransportOverHTTP2 implements HttpTransport
public void failed(Throwable failure)
{
boolean commit;
Callback callback = null;
synchronized (this)
{
commit = this.commit;
// Only fail pending writes, as we
// may need to write an error page.
if (state == State.WRITING)
{
this.state = State.FAILED;
callback = this.callback;
this.callback = null;
this.failure = failure;
}
}
if (LOG.isDebugEnabled())
LOG.debug(String.format("HTTP2 Response #%d/%h %s %s", stream.getId(), stream.getSession(), commit ? "commit" : "flush", callback == null ? "ignored" : "failed"), failure);
if (callback != null)
callback.failed(failure);
}
@Override
public InvocationType getInvocationType()
{
Callback callback;
synchronized (this)
{
commit = this.commit;
this.state = State.FAILED;
callback = this.callback;
this.callback = null;
this.failure = failure;
}
return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType();
if (LOG.isDebugEnabled())
LOG.debug(String.format("HTTP2 Response #%d/%h %s %s", stream.getId(), stream.getSession(),
commit ? "commit" : "flush", callback == null ? "ignored" : "failed"), failure);
if (callback != null)
callback.failed(failure);
}
private boolean onIdleTimeout(Throwable failure)
@ -446,6 +431,17 @@ public class HttpTransportOverHTTP2 implements HttpTransport
callback.failed(failure);
return result;
}
@Override
public InvocationType getInvocationType()
{
Callback callback;
synchronized (this)
{
callback = this.callback;
}
return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType();
}
}
private enum State

View File

@ -390,7 +390,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
return _fillInterest;
}
protected WriteFlusher getWriteFlusher()
public WriteFlusher getWriteFlusher()
{
return _writeFlusher;
}

View File

@ -258,7 +258,7 @@ public abstract class WriteFlusher
*/
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
{
callback = Objects.requireNonNull(callback);
Objects.requireNonNull(callback);
if (isFailed())
{
@ -523,12 +523,22 @@ public abstract class WriteFlusher
boolean isFailed()
{
return _state.get().getType() == StateType.FAILED;
return isState(StateType.FAILED);
}
boolean isIdle()
{
return _state.get().getType() == StateType.IDLE;
return isState(StateType.IDLE);
}
public boolean isPending()
{
return isState(StateType.PENDING);
}
private boolean isState(StateType type)
{
return _state.get().getType() == type;
}
public String toStateString()

View File

@ -403,7 +403,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
}
@Override
protected WriteFlusher getWriteFlusher()
public WriteFlusher getWriteFlusher()
{
return super.getWriteFlusher();
}

View File

@ -70,7 +70,6 @@ import org.eclipse.jetty.server.HttpInput.Content;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context;
import org.eclipse.jetty.unixsocket.UnixSocketConnector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.log.StacklessLogging;
@ -82,6 +81,8 @@ import org.junit.jupiter.params.provider.ArgumentsSource;
import static java.nio.ByteBuffer.wrap;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.eclipse.jetty.http.client.Transport.H2C;
import static org.eclipse.jetty.http.client.Transport.HTTP;
import static org.eclipse.jetty.util.BufferUtil.toArray;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@ -1074,6 +1075,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
// only generates the close alert back, without encrypting the
// response, so we need to skip the transports over TLS.
Assumptions.assumeFalse(scenario.transport.isTlsBased());
Assumptions.assumeFalse(scenario.transport == FCGI);
String content = "jetty";
int responseCode = HttpStatus.NO_CONTENT_204;
@ -1100,7 +1102,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
}
@Override
public void onAllDataRead() throws IOException
public void onAllDataRead()
{
}
@ -1122,13 +1124,16 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
.method(HttpMethod.POST)
.path(scenario.servletPath)
.content(contentProvider)
.onResponseSuccess(response -> responseLatch.countDown());
if (scenario.connector instanceof UnixSocketConnector)
{
// skip rest of this test for unix socket
return;
}
.onResponseSuccess(response ->
{
if (transport == HTTP)
responseLatch.countDown();
})
.onResponseFailure((response, failure) ->
{
if (transport == H2C)
responseLatch.countDown();
});
Destination destination = scenario.client.getDestination(scenario.getScheme(),
"localhost",
@ -1139,7 +1144,18 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
CountDownLatch clientLatch = new CountDownLatch(1);
connection.send(request, result ->
{
assertThat(result.getResponse().getStatus(), Matchers.equalTo(responseCode));
switch (transport)
{
case HTTP:
assertThat(result.getResponse().getStatus(), Matchers.equalTo(responseCode));
break;
case H2C:
// HTTP/2 does not attempt to write a response back, just a RST_STREAM.
assertTrue(result.isFailed());
break;
default:
fail("Unhandled transport: " + transport);
}
clientLatch.countDown();
});
@ -1148,11 +1164,9 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
switch (transport)
{
case HTTP:
case HTTPS:
((HttpConnectionOverHTTP)connection).getEndPoint().shutdownOutput();
break;
case H2C:
case H2:
// In case of HTTP/2, we not only send the request, but also the preface and
// SETTINGS frames. SETTINGS frame need to be replied, so we want to wait to
// write the reply before shutting output down, so that the test does not fail.

View File

@ -2,6 +2,6 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.client.LEVEL=DEBUG
#org.eclipse.jetty.http2.LEVEL=DEBUG
#org.eclipse.jetty.http2.hpack.LEVEL=INFO
org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.http2.client.LEVEL=DEBUG
#org.eclipse.jetty.io.LEVEL=DEBUG