diff --git a/documentation/jetty-documentation/src/main/asciidoc/programming-guide/arch-threads.adoc b/documentation/jetty-documentation/src/main/asciidoc/programming-guide/arch-threads.adoc index c4e453265f4..f39c7133c72 100644 --- a/documentation/jetty-documentation/src/main/asciidoc/programming-guide/arch-threads.adoc +++ b/documentation/jetty-documentation/src/main/asciidoc/programming-guide/arch-threads.adoc @@ -188,6 +188,19 @@ This value represents the maximum number of threads that can be reserved and use A negative value for `QueuedThreadPool.reservedThreads` means that the actual value will be heuristically derived from the number of CPU cores and `QueuedThreadPool.maxThreads`. A value of zero for `QueuedThreadPool.reservedThreads` means that reserved threads are disabled, and therefore the xref:pg-arch-threads-execution-strategy-epc[`Execute-Produce-Consume` mode] is never used -- the xref:pg-arch-threads-execution-strategy-pec[`Produce-Execute-Consume` mode] is always used instead. +`QueuedThreadPool` always maintains the number of threads between `QueuedThreadPool.minThreads` and `QueuedThreadPool.maxThreads`; during load spikes the number of thread grows to meet the load demand, and when the load on the system diminishes or the system goes idle, the number of threads shrinks. + +Shrinking `QueuedThreadPool` is important in particular in containerized environments, where typically you want to return the memory occupied by the threads to the operative system. +The shrinking of the `QueuedThreadPool` is controlled by two parameters: `QueuedThreadPool.idleTimeout` and `QueuedThreadPool.maxEvictCount`. + +`QueuedThreadPool.idleTimeout` indicates how long a thread should stay around when it is idle, waiting for tasks to execute. +The longer the threads stay around, the more ready they are in case of new load spikes on the system; however, they consume resources: a Java platform thread typically allocates 1 MiB of native memory. + +`QueuedThreadPool.maxEvictCount` controls how many idle threads are evicted for one `QueuedThreadPool.idleTimeout` period. +The larger this value is, the quicker the threads are evicted when the `QueuedThreadPool` is idle or has less load, and their resources returned to the operative system; however, large values may result in too much thread thrashing: the `QueuedThreadPool` shrinks too fast and must re-create a lot of threads in case of a new load spike on the system. + +A good balance between `QueuedThreadPool.idleTimeout` and `QueuedThreadPool.maxEvictCount` depends on the load profile of your system, and it is often tuned via trial and error. + [[pg-arch-threads-thread-pool-virtual-threads]] ===== Virtual Threads Virtual threads have been introduced in Java 19 as a preview feature. diff --git a/jetty-server/src/main/config/etc/jetty-threadpool-virtual-preview.xml b/jetty-server/src/main/config/etc/jetty-threadpool-virtual-preview.xml index ed7490a51ed..e5c454fb5de 100644 --- a/jetty-server/src/main/config/etc/jetty-threadpool-virtual-preview.xml +++ b/jetty-server/src/main/config/etc/jetty-threadpool-virtual-preview.xml @@ -8,6 +8,7 @@ + diff --git a/jetty-server/src/main/config/etc/jetty-threadpool.xml b/jetty-server/src/main/config/etc/jetty-threadpool.xml index 8c815ac33e9..2ef169a6c67 100644 --- a/jetty-server/src/main/config/etc/jetty-threadpool.xml +++ b/jetty-server/src/main/config/etc/jetty-threadpool.xml @@ -26,6 +26,7 @@ + diff --git a/jetty-server/src/main/config/modules/threadpool-virtual-preview.mod b/jetty-server/src/main/config/modules/threadpool-virtual-preview.mod index 30eb02debca..05ee57659f1 100644 --- a/jetty-server/src/main/config/modules/threadpool-virtual-preview.mod +++ b/jetty-server/src/main/config/modules/threadpool-virtual-preview.mod @@ -30,6 +30,9 @@ etc/jetty-threadpool-virtual-preview.xml ## Thread idle timeout (in milliseconds). #jetty.threadPool.idleTimeout=60000 +## The max number of idle threads that can be evicted in one idleTimeout period. +#jetty.threadPool.maxEvictCount=1 + ## Whether to output a detailed dump. #jetty.threadPool.detailedDump=false diff --git a/jetty-server/src/main/config/modules/threadpool.mod b/jetty-server/src/main/config/modules/threadpool.mod index 4f94692fc09..784c0449fee 100644 --- a/jetty-server/src/main/config/modules/threadpool.mod +++ b/jetty-server/src/main/config/modules/threadpool.mod @@ -31,6 +31,9 @@ etc/jetty-threadpool.xml ## Thread idle timeout (in milliseconds). #jetty.threadPool.idleTimeout=60000 +## The max number of idle threads that are evicted in one idleTimeout period. +#jetty.threadPool.maxEvictCount=1 + ## Whether to output a detailed dump. #jetty.threadPool.detailedDump=false # end::documentation[] diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicBiInteger.java b/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicBiInteger.java index 8eb325b08e3..4134b3118bc 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicBiInteger.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicBiInteger.java @@ -251,6 +251,13 @@ public class AtomicBiInteger extends AtomicLong } } + @Override + public String toString() + { + long encoded = get(); + return getHi(encoded) + "|" + getLo(encoded); + } + /** * Encodes hi and lo values into a long. * diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 7c6743952d3..330b8d2aa62 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -90,11 +90,11 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor *
Hi
Total thread count or Integer.MIN_VALUE if the pool is stopping
*
Lo
Net idle threads == idle threads - job queue size. Essentially if positive, * this represents the effective number of idle threads, and if negative it represents the - * demand for more threads
+ * demand for more threads, which is equivalent to the job queue's size. * */ private final AtomicBiInteger _counts = new AtomicBiInteger(Integer.MIN_VALUE, 0); - private final AtomicLong _lastShrink = new AtomicLong(); + private final AtomicLong _evictThreshold = new AtomicLong(); private final Set _threads = ConcurrentHashMap.newKeySet(); private final AutoLock.WithCondition _joinLock = new AutoLock.WithCondition(); private final BlockingQueue _jobs; @@ -113,6 +113,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor private ThreadPoolBudget _budget; private long _stopTimeout; private Executor _virtualThreadsExecutor; + private int _maxEvictCount = 1; public QueuedThreadPool() { @@ -218,7 +219,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor } addBean(_tryExecutor); - _lastShrink.set(NanoTime.now()); + _evictThreshold.set(NanoTime.now()); super.doStart(); // The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped. @@ -317,7 +318,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor } } - private void joinThreads(long stopByNanos) throws InterruptedException + private void joinThreads(long stopByNanos) { loop : while (true) { @@ -536,6 +537,36 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor } } + /** + *

Returns the maximum number of idle threads that are evicted for every idle timeout + * period, thus shrinking this thread pool towards its {@link #getMinThreads() minimum + * number of threads}. + * The default value is {@code 1}.

+ *

For example, consider a thread pool with {@code minThread=2}, {@code maxThread=20}, + * {@code idleTimeout=5000} and {@code maxEvictCount=3}. + * Let's assume all 20 threads are executing a task, and they all finish their own tasks + * at the same time and no more tasks are submitted; then, 3 threads will be evicted, + * while the other 17 will wait another idle timeout; then another 3 threads will be + * evicted, and so on until {@code minThreads=2} will be reached.

+ * + * @param evictCount the maximum number of idle threads to evict in one idle timeout period + */ + public void setMaxEvictCount(int evictCount) + { + if (evictCount < 1) + throw new IllegalArgumentException("Invalid evict count " + evictCount); + _maxEvictCount = evictCount; + } + + /** + * @return the maximum number of idle threads to evict in one idle timeout period + */ + @ManagedAttribute("maximum number of idle threads to evict in one idle timeout period") + public int getMaxEvictCount() + { + return _maxEvictCount; + } + /** * @return the number of jobs in the queue waiting for a thread */ @@ -747,7 +778,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor // and we are not at max threads. startThread = (idle <= 0 && threads < _maxThreads) ? 1 : 0; - // The job will be run by an idle thread when available + // Add 1|0 or 0|-1 to counts depending upon the decision to start a thread or not; + // idle can become negative which means there are queued tasks. if (!_counts.compareAndSet(counts, threads + startThread, idle + startThread - 1)) continue; @@ -830,7 +862,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor if (LOG.isDebugEnabled()) LOG.debug("Starting {}", thread); _threads.add(thread); - _lastShrink.set(NanoTime.now()); + // Update the evict threshold to prevent thrashing of newly started threads. + _evictThreshold.set(NanoTime.now() + TimeUnit.MILLISECONDS.toNanos(_idleTimeout)); thread.start(); started = true; } @@ -936,6 +969,79 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor job.run(); } + /** + *

Determines whether to evict the current thread from the pool.

+ * + * @return whether the current thread should be evicted + */ + protected boolean evict() + { + long idleTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(getIdleTimeout()); + + // There is a chance that many threads enter this method concurrently, + // and if all of them are evicted the pool shrinks below minThreads. + // For example when minThreads=3, threads=8, maxEvictCount=10 we want + // to evict at most 5 threads (8-3), not 10. + // When a thread fails the CAS, it may assume that another thread has + // been evicted, so the CAS should be attempted only a number of times + // equal to the most threads we want to evict (5 in the example above). + int threads = getThreads(); + int minThreads = getMinThreads(); + int threadsToEvict = threads - minThreads; + + while (true) + { + if (threadsToEvict > 0) + { + // We have excess threads, so check if we should evict the current thread. + long now = NanoTime.now(); + long evictPeriod = idleTimeoutNanos / getMaxEvictCount(); + if (LOG.isDebugEnabled()) + LOG.debug("Evict check, period={}ms {}", TimeUnit.NANOSECONDS.toMillis(evictPeriod), this); + + long evictThreshold = _evictThreshold.get(); + long threshold = evictThreshold; + + // If the threshold is too far in the past, + // advance it to be one idle timeout before now. + if (NanoTime.elapsed(threshold, now) > idleTimeoutNanos) + threshold = now - idleTimeoutNanos; + + // Advance the threshold by one evict period. + threshold += evictPeriod; + + // Is the new threshold in the future? + if (NanoTime.isBefore(now, threshold)) + { + // Yes - we cannot evict yet, so continue looking for jobs. + if (LOG.isDebugEnabled()) + LOG.debug("Evict skipped, threshold={}ms in the future {}", NanoTime.millisElapsed(now, threshold), this); + return false; + } + + // We can evict if we can update the threshold. + if (_evictThreshold.compareAndSet(evictThreshold, threshold)) + { + if (LOG.isDebugEnabled()) + LOG.debug("Evicted, threshold={}ms in the past {}", NanoTime.millisElapsed(threshold, now), this); + return true; + } + else + { + // Some other thread was evicted. + --threadsToEvict; + } + } + else + { + // No more threads to evict, continue looking for jobs. + if (LOG.isDebugEnabled()) + LOG.debug("Evict skipped, no excess threads {}", this); + return false; + } + } + } + /** * @return the job queue */ @@ -994,7 +1100,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor int idle = Math.max(0, AtomicBiInteger.getLo(count)); int queue = getQueueSize(); - return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]", + return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,t=%dms,q=%d}[%s]", getClass().getSimpleName(), _name, hashCode(), @@ -1004,17 +1110,18 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor getMaxThreads(), idle, getReservedThreads(), + NanoTime.millisUntil(_evictThreshold.get()), queue, _tryExecutor); } private class Runner implements Runnable { - private Runnable idleJobPoll(long idleTimeout) throws InterruptedException + private Runnable idleJobPoll(long idleTimeoutNanos) throws InterruptedException { - if (idleTimeout <= 0) + if (idleTimeoutNanos <= 0) return _jobs.take(); - return _jobs.poll(idleTimeout, TimeUnit.MILLISECONDS); + return _jobs.poll(idleTimeoutNanos, TimeUnit.NANOSECONDS); } @Override @@ -1026,76 +1133,41 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor boolean idle = true; try { - Runnable job = null; - while (true) + while (_counts.getHi() != Integer.MIN_VALUE) { - // If we had a job, - if (job != null) - { - idle = true; - // signal that we are idle again - if (!addCounts(0, 1)) - break; - } - // else check we are still running - else if (_counts.getHi() == Integer.MIN_VALUE) - { - break; - } - try { - // Look for an immediately available job - job = _jobs.poll(); - if (job == null) + long idleTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(getIdleTimeout()); + Runnable job = idleJobPoll(idleTimeoutNanos); + + while (job != null) { - // No job immediately available maybe we should shrink? - long idleTimeout = getIdleTimeout(); - if (idleTimeout > 0 && getThreads() > _minThreads) - { - long last = _lastShrink.get(); - long now = NanoTime.now(); - if (NanoTime.millisElapsed(last, now) > idleTimeout && _lastShrink.compareAndSet(last, now)) - { - if (LOG.isDebugEnabled()) - LOG.debug("shrinking {}", QueuedThreadPool.this); - break; - } - } + idle = false; + // Run the jobs. + if (LOG.isDebugEnabled()) + LOG.debug("run {} in {}", job, QueuedThreadPool.this); + doRunJob(job); + if (LOG.isDebugEnabled()) + LOG.debug("ran {} in {}", job, QueuedThreadPool.this); - // Wait for a job, only after we have checked if we should shrink - job = idleJobPoll(idleTimeout); + // Signal that we are idle again; since execute() subtracts + // 1 from idle each time a job is submitted, we have to add + // 1 for each executed job here to compensate. + if (!addCounts(0, 1)) + break; + idle = true; - // If still no job? - if (job == null) - // continue to try again - continue; + // Look for another job + job = _jobs.poll(); } - idle = false; - - // run job - if (LOG.isDebugEnabled()) - LOG.debug("run {} in {}", job, QueuedThreadPool.this); - runJob(job); - if (LOG.isDebugEnabled()) - LOG.debug("ran {} in {}", job, QueuedThreadPool.this); + if (evict()) + break; } catch (InterruptedException e) { - if (LOG.isDebugEnabled()) - LOG.debug("interrupted {} in {}", job, QueuedThreadPool.this); LOG.trace("IGNORED", e); } - catch (Throwable e) - { - LOG.warn("Job failed", e); - } - finally - { - // Clear any interrupted status - Thread.interrupted(); - } } } finally @@ -1103,15 +1175,33 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor Thread thread = Thread.currentThread(); removeThread(thread); - // Decrement the total thread count and the idle count if we had no job + // Decrement the total thread count and the idle count if we had no job. addCounts(-1, idle ? -1 : 0); if (LOG.isDebugEnabled()) LOG.debug("{} exited for {}", thread, QueuedThreadPool.this); - // There is a chance that we shrunk just as a job was queued for us, so - // check again if we have sufficient threads to meet demand + // There is a chance that we shrunk just as a job was queued, + // or multiple concurrent threads ran out of jobs, + // so check again if we have sufficient threads to meet demand. ensureThreads(); } } + + private void doRunJob(Runnable job) + { + try + { + runJob(job); + } + catch (Throwable e) + { + LOG.warn("Job failed", e); + } + finally + { + // Clear any thread interrupted status. + Thread.interrupted(); + } + } } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java index 9201c926501..d4fcc520d94 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java @@ -19,8 +19,12 @@ import java.net.URLClassLoader; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Random; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.logging.StacklessLogging; @@ -32,11 +36,13 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.core.StringContains.containsString; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; @@ -235,6 +241,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest Thread.sleep(3L * tp.getIdleTimeout() / 2); assertThat(tp.getThreads(), is(2)); assertThat(tp.getIdleThreads(), is(2)); + assertThat(tp.getQueueSize(), is(0)); // Run job0 RunningJob job0 = new RunningJob("JOB0"); @@ -242,6 +249,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest assertTrue(job0._run.await(10, TimeUnit.SECONDS)); assertThat(tp.getThreads(), is(2)); assertThat(tp.getIdleThreads(), is(1)); + assertThat(tp.getQueueSize(), is(0)); // Run job1 RunningJob job1 = new RunningJob("JOB1"); @@ -249,6 +257,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest assertTrue(job1._run.await(10, TimeUnit.SECONDS)); assertThat(tp.getThreads(), is(2)); assertThat(tp.getIdleThreads(), is(0)); + assertThat(tp.getQueueSize(), is(0)); // Run job2 RunningJob job2 = new RunningJob("JOB2"); @@ -256,6 +265,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest assertTrue(job2._run.await(10, TimeUnit.SECONDS)); assertThat(tp.getThreads(), is(3)); assertThat(tp.getIdleThreads(), is(0)); + assertThat(tp.getQueueSize(), is(0)); // Run job3 RunningJob job3 = new RunningJob("JOB3"); @@ -263,11 +273,13 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest assertTrue(job3._run.await(10, TimeUnit.SECONDS)); assertThat(tp.getThreads(), is(4)); assertThat(tp.getIdleThreads(), is(0)); + assertThat(tp.getQueueSize(), is(0)); // Check no short term change Thread.sleep(100); assertThat(tp.getThreads(), is(4)); assertThat(tp.getIdleThreads(), is(0)); + assertThat(tp.getQueueSize(), is(0)); // Run job4. will be queued RunningJob job4 = new RunningJob("JOB4"); @@ -287,11 +299,11 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest assertThat(tp.getIdleThreads(), is(0)); assertThat(tp.getQueueSize(), is(0)); - // finish job 1, and its thread will become idle + // finish job 1, and its thread will become idle and then shrink job1._stopping.countDown(); assertTrue(job1._stopped.await(10, TimeUnit.SECONDS)); - waitForIdle(tp, 1); - waitForThreads(tp, 4); + waitForIdle(tp, 0); + waitForThreads(tp, 3); // finish job 2,3,4 job2._stopping.countDown(); @@ -500,7 +512,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest } @Test - public void testShrink() throws Exception + public void testEvict() throws Exception { final AtomicInteger sleep = new AtomicInteger(100); Runnable job = () -> @@ -548,7 +560,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest } @Test - public void testSteadyShrink() throws Exception + public void testSteadyEvict() throws Exception { CountDownLatch latch = new CountDownLatch(1); Runnable job = () -> @@ -815,13 +827,13 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest String dump = pool.dump(); // TODO use hamcrest 2.0 regex matcher assertThat(dump, containsString("STOPPED")); - assertThat(dump, containsString(",3<=0<=4,i=0,r=-1,q=0")); + assertThat(dump, containsString(",3<=0<=4,i=0,r=-1,")); assertThat(dump, containsString("[NO_TRY]")); pool.setReservedThreads(2); dump = pool.dump(); assertThat(dump, containsString("STOPPED")); - assertThat(dump, containsString(",3<=0<=4,i=0,r=2,q=0")); + assertThat(dump, containsString(",3<=0<=4,i=0,r=2,")); assertThat(dump, containsString("[NO_TRY]")); pool.start(); @@ -829,7 +841,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest Thread.sleep(250); // TODO need to give time for threads to read idle poll after setting idle dump = pool.dump(); assertThat(count(dump, " - STARTED"), is(2)); - assertThat(dump, containsString(",3<=3<=4,i=3,r=2,q=0")); + assertThat(dump, containsString(",3<=3<=4,i=3,r=2,")); assertThat(dump, containsString("[ReservedThreadExecutor@")); assertThat(count(dump, " IDLE"), is(3)); assertThat(count(dump, " RESERVED"), is(0)); @@ -852,7 +864,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest Thread.sleep(250); // TODO need to give time for threads to read idle poll after setting idle dump = pool.dump(); assertThat(count(dump, " - STARTED"), is(2)); - assertThat(dump, containsString(",3<=3<=4,i=2,r=2,q=0")); + assertThat(dump, containsString(",3<=3<=4,i=2,r=2,")); assertThat(dump, containsString("[ReservedThreadExecutor@")); assertThat(count(dump, " IDLE"), is(2)); assertThat(count(dump, " WAITING"), is(1)); @@ -862,7 +874,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest pool.setDetailedDump(true); dump = pool.dump(); assertThat(count(dump, " - STARTED"), is(2)); - assertThat(dump, containsString(",3<=3<=4,i=2,r=2,q=0")); + assertThat(dump, containsString(",3<=3<=4,i=2,r=2,")); assertThat(dump, containsString("reserved=0/2")); assertThat(dump, containsString("[ReservedThreadExecutor@")); assertThat(count(dump, " IDLE"), is(2)); @@ -877,7 +889,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest Thread.sleep(250); // TODO need to give time for threads to read idle poll after setting idle dump = pool.dump(); assertThat(count(dump, " - STARTED"), is(2)); - assertThat(dump, containsString(",3<=3<=4,i=1,r=2,q=0")); + assertThat(dump, containsString(",3<=3<=4,i=1,r=2,")); assertThat(dump, containsString("reserved=1/2")); assertThat(dump, containsString("[ReservedThreadExecutor@")); assertThat(count(dump, " IDLE"), is(1)); @@ -1022,6 +1034,189 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest assertTrue(stopping._completed.await(5, TimeUnit.SECONDS)); } + @Test + public void testInterruptFlagClearedBetweenDelayedExecutions() throws Exception + { + QueuedThreadPool tp = new QueuedThreadPool(1, 1); + tp.setReservedThreads(0); + tp.start(); + + AtomicInteger executions = new AtomicInteger(); + + tp.execute(() -> + { + Thread.currentThread().interrupt(); + executions.incrementAndGet(); + }); + await().atMost(5, TimeUnit.SECONDS).until(() -> executions.get() == 1); + + AtomicBoolean intr = new AtomicBoolean(); + tp.execute(() -> + { + intr.set(Thread.currentThread().isInterrupted()); + executions.incrementAndGet(); + }); + await().atMost(5, TimeUnit.SECONDS).until(() -> executions.get() == 2); + + assertThat(intr.get(), is(false)); + } + + @Test + public void testInterruptFlagClearedBetweenQueuedJobsExecutions() throws Exception + { + QueuedThreadPool tp = new QueuedThreadPool(1, 1); + tp.setReservedThreads(0); + tp.start(); + + AtomicBoolean intr = new AtomicBoolean(); + CyclicBarrier barrier = new CyclicBarrier(2); + + tp.execute(() -> + { + try + { + barrier.await(); // wait until the main thread enqueued another job + } + catch (InterruptedException | BrokenBarrierException e) + { + e.printStackTrace(); + intr.set(true); + } + Thread.currentThread().interrupt(); + }); + + tp.execute(() -> + { + intr.set(Thread.interrupted()); + try + { + barrier.await(); // notify that this thread is over + } + catch (InterruptedException | BrokenBarrierException e) + { + e.printStackTrace(); + intr.set(true); + } + }); + + barrier.await(); // tell the 1st execute we enqueued the 2nd job + barrier.await(); // wait until 2nd execute is done + assertThat(intr.get(), is(false)); + } + + @Test + public void testEvictCount() throws Exception + { + QueuedThreadPool tp = new QueuedThreadPool(); + int minThreads = 2; + tp.setMinThreads(minThreads); + int maxThreads = 10; + tp.setMaxThreads(maxThreads); + int idleTimeout = 1000; + tp.setIdleTimeout(idleTimeout); + int evictCount = 3; + tp.setMaxEvictCount(evictCount); + tp.start(); + + waitForThreads(tp, minThreads); + waitForIdle(tp, minThreads); + + RunningJob[] jobs = new RunningJob[maxThreads]; + for (int i = 0; i < jobs.length; i++) + { + RunningJob job = jobs[i] = new RunningJob("JOB" + i); + tp.execute(job); + assertTrue(job._run.await(1, TimeUnit.SECONDS)); + } + + for (int i = 0; i < jobs.length; i++) + { + jobs[i]._stopping.countDown(); + } + + assertEquals(maxThreads, tp.getThreads()); + + Thread.sleep(idleTimeout * 2 + idleTimeout / 2); + assertEquals(maxThreads - tp.getMaxEvictCount(), tp.getThreads()); + + Thread.sleep(idleTimeout); + assertEquals(maxThreads - 2 * tp.getMaxEvictCount(), tp.getThreads()); + + Thread.sleep(idleTimeout); + assertEquals(minThreads, tp.getThreads()); + } + + @Test + public void testRealistic() throws Exception + { + final int spikeThreads = 1000; + final int busyThreads = 200; + final int idleTimeout = 2000; + final int evictCount = 200; + final int jobDuration = 10; + final Random random = new Random(); + + QueuedThreadPool qtp = new QueuedThreadPool(2 * spikeThreads, busyThreads / 2); + qtp.setIdleTimeout(idleTimeout); + qtp.setMaxEvictCount(evictCount); + qtp.start(); + + CountDownLatch spike = new CountDownLatch(spikeThreads); + for (int i = 0; i < spikeThreads; i++) + qtp.execute(job(spike, 100 + random.nextInt(2 * jobDuration))); + spike.await(); + System.err.printf("busy=%d %s\n", qtp.getBusyThreads(), qtp); + + // keep threads busy + long last = System.nanoTime(); + while (true) + { + if (NanoTime.secondsSince(last) > 1) + { + last = System.nanoTime(); + System.err.printf("busy=%d %s\n", qtp.getBusyThreads(), qtp); + if (qtp.getThreads() < (busyThreads * 3 / 2)) + break; + } + try + { + if (qtp.getBusyThreads() < busyThreads) + { + CountDownLatch start = new CountDownLatch(1); + qtp.execute(job(start, random.nextInt(jobDuration) + jobDuration / 2)); + start.await(); + continue; + } + Thread.sleep(random.nextInt(jobDuration / 4)); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + qtp.stop(); + } + + Runnable job(CountDownLatch started, int duration) + { + return new Runnable() + { + @Override + public void run() + { + try + { + started.countDown(); + Thread.sleep(duration); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + }; + } + private int count(String s, String p) { int c = 0; diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java index 5d7987205d3..4d07212307a 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java @@ -159,7 +159,7 @@ public class ReservedThreadExecutorTest } @Test - public void testShrink() throws Exception + public void testEvict() throws Exception { final long IDLE = 1000; @@ -183,7 +183,7 @@ public class ReservedThreadExecutorTest } @Test - public void testBusyShrink() throws Exception + public void testBusyEvict() throws Exception { final long IDLE = 1000; diff --git a/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/jmh/QueuedThreadPoolBenchmark.java b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/jmh/QueuedThreadPoolBenchmark.java new file mode 100644 index 00000000000..3b08df4d5b3 --- /dev/null +++ b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/jmh/QueuedThreadPoolBenchmark.java @@ -0,0 +1,93 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util.thread.jmh; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@State(Scope.Benchmark) +@Warmup(iterations = 10, time = 1000, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 10, time = 1000, timeUnit = TimeUnit.MILLISECONDS) +public class QueuedThreadPoolBenchmark +{ + QueuedThreadPool pool; + private CountDownLatch[] latches; + + @Setup // (Level.Iteration) + public void buildPool() + { + pool = new QueuedThreadPool(200, 200); + pool.setReservedThreads(0); + LifeCycle.start(pool); + latches = new CountDownLatch[50]; + for (int i = 0; i < latches.length; i++) + { + latches[i] = new CountDownLatch(1); + } + } + + @TearDown // (Level.Iteration) + public void shutdownPool() + { + System.err.println(pool); + LifeCycle.stop(pool); + pool = null; + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + @Threads(8) + public void test() throws Exception + { + for (CountDownLatch latch : latches) + { + pool.execute(latch::countDown); + } + for (CountDownLatch latch : latches) + { + latch.await(); + } + } + + public static void main(String[] args) throws RunnerException + { + Options opt = new OptionsBuilder() + .include(QueuedThreadPoolBenchmark.class.getSimpleName()) + .forks(1) + // .addProfiler(CompilerProfiler.class) + // .addProfiler(LinuxPerfProfiler.class) + // .addProfiler(LinuxPerfNormProfiler.class) + // .addProfiler(LinuxPerfAsmProfiler.class, "hotThreshold=0.05") + .build(); + + new Runner(opt).run(); + } +}