Fixes #9237 - Decouple QTP idleTimeout from pool shrink rate. (#9498)

Introduced `QueuedThreadPool.maxEvictCount` to be the number of idle threads that are evicted in one idle timeout.

When set to 1 (the default), the old behavior is reproduced: expiring 1 thread every idle timeout.
When set to larger values, allows to keep around the threads for the idle timeout (in case of further load spikes), but allows to quickly recover OS memory when they are truly idle.

For example, with 2000 threads, 30 seconds idle timeout and idleTimeoutMaxShrinkCount=1, it will take 995 minutes (about 16.5 hrs) to shrink the pool back to 10 threads.
By setting idleTimeoutMaxShrinkCount=100, the thread pool can be shrunk to 10 threads in about 10 minutes.

Note also that the new algorithm is more aggressive at shrinking the thread pool.
Previously, a small load might have been sufficient to never evict any thread, because all threads could take turns at executing jobs so that threads were mostly idle but would never really idle time out.
The new algorithm is more aggressive even in presence of a small load, so that if `minThreads` are sufficient to cope with the small load, then the other threads are evicted.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Signed-off-by: gregw <gregw@webtide.com>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Co-authored-by: gregw <gregw@webtide.com>
Co-authored-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Simone Bordet 2023-03-31 15:36:34 +02:00 committed by GitHub
parent fe11b94da8
commit 278ec1be69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 489 additions and 83 deletions

View File

@ -188,6 +188,19 @@ This value represents the maximum number of threads that can be reserved and use
A negative value for `QueuedThreadPool.reservedThreads` means that the actual value will be heuristically derived from the number of CPU cores and `QueuedThreadPool.maxThreads`.
A value of zero for `QueuedThreadPool.reservedThreads` means that reserved threads are disabled, and therefore the xref:pg-arch-threads-execution-strategy-epc[`Execute-Produce-Consume` mode] is never used -- the xref:pg-arch-threads-execution-strategy-pec[`Produce-Execute-Consume` mode] is always used instead.
`QueuedThreadPool` always maintains the number of threads between `QueuedThreadPool.minThreads` and `QueuedThreadPool.maxThreads`; during load spikes the number of thread grows to meet the load demand, and when the load on the system diminishes or the system goes idle, the number of threads shrinks.
Shrinking `QueuedThreadPool` is important in particular in containerized environments, where typically you want to return the memory occupied by the threads to the operative system.
The shrinking of the `QueuedThreadPool` is controlled by two parameters: `QueuedThreadPool.idleTimeout` and `QueuedThreadPool.maxEvictCount`.
`QueuedThreadPool.idleTimeout` indicates how long a thread should stay around when it is idle, waiting for tasks to execute.
The longer the threads stay around, the more ready they are in case of new load spikes on the system; however, they consume resources: a Java platform thread typically allocates 1 MiB of native memory.
`QueuedThreadPool.maxEvictCount` controls how many idle threads are evicted for one `QueuedThreadPool.idleTimeout` period.
The larger this value is, the quicker the threads are evicted when the `QueuedThreadPool` is idle or has less load, and their resources returned to the operative system; however, large values may result in too much thread thrashing: the `QueuedThreadPool` shrinks too fast and must re-create a lot of threads in case of a new load spike on the system.
A good balance between `QueuedThreadPool.idleTimeout` and `QueuedThreadPool.maxEvictCount` depends on the load profile of your system, and it is often tuned via trial and error.
[[pg-arch-threads-thread-pool-virtual-threads]]
===== Virtual Threads
Virtual threads have been introduced in Java 19 as a preview feature.

View File

@ -8,6 +8,7 @@
<Set name="maxThreads" type="int"><Property name="jetty.threadPool.maxThreads" deprecated="threads.max" default="200"/></Set>
<Set name="reservedThreads" type="int"><Property name="jetty.threadPool.reservedThreads" default="-1"/></Set>
<Set name="idleTimeout" type="int"><Property name="jetty.threadPool.idleTimeout" deprecated="threads.timeout" default="60000"/></Set>
<Set name="maxEvictCount" type="int"><Property name="jetty.threadPool.maxEvictCount" default="1"/></Set>
<Set name="detailedDump" type="boolean"><Property name="jetty.threadPool.detailedDump" default="false"/></Set>
<Get id="namePrefix" name="name" />
<Call class="java.lang.Thread" name="ofVirtual">

View File

@ -26,6 +26,7 @@
<Set name="reservedThreads" type="int"><Property name="jetty.threadPool.reservedThreads" default="-1"/></Set>
<Set name="useVirtualThreads" property="jetty.threadPool.useVirtualThreads" />
<Set name="idleTimeout" type="int"><Property name="jetty.threadPool.idleTimeout" deprecated="threads.timeout" default="60000"/></Set>
<Set name="maxEvictCount" type="int"><Property name="jetty.threadPool.maxEvictCount" default="1"/></Set>
<Set name="detailedDump" type="boolean"><Property name="jetty.threadPool.detailedDump" default="false"/></Set>
</New>
</Configure>

View File

@ -30,6 +30,9 @@ etc/jetty-threadpool-virtual-preview.xml
## Thread idle timeout (in milliseconds).
#jetty.threadPool.idleTimeout=60000
## The max number of idle threads that can be evicted in one idleTimeout period.
#jetty.threadPool.maxEvictCount=1
## Whether to output a detailed dump.
#jetty.threadPool.detailedDump=false

View File

@ -31,6 +31,9 @@ etc/jetty-threadpool.xml
## Thread idle timeout (in milliseconds).
#jetty.threadPool.idleTimeout=60000
## The max number of idle threads that are evicted in one idleTimeout period.
#jetty.threadPool.maxEvictCount=1
## Whether to output a detailed dump.
#jetty.threadPool.detailedDump=false
# end::documentation[]

View File

@ -251,6 +251,13 @@ public class AtomicBiInteger extends AtomicLong
}
}
@Override
public String toString()
{
long encoded = get();
return getHi(encoded) + "|" + getLo(encoded);
}
/**
* Encodes hi and lo values into a long.
*

View File

@ -90,11 +90,11 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
* <dt>Hi</dt><dd>Total thread count or Integer.MIN_VALUE if the pool is stopping</dd>
* <dt>Lo</dt><dd>Net idle threads == idle threads - job queue size. Essentially if positive,
* this represents the effective number of idle threads, and if negative it represents the
* demand for more threads</dd>
* demand for more threads, which is equivalent to the job queue's size.</dd>
* </dl>
*/
private final AtomicBiInteger _counts = new AtomicBiInteger(Integer.MIN_VALUE, 0);
private final AtomicLong _lastShrink = new AtomicLong();
private final AtomicLong _evictThreshold = new AtomicLong();
private final Set<Thread> _threads = ConcurrentHashMap.newKeySet();
private final AutoLock.WithCondition _joinLock = new AutoLock.WithCondition();
private final BlockingQueue<Runnable> _jobs;
@ -113,6 +113,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
private ThreadPoolBudget _budget;
private long _stopTimeout;
private Executor _virtualThreadsExecutor;
private int _maxEvictCount = 1;
public QueuedThreadPool()
{
@ -218,7 +219,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
}
addBean(_tryExecutor);
_lastShrink.set(NanoTime.now());
_evictThreshold.set(NanoTime.now());
super.doStart();
// The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped.
@ -317,7 +318,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
}
}
private void joinThreads(long stopByNanos) throws InterruptedException
private void joinThreads(long stopByNanos)
{
loop : while (true)
{
@ -536,6 +537,36 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
}
}
/**
* <p>Returns the maximum number of idle threads that are evicted for every idle timeout
* period, thus shrinking this thread pool towards its {@link #getMinThreads() minimum
* number of threads}.
* The default value is {@code 1}.</p>
* <p>For example, consider a thread pool with {@code minThread=2}, {@code maxThread=20},
* {@code idleTimeout=5000} and {@code maxEvictCount=3}.
* Let's assume all 20 threads are executing a task, and they all finish their own tasks
* at the same time and no more tasks are submitted; then, 3 threads will be evicted,
* while the other 17 will wait another idle timeout; then another 3 threads will be
* evicted, and so on until {@code minThreads=2} will be reached.</p>
*
* @param evictCount the maximum number of idle threads to evict in one idle timeout period
*/
public void setMaxEvictCount(int evictCount)
{
if (evictCount < 1)
throw new IllegalArgumentException("Invalid evict count " + evictCount);
_maxEvictCount = evictCount;
}
/**
* @return the maximum number of idle threads to evict in one idle timeout period
*/
@ManagedAttribute("maximum number of idle threads to evict in one idle timeout period")
public int getMaxEvictCount()
{
return _maxEvictCount;
}
/**
* @return the number of jobs in the queue waiting for a thread
*/
@ -747,7 +778,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
// and we are not at max threads.
startThread = (idle <= 0 && threads < _maxThreads) ? 1 : 0;
// The job will be run by an idle thread when available
// Add 1|0 or 0|-1 to counts depending upon the decision to start a thread or not;
// idle can become negative which means there are queued tasks.
if (!_counts.compareAndSet(counts, threads + startThread, idle + startThread - 1))
continue;
@ -830,7 +862,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
if (LOG.isDebugEnabled())
LOG.debug("Starting {}", thread);
_threads.add(thread);
_lastShrink.set(NanoTime.now());
// Update the evict threshold to prevent thrashing of newly started threads.
_evictThreshold.set(NanoTime.now() + TimeUnit.MILLISECONDS.toNanos(_idleTimeout));
thread.start();
started = true;
}
@ -936,6 +969,79 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
job.run();
}
/**
* <p>Determines whether to evict the current thread from the pool.</p>
*
* @return whether the current thread should be evicted
*/
protected boolean evict()
{
long idleTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(getIdleTimeout());
// There is a chance that many threads enter this method concurrently,
// and if all of them are evicted the pool shrinks below minThreads.
// For example when minThreads=3, threads=8, maxEvictCount=10 we want
// to evict at most 5 threads (8-3), not 10.
// When a thread fails the CAS, it may assume that another thread has
// been evicted, so the CAS should be attempted only a number of times
// equal to the most threads we want to evict (5 in the example above).
int threads = getThreads();
int minThreads = getMinThreads();
int threadsToEvict = threads - minThreads;
while (true)
{
if (threadsToEvict > 0)
{
// We have excess threads, so check if we should evict the current thread.
long now = NanoTime.now();
long evictPeriod = idleTimeoutNanos / getMaxEvictCount();
if (LOG.isDebugEnabled())
LOG.debug("Evict check, period={}ms {}", TimeUnit.NANOSECONDS.toMillis(evictPeriod), this);
long evictThreshold = _evictThreshold.get();
long threshold = evictThreshold;
// If the threshold is too far in the past,
// advance it to be one idle timeout before now.
if (NanoTime.elapsed(threshold, now) > idleTimeoutNanos)
threshold = now - idleTimeoutNanos;
// Advance the threshold by one evict period.
threshold += evictPeriod;
// Is the new threshold in the future?
if (NanoTime.isBefore(now, threshold))
{
// Yes - we cannot evict yet, so continue looking for jobs.
if (LOG.isDebugEnabled())
LOG.debug("Evict skipped, threshold={}ms in the future {}", NanoTime.millisElapsed(now, threshold), this);
return false;
}
// We can evict if we can update the threshold.
if (_evictThreshold.compareAndSet(evictThreshold, threshold))
{
if (LOG.isDebugEnabled())
LOG.debug("Evicted, threshold={}ms in the past {}", NanoTime.millisElapsed(threshold, now), this);
return true;
}
else
{
// Some other thread was evicted.
--threadsToEvict;
}
}
else
{
// No more threads to evict, continue looking for jobs.
if (LOG.isDebugEnabled())
LOG.debug("Evict skipped, no excess threads {}", this);
return false;
}
}
}
/**
* @return the job queue
*/
@ -994,7 +1100,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
int idle = Math.max(0, AtomicBiInteger.getLo(count));
int queue = getQueueSize();
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]",
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,t=%dms,q=%d}[%s]",
getClass().getSimpleName(),
_name,
hashCode(),
@ -1004,17 +1110,18 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
getMaxThreads(),
idle,
getReservedThreads(),
NanoTime.millisUntil(_evictThreshold.get()),
queue,
_tryExecutor);
}
private class Runner implements Runnable
{
private Runnable idleJobPoll(long idleTimeout) throws InterruptedException
private Runnable idleJobPoll(long idleTimeoutNanos) throws InterruptedException
{
if (idleTimeout <= 0)
if (idleTimeoutNanos <= 0)
return _jobs.take();
return _jobs.poll(idleTimeout, TimeUnit.MILLISECONDS);
return _jobs.poll(idleTimeoutNanos, TimeUnit.NANOSECONDS);
}
@Override
@ -1026,76 +1133,41 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
boolean idle = true;
try
{
Runnable job = null;
while (true)
while (_counts.getHi() != Integer.MIN_VALUE)
{
// If we had a job,
if (job != null)
{
idle = true;
// signal that we are idle again
if (!addCounts(0, 1))
break;
}
// else check we are still running
else if (_counts.getHi() == Integer.MIN_VALUE)
{
break;
}
try
{
// Look for an immediately available job
job = _jobs.poll();
if (job == null)
long idleTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(getIdleTimeout());
Runnable job = idleJobPoll(idleTimeoutNanos);
while (job != null)
{
// No job immediately available maybe we should shrink?
long idleTimeout = getIdleTimeout();
if (idleTimeout > 0 && getThreads() > _minThreads)
{
long last = _lastShrink.get();
long now = NanoTime.now();
if (NanoTime.millisElapsed(last, now) > idleTimeout && _lastShrink.compareAndSet(last, now))
{
if (LOG.isDebugEnabled())
LOG.debug("shrinking {}", QueuedThreadPool.this);
break;
}
}
idle = false;
// Run the jobs.
if (LOG.isDebugEnabled())
LOG.debug("run {} in {}", job, QueuedThreadPool.this);
doRunJob(job);
if (LOG.isDebugEnabled())
LOG.debug("ran {} in {}", job, QueuedThreadPool.this);
// Wait for a job, only after we have checked if we should shrink
job = idleJobPoll(idleTimeout);
// Signal that we are idle again; since execute() subtracts
// 1 from idle each time a job is submitted, we have to add
// 1 for each executed job here to compensate.
if (!addCounts(0, 1))
break;
idle = true;
// If still no job?
if (job == null)
// continue to try again
continue;
// Look for another job
job = _jobs.poll();
}
idle = false;
// run job
if (LOG.isDebugEnabled())
LOG.debug("run {} in {}", job, QueuedThreadPool.this);
runJob(job);
if (LOG.isDebugEnabled())
LOG.debug("ran {} in {}", job, QueuedThreadPool.this);
if (evict())
break;
}
catch (InterruptedException e)
{
if (LOG.isDebugEnabled())
LOG.debug("interrupted {} in {}", job, QueuedThreadPool.this);
LOG.trace("IGNORED", e);
}
catch (Throwable e)
{
LOG.warn("Job failed", e);
}
finally
{
// Clear any interrupted status
Thread.interrupted();
}
}
}
finally
@ -1103,15 +1175,33 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
Thread thread = Thread.currentThread();
removeThread(thread);
// Decrement the total thread count and the idle count if we had no job
// Decrement the total thread count and the idle count if we had no job.
addCounts(-1, idle ? -1 : 0);
if (LOG.isDebugEnabled())
LOG.debug("{} exited for {}", thread, QueuedThreadPool.this);
// There is a chance that we shrunk just as a job was queued for us, so
// check again if we have sufficient threads to meet demand
// There is a chance that we shrunk just as a job was queued,
// or multiple concurrent threads ran out of jobs,
// so check again if we have sufficient threads to meet demand.
ensureThreads();
}
}
private void doRunJob(Runnable job)
{
try
{
runJob(job);
}
catch (Throwable e)
{
LOG.warn("Job failed", e);
}
finally
{
// Clear any thread interrupted status.
Thread.interrupted();
}
}
}
}

View File

@ -19,8 +19,12 @@ import java.net.URLClassLoader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.logging.StacklessLogging;
@ -32,11 +36,13 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
@ -235,6 +241,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
Thread.sleep(3L * tp.getIdleTimeout() / 2);
assertThat(tp.getThreads(), is(2));
assertThat(tp.getIdleThreads(), is(2));
assertThat(tp.getQueueSize(), is(0));
// Run job0
RunningJob job0 = new RunningJob("JOB0");
@ -242,6 +249,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertTrue(job0._run.await(10, TimeUnit.SECONDS));
assertThat(tp.getThreads(), is(2));
assertThat(tp.getIdleThreads(), is(1));
assertThat(tp.getQueueSize(), is(0));
// Run job1
RunningJob job1 = new RunningJob("JOB1");
@ -249,6 +257,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertTrue(job1._run.await(10, TimeUnit.SECONDS));
assertThat(tp.getThreads(), is(2));
assertThat(tp.getIdleThreads(), is(0));
assertThat(tp.getQueueSize(), is(0));
// Run job2
RunningJob job2 = new RunningJob("JOB2");
@ -256,6 +265,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertTrue(job2._run.await(10, TimeUnit.SECONDS));
assertThat(tp.getThreads(), is(3));
assertThat(tp.getIdleThreads(), is(0));
assertThat(tp.getQueueSize(), is(0));
// Run job3
RunningJob job3 = new RunningJob("JOB3");
@ -263,11 +273,13 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertTrue(job3._run.await(10, TimeUnit.SECONDS));
assertThat(tp.getThreads(), is(4));
assertThat(tp.getIdleThreads(), is(0));
assertThat(tp.getQueueSize(), is(0));
// Check no short term change
Thread.sleep(100);
assertThat(tp.getThreads(), is(4));
assertThat(tp.getIdleThreads(), is(0));
assertThat(tp.getQueueSize(), is(0));
// Run job4. will be queued
RunningJob job4 = new RunningJob("JOB4");
@ -287,11 +299,11 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertThat(tp.getIdleThreads(), is(0));
assertThat(tp.getQueueSize(), is(0));
// finish job 1, and its thread will become idle
// finish job 1, and its thread will become idle and then shrink
job1._stopping.countDown();
assertTrue(job1._stopped.await(10, TimeUnit.SECONDS));
waitForIdle(tp, 1);
waitForThreads(tp, 4);
waitForIdle(tp, 0);
waitForThreads(tp, 3);
// finish job 2,3,4
job2._stopping.countDown();
@ -500,7 +512,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
}
@Test
public void testShrink() throws Exception
public void testEvict() throws Exception
{
final AtomicInteger sleep = new AtomicInteger(100);
Runnable job = () ->
@ -548,7 +560,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
}
@Test
public void testSteadyShrink() throws Exception
public void testSteadyEvict() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
Runnable job = () ->
@ -815,13 +827,13 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
String dump = pool.dump();
// TODO use hamcrest 2.0 regex matcher
assertThat(dump, containsString("STOPPED"));
assertThat(dump, containsString(",3<=0<=4,i=0,r=-1,q=0"));
assertThat(dump, containsString(",3<=0<=4,i=0,r=-1,"));
assertThat(dump, containsString("[NO_TRY]"));
pool.setReservedThreads(2);
dump = pool.dump();
assertThat(dump, containsString("STOPPED"));
assertThat(dump, containsString(",3<=0<=4,i=0,r=2,q=0"));
assertThat(dump, containsString(",3<=0<=4,i=0,r=2,"));
assertThat(dump, containsString("[NO_TRY]"));
pool.start();
@ -829,7 +841,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
Thread.sleep(250); // TODO need to give time for threads to read idle poll after setting idle
dump = pool.dump();
assertThat(count(dump, " - STARTED"), is(2));
assertThat(dump, containsString(",3<=3<=4,i=3,r=2,q=0"));
assertThat(dump, containsString(",3<=3<=4,i=3,r=2,"));
assertThat(dump, containsString("[ReservedThreadExecutor@"));
assertThat(count(dump, " IDLE"), is(3));
assertThat(count(dump, " RESERVED"), is(0));
@ -852,7 +864,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
Thread.sleep(250); // TODO need to give time for threads to read idle poll after setting idle
dump = pool.dump();
assertThat(count(dump, " - STARTED"), is(2));
assertThat(dump, containsString(",3<=3<=4,i=2,r=2,q=0"));
assertThat(dump, containsString(",3<=3<=4,i=2,r=2,"));
assertThat(dump, containsString("[ReservedThreadExecutor@"));
assertThat(count(dump, " IDLE"), is(2));
assertThat(count(dump, " WAITING"), is(1));
@ -862,7 +874,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
pool.setDetailedDump(true);
dump = pool.dump();
assertThat(count(dump, " - STARTED"), is(2));
assertThat(dump, containsString(",3<=3<=4,i=2,r=2,q=0"));
assertThat(dump, containsString(",3<=3<=4,i=2,r=2,"));
assertThat(dump, containsString("reserved=0/2"));
assertThat(dump, containsString("[ReservedThreadExecutor@"));
assertThat(count(dump, " IDLE"), is(2));
@ -877,7 +889,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
Thread.sleep(250); // TODO need to give time for threads to read idle poll after setting idle
dump = pool.dump();
assertThat(count(dump, " - STARTED"), is(2));
assertThat(dump, containsString(",3<=3<=4,i=1,r=2,q=0"));
assertThat(dump, containsString(",3<=3<=4,i=1,r=2,"));
assertThat(dump, containsString("reserved=1/2"));
assertThat(dump, containsString("[ReservedThreadExecutor@"));
assertThat(count(dump, " IDLE"), is(1));
@ -1022,6 +1034,189 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertTrue(stopping._completed.await(5, TimeUnit.SECONDS));
}
@Test
public void testInterruptFlagClearedBetweenDelayedExecutions() throws Exception
{
QueuedThreadPool tp = new QueuedThreadPool(1, 1);
tp.setReservedThreads(0);
tp.start();
AtomicInteger executions = new AtomicInteger();
tp.execute(() ->
{
Thread.currentThread().interrupt();
executions.incrementAndGet();
});
await().atMost(5, TimeUnit.SECONDS).until(() -> executions.get() == 1);
AtomicBoolean intr = new AtomicBoolean();
tp.execute(() ->
{
intr.set(Thread.currentThread().isInterrupted());
executions.incrementAndGet();
});
await().atMost(5, TimeUnit.SECONDS).until(() -> executions.get() == 2);
assertThat(intr.get(), is(false));
}
@Test
public void testInterruptFlagClearedBetweenQueuedJobsExecutions() throws Exception
{
QueuedThreadPool tp = new QueuedThreadPool(1, 1);
tp.setReservedThreads(0);
tp.start();
AtomicBoolean intr = new AtomicBoolean();
CyclicBarrier barrier = new CyclicBarrier(2);
tp.execute(() ->
{
try
{
barrier.await(); // wait until the main thread enqueued another job
}
catch (InterruptedException | BrokenBarrierException e)
{
e.printStackTrace();
intr.set(true);
}
Thread.currentThread().interrupt();
});
tp.execute(() ->
{
intr.set(Thread.interrupted());
try
{
barrier.await(); // notify that this thread is over
}
catch (InterruptedException | BrokenBarrierException e)
{
e.printStackTrace();
intr.set(true);
}
});
barrier.await(); // tell the 1st execute we enqueued the 2nd job
barrier.await(); // wait until 2nd execute is done
assertThat(intr.get(), is(false));
}
@Test
public void testEvictCount() throws Exception
{
QueuedThreadPool tp = new QueuedThreadPool();
int minThreads = 2;
tp.setMinThreads(minThreads);
int maxThreads = 10;
tp.setMaxThreads(maxThreads);
int idleTimeout = 1000;
tp.setIdleTimeout(idleTimeout);
int evictCount = 3;
tp.setMaxEvictCount(evictCount);
tp.start();
waitForThreads(tp, minThreads);
waitForIdle(tp, minThreads);
RunningJob[] jobs = new RunningJob[maxThreads];
for (int i = 0; i < jobs.length; i++)
{
RunningJob job = jobs[i] = new RunningJob("JOB" + i);
tp.execute(job);
assertTrue(job._run.await(1, TimeUnit.SECONDS));
}
for (int i = 0; i < jobs.length; i++)
{
jobs[i]._stopping.countDown();
}
assertEquals(maxThreads, tp.getThreads());
Thread.sleep(idleTimeout * 2 + idleTimeout / 2);
assertEquals(maxThreads - tp.getMaxEvictCount(), tp.getThreads());
Thread.sleep(idleTimeout);
assertEquals(maxThreads - 2 * tp.getMaxEvictCount(), tp.getThreads());
Thread.sleep(idleTimeout);
assertEquals(minThreads, tp.getThreads());
}
@Test
public void testRealistic() throws Exception
{
final int spikeThreads = 1000;
final int busyThreads = 200;
final int idleTimeout = 2000;
final int evictCount = 200;
final int jobDuration = 10;
final Random random = new Random();
QueuedThreadPool qtp = new QueuedThreadPool(2 * spikeThreads, busyThreads / 2);
qtp.setIdleTimeout(idleTimeout);
qtp.setMaxEvictCount(evictCount);
qtp.start();
CountDownLatch spike = new CountDownLatch(spikeThreads);
for (int i = 0; i < spikeThreads; i++)
qtp.execute(job(spike, 100 + random.nextInt(2 * jobDuration)));
spike.await();
System.err.printf("busy=%d %s\n", qtp.getBusyThreads(), qtp);
// keep threads busy
long last = System.nanoTime();
while (true)
{
if (NanoTime.secondsSince(last) > 1)
{
last = System.nanoTime();
System.err.printf("busy=%d %s\n", qtp.getBusyThreads(), qtp);
if (qtp.getThreads() < (busyThreads * 3 / 2))
break;
}
try
{
if (qtp.getBusyThreads() < busyThreads)
{
CountDownLatch start = new CountDownLatch(1);
qtp.execute(job(start, random.nextInt(jobDuration) + jobDuration / 2));
start.await();
continue;
}
Thread.sleep(random.nextInt(jobDuration / 4));
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
qtp.stop();
}
Runnable job(CountDownLatch started, int duration)
{
return new Runnable()
{
@Override
public void run()
{
try
{
started.countDown();
Thread.sleep(duration);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
};
}
private int count(String s, String p)
{
int c = 0;

View File

@ -159,7 +159,7 @@ public class ReservedThreadExecutorTest
}
@Test
public void testShrink() throws Exception
public void testEvict() throws Exception
{
final long IDLE = 1000;
@ -183,7 +183,7 @@ public class ReservedThreadExecutorTest
}
@Test
public void testBusyShrink() throws Exception
public void testBusyEvict() throws Exception
{
final long IDLE = 1000;

View File

@ -0,0 +1,93 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.util.thread.jmh;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
@State(Scope.Benchmark)
@Warmup(iterations = 10, time = 1000, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 10, time = 1000, timeUnit = TimeUnit.MILLISECONDS)
public class QueuedThreadPoolBenchmark
{
QueuedThreadPool pool;
private CountDownLatch[] latches;
@Setup // (Level.Iteration)
public void buildPool()
{
pool = new QueuedThreadPool(200, 200);
pool.setReservedThreads(0);
LifeCycle.start(pool);
latches = new CountDownLatch[50];
for (int i = 0; i < latches.length; i++)
{
latches[i] = new CountDownLatch(1);
}
}
@TearDown // (Level.Iteration)
public void shutdownPool()
{
System.err.println(pool);
LifeCycle.stop(pool);
pool = null;
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
@Threads(8)
public void test() throws Exception
{
for (CountDownLatch latch : latches)
{
pool.execute(latch::countDown);
}
for (CountDownLatch latch : latches)
{
latch.await();
}
}
public static void main(String[] args) throws RunnerException
{
Options opt = new OptionsBuilder()
.include(QueuedThreadPoolBenchmark.class.getSimpleName())
.forks(1)
// .addProfiler(CompilerProfiler.class)
// .addProfiler(LinuxPerfProfiler.class)
// .addProfiler(LinuxPerfNormProfiler.class)
// .addProfiler(LinuxPerfAsmProfiler.class, "hotThreshold=0.05")
.build();
new Runner(opt).run();
}
}