Utility class to use to query the runtime for virtual thread support, + * and, if virtual threads are supported, to start virtual threads.
+ * + * @see #areSupported() + * @see #startVirtualThread(Runnable) + * @see #isVirtualThread() + */ +public class VirtualThreads +{ + private static final Logger LOG = LoggerFactory.getLogger(VirtualThreads.class); + private static final Method startVirtualThread = probeStartVirtualThread(); + private static final Method isVirtualThread = probeIsVirtualThread(); + + private static Method probeStartVirtualThread() + { + try + { + return Thread.class.getMethod("startVirtualThread", Runnable.class); + } + catch (Throwable x) + { + return null; + } + } + + private static Method probeIsVirtualThread() + { + try + { + return Thread.class.getMethod("isVirtual"); + } + catch (Throwable x) + { + return null; + } + } + + private static void warn() + { + LOG.warn("Virtual thread support is not available (or not enabled via --enable-preview) in the current Java runtime ({})", System.getProperty("java.version")); + } + + /** + * @return whether the runtime supports virtual threads + */ + public static boolean areSupported() + { + return startVirtualThread != null; + } + + /** + *Starts a virtual thread to execute the given task, or throws + * {@link UnsupportedOperationException} if virtual threads are not + * supported.
+ * + * @param task the task to execute in a virtual thread + * @see #areSupported() + */ + public static void startVirtualThread(Runnable task) + { + try + { + if (LOG.isDebugEnabled()) + LOG.debug("Starting in virtual thread: {}", task); + startVirtualThread.invoke(null, task); + } + catch (Throwable x) + { + warn(); + throw new UnsupportedOperationException(x); + } + } + + /** + * @return whether the current thread is a virtual thread + */ + public static boolean isVirtualThread() + { + try + { + return (Boolean)isVirtualThread.invoke(Thread.currentThread()); + } + catch (Throwable x) + { + warn(); + return false; + } + } + + /** + *Tests whether the given executor implements {@link Configurable} and + * it has been configured to use virtual threads.
+ * + * @param executor the Executor to test + * @return whether the given executor implements {@link Configurable} + * and it has been configured to use virtual threads + */ + public static boolean isUseVirtualThreads(Executor executor) + { + if (executor instanceof Configurable) + return ((Configurable)executor).isUseVirtualThreads(); + return false; + } + + /** + *Implementations of this interface can be configured to use virtual threads.
+ *Whether virtual threads are actually used depends on whether the runtime + * supports virtual threads and, if the runtime supports them, whether they are + * configured to be used via {@link #setUseVirtualThreads(boolean)}.
+ */ + public interface Configurable + { + /** + * @return whether to use virtual threads + */ + default boolean isUseVirtualThreads() + { + return false; + } + + /** + * @param useVirtualThreads whether to use virtual threads + * @throws UnsupportedOperationException if the runtime does not support virtual threads + * @see #areSupported() + */ + default void setUseVirtualThreads(boolean useVirtualThreads) + { + if (useVirtualThreads && !VirtualThreads.areSupported()) + { + warn(); + throw new UnsupportedOperationException(); + } + } + } + + private VirtualThreads() + { + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java index 0d8b3c31eea..0019c7c7359 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.eclipse.jetty.util.ProcessorUtils; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.ContainerLifeCycle; @@ -34,7 +35,7 @@ import org.eclipse.jetty.util.component.DumpableCollection; * A {@link org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool} wrapper around {@link ThreadPoolExecutor}. */ @ManagedObject("A thread pool") -public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool.SizedThreadPool, TryExecutor +public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool.SizedThreadPool, TryExecutor, VirtualThreads.Configurable { private final ThreadPoolExecutor _executor; private final ThreadPoolBudget _budget; @@ -46,6 +47,7 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool private int _priority = Thread.NORM_PRIORITY; private boolean _daemon; private boolean _detailedDump; + private boolean _useVirtualThreads; public ExecutorThreadPool() { @@ -268,6 +270,25 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool return getThreads() == getMaxThreads() && _executor.getQueue().size() >= getIdleThreads(); } + @Override + public boolean isUseVirtualThreads() + { + return _useVirtualThreads; + } + + @Override + public void setUseVirtualThreads(boolean useVirtualThreads) + { + try + { + VirtualThreads.Configurable.super.setUseVirtualThreads(useVirtualThreads); + _useVirtualThreads = useVirtualThreads; + } + catch (UnsupportedOperationException ignored) + { + } + } + @Override protected void doStart() throws Exception { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 8033d6cd035..a1e9f221f15 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.util.AtomicBiInteger; import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedOperation; @@ -74,7 +75,7 @@ import org.slf4j.LoggerFactory; * */ @ManagedObject("A thread pool") -public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor +public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor, VirtualThreads.Configurable { private static final Logger LOG = LoggerFactory.getLogger(QueuedThreadPool.class); private static final Runnable NOOP = () -> @@ -109,6 +110,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor private int _lowThreadsThreshold = 1; private ThreadPoolBudget _budget; private long _stopTimeout; + private boolean _useVirtualThreads; public QueuedThreadPool() { @@ -511,6 +513,25 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor _lowThreadsThreshold = lowThreadsThreshold; } + @Override + public boolean isUseVirtualThreads() + { + return _useVirtualThreads; + } + + @Override + public void setUseVirtualThreads(boolean useVirtualThreads) + { + try + { + VirtualThreads.Configurable.super.setUseVirtualThreads(useVirtualThreads); + _useVirtualThreads = useVirtualThreads; + } + catch (UnsupportedOperationException ignored) + { + } + } + /** * @return the number of jobs in the queue waiting for a thread */ diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index 1e0e8377c43..21f21a32e20 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -26,6 +26,7 @@ import java.util.stream.Collectors; import org.eclipse.jetty.util.AtomicBiInteger; import org.eclipse.jetty.util.ProcessorUtils; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.AbstractLifeCycle; @@ -103,6 +104,8 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec { if (capacity >= 0) return capacity; + if (VirtualThreads.isUseVirtualThreads(executor)) + return 0; int cpus = ProcessorUtils.availableProcessors(); if (executor instanceof ThreadPool.SizedThreadPool) { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java index 02db9f14005..ecf261f675f 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -21,6 +21,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.LongAdder; import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedOperation; @@ -136,11 +137,12 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe private final Executor _executor; private final TryExecutor _tryExecutor; private final Runnable _runPendingProducer = () -> tryProduce(true); + private boolean _useVirtualThreads; private State _state = State.IDLE; private boolean _pending; /** - * @param producer The produce of tasks to be consumed. + * @param producer The producer of tasks to be consumed. * @param executor The executor to be used for executing producers or consumers, depending on the sub-strategy. */ public AdaptiveExecutionStrategy(Producer producer, Executor executor) @@ -154,6 +156,13 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe LOG.debug("{} created", this); } + @Override + protected void doStart() throws Exception + { + super.doStart(); + _useVirtualThreads = VirtualThreads.isUseVirtualThreads(_executor); + } + @Override public void dispatch() { @@ -462,7 +471,10 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe { try { - _executor.execute(task); + if (isUseVirtualThreads()) + VirtualThreads.startVirtualThread(task); + else + _executor.execute(task); } catch (RejectedExecutionException e) { @@ -476,6 +488,12 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe } } + @ManagedAttribute(value = "whether this execution strategy uses virtual threads", readonly = true) + public boolean isUseVirtualThreads() + { + return _useVirtualThreads; + } + @ManagedAttribute(value = "number of tasks consumed with PC mode", readonly = true) public long getPCTasksConsumed() { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java index 3d5580be467..649458f7bdc 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java @@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory; */ public class ProduceConsume implements ExecutionStrategy, Runnable { - private static final Logger LOG = LoggerFactory.getLogger(ExecuteProduceConsume.class); + private static final Logger LOG = LoggerFactory.getLogger(ProduceConsume.class); private final AutoLock _lock = new AutoLock(); private final Producer _producer; diff --git a/tests/test-http-client-transport/pom.xml b/tests/test-http-client-transport/pom.xml index a95887aaa2f..66032985893 100644 --- a/tests/test-http-client-transport/pom.xml +++ b/tests/test-http-client-transport/pom.xml @@ -35,9 +35,9 @@