Updated VirtualThreadPool to limit the number of concurrent virtual threads using a Semaphore. Updated modules and documentation. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
2ca9be96ae
commit
0644aaf88c
|
@ -17,17 +17,29 @@ import java.util.concurrent.Executors;
|
|||
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.eclipse.jetty.util.thread.VirtualThreadPool;
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public class ArchitectureDocs
|
||||
{
|
||||
public void configureVirtualThreads()
|
||||
public void queuedVirtualThreads()
|
||||
{
|
||||
// tag::virtual[]
|
||||
// tag::queuedVirtual[]
|
||||
QueuedThreadPool threadPool = new QueuedThreadPool();
|
||||
threadPool.setVirtualThreadsExecutor(Executors.newVirtualThreadPerTaskExecutor());
|
||||
|
||||
Server server = new Server(threadPool);
|
||||
// end::virtual[]
|
||||
// end::queuedVirtual[]
|
||||
}
|
||||
|
||||
public void virtualVirtualThreads()
|
||||
{
|
||||
// tag::virtualVirtual[]
|
||||
VirtualThreadPool threadPool = new VirtualThreadPool();
|
||||
// Limit the max number of current virtual threads.
|
||||
threadPool.setMaxThreads(200);
|
||||
|
||||
Server server = new Server(threadPool);
|
||||
// end::virtualVirtual[]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -708,6 +708,34 @@ If you want to use virtual threads, introduced as a preview feature in Java 19 a
|
|||
|
||||
See also the xref:server/index.adoc#threadpool[section about configuring the thread pool].
|
||||
|
||||
[[threadpool-all-virtual]]
|
||||
== Module `threadpool-all-virtual`
|
||||
|
||||
The `threadpool-all-virtual` module allows you to configure the server-wide thread pool, similarly to what you can do with the <<threadpool,`threadpool`>> Jetty module, so that all threads are virtual threads, introduced as an official feature since Java 21.
|
||||
|
||||
CAUTION: Only use this module if you are using Java 21 or later.
|
||||
If you are using Java 19 or Java 20, use the <<threadpool-virtual-preview,`threadpool-virtual-preview`>> Jetty module instead.
|
||||
|
||||
The module properties to configure the thread pool are:
|
||||
|
||||
----
|
||||
include::{jetty-home}/modules/threadpool-all-virtual.mod[tags=documentation]
|
||||
----
|
||||
|
||||
The property `jetty.threadpool.maxThreads` limits, using a `Semaphore`, the number of current virtual threads in use.
|
||||
|
||||
Limiting the number of current virtual threads helps to limit resource usage in applications, especially in case of load spikes.
|
||||
When an unlimited number of virtual threads is allowed, the server might be brought down due to resource (typically memory) exhaustion.
|
||||
|
||||
[CAUTION]
|
||||
====
|
||||
Even when using virtual threads, Jetty uses non-blocking I/O, and dedicates a thread to each `java.nio.channels.Selector` to perform the `Selector.select()` operation.
|
||||
|
||||
Currently (up to Java 22), calling `Selector.select()` from a virtual thread pins the carrier thread.
|
||||
|
||||
When using the `threadpool-all-virtual` Jetty module, if you have `N` selectors, then `N` carrier threads will be pinned by the virtual threads calling `Selector.select()`, possibly making your system less efficient, and at worst locking up the entire system if there are no carrier threads available to run virtual threads.
|
||||
====
|
||||
|
||||
[[threadpool-virtual]]
|
||||
== Module `threadpool-virtual`
|
||||
|
||||
|
|
|
@ -328,32 +328,30 @@ Virtual threads have been introduced as a preview feature in Java 19 and Java 20
|
|||
|
||||
The xref:modules/standard.adoc#threadpool-virtual-preview[`threadpool-virtual-preview`] Jetty module provides support for virtual threads in Java 19 and Java 20, and it is mutually exclusive with the `threadpool` Jetty module.
|
||||
|
||||
The xref:modules/standard.adoc#threadpool-virtual[`threadpool-virtual`] Jetty module provides support for virtual threads in Java 21 or later, and it is mutually exclusive with the `threadpool` Jetty module.
|
||||
When using Java 21, there are two Jetty modules available:
|
||||
|
||||
* xref:modules/standard.adoc#threadpool-virtual[`threadpool-virtual`]
|
||||
* xref:modules/standard.adoc#threadpool-all-virtual[`threadpool-all-virtual`]
|
||||
|
||||
Both are mutually exclusive with the `threadpool` Jetty module.
|
||||
|
||||
If you have already enabled the `threadpool` Jetty module, it is sufficient to remove it by removing the `$JETTY_BASE/start.d/threadpool.ini` file.
|
||||
|
||||
When using Java 21 or later, you can enable the xref:modules/standard.adoc#threadpool-virtual[`threadpool-virtual`] module:
|
||||
The xref:modules/standard.adoc#threadpool-virtual[`threadpool-virtual`] Jetty module provides a mixed thread mode, where platform threads are used to run internal Jetty tasks, but application code is invoked using virtual threads.
|
||||
|
||||
The xref:modules/standard.adoc#threadpool-all-virtual[`threadpool-all-virtual`] Jetty module provides a thread mode where all threads are virtual threads, including those used internally by Jetty.
|
||||
|
||||
You can enable either module using:
|
||||
|
||||
----
|
||||
$ java -jar $JETTY_HOME/start.jar --add-modules=threadpool-virtual,http
|
||||
----
|
||||
|
||||
After the command above, the `$JETTY_BASE` directory looks like this:
|
||||
or
|
||||
|
||||
[source]
|
||||
----
|
||||
$JETTY_BASE
|
||||
├── resources
|
||||
│ └── jetty-logging.properties
|
||||
└── start.d
|
||||
├── http.ini
|
||||
└── threadpool-virtual.ini
|
||||
$ java -jar $JETTY_HOME/start.jar --add-modules=threadpool-all-virtual,http
|
||||
----
|
||||
|
||||
Now you can customize the `threadpool-virtual.ini` file to explicitly configure the thread pool and the virtual threads and then start Jetty:
|
||||
|
||||
[jetty%nowrap]
|
||||
....
|
||||
[jetty]
|
||||
setupArgs=--add-modules=threadpool-virtual,http
|
||||
....
|
||||
After the command above, the `$JETTY_BASE/start.d/` directory will contain the corresponding `threadpool-virtual.ini` or `threadpool-all-virtual.ini` file.
|
||||
You can now explicitly configure the thread pool module properties inside the `+*.ini+` file and then start Jetty.
|
||||
|
|
|
@ -235,11 +235,14 @@ Virtual threads have been introduced in Java 19 and Java 20 as a preview feature
|
|||
|
||||
NOTE: In Java versions where virtual threads are a preview feature, remember to add `+--enable-preview+` to the JVM command line options to use virtual threads.
|
||||
|
||||
[[thread-pool-virtual-threads-queued]]
|
||||
==== Virtual Threads Support with `QueuedThreadPool`
|
||||
|
||||
`QueuedThreadPool` can be configured to use virtual threads by specifying the virtual threads `Executor`:
|
||||
|
||||
[,java,indent=0]
|
||||
----
|
||||
include::code:example$src/main/java/org/eclipse/jetty/docs/programming/ArchitectureDocs.java[tags=virtual]
|
||||
include::code:example$src/main/java/org/eclipse/jetty/docs/programming/ArchitectureDocs.java[tags=queuedVirtual]
|
||||
----
|
||||
|
||||
[CAUTION]
|
||||
|
@ -255,3 +258,17 @@ Enabling virtual threads in `QueuedThreadPool` will default the number of reserv
|
|||
|
||||
Defaulting the number of reserved threads to zero ensures that the <<execution-strategy-pec,Produce-Execute-Consume mode>> is always used, which means that virtual threads will always be used for blocking tasks.
|
||||
====
|
||||
|
||||
[[thread-pool-virtual-threads-virtual]]
|
||||
==== Virtual Threads Support with `VirtualThreadPool`
|
||||
|
||||
`VirtualThreadPool` is an alternative to `QueuedThreadPool` that creates only virtual threads (no platform threads).
|
||||
|
||||
[,java,indent=0]
|
||||
----
|
||||
include::code:example$src/main/java/org/eclipse/jetty/docs/programming/ArchitectureDocs.java[tags=virtualVirtual]
|
||||
----
|
||||
|
||||
Despite the name, `VirtualThreadPool` does not pool virtual threads, but allows you to impose a limit on the maximum number of current virtual threads, in order to limit resource consumption.
|
||||
|
||||
Furthermore, you can configure it to track virtual threads so that a xref:troubleshooting/component-dump.adoc[Jetty component dump] will show all virtual threads, including those that are unmounted.
|
||||
|
|
|
@ -3,7 +3,10 @@
|
|||
|
||||
<Configure>
|
||||
<New id="threadPool" class="org.eclipse.jetty.util.thread.VirtualThreadPool">
|
||||
<Arg type="int"><Property name="jetty.threadPool.maxThreads" default="200" /></Arg>
|
||||
<Set name="name" property="jetty.threadPool.namePrefix" />
|
||||
<Set name="tracking" property="jetty.threadPool.tracking" />
|
||||
<Set name="detailedDump" property="jetty.threadPool.detailedDump" />
|
||||
</New>
|
||||
|
||||
<Call class="org.slf4j.LoggerFactory" name="getLogger">
|
||||
|
|
|
@ -11,25 +11,18 @@
|
|||
<Set name="maxEvictCount" type="int"><Property name="jetty.threadPool.maxEvictCount" default="1"/></Set>
|
||||
<Set name="detailedDump" type="boolean"><Property name="jetty.threadPool.detailedDump" default="false"/></Set>
|
||||
<Get id="namePrefix" name="name" />
|
||||
<Call class="java.lang.Thread" name="ofVirtual">
|
||||
<Call class="java.lang.Thread$Builder" name="name">
|
||||
<Arg>
|
||||
<Property name="jetty.threadPool.virtual.namePrefix">
|
||||
<Default><Ref refid="namePrefix" />-virtual-</Default>
|
||||
</Property>
|
||||
</Arg>
|
||||
<Arg type="long">0</Arg>
|
||||
<Call class="java.lang.Thread$Builder" name="inheritInheritableThreadLocals">
|
||||
<Arg type="boolean"><Property name="jetty.threadPool.virtual.inheritInheritableThreadLocals" default="false" /></Arg>
|
||||
<Call id="virtualThreadFactory" class="java.lang.Thread$Builder" name="factory" />
|
||||
</Call>
|
||||
</Call>
|
||||
</Call>
|
||||
<Call name="setVirtualThreadsExecutor">
|
||||
<Arg>
|
||||
<Call class="java.util.concurrent.Executors" name="newThreadPerTaskExecutor">
|
||||
<Arg><Ref refid="virtualThreadFactory" /></Arg>
|
||||
</Call>
|
||||
<New class="org.eclipse.jetty.util.thread.VirtualThreadPool">
|
||||
<Set name="name">
|
||||
<Property name="jetty.threadPool.virtual.namePrefix">
|
||||
<Default><Ref refid="namePrefix" />-virtual-</Default>
|
||||
</Property>
|
||||
</Set>
|
||||
<Set name="maxThreads" property="jetty.threadPool.virtual.maxThreads" />
|
||||
<Set name="tracking" property="jetty.threadPool.virtual.tracking" />
|
||||
<Set name="detailedDump" property="jetty.threadPool.detailedDump" />
|
||||
</New>
|
||||
</Arg>
|
||||
</Call>
|
||||
</New>
|
||||
|
|
|
@ -13,7 +13,16 @@ etc/jetty-threadpool-all-virtual.xml
|
|||
|
||||
[ini-template]
|
||||
# tag::documentation[]
|
||||
## Platform threads name prefix.
|
||||
## Virtual threads name prefix.
|
||||
#jetty.threadPool.namePrefix=vtp<hashCode>
|
||||
|
||||
## Maximum number of current virtual threads.
|
||||
#jetty.threadPool.maxThreads=200
|
||||
|
||||
## Whether to track virtual threads so they appear
|
||||
## in the dump even if they are unmounted.
|
||||
#jetty.threadPool.tracking=false
|
||||
|
||||
## Whether to output virtual thread's stack traces in the dump.
|
||||
#jetty.threadPool.detailedDump=false
|
||||
# end::documentation[]
|
||||
|
|
|
@ -37,6 +37,10 @@ etc/jetty-threadpool-virtual.xml
|
|||
## Virtual threads name prefix.
|
||||
#jetty.threadPool.virtual.namePrefix=qtp<hashCode>-virtual-
|
||||
|
||||
## Whether virtual threads inherits the values of inheritable thread locals.
|
||||
#jetty.threadPool.virtual.inheritInheritableThreadLocals=true
|
||||
## Max number of current virtual threads.
|
||||
#jetty.threadPool.virtual.maxThreads=200
|
||||
|
||||
## Whether to track virtual threads so they appear
|
||||
## in the dump even if they are unmounted.
|
||||
#jetty.threadPool.virtual.tracking=false
|
||||
# end::documentation[]
|
||||
|
|
|
@ -16,6 +16,7 @@ package org.eclipse.jetty.util.thread;
|
|||
import java.util.Objects;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
import org.eclipse.jetty.util.VirtualThreads;
|
||||
|
@ -27,7 +28,10 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* An implementation of {@link ThreadPool} interface that does not pool, but instead uses {@link VirtualThreads}.
|
||||
* <p>An implementation of {@link ThreadPool} interface that does not pool, but instead uses {@link VirtualThreads}.</p>
|
||||
* <p>It is possible to specify the max number of concurrent virtual threads that can be spawned, to help limiting
|
||||
* resource usage in applications, especially in case of load spikes, where an unlimited number of virtual threads
|
||||
* may be spawned, compete for resources, and eventually bring the system down due to memory exhaustion.</p>
|
||||
*/
|
||||
@ManagedObject("A thread non-pool for virtual threads")
|
||||
public class VirtualThreadPool extends ContainerLifeCycle implements ThreadPool, Dumpable, TryExecutor, VirtualThreads.Configurable
|
||||
|
@ -35,12 +39,14 @@ public class VirtualThreadPool extends ContainerLifeCycle implements ThreadPool,
|
|||
private static final Logger LOG = LoggerFactory.getLogger(VirtualThreadPool.class);
|
||||
|
||||
private final AutoLock.WithCondition _joinLock = new AutoLock.WithCondition();
|
||||
private String _name = null;
|
||||
private Executor _virtualExecutor;
|
||||
private Thread _main;
|
||||
private boolean _externalExecutor;
|
||||
private String _name;
|
||||
private int _maxThreads = 200;
|
||||
private boolean _tracking;
|
||||
private boolean _detailedDump;
|
||||
private Thread _keepAlive;
|
||||
private Executor _virtualExecutor;
|
||||
private boolean _externalExecutor;
|
||||
private Semaphore _semaphore;
|
||||
|
||||
public VirtualThreadPool()
|
||||
{
|
||||
|
@ -71,12 +77,32 @@ public class VirtualThreadPool extends ContainerLifeCycle implements ThreadPool,
|
|||
_name = name;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximum number of concurrent virtual threads
|
||||
*/
|
||||
@ManagedAttribute("The max number of concurrent virtual threads")
|
||||
public int getMaxThreads()
|
||||
{
|
||||
return _maxThreads;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxThreads the maximum number of concurrent virtual threads
|
||||
*/
|
||||
public void setMaxThreads(int maxThreads)
|
||||
{
|
||||
if (isRunning())
|
||||
throw new IllegalStateException(getState());
|
||||
_maxThreads = maxThreads;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get if this pool is tracking virtual threads.
|
||||
*
|
||||
* @return {@code true} if the virtual threads will be tracked.
|
||||
* @see TrackingExecutor
|
||||
*/
|
||||
@ManagedAttribute("virtual threads are tracked")
|
||||
@ManagedAttribute("Whether virtual threads are tracked")
|
||||
public boolean isTracking()
|
||||
{
|
||||
return _tracking;
|
||||
|
@ -89,7 +115,7 @@ public class VirtualThreadPool extends ContainerLifeCycle implements ThreadPool,
|
|||
_tracking = tracking;
|
||||
}
|
||||
|
||||
@ManagedAttribute("reports additional details in the dump")
|
||||
@ManagedAttribute("Whether to report additional details in the dump")
|
||||
public boolean isDetailedDump()
|
||||
{
|
||||
return _detailedDump;
|
||||
|
@ -101,11 +127,11 @@ public class VirtualThreadPool extends ContainerLifeCycle implements ThreadPool,
|
|||
if (_virtualExecutor instanceof TrackingExecutor trackingExecutor)
|
||||
trackingExecutor.setDetailedDump(detailedDump);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
_main = new Thread("jetty-virtual-thread-pool-keepalive")
|
||||
_keepAlive = new Thread("jetty-virtual-thread-pool-keepalive")
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
|
@ -123,18 +149,24 @@ public class VirtualThreadPool extends ContainerLifeCycle implements ThreadPool,
|
|||
}
|
||||
}
|
||||
};
|
||||
_main.start();
|
||||
_keepAlive.start();
|
||||
|
||||
if (_virtualExecutor == null)
|
||||
{
|
||||
_externalExecutor = false;
|
||||
_virtualExecutor = Objects.requireNonNull(StringUtil.isBlank(_name)
|
||||
? VirtualThreads.getDefaultVirtualThreadsExecutor()
|
||||
: VirtualThreads.getNamedVirtualThreadsExecutor(_name));
|
||||
}
|
||||
if (_tracking && !(_virtualExecutor instanceof TrackingExecutor))
|
||||
_virtualExecutor = new TrackingExecutor(_virtualExecutor, _detailedDump);
|
||||
_virtualExecutor = new TrackingExecutor(_virtualExecutor, isDetailedDump());
|
||||
addBean(_virtualExecutor);
|
||||
|
||||
if (_maxThreads > 0)
|
||||
{
|
||||
_semaphore = new Semaphore(_maxThreads);
|
||||
addBean(_semaphore);
|
||||
}
|
||||
|
||||
super.doStart();
|
||||
}
|
||||
|
||||
|
@ -142,11 +174,12 @@ public class VirtualThreadPool extends ContainerLifeCycle implements ThreadPool,
|
|||
protected void doStop() throws Exception
|
||||
{
|
||||
super.doStop();
|
||||
removeBean(_semaphore);
|
||||
_semaphore = null;
|
||||
removeBean(_virtualExecutor);
|
||||
if (!_externalExecutor)
|
||||
_virtualExecutor = null;
|
||||
_main = null;
|
||||
|
||||
_keepAlive = null;
|
||||
try (AutoLock.WithCondition l = _joinLock.lock())
|
||||
{
|
||||
l.signalAll();
|
||||
|
@ -208,7 +241,7 @@ public class VirtualThreadPool extends ContainerLifeCycle implements ThreadPool,
|
|||
{
|
||||
try
|
||||
{
|
||||
_virtualExecutor.execute(task);
|
||||
execute(task);
|
||||
return true;
|
||||
}
|
||||
catch (RejectedExecutionException e)
|
||||
|
@ -221,6 +254,32 @@ public class VirtualThreadPool extends ContainerLifeCycle implements ThreadPool,
|
|||
@Override
|
||||
public void execute(Runnable task)
|
||||
{
|
||||
_virtualExecutor.execute(task);
|
||||
Runnable job = task;
|
||||
if (_semaphore != null)
|
||||
{
|
||||
job = () ->
|
||||
{
|
||||
try
|
||||
{
|
||||
// The caller of execute(Runnable) cannot be blocked,
|
||||
// as it is unknown whether it is a virtual thread.
|
||||
// But this is a virtual thread, so acquiring a permit here
|
||||
// blocks the virtual thread, but does not pin the carrier.
|
||||
_semaphore.acquire();
|
||||
task.run();
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
// Likely stopping this component, exit.
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("interrupted while waiting for permit {}", task, x);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_semaphore.release();
|
||||
}
|
||||
};
|
||||
}
|
||||
_virtualExecutor.execute(job);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,16 +16,19 @@ package org.eclipse.jetty.util.thread;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.condition.DisabledForJreRange;
|
||||
import org.junit.jupiter.api.condition.JRE;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@DisabledForJreRange(max = JRE.JAVA_20)
|
||||
|
@ -146,6 +149,46 @@ public class VirtualThreadPoolTest
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxThreads() throws Exception
|
||||
{
|
||||
VirtualThreadPool vtp = new VirtualThreadPool();
|
||||
vtp.setMaxThreads(1);
|
||||
vtp.start();
|
||||
|
||||
AtomicBoolean run1 = new AtomicBoolean();
|
||||
CountDownLatch latch1 = new CountDownLatch(1);
|
||||
vtp.execute(() ->
|
||||
{
|
||||
try
|
||||
{
|
||||
// Simulate a blocking call.
|
||||
run1.set(true);
|
||||
latch1.await();
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new RuntimeException(x);
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for the first task to acquire the only permit.
|
||||
await().atMost(1, TimeUnit.SECONDS).until(run1::get);
|
||||
|
||||
// Try to submit another task, it should not
|
||||
// be executed, and the caller must not block.
|
||||
CountDownLatch latch2 = new CountDownLatch(1);
|
||||
vtp.execute(latch2::countDown);
|
||||
assertFalse(latch2.await(1, TimeUnit.SECONDS));
|
||||
|
||||
// Unblocking the first task allows the execution of the second task.
|
||||
latch1.countDown();
|
||||
|
||||
assertTrue(latch2.await(5, TimeUnit.SECONDS));
|
||||
|
||||
vtp.stop();
|
||||
}
|
||||
|
||||
public static int count(String str, String subStr)
|
||||
{
|
||||
if (StringUtil.isEmpty(str))
|
||||
|
|
Loading…
Reference in New Issue