From 0420e926a1fbfd04bdff99ec80acba95a09d8977 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 27 Aug 2024 11:03:48 +0300 Subject: [PATCH] Fixes #12171 - QoSHandler does not resume on a virtual thread. (#12174) Now QoSHandler resumes requests using Request.getComponents().getExecutor(). This Executor is configured to be the virtual thread executor, if present, otherwise the Server Executor. Removed warn() from VirtualThreads.isVirtualThread(), as it was too verbose. Signed-off-by: Simone Bordet --- .../org/eclipse/jetty/server/Components.java | 33 ++++++-- .../jetty/server/handler/QoSHandler.java | 9 ++- .../server/internal/HttpChannelState.java | 54 ++++++++++++- .../org/eclipse/jetty/server/ServerTest.java | 2 +- .../jetty/server/handler/QoSHandlerTest.java | 77 ++++++++++++++++++- .../eclipse/jetty/util/VirtualThreads.java | 1 - 6 files changed, 164 insertions(+), 12 deletions(-) diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Components.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Components.java index b6e32ea937f..7151c38fdbc 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Components.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Components.java @@ -13,34 +13,55 @@ package org.eclipse.jetty.server; +import java.util.concurrent.Executor; + import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.Attributes; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.ThreadPool; /** - * Common components made available via a {@link Request} + * Common components made available via a {@link Request}. */ public interface Components { + /** + * @return the {@link ByteBufferPool} associated with the {@link Request} + */ ByteBufferPool getByteBufferPool(); + /** + * @return the {@link Scheduler} associated with the {@link Request} + */ Scheduler getScheduler(); + /** + * @return the {@link ThreadPool} associated with the {@link Request} + * @deprecated use {@link #getExecutor()} instead + */ + @Deprecated(since = "12.0.13", forRemoval = true) ThreadPool getThreadPool(); /** - * A Map which can be used as a cache for object (e.g. Cookie cache). - * The cache will have a life cycle limited by the connection, i.e. no cache map will live + * @return the {@link Executor} associated with the {@link Request} + */ + default Executor getExecutor() + { + return getThreadPool(); + } + + /** + *

A map-like object that can be used as a cache (for example, as a cookie cache).

+ *

The cache will have a life cycle limited by the connection, i.e. no cache map will live * longer that the connection associated with it. However, a cache may have a shorter life * than a connection (e.g. it may be discarded for implementation reasons). A cache map is * guaranteed to be given to only a single request concurrently (scoped by * {@link org.eclipse.jetty.server.internal.HttpChannelState}), so objects saved there do not * need to be made safe from access by simultaneous request. - * If the connection is known to be none-persistent then the cache may be a noop - * cache and discard all items set on it. + * If the connection is known to be non-persistent then the cache may be a noop + * cache and discard all items set on it.

* - * @return A Map, which may be an empty map that discards all items. + * @return A map-like object, which may be an empty implementation that discards all items. */ Attributes getCache(); } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java index 69722c13d40..9fef7b060d7 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/QoSHandler.java @@ -397,13 +397,18 @@ public class QoSHandler extends ConditionalHandler.Abstract if (LOG.isDebugEnabled()) LOG.debug("{} resuming {}", this, entry.request); // Always dispatch to avoid StackOverflowError. - getServer().getThreadPool().execute(entry); + execute(entry.request, entry); return true; } } return false; } + private void execute(Request request, Runnable task) + { + request.getComponents().getExecutor().execute(task); + } + private class Entry implements CyclicTimeouts.Expirable, Runnable { private final Request request; @@ -458,7 +463,7 @@ public class QoSHandler extends ConditionalHandler.Abstract } if (removed) - failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException()); + execute(request, () -> failSuspended(request, response, callback, HttpStatus.SERVICE_UNAVAILABLE_503, new TimeoutException())); } @Override diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index af8ca07824e..f3455d38095 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.LongAdder; import java.util.function.Consumer; @@ -63,6 +64,7 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.NanoTime; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Scheduler; @@ -231,7 +233,18 @@ public class HttpChannelState implements HttpChannel, Components @Override public ThreadPool getThreadPool() { - return getServer().getThreadPool(); + Executor executor = getExecutor(); + if (executor instanceof ThreadPool threadPool) + return threadPool; + return new ThreadPoolWrapper(executor); + } + + @Override + public Executor getExecutor() + { + Executor executor = getServer().getThreadPool(); + Executor virtualExecutor = VirtualThreads.getVirtualThreadsExecutor(executor); + return virtualExecutor != null ? virtualExecutor : executor; } @Override @@ -1948,4 +1961,43 @@ public class HttpChannelState implements HttpChannel, Components throw t; } } + + private static class ThreadPoolWrapper implements ThreadPool + { + private final Executor _executor; + + private ThreadPoolWrapper(Executor executor) + { + _executor = executor; + } + + @Override + public void execute(Runnable command) + { + _executor.execute(command); + } + + @Override + public void join() + { + } + + @Override + public int getThreads() + { + return 0; + } + + @Override + public int getIdleThreads() + { + return 0; + } + + @Override + public boolean isLowOnThreads() + { + return false; + } + } } diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ServerTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ServerTest.java index dcca3eb0375..f3b7724420d 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ServerTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/ServerTest.java @@ -91,7 +91,7 @@ public class ServerTest { Runnable after = _afterHandle.getAndSet(null); if (after != null) - getThreadPool().execute(after); + getExecutor().execute(after); } }; } diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java index 88544658be6..1083dbe6d91 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/QoSHandlerTest.java @@ -13,6 +13,9 @@ package org.eclipse.jetty.server.handler; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -28,10 +31,15 @@ import org.eclipse.jetty.server.LocalConnector; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledForJreRange; +import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -50,7 +58,8 @@ public class QoSHandlerTest private void start(QoSHandler qosHandler) throws Exception { - server = new Server(); + if (server == null) + server = new Server(); connector = new LocalConnector(server); server.addConnector(connector); server.setHandler(qosHandler); @@ -483,4 +492,70 @@ public class QoSHandlerTest } }); } + + @Test + @DisabledForJreRange(max = JRE.JAVA_20) + public void testRequestInVirtualThreadIsResumedInVirtualThread() throws Exception + { + QoSHandler qosHandler = new QoSHandler(); + qosHandler.setMaxRequestCount(1); + List callbacks = new ArrayList<>(); + qosHandler.setHandler(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + response.setStatus(VirtualThreads.isVirtualThread() ? HttpStatus.OK_200 : HttpStatus.NOT_ACCEPTABLE_406); + // Save the callback but do not succeed it yet. + callbacks.add(callback); + return true; + } + }); + QueuedThreadPool serverThreads = new QueuedThreadPool(); + serverThreads.setName("st"); + serverThreads.setVirtualThreadsExecutor(VirtualThreads.getNamedVirtualThreadsExecutor("vst")); + server = new Server(serverThreads); + ServerConnector networkConnector = new ServerConnector(server, 1, 1); + server.addConnector(networkConnector); + start(qosHandler); + + // Send the first request that will not be completed yet. + try (SocketChannel client1 = SocketChannel.open(new InetSocketAddress("localhost", networkConnector.getLocalPort()))) + { + client1.write(StandardCharsets.UTF_8.encode(""" + GET /first HTTP/1.1 + Host: localhost + + """)); + // Wait that the request arrives at the server. + await().atMost(5, TimeUnit.SECONDS).until(callbacks::size, is(1)); + + // Send the second request, it should be suspended by QoSHandler. + try (SocketChannel client2 = SocketChannel.open(new InetSocketAddress("localhost", networkConnector.getLocalPort()))) + { + client2.write(StandardCharsets.UTF_8.encode(""" + GET /second HTTP/1.1 + Host: localhost + + """)); + // Wait for the second request to be suspended. + await().atMost(5, TimeUnit.SECONDS).until(qosHandler::getSuspendedRequestCount, is(1)); + + // Finish the first request, so that the second can be resumed. + callbacks.remove(0).succeeded(); + client1.socket().setSoTimeout(5000); + HttpTester.Response response1 = HttpTester.parseResponse(client1); + assertEquals(HttpStatus.OK_200, response1.getStatus()); + + // Wait for the second request to arrive to the server. + await().atMost(5, TimeUnit.SECONDS).until(callbacks::size, is(1)); + + // Finish the second request. + callbacks.remove(0).succeeded(); + client2.socket().setSoTimeout(5000); + HttpTester.Response response2 = HttpTester.parseResponse(client2); + assertEquals(HttpStatus.OK_200, response2.getStatus()); + } + } + } } diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java index 08ffd1f000a..a3bddae9b41 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java @@ -101,7 +101,6 @@ public class VirtualThreads } catch (Throwable x) { - warn(); return false; } }