Fixes #3601 - HTTP2 stall on reset streams.

The client reset wakes up threads blocked in
writes, but these may again attempt to write,
therefore blocking again.

Now we detect that the stream is not writable
and mark the transport as failed, so that
writes fail immediately without blocking.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-07-25 22:21:13 +02:00
parent 1aaadea13d
commit 873d1c6d7d
4 changed files with 233 additions and 5 deletions

View File

@ -20,19 +20,26 @@ package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet;
@ -40,7 +47,9 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
@ -54,12 +63,20 @@ import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
@ -833,4 +850,213 @@ public class StreamResetTest extends AbstractTest
// Read on server should fail.
assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testResetAfterTCPCongestedWrite() throws Exception
{
AtomicReference<WriteFlusher> flusherRef = new AtomicReference<>();
CountDownLatch flusherLatch = new CountDownLatch(1);
CountDownLatch writeLatch1 = new CountDownLatch(1);
CountDownLatch writeLatch2 = new CountDownLatch(1);
start(new EmptyHttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
Request jettyRequest = (Request)request;
flusherRef.set(((AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint()).getWriteFlusher());
flusherLatch.countDown();
ServletOutputStream output = response.getOutputStream();
try
{
// Large write, it blocks due to TCP congestion.
byte[] data = new byte[128 * 1024 * 1024];
output.write(data);
}
catch (IOException x)
{
writeLatch1.countDown();
try
{
// Try to write again, must fail immediately.
output.write(0xFF);
}
catch (IOException xx)
{
writeLatch2.countDown();
}
}
}
});
ByteBufferPool byteBufferPool = client.getByteBufferPool();
try (SocketChannel socket = SocketChannel.open())
{
String host = "localhost";
int port = connector.getLocalPort();
socket.connect(new InetSocketAddress(host, port));
Generator generator = new Generator(byteBufferPool);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
Map<Integer, Integer> clientSettings = new HashMap<>();
// Max stream HTTP/2 flow control window.
clientSettings.put(SettingsFrame.INITIAL_WINDOW_SIZE, Integer.MAX_VALUE);
generator.control(lease, new SettingsFrame(clientSettings, false));
// Max session HTTP/2 flow control window.
generator.control(lease, new WindowUpdateFrame(0, Integer.MAX_VALUE - FlowControlStrategy.DEFAULT_WINDOW_SIZE));
HttpURI uri = new HttpURI("http", host, port, servletPath);
MetaData.Request request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields());
int streamId = 3;
HeadersFrame headersFrame = new HeadersFrame(streamId, request, null, true);
generator.control(lease, headersFrame);
List<ByteBuffer> buffers = lease.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[0]));
// Wait until the server is TCP congested.
assertTrue(flusherLatch.await(5, TimeUnit.SECONDS));
WriteFlusher flusher = flusherRef.get();
waitUntilTCPCongested(flusher);
lease.recycle();
generator.control(lease, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code));
buffers = lease.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[0]));
assertTrue(writeLatch1.await(5, TimeUnit.SECONDS));
assertTrue(writeLatch2.await(5, TimeUnit.SECONDS));
}
}
@Test
public void testResetSecondRequestAfterTCPCongestedWriteBeforeWrite() throws Exception
{
Exchanger<WriteFlusher> exchanger = new Exchanger<>();
CountDownLatch requestLatch1 = new CountDownLatch(1);
CountDownLatch requestLatch2 = new CountDownLatch(1);
CountDownLatch writeLatch1 = new CountDownLatch(1);
start(new EmptyHttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
if (request.getPathInfo().equals("/1"))
service1(request, response);
else if (request.getPathInfo().equals("/2"))
service2(request, response);
else
throw new IllegalArgumentException();
}
private void service1(HttpServletRequest request, HttpServletResponse response) throws IOException
{
try
{
Request jettyRequest = (Request)request;
exchanger.exchange(((AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint()).getWriteFlusher());
ServletOutputStream output = response.getOutputStream();
// Large write, it blocks due to TCP congestion.
output.write(new byte[128 * 1024 * 1024]);
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
private void service2(HttpServletRequest request, HttpServletResponse response) throws IOException
{
try
{
requestLatch1.countDown();
requestLatch2.await();
ServletOutputStream output = response.getOutputStream();
int length = 512 * 1024;
AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class);
if (h2 != null)
length = h2.getHttpConfiguration().getOutputAggregationSize();
// Medium write so we don't aggregate it, must not block.
output.write(new byte[length * 2]);
}
catch (IOException x)
{
writeLatch1.countDown();
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
});
ByteBufferPool byteBufferPool = client.getByteBufferPool();
try (SocketChannel socket = SocketChannel.open())
{
String host = "localhost";
int port = connector.getLocalPort();
socket.connect(new InetSocketAddress(host, port));
Generator generator = new Generator(byteBufferPool);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
Map<Integer, Integer> clientSettings = new HashMap<>();
// Max stream HTTP/2 flow control window.
clientSettings.put(SettingsFrame.INITIAL_WINDOW_SIZE, Integer.MAX_VALUE);
generator.control(lease, new SettingsFrame(clientSettings, false));
// Max session HTTP/2 flow control window.
generator.control(lease, new WindowUpdateFrame(0, Integer.MAX_VALUE - FlowControlStrategy.DEFAULT_WINDOW_SIZE));
HttpURI uri = new HttpURI("http", host, port, servletPath + "/1");
MetaData.Request request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields());
HeadersFrame headersFrame = new HeadersFrame(3, request, null, true);
generator.control(lease, headersFrame);
List<ByteBuffer> buffers = lease.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[0]));
waitUntilTCPCongested(exchanger.exchange(null));
// Send a second request.
uri = new HttpURI("http", host, port, servletPath + "/2");
request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields());
int streamId = 5;
headersFrame = new HeadersFrame(streamId, request, null, true);
generator.control(lease, headersFrame);
buffers = lease.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[0]));
assertTrue(requestLatch1.await(5, TimeUnit.SECONDS));
// Now reset the second request, which has not started writing yet.
lease.recycle();
generator.control(lease, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code));
buffers = lease.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[0]));
// Wait to be sure that the server processed the reset.
Thread.sleep(1000);
// Let the request write, it should not block.
requestLatch2.countDown();
assertTrue(writeLatch1.await(555, TimeUnit.SECONDS));
}
}
private void waitUntilTCPCongested(WriteFlusher flusher) throws TimeoutException, InterruptedException
{
long start = System.nanoTime();
while (true)
{
// Yuck! But no other easy way to detect this.
if ("P".equals(flusher.toStateString()))
break;
long elapsed = System.nanoTime() - start;
if (TimeUnit.NANOSECONDS.toSeconds(elapsed) > 15)
throw new TimeoutException();
Thread.sleep(100);
}
// Wait for the selector to update the SelectionKey to OP_WRITE.
Thread.sleep(1000);
}
}

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.util.BufferUtil;
@ -396,9 +397,10 @@ public class HttpTransportOverHTTP2 implements HttpTransport
synchronized (this)
{
commit = this.commit;
// Only fail pending writes, as we
// may need to write an error page.
if (state == State.WRITING)
// Don't always fail, as we may need to write an error response.
EndPoint endPoint = connection.getEndPoint();
boolean cannotWrite = stream.isReset() || endPoint.isOutputShutdown() || !endPoint.isOpen();
if ((cannotWrite && state == State.IDLE) || state == State.WRITING)
{
this.state = State.FAILED;
callback = this.callback;

View File

@ -390,7 +390,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
return _fillInterest;
}
protected WriteFlusher getWriteFlusher()
public WriteFlusher getWriteFlusher()
{
return _writeFlusher;
}

View File

@ -403,7 +403,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
}
@Override
protected WriteFlusher getWriteFlusher()
public WriteFlusher getWriteFlusher()
{
return super.getWriteFlusher();
}