diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java index f461659cafb..df1fe1bf9ee 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java @@ -135,25 +135,28 @@ public class AsyncServletTest extends AbstractTest MetaData.Request metaData = newRequest("GET", fields); HeadersFrame frame = new HeadersFrame(metaData, null, true); FuturePromise promise = new FuturePromise<>(); - CountDownLatch clientLatch = new CountDownLatch(1); + CountDownLatch responseLatch = new CountDownLatch(1); + CountDownLatch resetLatch = new CountDownLatch(1); session.newStream(frame, promise, new Stream.Listener.Adapter() { @Override public void onHeaders(Stream stream, HeadersFrame frame) { - MetaData.Response response = (MetaData.Response)frame.getMetaData(); - if (response.getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500 && frame.isEndStream()) - clientLatch.countDown(); + responseLatch.countDown(); + } + + @Override + public void onReset(Stream stream, ResetFrame frame) + { + resetLatch.countDown(); } }); Stream stream = promise.get(5, TimeUnit.SECONDS); stream.setIdleTimeout(10 * idleTimeout); - // When the client closes, the server receives the - // corresponding frame and acts by notifying the failure, - // which sends back to the client the error response. assertTrue(serverLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); - assertTrue(clientLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + assertFalse(responseLatch.await(idleTimeout + 1000, TimeUnit.MILLISECONDS)); + assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); } @Test diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index f025e90f38b..f86e132fb2e 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -20,16 +20,22 @@ package org.eclipse.jetty.http2.client; import java.io.IOException; import java.io.InterruptedIOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.servlet.AsyncContext; @@ -40,7 +46,9 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.ErrorCode; @@ -54,12 +62,20 @@ import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.PrefaceFrame; import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.http2.frames.SettingsFrame; +import org.eclipse.jetty.http2.frames.WindowUpdateFrame; +import org.eclipse.jetty.http2.generator.Generator; import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.io.AbstractEndPoint; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpOutput; +import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -833,4 +849,210 @@ public class StreamResetTest extends AbstractTest // Read on server should fail. assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testResetAfterTCPCongestedWrite() throws Exception + { + AtomicReference flusherRef = new AtomicReference<>(); + CountDownLatch flusherLatch = new CountDownLatch(1); + CountDownLatch writeLatch1 = new CountDownLatch(1); + CountDownLatch writeLatch2 = new CountDownLatch(1); + start(new EmptyHttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException + { + Request jettyRequest = (Request)request; + flusherRef.set(((AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint()).getWriteFlusher()); + flusherLatch.countDown(); + + ServletOutputStream output = response.getOutputStream(); + try + { + // Large write, it blocks due to TCP congestion. + byte[] data = new byte[128 * 1024 * 1024]; + output.write(data); + } + catch (IOException x) + { + writeLatch1.countDown(); + try + { + // Try to write again, must fail immediately. + output.write(0xFF); + } + catch (IOException xx) + { + writeLatch2.countDown(); + } + } + } + }); + + ByteBufferPool byteBufferPool = client.getByteBufferPool(); + try (SocketChannel socket = SocketChannel.open()) + { + String host = "localhost"; + int port = connector.getLocalPort(); + socket.connect(new InetSocketAddress(host, port)); + + Generator generator = new Generator(byteBufferPool); + ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); + generator.control(lease, new PrefaceFrame()); + Map clientSettings = new HashMap<>(); + // Max stream HTTP/2 flow control window. + clientSettings.put(SettingsFrame.INITIAL_WINDOW_SIZE, Integer.MAX_VALUE); + generator.control(lease, new SettingsFrame(clientSettings, false)); + // Max session HTTP/2 flow control window. + generator.control(lease, new WindowUpdateFrame(0, Integer.MAX_VALUE - FlowControlStrategy.DEFAULT_WINDOW_SIZE)); + + HttpURI uri = new HttpURI("http", host, port, servletPath); + MetaData.Request request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields()); + int streamId = 3; + HeadersFrame headersFrame = new HeadersFrame(streamId, request, null, true); + generator.control(lease, headersFrame); + + List buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + + // Wait until the server is TCP congested. + assertTrue(flusherLatch.await(5, TimeUnit.SECONDS)); + WriteFlusher flusher = flusherRef.get(); + waitUntilTCPCongested(flusher); + + lease.recycle(); + generator.control(lease, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code)); + buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + + assertTrue(writeLatch1.await(5, TimeUnit.SECONDS)); + assertTrue(writeLatch2.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testResetSecondRequestAfterTCPCongestedWriteBeforeWrite() throws Exception + { + Exchanger exchanger = new Exchanger<>(); + CountDownLatch requestLatch1 = new CountDownLatch(1); + CountDownLatch requestLatch2 = new CountDownLatch(1); + CountDownLatch writeLatch1 = new CountDownLatch(1); + start(new EmptyHttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException + { + if (request.getPathInfo().equals("/1")) + service1(request, response); + else if (request.getPathInfo().equals("/2")) + service2(request, response); + else + throw new IllegalArgumentException(); + } + + private void service1(HttpServletRequest request, HttpServletResponse response) throws IOException + { + try + { + Request jettyRequest = (Request)request; + exchanger.exchange(((AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint()).getWriteFlusher()); + + ServletOutputStream output = response.getOutputStream(); + // Large write, it blocks due to TCP congestion. + output.write(new byte[128 * 1024 * 1024]); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + + private void service2(HttpServletRequest request, HttpServletResponse response) throws IOException + { + try + { + requestLatch1.countDown(); + requestLatch2.await(); + ServletOutputStream output = response.getOutputStream(); + int length = 512 * 1024; + AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class); + if (h2 != null) + length = h2.getHttpConfiguration().getOutputAggregationSize(); + // Medium write so we don't aggregate it, must not block. + output.write(new byte[length * 2]); + } + catch (IOException x) + { + writeLatch1.countDown(); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + }); + + ByteBufferPool byteBufferPool = client.getByteBufferPool(); + try (SocketChannel socket = SocketChannel.open()) + { + String host = "localhost"; + int port = connector.getLocalPort(); + socket.connect(new InetSocketAddress(host, port)); + + Generator generator = new Generator(byteBufferPool); + ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); + generator.control(lease, new PrefaceFrame()); + Map clientSettings = new HashMap<>(); + // Max stream HTTP/2 flow control window. + clientSettings.put(SettingsFrame.INITIAL_WINDOW_SIZE, Integer.MAX_VALUE); + generator.control(lease, new SettingsFrame(clientSettings, false)); + // Max session HTTP/2 flow control window. + generator.control(lease, new WindowUpdateFrame(0, Integer.MAX_VALUE - FlowControlStrategy.DEFAULT_WINDOW_SIZE)); + + HttpURI uri = new HttpURI("http", host, port, servletPath + "/1"); + MetaData.Request request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields()); + HeadersFrame headersFrame = new HeadersFrame(3, request, null, true); + generator.control(lease, headersFrame); + + List buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + + waitUntilTCPCongested(exchanger.exchange(null)); + + // Send a second request. + uri = new HttpURI("http", host, port, servletPath + "/2"); + request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields()); + int streamId = 5; + headersFrame = new HeadersFrame(streamId, request, null, true); + generator.control(lease, headersFrame); + buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + assertTrue(requestLatch1.await(5, TimeUnit.SECONDS)); + + // Now reset the second request, which has not started writing yet. + lease.recycle(); + generator.control(lease, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code)); + buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + // Wait to be sure that the server processed the reset. + Thread.sleep(1000); + // Let the request write, it should not block. + requestLatch2.countDown(); + assertTrue(writeLatch1.await(5, TimeUnit.SECONDS)); + } + } + + private void waitUntilTCPCongested(WriteFlusher flusher) throws TimeoutException, InterruptedException + { + long start = System.nanoTime(); + while (!flusher.isPending()) + { + long elapsed = System.nanoTime() - start; + if (TimeUnit.NANOSECONDS.toSeconds(elapsed) > 15) + throw new TimeoutException(); + Thread.sleep(100); + } + // Wait for the selector to update the SelectionKey to OP_WRITE. + Thread.sleep(1000); + } } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index a8a9476423f..11cd845cce0 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -331,7 +331,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport LOG.debug("HTTP2 Response #{}/{} aborted", stream == null ? -1 : stream.getId(), stream == null ? -1 : Integer.toHexString(stream.getSession().hashCode())); if (stream != null) - stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP); + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); } private class TransportCallback implements Callback @@ -392,35 +392,20 @@ public class HttpTransportOverHTTP2 implements HttpTransport public void failed(Throwable failure) { boolean commit; - Callback callback = null; - synchronized (this) - { - commit = this.commit; - // Only fail pending writes, as we - // may need to write an error page. - if (state == State.WRITING) - { - this.state = State.FAILED; - callback = this.callback; - this.callback = null; - this.failure = failure; - } - } - if (LOG.isDebugEnabled()) - LOG.debug(String.format("HTTP2 Response #%d/%h %s %s", stream.getId(), stream.getSession(), commit ? "commit" : "flush", callback == null ? "ignored" : "failed"), failure); - if (callback != null) - callback.failed(failure); - } - - @Override - public InvocationType getInvocationType() - { Callback callback; synchronized (this) { + commit = this.commit; + this.state = State.FAILED; callback = this.callback; + this.callback = null; + this.failure = failure; } - return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType(); + if (LOG.isDebugEnabled()) + LOG.debug(String.format("HTTP2 Response #%d/%h %s %s", stream.getId(), stream.getSession(), + commit ? "commit" : "flush", callback == null ? "ignored" : "failed"), failure); + if (callback != null) + callback.failed(failure); } private boolean onIdleTimeout(Throwable failure) @@ -446,6 +431,17 @@ public class HttpTransportOverHTTP2 implements HttpTransport callback.failed(failure); return result; } + + @Override + public InvocationType getInvocationType() + { + Callback callback; + synchronized (this) + { + callback = this.callback; + } + return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType(); + } } private enum State diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 7e926856c04..d247a9ed5dd 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -406,7 +406,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint return _fillInterest; } - protected WriteFlusher getWriteFlusher() + public WriteFlusher getWriteFlusher() { return _writeFlusher; } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index 931052bc618..a1e040754d7 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -258,7 +258,7 @@ public abstract class WriteFlusher */ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException { - callback = Objects.requireNonNull(callback); + Objects.requireNonNull(callback); if (isFailed()) { @@ -526,12 +526,22 @@ public abstract class WriteFlusher boolean isFailed() { - return _state.get().getType() == StateType.FAILED; + return isState(StateType.FAILED); } boolean isIdle() { - return _state.get().getType() == StateType.IDLE; + return isState(StateType.IDLE); + } + + public boolean isPending() + { + return isState(StateType.PENDING); + } + + private boolean isState(StateType type) + { + return _state.get().getType() == type; } public String toStateString() diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 35f603caa35..f20d0b03f18 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -403,7 +403,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr } @Override - protected WriteFlusher getWriteFlusher() + public WriteFlusher getWriteFlusher() { return super.getWriteFlusher(); } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java index b4a8011e33e..c868796d1ad 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java @@ -70,7 +70,6 @@ import org.eclipse.jetty.server.HttpInput.Content; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler.Context; -import org.eclipse.jetty.unixsocket.server.UnixSocketConnector; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.log.StacklessLogging; @@ -82,6 +81,8 @@ import org.junit.jupiter.params.provider.ArgumentsSource; import static java.nio.ByteBuffer.wrap; import static org.eclipse.jetty.http.client.Transport.FCGI; +import static org.eclipse.jetty.http.client.Transport.H2C; +import static org.eclipse.jetty.http.client.Transport.HTTP; import static org.eclipse.jetty.util.BufferUtil.toArray; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; @@ -1074,6 +1075,7 @@ public class AsyncIOServletTest extends AbstractTest responseLatch.countDown()); - - if (scenario.connector instanceof UnixSocketConnector) - { - // skip rest of this test for unix socket - return; - } + .onResponseSuccess(response -> + { + if (transport == HTTP) + responseLatch.countDown(); + }) + .onResponseFailure((response, failure) -> + { + if (transport == H2C) + responseLatch.countDown(); + }); Destination destination = scenario.client.getDestination(scenario.getScheme(), "localhost", @@ -1139,7 +1144,18 @@ public class AsyncIOServletTest extends AbstractTest { - assertThat(result.getResponse().getStatus(), Matchers.equalTo(responseCode)); + switch (transport) + { + case HTTP: + assertThat(result.getResponse().getStatus(), Matchers.equalTo(responseCode)); + break; + case H2C: + // HTTP/2 does not attempt to write a response back, just a RST_STREAM. + assertTrue(result.isFailed()); + break; + default: + fail("Unhandled transport: " + transport); + } clientLatch.countDown(); }); @@ -1148,11 +1164,9 @@ public class AsyncIOServletTest extends AbstractTest