From 873d1c6d7ddcd773c73ce04416fc4d8dd54c8583 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 25 Jul 2019 22:21:13 +0200 Subject: [PATCH] Fixes #3601 - HTTP2 stall on reset streams. The client reset wakes up threads blocked in writes, but these may again attempt to write, therefore blocking again. Now we detect that the stream is not writable and mark the transport as failed, so that writes fail immediately without blocking. Signed-off-by: Simone Bordet --- .../jetty/http2/client/StreamResetTest.java | 226 ++++++++++++++++++ .../http2/server/HttpTransportOverHTTP2.java | 8 +- .../eclipse/jetty/io/AbstractEndPoint.java | 2 +- .../eclipse/jetty/io/ssl/SslConnection.java | 2 +- 4 files changed, 233 insertions(+), 5 deletions(-) diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index f025e90f38b..655c59a9600 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -20,19 +20,26 @@ package org.eclipse.jetty.http2.client; import java.io.IOException; import java.io.InterruptedIOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.servlet.AsyncContext; +import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; import javax.servlet.http.HttpServlet; @@ -40,7 +47,9 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.ErrorCode; @@ -54,12 +63,20 @@ import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.PrefaceFrame; import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.http2.frames.SettingsFrame; +import org.eclipse.jetty.http2.frames.WindowUpdateFrame; +import org.eclipse.jetty.http2.generator.Generator; import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.io.AbstractEndPoint; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpOutput; +import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -833,4 +850,213 @@ public class StreamResetTest extends AbstractTest // Read on server should fail. assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testResetAfterTCPCongestedWrite() throws Exception + { + AtomicReference flusherRef = new AtomicReference<>(); + CountDownLatch flusherLatch = new CountDownLatch(1); + CountDownLatch writeLatch1 = new CountDownLatch(1); + CountDownLatch writeLatch2 = new CountDownLatch(1); + start(new EmptyHttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException + { + Request jettyRequest = (Request)request; + flusherRef.set(((AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint()).getWriteFlusher()); + flusherLatch.countDown(); + + ServletOutputStream output = response.getOutputStream(); + try + { + // Large write, it blocks due to TCP congestion. + byte[] data = new byte[128 * 1024 * 1024]; + output.write(data); + } + catch (IOException x) + { + writeLatch1.countDown(); + try + { + // Try to write again, must fail immediately. + output.write(0xFF); + } + catch (IOException xx) + { + writeLatch2.countDown(); + } + } + } + }); + + ByteBufferPool byteBufferPool = client.getByteBufferPool(); + try (SocketChannel socket = SocketChannel.open()) + { + String host = "localhost"; + int port = connector.getLocalPort(); + socket.connect(new InetSocketAddress(host, port)); + + Generator generator = new Generator(byteBufferPool); + ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); + generator.control(lease, new PrefaceFrame()); + Map clientSettings = new HashMap<>(); + // Max stream HTTP/2 flow control window. + clientSettings.put(SettingsFrame.INITIAL_WINDOW_SIZE, Integer.MAX_VALUE); + generator.control(lease, new SettingsFrame(clientSettings, false)); + // Max session HTTP/2 flow control window. + generator.control(lease, new WindowUpdateFrame(0, Integer.MAX_VALUE - FlowControlStrategy.DEFAULT_WINDOW_SIZE)); + + HttpURI uri = new HttpURI("http", host, port, servletPath); + MetaData.Request request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields()); + int streamId = 3; + HeadersFrame headersFrame = new HeadersFrame(streamId, request, null, true); + generator.control(lease, headersFrame); + + List buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + + // Wait until the server is TCP congested. + assertTrue(flusherLatch.await(5, TimeUnit.SECONDS)); + WriteFlusher flusher = flusherRef.get(); + waitUntilTCPCongested(flusher); + + lease.recycle(); + generator.control(lease, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code)); + buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + + assertTrue(writeLatch1.await(5, TimeUnit.SECONDS)); + assertTrue(writeLatch2.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testResetSecondRequestAfterTCPCongestedWriteBeforeWrite() throws Exception + { + Exchanger exchanger = new Exchanger<>(); + CountDownLatch requestLatch1 = new CountDownLatch(1); + CountDownLatch requestLatch2 = new CountDownLatch(1); + CountDownLatch writeLatch1 = new CountDownLatch(1); + start(new EmptyHttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + if (request.getPathInfo().equals("/1")) + service1(request, response); + else if (request.getPathInfo().equals("/2")) + service2(request, response); + else + throw new IllegalArgumentException(); + } + + private void service1(HttpServletRequest request, HttpServletResponse response) throws IOException + { + try + { + Request jettyRequest = (Request)request; + exchanger.exchange(((AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint()).getWriteFlusher()); + + ServletOutputStream output = response.getOutputStream(); + // Large write, it blocks due to TCP congestion. + output.write(new byte[128 * 1024 * 1024]); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + + private void service2(HttpServletRequest request, HttpServletResponse response) throws IOException + { + try + { + requestLatch1.countDown(); + requestLatch2.await(); + ServletOutputStream output = response.getOutputStream(); + int length = 512 * 1024; + AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class); + if (h2 != null) + length = h2.getHttpConfiguration().getOutputAggregationSize(); + // Medium write so we don't aggregate it, must not block. + output.write(new byte[length * 2]); + } + catch (IOException x) + { + writeLatch1.countDown(); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + }); + + ByteBufferPool byteBufferPool = client.getByteBufferPool(); + try (SocketChannel socket = SocketChannel.open()) + { + String host = "localhost"; + int port = connector.getLocalPort(); + socket.connect(new InetSocketAddress(host, port)); + + Generator generator = new Generator(byteBufferPool); + ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); + generator.control(lease, new PrefaceFrame()); + Map clientSettings = new HashMap<>(); + // Max stream HTTP/2 flow control window. + clientSettings.put(SettingsFrame.INITIAL_WINDOW_SIZE, Integer.MAX_VALUE); + generator.control(lease, new SettingsFrame(clientSettings, false)); + // Max session HTTP/2 flow control window. + generator.control(lease, new WindowUpdateFrame(0, Integer.MAX_VALUE - FlowControlStrategy.DEFAULT_WINDOW_SIZE)); + + HttpURI uri = new HttpURI("http", host, port, servletPath + "/1"); + MetaData.Request request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields()); + HeadersFrame headersFrame = new HeadersFrame(3, request, null, true); + generator.control(lease, headersFrame); + + List buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + + waitUntilTCPCongested(exchanger.exchange(null)); + + // Send a second request. + uri = new HttpURI("http", host, port, servletPath + "/2"); + request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields()); + int streamId = 5; + headersFrame = new HeadersFrame(streamId, request, null, true); + generator.control(lease, headersFrame); + buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + assertTrue(requestLatch1.await(5, TimeUnit.SECONDS)); + + // Now reset the second request, which has not started writing yet. + lease.recycle(); + generator.control(lease, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code)); + buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + // Wait to be sure that the server processed the reset. + Thread.sleep(1000); + // Let the request write, it should not block. + requestLatch2.countDown(); + assertTrue(writeLatch1.await(555, TimeUnit.SECONDS)); + } + } + + private void waitUntilTCPCongested(WriteFlusher flusher) throws TimeoutException, InterruptedException + { + long start = System.nanoTime(); + while (true) + { + // Yuck! But no other easy way to detect this. + if ("P".equals(flusher.toStateString())) + break; + long elapsed = System.nanoTime() - start; + if (TimeUnit.NANOSECONDS.toSeconds(elapsed) > 15) + throw new TimeoutException(); + Thread.sleep(100); + } + // Wait for the selector to update the SelectionKey to OP_WRITE. + Thread.sleep(1000); + } } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index b19123bb8f1..044f6609b6a 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -33,6 +33,7 @@ import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpTransport; import org.eclipse.jetty.util.BufferUtil; @@ -396,9 +397,10 @@ public class HttpTransportOverHTTP2 implements HttpTransport synchronized (this) { commit = this.commit; - // Only fail pending writes, as we - // may need to write an error page. - if (state == State.WRITING) + // Don't always fail, as we may need to write an error response. + EndPoint endPoint = connection.getEndPoint(); + boolean cannotWrite = stream.isReset() || endPoint.isOutputShutdown() || !endPoint.isOpen(); + if ((cannotWrite && state == State.IDLE) || state == State.WRITING) { this.state = State.FAILED; callback = this.callback; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 8b8499bc5b4..c7ca0333cd5 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -390,7 +390,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint return _fillInterest; } - protected WriteFlusher getWriteFlusher() + public WriteFlusher getWriteFlusher() { return _writeFlusher; } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 03f84f0970c..c1b4398b6eb 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -403,7 +403,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr } @Override - protected WriteFlusher getWriteFlusher() + public WriteFlusher getWriteFlusher() { return super.getWriteFlusher(); }