Merged branch 'jetty-10.0.x' into 'jetty-11.0.x'.
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
commit
2c3230a26a
|
@ -63,16 +63,16 @@ public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPo
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isUseVirtualThreads()
|
public Executor getVirtualThreadsExecutor()
|
||||||
{
|
{
|
||||||
return VirtualThreads.isUseVirtualThreads(_executor);
|
return VirtualThreads.getVirtualThreadsExecutor(_executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setUseVirtualThreads(boolean useVirtualThreads)
|
public void setVirtualThreadsExecutor(Executor executor)
|
||||||
{
|
{
|
||||||
if (_executor instanceof VirtualThreads.Configurable)
|
if (_executor instanceof VirtualThreads.Configurable)
|
||||||
((VirtualThreads.Configurable)_executor).setUseVirtualThreads(useVirtualThreads);
|
((VirtualThreads.Configurable)_executor).setVirtualThreadsExecutor(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
<?xml version="1.0"?>
|
||||||
|
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "https://www.eclipse.org/jetty/configure_10_0.dtd">
|
||||||
|
|
||||||
|
<Configure>
|
||||||
|
<New id="threadPool" class="org.eclipse.jetty.util.thread.QueuedThreadPool">
|
||||||
|
<Set name="name" property="jetty.threadPool.namePrefix" />
|
||||||
|
<Set name="minThreads" type="int"><Property name="jetty.threadPool.minThreads" deprecated="threads.min" default="10"/></Set>
|
||||||
|
<Set name="maxThreads" type="int"><Property name="jetty.threadPool.maxThreads" deprecated="threads.max" default="200"/></Set>
|
||||||
|
<Set name="reservedThreads" type="int"><Property name="jetty.threadPool.reservedThreads" default="-1"/></Set>
|
||||||
|
<Set name="idleTimeout" type="int"><Property name="jetty.threadPool.idleTimeout" deprecated="threads.timeout" default="60000"/></Set>
|
||||||
|
<Set name="detailedDump" type="boolean"><Property name="jetty.threadPool.detailedDump" default="false"/></Set>
|
||||||
|
<Get id="namePrefix" name="name" />
|
||||||
|
<Call class="java.lang.Thread" name="ofVirtual">
|
||||||
|
<Call class="java.lang.Thread$Builder" name="name">
|
||||||
|
<Arg>
|
||||||
|
<Property name="jetty.threadPool.virtual.namePrefix">
|
||||||
|
<Default><Ref refid="namePrefix" />-virtual-</Default>
|
||||||
|
</Property>
|
||||||
|
</Arg>
|
||||||
|
<Arg type="long">0</Arg>
|
||||||
|
<Call class="java.lang.Thread$Builder" name="allowSetThreadLocals">
|
||||||
|
<Arg type="boolean"><Property name="jetty.threadPool.virtual.allowSetThreadLocals" default="true" /></Arg>
|
||||||
|
<Call class="java.lang.Thread$Builder" name="inheritInheritableThreadLocals">
|
||||||
|
<Arg type="boolean"><Property name="jetty.threadPool.virtual.inheritInheritableThreadLocals" default="false" /></Arg>
|
||||||
|
<Call id="virtualThreadFactory" class="java.lang.Thread$Builder" name="factory" />
|
||||||
|
</Call>
|
||||||
|
</Call>
|
||||||
|
</Call>
|
||||||
|
</Call>
|
||||||
|
<Call name="setVirtualThreadsExecutor">
|
||||||
|
<Arg>
|
||||||
|
<Call class="java.util.concurrent.Executors" name="newThreadPerTaskExecutor">
|
||||||
|
<Arg><Ref refid="virtualThreadFactory" /></Arg>
|
||||||
|
</Call>
|
||||||
|
</Arg>
|
||||||
|
</Call>
|
||||||
|
</New>
|
||||||
|
|
||||||
|
<Call class="org.slf4j.LoggerFactory" name="getLogger">
|
||||||
|
<Arg>org.eclipse.jetty</Arg>
|
||||||
|
<Call name="warn">
|
||||||
|
<Arg>Virtual threads are a Java Preview Feature, support may be limited.</Arg>
|
||||||
|
</Call>
|
||||||
|
</Call>
|
||||||
|
</Configure>
|
|
@ -20,10 +20,11 @@
|
||||||
<!-- for all configuration that may be set here. -->
|
<!-- for all configuration that may be set here. -->
|
||||||
<!-- =========================================================== -->
|
<!-- =========================================================== -->
|
||||||
<New id="threadPool" class="org.eclipse.jetty.util.thread.QueuedThreadPool">
|
<New id="threadPool" class="org.eclipse.jetty.util.thread.QueuedThreadPool">
|
||||||
|
<Set name="name" property="jetty.threadPool.namePrefix" />
|
||||||
<Set name="minThreads" type="int"><Property name="jetty.threadPool.minThreads" deprecated="threads.min" default="10"/></Set>
|
<Set name="minThreads" type="int"><Property name="jetty.threadPool.minThreads" deprecated="threads.min" default="10"/></Set>
|
||||||
<Set name="maxThreads" type="int"><Property name="jetty.threadPool.maxThreads" deprecated="threads.max" default="200"/></Set>
|
<Set name="maxThreads" type="int"><Property name="jetty.threadPool.maxThreads" deprecated="threads.max" default="200"/></Set>
|
||||||
<Set name="reservedThreads" type="int"><Property name="jetty.threadPool.reservedThreads" default="-1"/></Set>
|
<Set name="reservedThreads" type="int"><Property name="jetty.threadPool.reservedThreads" default="-1"/></Set>
|
||||||
<Set name="useVirtualThreads" type="boolean"><Property name="jetty.threadPool.useVirtualThreads" default="false"/></Set>
|
<Set name="useVirtualThreads" type="boolean"><Property deprecated="jetty.threadPool.useVirtualThreads" default="false"/></Set>
|
||||||
<Set name="idleTimeout" type="int"><Property name="jetty.threadPool.idleTimeout" deprecated="threads.timeout" default="60000"/></Set>
|
<Set name="idleTimeout" type="int"><Property name="jetty.threadPool.idleTimeout" deprecated="threads.timeout" default="60000"/></Set>
|
||||||
<Set name="detailedDump" type="boolean"><Property name="jetty.threadPool.detailedDump" default="false"/></Set>
|
<Set name="detailedDump" type="boolean"><Property name="jetty.threadPool.detailedDump" default="false"/></Set>
|
||||||
</New>
|
</New>
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
[description]
|
||||||
|
Enables and configures the Server ThreadPool with support for virtual threads.
|
||||||
|
|
||||||
|
[exec]
|
||||||
|
--enable-preview
|
||||||
|
|
||||||
|
[depends]
|
||||||
|
logging
|
||||||
|
|
||||||
|
[provides]
|
||||||
|
threadpool
|
||||||
|
|
||||||
|
[xml]
|
||||||
|
etc/jetty-threadpool-virtual-preview.xml
|
||||||
|
|
||||||
|
[ini-template]
|
||||||
|
## Platform threads name prefix.
|
||||||
|
#jetty.threadPool.namePrefix=qtp<hashCode>
|
||||||
|
|
||||||
|
## Minimum number of pooled threads.
|
||||||
|
#jetty.threadPool.minThreads=10
|
||||||
|
|
||||||
|
## Maximum number of pooled threads.
|
||||||
|
#jetty.threadPool.maxThreads=200
|
||||||
|
|
||||||
|
## Number of reserved threads (-1 for heuristic).
|
||||||
|
#jetty.threadPool.reservedThreads=-1
|
||||||
|
|
||||||
|
## Thread idle timeout (in milliseconds).
|
||||||
|
#jetty.threadPool.idleTimeout=60000
|
||||||
|
|
||||||
|
## Whether to output a detailed dump.
|
||||||
|
#jetty.threadPool.detailedDump=false
|
||||||
|
|
||||||
|
## Virtual threads name prefix.
|
||||||
|
#jetty.threadPool.virtual.namePrefix=qtp<hashCode>-virtual-
|
||||||
|
|
||||||
|
## Whether virtual threads are allowed to set thread locals.
|
||||||
|
#jetty.threadPool.virtual.allowSetThreadLocals=true
|
||||||
|
|
||||||
|
## Whether virtual threads inherits the values of inheritable thread locals.
|
||||||
|
#jetty.threadPool.virtual.allowSetThreadLocals=true
|
|
@ -4,10 +4,16 @@ Enables and configures the Server ThreadPool.
|
||||||
[depends]
|
[depends]
|
||||||
logging
|
logging
|
||||||
|
|
||||||
|
[provides]
|
||||||
|
threadpool|default
|
||||||
|
|
||||||
[xml]
|
[xml]
|
||||||
etc/jetty-threadpool.xml
|
etc/jetty-threadpool.xml
|
||||||
|
|
||||||
[ini-template]
|
[ini-template]
|
||||||
|
## Thread name prefix.
|
||||||
|
#jetty.threadPool.namePrefix=qtp<hashCode>
|
||||||
|
|
||||||
## Minimum number of pooled threads.
|
## Minimum number of pooled threads.
|
||||||
#jetty.threadPool.minThreads=10
|
#jetty.threadPool.minThreads=10
|
||||||
|
|
||||||
|
@ -18,6 +24,7 @@ etc/jetty-threadpool.xml
|
||||||
#jetty.threadPool.reservedThreads=-1
|
#jetty.threadPool.reservedThreads=-1
|
||||||
|
|
||||||
## Whether to use virtual threads, if the runtime supports them.
|
## Whether to use virtual threads, if the runtime supports them.
|
||||||
|
## Deprecated, use Jetty module 'threadpool-virtual-preview' instead.
|
||||||
#jetty.threadPool.useVirtualThreads=false
|
#jetty.threadPool.useVirtualThreads=false
|
||||||
|
|
||||||
## Thread idle timeout (in milliseconds).
|
## Thread idle timeout (in milliseconds).
|
||||||
|
|
|
@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
|
||||||
* and, if virtual threads are supported, to start virtual threads.</p>
|
* and, if virtual threads are supported, to start virtual threads.</p>
|
||||||
*
|
*
|
||||||
* @see #areSupported()
|
* @see #areSupported()
|
||||||
* @see #executeOnVirtualThread(Runnable)
|
* @see #getVirtualThreadsExecutor(Executor)
|
||||||
* @see #isVirtualThread()
|
* @see #isVirtualThread()
|
||||||
*/
|
*/
|
||||||
public class VirtualThreads
|
public class VirtualThreads
|
||||||
|
@ -58,6 +58,11 @@ public class VirtualThreads
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Method getIsVirtualThreadMethod()
|
||||||
|
{
|
||||||
|
return isVirtualThread;
|
||||||
|
}
|
||||||
|
|
||||||
private static void warn()
|
private static void warn()
|
||||||
{
|
{
|
||||||
LOG.warn("Virtual thread support is not available (or not enabled via --enable-preview) in the current Java runtime ({})", System.getProperty("java.version"));
|
LOG.warn("Virtual thread support is not available (or not enabled via --enable-preview) in the current Java runtime ({})", System.getProperty("java.version"));
|
||||||
|
@ -78,14 +83,16 @@ public class VirtualThreads
|
||||||
*
|
*
|
||||||
* @param task the task to execute in a virtual thread
|
* @param task the task to execute in a virtual thread
|
||||||
* @see #areSupported()
|
* @see #areSupported()
|
||||||
|
* @deprecated use {@link #getVirtualThreadsExecutor(Executor)} instead
|
||||||
*/
|
*/
|
||||||
|
@Deprecated(forRemoval = true)
|
||||||
public static void executeOnVirtualThread(Runnable task)
|
public static void executeOnVirtualThread(Runnable task)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Starting in virtual thread: {}", task);
|
LOG.debug("Starting in virtual thread: {}", task);
|
||||||
executor.execute(task);
|
getDefaultVirtualThreadsExecutor().execute(task);
|
||||||
}
|
}
|
||||||
catch (Throwable x)
|
catch (Throwable x)
|
||||||
{
|
{
|
||||||
|
@ -101,7 +108,7 @@ public class VirtualThreads
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
return (Boolean)isVirtualThread.invoke(Thread.currentThread());
|
return (Boolean)getIsVirtualThreadMethod().invoke(Thread.currentThread());
|
||||||
}
|
}
|
||||||
catch (Throwable x)
|
catch (Throwable x)
|
||||||
{
|
{
|
||||||
|
@ -110,6 +117,25 @@ public class VirtualThreads
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a default virtual thread per task {@code Executor}
|
||||||
|
*/
|
||||||
|
public static Executor getDefaultVirtualThreadsExecutor()
|
||||||
|
{
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param executor the {@code Executor} to obtain a virtual threads {@code Executor} from
|
||||||
|
* @return a virtual threads {@code Executor} obtained from the given {@code Executor}
|
||||||
|
*/
|
||||||
|
public static Executor getVirtualThreadsExecutor(Executor executor)
|
||||||
|
{
|
||||||
|
if (executor instanceof Configurable)
|
||||||
|
return ((Configurable)executor).getVirtualThreadsExecutor();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>Tests whether the given executor implements {@link Configurable} and
|
* <p>Tests whether the given executor implements {@link Configurable} and
|
||||||
* it has been configured to use virtual threads.</p>
|
* it has been configured to use virtual threads.</p>
|
||||||
|
@ -121,7 +147,7 @@ public class VirtualThreads
|
||||||
public static boolean isUseVirtualThreads(Executor executor)
|
public static boolean isUseVirtualThreads(Executor executor)
|
||||||
{
|
{
|
||||||
if (executor instanceof Configurable)
|
if (executor instanceof Configurable)
|
||||||
return ((Configurable)executor).isUseVirtualThreads();
|
return ((Configurable)executor).getVirtualThreadsExecutor() != null;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,30 +155,53 @@ public class VirtualThreads
|
||||||
* <p>Implementations of this interface can be configured to use virtual threads.</p>
|
* <p>Implementations of this interface can be configured to use virtual threads.</p>
|
||||||
* <p>Whether virtual threads are actually used depends on whether the runtime
|
* <p>Whether virtual threads are actually used depends on whether the runtime
|
||||||
* supports virtual threads and, if the runtime supports them, whether they are
|
* supports virtual threads and, if the runtime supports them, whether they are
|
||||||
* configured to be used via {@link #setUseVirtualThreads(boolean)}.</p>
|
* configured to be used via {@link #setVirtualThreadsExecutor(Executor)}.</p>
|
||||||
*/
|
*/
|
||||||
public interface Configurable
|
public interface Configurable
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* @return whether to use virtual threads
|
* @return the {@code Executor} to use to execute tasks in virtual threads
|
||||||
*/
|
*/
|
||||||
|
default Executor getVirtualThreadsExecutor()
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param executor the {@code Executor} to use to execute tasks in virtual threads
|
||||||
|
* @throws UnsupportedOperationException if the runtime does not support virtual threads
|
||||||
|
* @see #areSupported()
|
||||||
|
*/
|
||||||
|
default void setVirtualThreadsExecutor(Executor executor)
|
||||||
|
{
|
||||||
|
if (executor != null && !VirtualThreads.areSupported())
|
||||||
|
{
|
||||||
|
warn();
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return whether to use virtual threads
|
||||||
|
* @deprecated use {@link #getVirtualThreadsExecutor()} instead
|
||||||
|
*/
|
||||||
|
@Deprecated(forRemoval = true)
|
||||||
default boolean isUseVirtualThreads()
|
default boolean isUseVirtualThreads()
|
||||||
{
|
{
|
||||||
return false;
|
return getVirtualThreadsExecutor() != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param useVirtualThreads whether to use virtual threads
|
* @param useVirtualThreads whether to use virtual threads
|
||||||
* @throws UnsupportedOperationException if the runtime does not support virtual threads
|
* @throws UnsupportedOperationException if the runtime does not support virtual threads
|
||||||
* @see #areSupported()
|
* @see #areSupported()
|
||||||
|
* @deprecated use {@link #setVirtualThreadsExecutor(Executor)} instead
|
||||||
*/
|
*/
|
||||||
|
@Deprecated(forRemoval = true)
|
||||||
default void setUseVirtualThreads(boolean useVirtualThreads)
|
default void setUseVirtualThreads(boolean useVirtualThreads)
|
||||||
{
|
{
|
||||||
if (useVirtualThreads && !VirtualThreads.areSupported())
|
setVirtualThreadsExecutor(useVirtualThreads ? executor : null);
|
||||||
{
|
|
||||||
warn();
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -47,7 +48,7 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool
|
||||||
private int _priority = Thread.NORM_PRIORITY;
|
private int _priority = Thread.NORM_PRIORITY;
|
||||||
private boolean _daemon;
|
private boolean _daemon;
|
||||||
private boolean _detailedDump;
|
private boolean _detailedDump;
|
||||||
private boolean _useVirtualThreads;
|
private Executor _virtualThreadsExecutor;
|
||||||
|
|
||||||
public ExecutorThreadPool()
|
public ExecutorThreadPool()
|
||||||
{
|
{
|
||||||
|
@ -271,18 +272,18 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isUseVirtualThreads()
|
public Executor getVirtualThreadsExecutor()
|
||||||
{
|
{
|
||||||
return _useVirtualThreads;
|
return _virtualThreadsExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setUseVirtualThreads(boolean useVirtualThreads)
|
public void setVirtualThreadsExecutor(Executor executor)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
VirtualThreads.Configurable.super.setUseVirtualThreads(useVirtualThreads);
|
VirtualThreads.Configurable.super.setVirtualThreadsExecutor(executor);
|
||||||
_useVirtualThreads = useVirtualThreads;
|
_virtualThreadsExecutor = executor;
|
||||||
}
|
}
|
||||||
catch (UnsupportedOperationException ignored)
|
catch (UnsupportedOperationException ignored)
|
||||||
{
|
{
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -111,7 +112,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
||||||
private int _lowThreadsThreshold = 1;
|
private int _lowThreadsThreshold = 1;
|
||||||
private ThreadPoolBudget _budget;
|
private ThreadPoolBudget _budget;
|
||||||
private long _stopTimeout;
|
private long _stopTimeout;
|
||||||
private boolean _useVirtualThreads;
|
private Executor _virtualThreadsExecutor;
|
||||||
|
|
||||||
public QueuedThreadPool()
|
public QueuedThreadPool()
|
||||||
{
|
{
|
||||||
|
@ -515,18 +516,18 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isUseVirtualThreads()
|
public Executor getVirtualThreadsExecutor()
|
||||||
{
|
{
|
||||||
return _useVirtualThreads;
|
return _virtualThreadsExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setUseVirtualThreads(boolean useVirtualThreads)
|
public void setVirtualThreadsExecutor(Executor executor)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
VirtualThreads.Configurable.super.setUseVirtualThreads(useVirtualThreads);
|
VirtualThreads.Configurable.super.setVirtualThreadsExecutor(executor);
|
||||||
_useVirtualThreads = useVirtualThreads;
|
_virtualThreadsExecutor = executor;
|
||||||
}
|
}
|
||||||
catch (UnsupportedOperationException ignored)
|
catch (UnsupportedOperationException ignored)
|
||||||
{
|
{
|
||||||
|
|
|
@ -136,8 +136,8 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
|
||||||
private final Producer _producer;
|
private final Producer _producer;
|
||||||
private final Executor _executor;
|
private final Executor _executor;
|
||||||
private final TryExecutor _tryExecutor;
|
private final TryExecutor _tryExecutor;
|
||||||
|
private final Executor _virtualExecutor;
|
||||||
private final Runnable _runPendingProducer = () -> tryProduce(true);
|
private final Runnable _runPendingProducer = () -> tryProduce(true);
|
||||||
private boolean _useVirtualThreads;
|
|
||||||
private State _state = State.IDLE;
|
private State _state = State.IDLE;
|
||||||
private boolean _pending;
|
private boolean _pending;
|
||||||
|
|
||||||
|
@ -150,19 +150,14 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
|
||||||
_producer = producer;
|
_producer = producer;
|
||||||
_executor = executor;
|
_executor = executor;
|
||||||
_tryExecutor = TryExecutor.asTryExecutor(executor);
|
_tryExecutor = TryExecutor.asTryExecutor(executor);
|
||||||
|
_virtualExecutor = VirtualThreads.getVirtualThreadsExecutor(_executor);
|
||||||
addBean(_producer);
|
addBean(_producer);
|
||||||
addBean(_tryExecutor);
|
addBean(_tryExecutor);
|
||||||
|
addBean(_virtualExecutor);
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{} created", this);
|
LOG.debug("{} created", this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void doStart() throws Exception
|
|
||||||
{
|
|
||||||
super.doStart();
|
|
||||||
_useVirtualThreads = VirtualThreads.isUseVirtualThreads(_executor);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dispatch()
|
public void dispatch()
|
||||||
{
|
{
|
||||||
|
@ -471,10 +466,10 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (isUseVirtualThreads())
|
Executor executor = _virtualExecutor;
|
||||||
VirtualThreads.executeOnVirtualThread(task);
|
if (executor == null)
|
||||||
else
|
executor = _executor;
|
||||||
_executor.execute(task);
|
executor.execute(task);
|
||||||
}
|
}
|
||||||
catch (RejectedExecutionException e)
|
catch (RejectedExecutionException e)
|
||||||
{
|
{
|
||||||
|
@ -491,7 +486,7 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
|
||||||
@ManagedAttribute(value = "whether this execution strategy uses virtual threads", readonly = true)
|
@ManagedAttribute(value = "whether this execution strategy uses virtual threads", readonly = true)
|
||||||
public boolean isUseVirtualThreads()
|
public boolean isUseVirtualThreads()
|
||||||
{
|
{
|
||||||
return _useVirtualThreads;
|
return _virtualExecutor != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ManagedAttribute(value = "number of tasks consumed with PC mode", readonly = true)
|
@ManagedAttribute(value = "number of tasks consumed with PC mode", readonly = true)
|
||||||
|
|
|
@ -55,6 +55,7 @@ import org.eclipse.jetty.websocket.api.WebSocketListener;
|
||||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||||
import org.junit.jupiter.api.Tag;
|
import org.junit.jupiter.api.Tag;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.condition.DisabledForJreRange;
|
||||||
import org.junit.jupiter.api.condition.DisabledOnJre;
|
import org.junit.jupiter.api.condition.DisabledOnJre;
|
||||||
import org.junit.jupiter.api.condition.DisabledOnOs;
|
import org.junit.jupiter.api.condition.DisabledOnOs;
|
||||||
import org.junit.jupiter.api.condition.EnabledForJreRange;
|
import org.junit.jupiter.api.condition.EnabledForJreRange;
|
||||||
|
@ -1259,4 +1260,33 @@ public class DistributionTests extends AbstractJettyHomeTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisabledForJreRange(max = JRE.JAVA_18)
|
||||||
|
public void testVirtualThreadPool() throws Exception
|
||||||
|
{
|
||||||
|
String jettyVersion = System.getProperty("jettyVersion");
|
||||||
|
JettyHomeTester distribution = JettyHomeTester.Builder.newInstance()
|
||||||
|
.jettyVersion(jettyVersion)
|
||||||
|
.mavenLocalRepository(System.getProperty("mavenRepoPath"))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
try (JettyHomeTester.Run run1 = distribution.start("--add-modules=threadpool-virtual-preview,http"))
|
||||||
|
{
|
||||||
|
assertTrue(run1.awaitFor(10, TimeUnit.SECONDS));
|
||||||
|
assertEquals(0, run1.getExitValue());
|
||||||
|
|
||||||
|
int httpPort = distribution.freePort();
|
||||||
|
try (JettyHomeTester.Run run2 = distribution.start(List.of("jetty.http.selectors=1", "jetty.http.port=" + httpPort)))
|
||||||
|
{
|
||||||
|
assertTrue(run2.awaitConsoleLogsFor("Started Server@", 10, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
startHttpClient();
|
||||||
|
ContentResponse response = client.newRequest("localhost", httpPort)
|
||||||
|
.timeout(15, TimeUnit.SECONDS)
|
||||||
|
.send();
|
||||||
|
assertEquals(HttpStatus.NOT_FOUND_404, response.getStatus());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,11 @@
|
||||||
package org.eclipse.jetty.http.client;
|
package org.eclipse.jetty.http.client;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@ -58,6 +62,7 @@ public class VirtualThreadsTest extends AbstractTest<TransportScenario>
|
||||||
// No virtual thread support in FCGI server-side.
|
// No virtual thread support in FCGI server-side.
|
||||||
Assumptions.assumeTrue(transport != Transport.FCGI);
|
Assumptions.assumeTrue(transport != Transport.FCGI);
|
||||||
|
|
||||||
|
String virtualThreadsName = "green-";
|
||||||
init(transport);
|
init(transport);
|
||||||
scenario.prepareServer(new EmptyServerHandler()
|
scenario.prepareServer(new EmptyServerHandler()
|
||||||
{
|
{
|
||||||
|
@ -66,11 +71,21 @@ public class VirtualThreadsTest extends AbstractTest<TransportScenario>
|
||||||
{
|
{
|
||||||
if (!VirtualThreads.isVirtualThread())
|
if (!VirtualThreads.isVirtualThread())
|
||||||
response.setStatus(HttpStatus.NOT_IMPLEMENTED_501);
|
response.setStatus(HttpStatus.NOT_IMPLEMENTED_501);
|
||||||
|
if (!Thread.currentThread().getName().startsWith(virtualThreadsName))
|
||||||
|
response.setStatus(HttpStatus.NOT_IMPLEMENTED_501);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
ThreadPool threadPool = scenario.server.getThreadPool();
|
ThreadPool threadPool = scenario.server.getThreadPool();
|
||||||
if (threadPool instanceof VirtualThreads.Configurable)
|
if (threadPool instanceof VirtualThreads.Configurable)
|
||||||
((VirtualThreads.Configurable)threadPool).setUseVirtualThreads(true);
|
{
|
||||||
|
// CAUTION: Java 19 specific reflection code, might change in future Java versions.
|
||||||
|
Object builder = Thread.class.getMethod("ofVirtual").invoke(null);
|
||||||
|
Class<?> builderClass = Arrays.stream(Thread.class.getClasses()).filter(klass -> klass.getName().endsWith("$Builder")).findFirst().orElseThrow();
|
||||||
|
builder = builderClass.getMethod("name", String.class, long.class).invoke(builder, virtualThreadsName, 0L);
|
||||||
|
ThreadFactory factory = (ThreadFactory)builderClass.getMethod("factory").invoke(builder);
|
||||||
|
Executor virtualThreadsExecutor = (Executor)Executors.class.getMethod("newThreadPerTaskExecutor", ThreadFactory.class).invoke(null, factory);
|
||||||
|
((VirtualThreads.Configurable)threadPool).setVirtualThreadsExecutor(virtualThreadsExecutor);
|
||||||
|
}
|
||||||
scenario.server.start();
|
scenario.server.start();
|
||||||
scenario.startClient();
|
scenario.startClient();
|
||||||
|
|
||||||
|
@ -151,7 +166,7 @@ public class VirtualThreadsTest extends AbstractTest<TransportScenario>
|
||||||
});
|
});
|
||||||
ThreadPool threadPool = scenario.server.getThreadPool();
|
ThreadPool threadPool = scenario.server.getThreadPool();
|
||||||
if (threadPool instanceof VirtualThreads.Configurable)
|
if (threadPool instanceof VirtualThreads.Configurable)
|
||||||
((VirtualThreads.Configurable)threadPool).setUseVirtualThreads(true);
|
((VirtualThreads.Configurable)threadPool).setVirtualThreadsExecutor(VirtualThreads.getDefaultVirtualThreadsExecutor());
|
||||||
scenario.server.start();
|
scenario.server.start();
|
||||||
scenario.startClient();
|
scenario.startClient();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue