From 873d1c6d7ddcd773c73ce04416fc4d8dd54c8583 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 25 Jul 2019 22:21:13 +0200 Subject: [PATCH 1/3] Fixes #3601 - HTTP2 stall on reset streams. The client reset wakes up threads blocked in writes, but these may again attempt to write, therefore blocking again. Now we detect that the stream is not writable and mark the transport as failed, so that writes fail immediately without blocking. Signed-off-by: Simone Bordet --- .../jetty/http2/client/StreamResetTest.java | 226 ++++++++++++++++++ .../http2/server/HttpTransportOverHTTP2.java | 8 +- .../eclipse/jetty/io/AbstractEndPoint.java | 2 +- .../eclipse/jetty/io/ssl/SslConnection.java | 2 +- 4 files changed, 233 insertions(+), 5 deletions(-) diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index f025e90f38b..655c59a9600 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -20,19 +20,26 @@ package org.eclipse.jetty.http2.client; import java.io.IOException; import java.io.InterruptedIOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.servlet.AsyncContext; +import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; import javax.servlet.http.HttpServlet; @@ -40,7 +47,9 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.ErrorCode; @@ -54,12 +63,20 @@ import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.PrefaceFrame; import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.http2.frames.SettingsFrame; +import org.eclipse.jetty.http2.frames.WindowUpdateFrame; +import org.eclipse.jetty.http2.generator.Generator; import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.io.AbstractEndPoint; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpOutput; +import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -833,4 +850,213 @@ public class StreamResetTest extends AbstractTest // Read on server should fail. assertTrue(failureLatch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testResetAfterTCPCongestedWrite() throws Exception + { + AtomicReference flusherRef = new AtomicReference<>(); + CountDownLatch flusherLatch = new CountDownLatch(1); + CountDownLatch writeLatch1 = new CountDownLatch(1); + CountDownLatch writeLatch2 = new CountDownLatch(1); + start(new EmptyHttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException + { + Request jettyRequest = (Request)request; + flusherRef.set(((AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint()).getWriteFlusher()); + flusherLatch.countDown(); + + ServletOutputStream output = response.getOutputStream(); + try + { + // Large write, it blocks due to TCP congestion. + byte[] data = new byte[128 * 1024 * 1024]; + output.write(data); + } + catch (IOException x) + { + writeLatch1.countDown(); + try + { + // Try to write again, must fail immediately. + output.write(0xFF); + } + catch (IOException xx) + { + writeLatch2.countDown(); + } + } + } + }); + + ByteBufferPool byteBufferPool = client.getByteBufferPool(); + try (SocketChannel socket = SocketChannel.open()) + { + String host = "localhost"; + int port = connector.getLocalPort(); + socket.connect(new InetSocketAddress(host, port)); + + Generator generator = new Generator(byteBufferPool); + ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); + generator.control(lease, new PrefaceFrame()); + Map clientSettings = new HashMap<>(); + // Max stream HTTP/2 flow control window. + clientSettings.put(SettingsFrame.INITIAL_WINDOW_SIZE, Integer.MAX_VALUE); + generator.control(lease, new SettingsFrame(clientSettings, false)); + // Max session HTTP/2 flow control window. + generator.control(lease, new WindowUpdateFrame(0, Integer.MAX_VALUE - FlowControlStrategy.DEFAULT_WINDOW_SIZE)); + + HttpURI uri = new HttpURI("http", host, port, servletPath); + MetaData.Request request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields()); + int streamId = 3; + HeadersFrame headersFrame = new HeadersFrame(streamId, request, null, true); + generator.control(lease, headersFrame); + + List buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + + // Wait until the server is TCP congested. + assertTrue(flusherLatch.await(5, TimeUnit.SECONDS)); + WriteFlusher flusher = flusherRef.get(); + waitUntilTCPCongested(flusher); + + lease.recycle(); + generator.control(lease, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code)); + buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + + assertTrue(writeLatch1.await(5, TimeUnit.SECONDS)); + assertTrue(writeLatch2.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testResetSecondRequestAfterTCPCongestedWriteBeforeWrite() throws Exception + { + Exchanger exchanger = new Exchanger<>(); + CountDownLatch requestLatch1 = new CountDownLatch(1); + CountDownLatch requestLatch2 = new CountDownLatch(1); + CountDownLatch writeLatch1 = new CountDownLatch(1); + start(new EmptyHttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + if (request.getPathInfo().equals("/1")) + service1(request, response); + else if (request.getPathInfo().equals("/2")) + service2(request, response); + else + throw new IllegalArgumentException(); + } + + private void service1(HttpServletRequest request, HttpServletResponse response) throws IOException + { + try + { + Request jettyRequest = (Request)request; + exchanger.exchange(((AbstractEndPoint)jettyRequest.getHttpChannel().getEndPoint()).getWriteFlusher()); + + ServletOutputStream output = response.getOutputStream(); + // Large write, it blocks due to TCP congestion. + output.write(new byte[128 * 1024 * 1024]); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + + private void service2(HttpServletRequest request, HttpServletResponse response) throws IOException + { + try + { + requestLatch1.countDown(); + requestLatch2.await(); + ServletOutputStream output = response.getOutputStream(); + int length = 512 * 1024; + AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class); + if (h2 != null) + length = h2.getHttpConfiguration().getOutputAggregationSize(); + // Medium write so we don't aggregate it, must not block. + output.write(new byte[length * 2]); + } + catch (IOException x) + { + writeLatch1.countDown(); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + }); + + ByteBufferPool byteBufferPool = client.getByteBufferPool(); + try (SocketChannel socket = SocketChannel.open()) + { + String host = "localhost"; + int port = connector.getLocalPort(); + socket.connect(new InetSocketAddress(host, port)); + + Generator generator = new Generator(byteBufferPool); + ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); + generator.control(lease, new PrefaceFrame()); + Map clientSettings = new HashMap<>(); + // Max stream HTTP/2 flow control window. + clientSettings.put(SettingsFrame.INITIAL_WINDOW_SIZE, Integer.MAX_VALUE); + generator.control(lease, new SettingsFrame(clientSettings, false)); + // Max session HTTP/2 flow control window. + generator.control(lease, new WindowUpdateFrame(0, Integer.MAX_VALUE - FlowControlStrategy.DEFAULT_WINDOW_SIZE)); + + HttpURI uri = new HttpURI("http", host, port, servletPath + "/1"); + MetaData.Request request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields()); + HeadersFrame headersFrame = new HeadersFrame(3, request, null, true); + generator.control(lease, headersFrame); + + List buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + + waitUntilTCPCongested(exchanger.exchange(null)); + + // Send a second request. + uri = new HttpURI("http", host, port, servletPath + "/2"); + request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, new HttpFields()); + int streamId = 5; + headersFrame = new HeadersFrame(streamId, request, null, true); + generator.control(lease, headersFrame); + buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + assertTrue(requestLatch1.await(5, TimeUnit.SECONDS)); + + // Now reset the second request, which has not started writing yet. + lease.recycle(); + generator.control(lease, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code)); + buffers = lease.getByteBuffers(); + socket.write(buffers.toArray(new ByteBuffer[0])); + // Wait to be sure that the server processed the reset. + Thread.sleep(1000); + // Let the request write, it should not block. + requestLatch2.countDown(); + assertTrue(writeLatch1.await(555, TimeUnit.SECONDS)); + } + } + + private void waitUntilTCPCongested(WriteFlusher flusher) throws TimeoutException, InterruptedException + { + long start = System.nanoTime(); + while (true) + { + // Yuck! But no other easy way to detect this. + if ("P".equals(flusher.toStateString())) + break; + long elapsed = System.nanoTime() - start; + if (TimeUnit.NANOSECONDS.toSeconds(elapsed) > 15) + throw new TimeoutException(); + Thread.sleep(100); + } + // Wait for the selector to update the SelectionKey to OP_WRITE. + Thread.sleep(1000); + } } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index b19123bb8f1..044f6609b6a 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -33,6 +33,7 @@ import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpTransport; import org.eclipse.jetty.util.BufferUtil; @@ -396,9 +397,10 @@ public class HttpTransportOverHTTP2 implements HttpTransport synchronized (this) { commit = this.commit; - // Only fail pending writes, as we - // may need to write an error page. - if (state == State.WRITING) + // Don't always fail, as we may need to write an error response. + EndPoint endPoint = connection.getEndPoint(); + boolean cannotWrite = stream.isReset() || endPoint.isOutputShutdown() || !endPoint.isOpen(); + if ((cannotWrite && state == State.IDLE) || state == State.WRITING) { this.state = State.FAILED; callback = this.callback; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 8b8499bc5b4..c7ca0333cd5 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -390,7 +390,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint return _fillInterest; } - protected WriteFlusher getWriteFlusher() + public WriteFlusher getWriteFlusher() { return _writeFlusher; } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 03f84f0970c..c1b4398b6eb 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -403,7 +403,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr } @Override - protected WriteFlusher getWriteFlusher() + public WriteFlusher getWriteFlusher() { return super.getWriteFlusher(); } From 762767c62c186ce0ff179dbea69901009eef456e Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 1 Aug 2019 15:52:47 +0200 Subject: [PATCH 2/3] Fixes #3601 - HTTP2 stall on reset streams. After review, updated the logic to always fail the transport. Signed-off-by: Simone Bordet --- .../jetty/http2/client/AsyncServletTest.java | 19 ++++---- .../http2/server/HttpTransportOverHTTP2.java | 48 ++++++++----------- .../jetty/http/client/AsyncIOServletTest.java | 38 ++++++++++----- .../test/resources/jetty-logging.properties | 2 +- 4 files changed, 59 insertions(+), 48 deletions(-) diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java index f461659cafb..df1fe1bf9ee 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java @@ -135,25 +135,28 @@ public class AsyncServletTest extends AbstractTest MetaData.Request metaData = newRequest("GET", fields); HeadersFrame frame = new HeadersFrame(metaData, null, true); FuturePromise promise = new FuturePromise<>(); - CountDownLatch clientLatch = new CountDownLatch(1); + CountDownLatch responseLatch = new CountDownLatch(1); + CountDownLatch resetLatch = new CountDownLatch(1); session.newStream(frame, promise, new Stream.Listener.Adapter() { @Override public void onHeaders(Stream stream, HeadersFrame frame) { - MetaData.Response response = (MetaData.Response)frame.getMetaData(); - if (response.getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500 && frame.isEndStream()) - clientLatch.countDown(); + responseLatch.countDown(); + } + + @Override + public void onReset(Stream stream, ResetFrame frame) + { + resetLatch.countDown(); } }); Stream stream = promise.get(5, TimeUnit.SECONDS); stream.setIdleTimeout(10 * idleTimeout); - // When the client closes, the server receives the - // corresponding frame and acts by notifying the failure, - // which sends back to the client the error response. assertTrue(serverLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); - assertTrue(clientLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + assertFalse(responseLatch.await(idleTimeout + 1000, TimeUnit.MILLISECONDS)); + assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); } @Test diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index a7a1923a6c5..11cd845cce0 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -33,7 +33,6 @@ import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.ResetFrame; -import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpTransport; import org.eclipse.jetty.util.BufferUtil; @@ -332,7 +331,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport LOG.debug("HTTP2 Response #{}/{} aborted", stream == null ? -1 : stream.getId(), stream == null ? -1 : Integer.toHexString(stream.getSession().hashCode())); if (stream != null) - stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP); + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); } private class TransportCallback implements Callback @@ -393,36 +392,20 @@ public class HttpTransportOverHTTP2 implements HttpTransport public void failed(Throwable failure) { boolean commit; - Callback callback = null; - synchronized (this) - { - commit = this.commit; - // Don't always fail, as we may need to write an error response. - EndPoint endPoint = connection.getEndPoint(); - boolean cannotWrite = stream.isReset() || endPoint.isOutputShutdown() || !endPoint.isOpen(); - if ((cannotWrite && state == State.IDLE) || state == State.WRITING) - { - this.state = State.FAILED; - callback = this.callback; - this.callback = null; - this.failure = failure; - } - } - if (LOG.isDebugEnabled()) - LOG.debug(String.format("HTTP2 Response #%d/%h %s %s", stream.getId(), stream.getSession(), commit ? "commit" : "flush", callback == null ? "ignored" : "failed"), failure); - if (callback != null) - callback.failed(failure); - } - - @Override - public InvocationType getInvocationType() - { Callback callback; synchronized (this) { + commit = this.commit; + this.state = State.FAILED; callback = this.callback; + this.callback = null; + this.failure = failure; } - return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType(); + if (LOG.isDebugEnabled()) + LOG.debug(String.format("HTTP2 Response #%d/%h %s %s", stream.getId(), stream.getSession(), + commit ? "commit" : "flush", callback == null ? "ignored" : "failed"), failure); + if (callback != null) + callback.failed(failure); } private boolean onIdleTimeout(Throwable failure) @@ -448,6 +431,17 @@ public class HttpTransportOverHTTP2 implements HttpTransport callback.failed(failure); return result; } + + @Override + public InvocationType getInvocationType() + { + Callback callback; + synchronized (this) + { + callback = this.callback; + } + return callback != null ? callback.getInvocationType() : Callback.super.getInvocationType(); + } } private enum State diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java index 992156fa015..c868796d1ad 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java @@ -70,7 +70,6 @@ import org.eclipse.jetty.server.HttpInput.Content; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler.Context; -import org.eclipse.jetty.unixsocket.UnixSocketConnector; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.log.StacklessLogging; @@ -82,6 +81,8 @@ import org.junit.jupiter.params.provider.ArgumentsSource; import static java.nio.ByteBuffer.wrap; import static org.eclipse.jetty.http.client.Transport.FCGI; +import static org.eclipse.jetty.http.client.Transport.H2C; +import static org.eclipse.jetty.http.client.Transport.HTTP; import static org.eclipse.jetty.util.BufferUtil.toArray; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; @@ -1074,6 +1075,7 @@ public class AsyncIOServletTest extends AbstractTest responseLatch.countDown()); - - if (scenario.connector instanceof UnixSocketConnector) - { - // skip rest of this test for unix socket - return; - } + .onResponseSuccess(response -> + { + if (transport == HTTP) + responseLatch.countDown(); + }) + .onResponseFailure((response, failure) -> + { + if (transport == H2C) + responseLatch.countDown(); + }); Destination destination = scenario.client.getDestination(scenario.getScheme(), "localhost", @@ -1139,7 +1144,18 @@ public class AsyncIOServletTest extends AbstractTest { - assertThat(result.getResponse().getStatus(), Matchers.equalTo(responseCode)); + switch (transport) + { + case HTTP: + assertThat(result.getResponse().getStatus(), Matchers.equalTo(responseCode)); + break; + case H2C: + // HTTP/2 does not attempt to write a response back, just a RST_STREAM. + assertTrue(result.isFailed()); + break; + default: + fail("Unhandled transport: " + transport); + } clientLatch.countDown(); }); @@ -1148,11 +1164,9 @@ public class AsyncIOServletTest extends AbstractTest Date: Wed, 7 Aug 2019 11:48:31 +0200 Subject: [PATCH 3/3] Fixes #3601 - HTTP2 stall on reset streams. After review, introduced WriteFlusher.isPending() and now using that in the test case to test for TCP congestion. Signed-off-by: Simone Bordet --- .../jetty/http2/client/StreamResetTest.java | 10 +++------- .../java/org/eclipse/jetty/io/WriteFlusher.java | 16 +++++++++++++--- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index 655c59a9600..f86e132fb2e 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -39,7 +39,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.servlet.AsyncContext; -import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; import javax.servlet.http.HttpServlet; @@ -941,7 +940,7 @@ public class StreamResetTest extends AbstractTest start(new EmptyHttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { if (request.getPathInfo().equals("/1")) service1(request, response); @@ -1039,18 +1038,15 @@ public class StreamResetTest extends AbstractTest Thread.sleep(1000); // Let the request write, it should not block. requestLatch2.countDown(); - assertTrue(writeLatch1.await(555, TimeUnit.SECONDS)); + assertTrue(writeLatch1.await(5, TimeUnit.SECONDS)); } } private void waitUntilTCPCongested(WriteFlusher flusher) throws TimeoutException, InterruptedException { long start = System.nanoTime(); - while (true) + while (!flusher.isPending()) { - // Yuck! But no other easy way to detect this. - if ("P".equals(flusher.toStateString())) - break; long elapsed = System.nanoTime() - start; if (TimeUnit.NANOSECONDS.toSeconds(elapsed) > 15) throw new TimeoutException(); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index cd1e2a06ac6..743f0a1fcef 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -258,7 +258,7 @@ public abstract class WriteFlusher */ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException { - callback = Objects.requireNonNull(callback); + Objects.requireNonNull(callback); if (isFailed()) { @@ -523,12 +523,22 @@ public abstract class WriteFlusher boolean isFailed() { - return _state.get().getType() == StateType.FAILED; + return isState(StateType.FAILED); } boolean isIdle() { - return _state.get().getType() == StateType.IDLE; + return isState(StateType.IDLE); + } + + public boolean isPending() + { + return isState(StateType.PENDING); + } + + private boolean isState(StateType type) + { + return _state.get().getType() == type; } public String toStateString()