diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/BlockedWritesWithSmallThreadPoolTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/BlockedWritesWithSmallThreadPoolTest.java index 2fae29f7828..cd55d1a2fa7 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/BlockedWritesWithSmallThreadPoolTest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/BlockedWritesWithSmallThreadPoolTest.java @@ -33,6 +33,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; import org.eclipse.jetty.io.AbstractEndPoint; +import org.eclipse.jetty.io.Content; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.Request; @@ -45,8 +46,6 @@ import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import static java.util.concurrent.TimeUnit.SECONDS; @@ -55,7 +54,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -@Disabled // TODO fix this public class BlockedWritesWithSmallThreadPoolTest { private Server server; @@ -102,7 +100,6 @@ public class BlockedWritesWithSmallThreadPoolTest } @Test - @Tag("flaky") public void testServerThreadsBlockedInWrites() throws Exception { int contentLength = 16 * 1024 * 1024; @@ -110,11 +107,12 @@ public class BlockedWritesWithSmallThreadPoolTest start(new Handler.Abstract() { @Override - public boolean handle(Request request, Response response, Callback callback) + public boolean handle(Request request, Response response, Callback callback) throws Exception { serverEndPointRef.compareAndSet(null, (AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint()); - // Write a large content to cause TCP congestion. - response.write(true, ByteBuffer.wrap(new byte[contentLength]), callback); + // Blocking write a large content to cause TCP congestion. + Content.Sink.write(response, true, ByteBuffer.wrap(new byte[contentLength])); + callback.succeeded(); return true; } }); @@ -140,21 +138,20 @@ public class BlockedWritesWithSmallThreadPoolTest @Override public void onDataAvailable(Stream stream) { - Stream.Data data = stream.readData(); try { // Block here to stop reading from the network // to cause the server to TCP congest. clientBlockLatch.await(5, SECONDS); + Stream.Data data = stream.readData(); data.release(); if (data.frame().isEndStream()) clientDataLatch.countDown(); else stream.demand(); } - catch (InterruptedException x) + catch (InterruptedException ignored) { - data.release(); } } }); @@ -174,18 +171,139 @@ public class BlockedWritesWithSmallThreadPoolTest await().atMost(5, SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1); } // Use the reserved thread for a blocking operation, simulating another blocking write. + long delaySeconds = 10; CountDownLatch serverBlockLatch = new CountDownLatch(1); - assertTrue(serverThreads.tryExecute(() -> await().atMost(20, SECONDS).until(() -> serverBlockLatch.await(15, SECONDS), b -> true))); + assertTrue(serverThreads.tryExecute(() -> + { + try + { + serverBlockLatch.await(2 * delaySeconds, SECONDS); + } + catch (InterruptedException ignored) + { + } + })); + // No more threads are available on the server. assertEquals(0, serverThreads.getReadyThreads()); // Unblock the client to read from the network, which should unblock the server write(). clientBlockLatch.countDown(); - assertTrue(clientDataLatch.await(10, SECONDS), server.dump()); + assertTrue(clientDataLatch.await(delaySeconds, SECONDS), server.dump()); + + // Unblock blocked threads. serverBlockLatch.countDown(); } + @Test + public void testServerThreadsInPendingWrites() throws Exception + { + int contentLength = 16 * 1024 * 1024; + AtomicReference serverEndPointRef = new AtomicReference<>(); + start(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + serverEndPointRef.set((AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint()); + // Large write that will TCP congest, but it is non-blocking. + response.write(true, ByteBuffer.allocate(contentLength), callback); + return true; + } + }); + + client = new HTTP2Client(); + // Set large flow control windows so the server hits TCP congestion. + int window = 2 * contentLength; + client.setInitialSessionRecvWindow(window); + client.setInitialStreamRecvWindow(window); + client.start(); + + CountDownLatch clientBlockLatch = new CountDownLatch(1); + CountDownLatch clientDataLatch = new CountDownLatch(1); + Session session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {}) + .get(5, SECONDS); + HttpURI uri = HttpURI.build("http://localhost:" + connector.getLocalPort() + "/congest"); + MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, HttpFields.EMPTY); + session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + try + { + // Block here to stop reading from the network + // to cause the server to TCP congest. + clientBlockLatch.await(5, SECONDS); + Stream.Data data = stream.readData(); + data.release(); + if (data.frame().isEndStream()) + clientDataLatch.countDown(); + else + stream.demand(); + } + catch (InterruptedException ignored) + { + } + } + }); + + await().atMost(5, SECONDS).until(() -> + { + AbstractEndPoint serverEndPoint = serverEndPointRef.get(); + return serverEndPoint != null && serverEndPoint.getWriteFlusher().isPending(); + }); + // Wait for NIO on the server to be OP_WRITE interested. + Thread.sleep(1000); + + // Handler.handle() should have returned, make sure we block that thread. + long delaySeconds = 10; + await().atMost(5, SECONDS).until(() -> serverThreads.getIdleThreads() == 1); + CountDownLatch serverBlockLatch = new CountDownLatch(1); + serverThreads.execute(() -> + { + try + { + serverBlockLatch.await(2 * delaySeconds, SECONDS); + } + catch (InterruptedException ignored) + { + } + }); + + // Make sure there is a reserved thread. + if (serverThreads.getAvailableReservedThreads() != 1) + { + assertFalse(serverThreads.tryExecute(() -> {})); + await().atMost(5, SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1); + } + // Use the reserved thread for a blocking operation, simulating another blocking write. + CountDownLatch reservedBlockLatch = new CountDownLatch(1); + assertTrue(serverThreads.tryExecute(() -> + { + try + { + reservedBlockLatch.await(2 * delaySeconds, SECONDS); + } + catch (InterruptedException ignored) + { + } + })); + + // No more threads are available on the server. + assertEquals(0, serverThreads.getReadyThreads()); + + // Unblock the client to read from the network, which must unblock the server write() and send a response. + clientBlockLatch.countDown(); + + assertTrue(clientDataLatch.await(delaySeconds, SECONDS), server.dump()); + + // Unblock blocked threads. + serverBlockLatch.countDown(); + reservedBlockLatch.countDown(); + } + @Test public void testClientThreadsBlockedInWrite() throws Exception { @@ -202,12 +320,12 @@ public class BlockedWritesWithSmallThreadPoolTest @Override public void onDataAvailable(Stream stream) { - Stream.Data data = stream.readData(); try { // Block here to stop reading from the network // to cause the client to TCP congest. serverBlockLatch.await(5, SECONDS); + Stream.Data data = stream.readData(); data.release(); if (data.frame().isEndStream()) { @@ -219,9 +337,8 @@ public class BlockedWritesWithSmallThreadPoolTest stream.demand(); } } - catch (InterruptedException x) + catch (InterruptedException ignored) { - data.release(); } } }; @@ -279,14 +396,27 @@ public class BlockedWritesWithSmallThreadPoolTest await().atMost(5, SECONDS).until(() -> clientThreads.getAvailableReservedThreads() == 1); } // Use the reserved thread for a blocking operation, simulating another blocking write. - assertTrue(clientThreads.tryExecute(() -> await().until(() -> clientBlockLatch.await(15, SECONDS), b -> true))); + long delaySeconds = 10; + assertTrue(clientThreads.tryExecute(() -> + { + try + { + clientBlockLatch.await(2 * delaySeconds, SECONDS); + } + catch (InterruptedException ignored) + { + } + })); + // No more threads are available on the client. await().atMost(5, SECONDS).until(() -> clientThreads.getReadyThreads() == 0); // Unblock the server to read from the network, which should unblock the client. serverBlockLatch.countDown(); - assertTrue(latch.await(10, SECONDS), client.dump()); + assertTrue(latch.await(delaySeconds, SECONDS), client.dump()); + + // Unblock blocked threads. clientBlockLatch.countDown(); } } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/HttpStream.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/HttpStream.java index 93b86096ffb..c6b488107a7 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/HttpStream.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/HttpStream.java @@ -135,6 +135,12 @@ public interface HttpStream extends Callback return CONTENT_NOT_CONSUMED; } + @Override + default InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } + class Wrapper implements HttpStream { private final HttpStream _wrapped; diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index 5aa3287355e..af8ca07824e 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -1636,7 +1636,6 @@ public class HttpChannelState implements HttpChannel, Components @Override public InvocationType getInvocationType() { - // TODO review this as it is probably not correct return _request.getHttpStream().getInvocationType(); } } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index ca6825d9d87..6abca784839 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -1623,12 +1623,6 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab { getEndPoint().close(failure); } - - @Override - public InvocationType getInvocationType() - { - return HttpStream.super.getInvocationType(); - } } private class TunnelSupportOverHTTP1 implements TunnelSupport diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java index 650672d99ea..f28a22f6f2d 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java @@ -102,10 +102,10 @@ public interface Callback extends Invocable * with the given {@code blocking} characteristic.

* * @param completable the CompletableFuture to convert into a callback - * @param invocation whether the callback is blocking + * @param invocationType whether the callback is blocking * @return a callback that when completed, completes the given CompletableFuture */ - static Callback from(CompletableFuture completable, InvocationType invocation) + static Callback from(CompletableFuture completable, InvocationType invocationType) { if (completable instanceof Callback) return (Callback)completable; @@ -135,7 +135,7 @@ public interface Callback extends Invocable @Override public InvocationType getInvocationType() { - return invocation; + return invocationType; } }; } @@ -290,6 +290,12 @@ public interface Callback extends Invocable { Callback.failed(callback::failed, completed, x); } + + @Override + public InvocationType getInvocationType() + { + return callback.getInvocationType(); + } }; } @@ -320,15 +326,21 @@ public interface Callback extends Invocable } } + @Override + public void failed(Throwable x) + { + Callback.failed(this::completed, callback::failed, x); + } + private void completed(Throwable ignored) { completed.run(); } @Override - public void failed(Throwable x) + public InvocationType getInvocationType() { - Callback.failed(this::completed, callback::failed, x); + return callback.getInvocationType(); } }; } @@ -357,6 +369,12 @@ public interface Callback extends Invocable ExceptionUtil.addSuppressedIfNotAssociated(cause, x); Callback.failed(callback, cause); } + + @Override + public InvocationType getInvocationType() + { + return callback.getInvocationType(); + } }; }