Fixes #8863 - Provide a possibility to name virtual threads (#8903)

* Fixes #8863 - Provide a possibility to name virtual threads

Reworked the VirtualThreads APIs to be based on `Executor` rather than just `boolean`.
Introduced Jetty module `threadpool-virtual-preview`.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-11-21 15:39:54 +01:00 committed by GitHub
parent 88ea1e81c4
commit 83154b4ffe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 230 additions and 44 deletions

View File

@ -63,16 +63,16 @@ public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPo
}
@Override
public boolean isUseVirtualThreads()
public Executor getVirtualThreadsExecutor()
{
return VirtualThreads.isUseVirtualThreads(_executor);
return VirtualThreads.getVirtualThreadsExecutor(_executor);
}
@Override
public void setUseVirtualThreads(boolean useVirtualThreads)
public void setVirtualThreadsExecutor(Executor executor)
{
if (_executor instanceof VirtualThreads.Configurable)
((VirtualThreads.Configurable)_executor).setUseVirtualThreads(useVirtualThreads);
((VirtualThreads.Configurable)_executor).setVirtualThreadsExecutor(executor);
}
@Override

View File

@ -0,0 +1,45 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "https://www.eclipse.org/jetty/configure_10_0.dtd">
<Configure>
<New id="threadPool" class="org.eclipse.jetty.util.thread.QueuedThreadPool">
<Set name="name" property="jetty.threadPool.namePrefix" />
<Set name="minThreads" type="int"><Property name="jetty.threadPool.minThreads" deprecated="threads.min" default="10"/></Set>
<Set name="maxThreads" type="int"><Property name="jetty.threadPool.maxThreads" deprecated="threads.max" default="200"/></Set>
<Set name="reservedThreads" type="int"><Property name="jetty.threadPool.reservedThreads" default="-1"/></Set>
<Set name="idleTimeout" type="int"><Property name="jetty.threadPool.idleTimeout" deprecated="threads.timeout" default="60000"/></Set>
<Set name="detailedDump" type="boolean"><Property name="jetty.threadPool.detailedDump" default="false"/></Set>
<Get id="namePrefix" name="name" />
<Call class="java.lang.Thread" name="ofVirtual">
<Call class="java.lang.Thread$Builder" name="name">
<Arg>
<Property name="jetty.threadPool.virtual.namePrefix">
<Default><Ref refid="namePrefix" />-virtual-</Default>
</Property>
</Arg>
<Arg type="long">0</Arg>
<Call class="java.lang.Thread$Builder" name="allowSetThreadLocals">
<Arg type="boolean"><Property name="jetty.threadPool.virtual.allowSetThreadLocals" default="true" /></Arg>
<Call class="java.lang.Thread$Builder" name="inheritInheritableThreadLocals">
<Arg type="boolean"><Property name="jetty.threadPool.virtual.inheritInheritableThreadLocals" default="false" /></Arg>
<Call id="virtualThreadFactory" class="java.lang.Thread$Builder" name="factory" />
</Call>
</Call>
</Call>
</Call>
<Call name="setVirtualThreadsExecutor">
<Arg>
<Call class="java.util.concurrent.Executors" name="newThreadPerTaskExecutor">
<Arg><Ref refid="virtualThreadFactory" /></Arg>
</Call>
</Arg>
</Call>
</New>
<Call class="org.slf4j.LoggerFactory" name="getLogger">
<Arg>org.eclipse.jetty</Arg>
<Call name="warn">
<Arg>Virtual threads are a Java Preview Feature, support may be limited.</Arg>
</Call>
</Call>
</Configure>

View File

@ -20,10 +20,11 @@
<!-- for all configuration that may be set here. -->
<!-- =========================================================== -->
<New id="threadPool" class="org.eclipse.jetty.util.thread.QueuedThreadPool">
<Set name="name" property="jetty.threadPool.namePrefix" />
<Set name="minThreads" type="int"><Property name="jetty.threadPool.minThreads" deprecated="threads.min" default="10"/></Set>
<Set name="maxThreads" type="int"><Property name="jetty.threadPool.maxThreads" deprecated="threads.max" default="200"/></Set>
<Set name="reservedThreads" type="int"><Property name="jetty.threadPool.reservedThreads" default="-1"/></Set>
<Set name="useVirtualThreads" type="boolean"><Property name="jetty.threadPool.useVirtualThreads" default="false"/></Set>
<Set name="useVirtualThreads" type="boolean"><Property deprecated="jetty.threadPool.useVirtualThreads" default="false"/></Set>
<Set name="idleTimeout" type="int"><Property name="jetty.threadPool.idleTimeout" deprecated="threads.timeout" default="60000"/></Set>
<Set name="detailedDump" type="boolean"><Property name="jetty.threadPool.detailedDump" default="false"/></Set>
</New>

View File

@ -0,0 +1,42 @@
[description]
Enables and configures the Server ThreadPool with support for virtual threads.
[exec]
--enable-preview
[depends]
logging
[provides]
threadpool
[xml]
etc/jetty-threadpool-virtual-preview.xml
[ini-template]
## Platform threads name prefix.
#jetty.threadPool.namePrefix=qtp<hashCode>
## Minimum number of pooled threads.
#jetty.threadPool.minThreads=10
## Maximum number of pooled threads.
#jetty.threadPool.maxThreads=200
## Number of reserved threads (-1 for heuristic).
#jetty.threadPool.reservedThreads=-1
## Thread idle timeout (in milliseconds).
#jetty.threadPool.idleTimeout=60000
## Whether to output a detailed dump.
#jetty.threadPool.detailedDump=false
## Virtual threads name prefix.
#jetty.threadPool.virtual.namePrefix=qtp<hashCode>-virtual-
## Whether virtual threads are allowed to set thread locals.
#jetty.threadPool.virtual.allowSetThreadLocals=true
## Whether virtual threads inherits the values of inheritable thread locals.
#jetty.threadPool.virtual.allowSetThreadLocals=true

View File

@ -4,10 +4,16 @@ Enables and configures the Server ThreadPool.
[depends]
logging
[provides]
threadpool|default
[xml]
etc/jetty-threadpool.xml
[ini-template]
## Thread name prefix.
#jetty.threadPool.namePrefix=qtp<hashCode>
## Minimum number of pooled threads.
#jetty.threadPool.minThreads=10
@ -18,6 +24,7 @@ etc/jetty-threadpool.xml
#jetty.threadPool.reservedThreads=-1
## Whether to use virtual threads, if the runtime supports them.
## Deprecated, use Jetty module 'threadpool-virtual-preview' instead.
#jetty.threadPool.useVirtualThreads=false
## Thread idle timeout (in milliseconds).

View File

@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
* and, if virtual threads are supported, to start virtual threads.</p>
*
* @see #areSupported()
* @see #executeOnVirtualThread(Runnable)
* @see #getVirtualThreadsExecutor(Executor)
* @see #isVirtualThread()
*/
public class VirtualThreads
@ -58,6 +58,11 @@ public class VirtualThreads
}
}
private static Method getIsVirtualThreadMethod()
{
return isVirtualThread;
}
private static void warn()
{
LOG.warn("Virtual thread support is not available (or not enabled via --enable-preview) in the current Java runtime ({})", System.getProperty("java.version"));
@ -78,14 +83,16 @@ public class VirtualThreads
*
* @param task the task to execute in a virtual thread
* @see #areSupported()
* @deprecated use {@link #getVirtualThreadsExecutor(Executor)} instead
*/
@Deprecated(forRemoval = true)
public static void executeOnVirtualThread(Runnable task)
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("Starting in virtual thread: {}", task);
executor.execute(task);
getDefaultVirtualThreadsExecutor().execute(task);
}
catch (Throwable x)
{
@ -101,7 +108,7 @@ public class VirtualThreads
{
try
{
return (Boolean)isVirtualThread.invoke(Thread.currentThread());
return (Boolean)getIsVirtualThreadMethod().invoke(Thread.currentThread());
}
catch (Throwable x)
{
@ -110,6 +117,25 @@ public class VirtualThreads
}
}
/**
* @return a default virtual thread per task {@code Executor}
*/
public static Executor getDefaultVirtualThreadsExecutor()
{
return executor;
}
/**
* @param executor the {@code Executor} to obtain a virtual threads {@code Executor} from
* @return a virtual threads {@code Executor} obtained from the given {@code Executor}
*/
public static Executor getVirtualThreadsExecutor(Executor executor)
{
if (executor instanceof Configurable)
return ((Configurable)executor).getVirtualThreadsExecutor();
return null;
}
/**
* <p>Tests whether the given executor implements {@link Configurable} and
* it has been configured to use virtual threads.</p>
@ -121,7 +147,7 @@ public class VirtualThreads
public static boolean isUseVirtualThreads(Executor executor)
{
if (executor instanceof Configurable)
return ((Configurable)executor).isUseVirtualThreads();
return ((Configurable)executor).getVirtualThreadsExecutor() != null;
return false;
}
@ -129,30 +155,53 @@ public class VirtualThreads
* <p>Implementations of this interface can be configured to use virtual threads.</p>
* <p>Whether virtual threads are actually used depends on whether the runtime
* supports virtual threads and, if the runtime supports them, whether they are
* configured to be used via {@link #setUseVirtualThreads(boolean)}.</p>
* configured to be used via {@link #setVirtualThreadsExecutor(Executor)}.</p>
*/
public interface Configurable
{
/**
* @return whether to use virtual threads
* @return the {@code Executor} to use to execute tasks in virtual threads
*/
default Executor getVirtualThreadsExecutor()
{
return null;
}
/**
*
* @param executor the {@code Executor} to use to execute tasks in virtual threads
* @throws UnsupportedOperationException if the runtime does not support virtual threads
* @see #areSupported()
*/
default void setVirtualThreadsExecutor(Executor executor)
{
if (executor != null && !VirtualThreads.areSupported())
{
warn();
throw new UnsupportedOperationException();
}
}
/**
* @return whether to use virtual threads
* @deprecated use {@link #getVirtualThreadsExecutor()} instead
*/
@Deprecated(forRemoval = true)
default boolean isUseVirtualThreads()
{
return false;
return getVirtualThreadsExecutor() != null;
}
/**
* @param useVirtualThreads whether to use virtual threads
* @throws UnsupportedOperationException if the runtime does not support virtual threads
* @see #areSupported()
* @deprecated use {@link #setVirtualThreadsExecutor(Executor)} instead
*/
@Deprecated(forRemoval = true)
default void setUseVirtualThreads(boolean useVirtualThreads)
{
if (useVirtualThreads && !VirtualThreads.areSupported())
{
warn();
throw new UnsupportedOperationException();
}
setVirtualThreadsExecutor(useVirtualThreads ? executor : null);
}
}

View File

@ -18,6 +18,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -47,7 +48,7 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool
private int _priority = Thread.NORM_PRIORITY;
private boolean _daemon;
private boolean _detailedDump;
private boolean _useVirtualThreads;
private Executor _virtualThreadsExecutor;
public ExecutorThreadPool()
{
@ -271,18 +272,18 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool
}
@Override
public boolean isUseVirtualThreads()
public Executor getVirtualThreadsExecutor()
{
return _useVirtualThreads;
return _virtualThreadsExecutor;
}
@Override
public void setUseVirtualThreads(boolean useVirtualThreads)
public void setVirtualThreadsExecutor(Executor executor)
{
try
{
VirtualThreads.Configurable.super.setUseVirtualThreads(useVirtualThreads);
_useVirtualThreads = useVirtualThreads;
VirtualThreads.Configurable.super.setVirtualThreadsExecutor(executor);
_virtualThreadsExecutor = executor;
}
catch (UnsupportedOperationException ignored)
{

View File

@ -20,6 +20,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -111,7 +112,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
private int _lowThreadsThreshold = 1;
private ThreadPoolBudget _budget;
private long _stopTimeout;
private boolean _useVirtualThreads;
private Executor _virtualThreadsExecutor;
public QueuedThreadPool()
{
@ -515,18 +516,18 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
}
@Override
public boolean isUseVirtualThreads()
public Executor getVirtualThreadsExecutor()
{
return _useVirtualThreads;
return _virtualThreadsExecutor;
}
@Override
public void setUseVirtualThreads(boolean useVirtualThreads)
public void setVirtualThreadsExecutor(Executor executor)
{
try
{
VirtualThreads.Configurable.super.setUseVirtualThreads(useVirtualThreads);
_useVirtualThreads = useVirtualThreads;
VirtualThreads.Configurable.super.setVirtualThreadsExecutor(executor);
_virtualThreadsExecutor = executor;
}
catch (UnsupportedOperationException ignored)
{

View File

@ -136,8 +136,8 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
private final Producer _producer;
private final Executor _executor;
private final TryExecutor _tryExecutor;
private final Executor _virtualExecutor;
private final Runnable _runPendingProducer = () -> tryProduce(true);
private boolean _useVirtualThreads;
private State _state = State.IDLE;
private boolean _pending;
@ -150,19 +150,14 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
_producer = producer;
_executor = executor;
_tryExecutor = TryExecutor.asTryExecutor(executor);
_virtualExecutor = VirtualThreads.getVirtualThreadsExecutor(_executor);
addBean(_producer);
addBean(_tryExecutor);
addBean(_virtualExecutor);
if (LOG.isDebugEnabled())
LOG.debug("{} created", this);
}
@Override
protected void doStart() throws Exception
{
super.doStart();
_useVirtualThreads = VirtualThreads.isUseVirtualThreads(_executor);
}
@Override
public void dispatch()
{
@ -471,10 +466,10 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
{
try
{
if (isUseVirtualThreads())
VirtualThreads.executeOnVirtualThread(task);
else
_executor.execute(task);
Executor executor = _virtualExecutor;
if (executor == null)
executor = _executor;
executor.execute(task);
}
catch (RejectedExecutionException e)
{
@ -491,7 +486,7 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
@ManagedAttribute(value = "whether this execution strategy uses virtual threads", readonly = true)
public boolean isUseVirtualThreads()
{
return _useVirtualThreads;
return _virtualExecutor != null;
}
@ManagedAttribute(value = "number of tasks consumed with PC mode", readonly = true)

View File

@ -55,6 +55,7 @@ import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledForJreRange;
import org.junit.jupiter.api.condition.DisabledOnJre;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.EnabledForJreRange;
@ -1259,4 +1260,33 @@ public class DistributionTests extends AbstractJettyHomeTest
}
}
}
@Test
@DisabledForJreRange(max = JRE.JAVA_18)
public void testVirtualThreadPool() throws Exception
{
String jettyVersion = System.getProperty("jettyVersion");
JettyHomeTester distribution = JettyHomeTester.Builder.newInstance()
.jettyVersion(jettyVersion)
.mavenLocalRepository(System.getProperty("mavenRepoPath"))
.build();
try (JettyHomeTester.Run run1 = distribution.start("--add-modules=threadpool-virtual-preview,http"))
{
assertTrue(run1.awaitFor(10, TimeUnit.SECONDS));
assertEquals(0, run1.getExitValue());
int httpPort = distribution.freePort();
try (JettyHomeTester.Run run2 = distribution.start(List.of("jetty.http.selectors=1", "jetty.http.port=" + httpPort)))
{
assertTrue(run2.awaitConsoleLogsFor("Started Server@", 10, TimeUnit.SECONDS));
startHttpClient();
ContentResponse response = client.newRequest("localhost", httpPort)
.timeout(15, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.NOT_FOUND_404, response.getStatus());
}
}
}
}

View File

@ -14,7 +14,11 @@
package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
@ -58,6 +62,7 @@ public class VirtualThreadsTest extends AbstractTest<TransportScenario>
// No virtual thread support in FCGI server-side.
Assumptions.assumeTrue(transport != Transport.FCGI);
String virtualThreadsName = "green-";
init(transport);
scenario.prepareServer(new EmptyServerHandler()
{
@ -66,11 +71,21 @@ public class VirtualThreadsTest extends AbstractTest<TransportScenario>
{
if (!VirtualThreads.isVirtualThread())
response.setStatus(HttpStatus.NOT_IMPLEMENTED_501);
if (!Thread.currentThread().getName().startsWith(virtualThreadsName))
response.setStatus(HttpStatus.NOT_IMPLEMENTED_501);
}
});
ThreadPool threadPool = scenario.server.getThreadPool();
if (threadPool instanceof VirtualThreads.Configurable)
((VirtualThreads.Configurable)threadPool).setUseVirtualThreads(true);
{
// CAUTION: Java 19 specific reflection code, might change in future Java versions.
Object builder = Thread.class.getMethod("ofVirtual").invoke(null);
Class<?> builderClass = Arrays.stream(Thread.class.getClasses()).filter(klass -> klass.getName().endsWith("$Builder")).findFirst().orElseThrow();
builder = builderClass.getMethod("name", String.class, long.class).invoke(builder, virtualThreadsName, 0L);
ThreadFactory factory = (ThreadFactory)builderClass.getMethod("factory").invoke(builder);
Executor virtualThreadsExecutor = (Executor)Executors.class.getMethod("newThreadPerTaskExecutor", ThreadFactory.class).invoke(null, factory);
((VirtualThreads.Configurable)threadPool).setVirtualThreadsExecutor(virtualThreadsExecutor);
}
scenario.server.start();
scenario.startClient();
@ -151,7 +166,7 @@ public class VirtualThreadsTest extends AbstractTest<TransportScenario>
});
ThreadPool threadPool = scenario.server.getThreadPool();
if (threadPool instanceof VirtualThreads.Configurable)
((VirtualThreads.Configurable)threadPool).setUseVirtualThreads(true);
((VirtualThreads.Configurable)threadPool).setVirtualThreadsExecutor(VirtualThreads.getDefaultVirtualThreadsExecutor());
scenario.server.start();
scenario.startClient();