Issue #5994 - QueuedThreadPool "free" threads

Updates after review.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-02-23 18:12:55 +01:00
parent 64e01beeeb
commit 530c14e7b3
4 changed files with 397 additions and 224 deletions

View File

@ -77,12 +77,13 @@ public class MBeanContainerLifeCycleTest
String pkg = bean.getClass().getPackage().getName();
Set<ObjectName> objectNames = mbeanServer.queryNames(ObjectName.getInstance(pkg + ":*"), null);
assertEquals(1, objectNames.size());
// QueuedThreadPool and ThreadPoolBudget.
assertEquals(2, objectNames.size());
container.stop();
objectNames = mbeanServer.queryNames(ObjectName.getInstance(pkg + ":*"), null);
assertEquals(1, objectNames.size());
assertEquals(2, objectNames.size());
// Remove the MBeans to start clean on the next test.
objectNames.forEach(objectName ->
@ -105,7 +106,8 @@ public class MBeanContainerLifeCycleTest
String pkg = bean.getClass().getPackage().getName();
Set<ObjectName> objectNames = mbeanServer.queryNames(ObjectName.getInstance(pkg + ":*"), null);
assertEquals(1, objectNames.size());
// QueuedThreadPool and ThreadPoolBudget.
assertEquals(2, objectNames.size());
container.stop();
container.destroy();

View File

@ -45,6 +45,40 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
/**
* <p>A thread pool with a queue of jobs to execute.</p>
* <p>Jetty components that need threads (such as network acceptors and selector) may lease threads
* from this thread pool using a {@link ThreadPoolBudget}; these threads are "active" from the point
* of view of the thread pool, but not available to run <em>transient</em> jobs such as processing
* an HTTP request or a WebSocket frame.</p>
* <p>QueuedThreadPool has a {@link ReservedThreadExecutor} which leases threads from this pool,
* but makes them available as if they are "idle" threads.</p>
* <p>QueuedThreadPool therefore has the following <em>fundamental</em> values:</p>
* <ul>
* <li>{@link #getThreads() threads}: the current number of threads. These threads may execute
* a job (either internal or transient), or may be ready to run (either idle or reserved).
* This number may grow or shrink as the thread pool grows or shrinks.</li>
* <li>{@link #getReadyThreads() readyThreads}: the current number of threads that are ready to
* run transient jobs.
* This number may grow or shrink as the thread pool grows or shrinks.</li>
* <li>{@link #getLeasedThreads() leasedThreads}: the number of threads that run internal jobs.
* This number is typically constant after this thread pool is {@link #start() started}.</li>
* </ul>
* <p>Given the definitions above, the most interesting definitions are:</p>
* <ul>
* <li>{@link #getThreads() threads} = {@link #getReadyThreads() readyThreads} + {@link #getLeasedThreads() leasedThreads} + {@link #getUtilizedThreads() utilizedThreads}</li>
* <li>readyThreads = {@link #getIdleThreads() idleThreads} + {@link #getAvailableReservedThreads() availableReservedThreads}</li>
* <li>{@link #getMaxAvailableThreads() maxAvailableThreads} = {@link #getMaxThreads() maxThreads} - leasedThreads</li>
* <li>{@link #getUtilizationRate() utilizationRate} = utilizedThreads / maxAvailableThreads</li>
* </ul>
* <p>Other definitions, typically less interesting because they take into account threads that
* execute internal jobs, or because they don't take into account available reserved threads
* (that are essentially ready to execute transient jobs), are:</p>
* <ul>
* <li>{@link #getBusyThreads() busyThreads} = utilizedThreads + leasedThreads</li>
* <li>{@link #getIdleThreads()} idleThreads} = readyThreads - availableReservedThreads</li>
* </ul>
*/
@ManagedObject("A thread pool")
public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor
{
@ -287,23 +321,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
}
/**
* Thread Pool should use Daemon Threading.
*
* @param daemon true to enable delegation
* @see Thread#setDaemon(boolean)
* @return the maximum thread idle time in ms
*/
public void setDaemon(boolean daemon)
@ManagedAttribute("maximum time a thread may be idle in ms")
public int getIdleTimeout()
{
_daemon = daemon;
return _idleTimeout;
}
/**
* Set the maximum thread idle time.
* Threads that are idle for longer than this period may be
* stopped.
* <p>Set the maximum thread idle time in ms.</p>
* <p>Threads that are idle for longer than this period may be stopped.</p>
*
* @param idleTimeout Max idle time in ms.
* @see #getIdleTimeout
* @param idleTimeout the maximum thread idle time in ms
*/
public void setIdleTimeout(int idleTimeout)
{
@ -314,10 +344,17 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
}
/**
* Set the maximum number of threads.
*
* @param maxThreads maximum number of threads.
* @see #getMaxThreads
* @return the maximum number of threads
*/
@Override
@ManagedAttribute("maximum number of threads in the pool")
public int getMaxThreads()
{
return _maxThreads;
}
/**
* @param maxThreads the maximum number of threads
*/
@Override
public void setMaxThreads(int maxThreads)
@ -330,10 +367,17 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
}
/**
* Set the minimum number of threads.
*
* @return the minimum number of threads
*/
@Override
@ManagedAttribute("minimum number of threads in the pool")
public int getMinThreads()
{
return _minThreads;
}
/**
* @param minThreads minimum number of threads
* @see #getMinThreads
*/
@Override
public void setMinThreads(int minThreads)
@ -348,10 +392,16 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
}
/**
* Set the number of reserved threads.
*
* @return number of reserved threads or -1 for heuristically determined
*/
@ManagedAttribute("number of configured reserved threads or -1 for heuristic")
public int getReservedThreads()
{
return _reservedThreads;
}
/**
* @param reservedThreads number of reserved threads or -1 for heuristically determined
* @see #getReservedThreads
*/
public void setReservedThreads(int reservedThreads)
{
@ -361,101 +411,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
}
/**
* @param name Name of this thread pool to use when naming threads.
*/
public void setName(String name)
{
if (isRunning())
throw new IllegalStateException("started");
_name = name;
}
/**
* Set the priority of the pool threads.
*
* @param priority the new thread priority.
*/
public void setThreadsPriority(int priority)
{
_priority = priority;
}
/**
* Get the maximum thread idle time.
*
* @return Max idle time in ms.
* @see #setIdleTimeout
*/
@ManagedAttribute("maximum time a thread may be idle in ms")
public int getIdleTimeout()
{
return _idleTimeout;
}
/**
* Get the maximum number of threads.
*
* @return maximum number of threads.
* @see #setMaxThreads
*/
@Override
@ManagedAttribute("maximum number of threads in the pool")
public int getMaxThreads()
{
return _maxThreads;
}
/**
* Get the minimum number of threads.
*
* @return minimum number of threads.
* @see #setMinThreads
*/
@Override
@ManagedAttribute("minimum number of threads in the pool")
public int getMinThreads()
{
return _minThreads;
}
/**
* Get the number of reserved threads.
*
* @return number of reserved threads or or -1 for heuristically determined
* @see #setReservedThreads
*/
@ManagedAttribute("number of configured reserved threads")
public int getReservedThreads()
{
return _reservedThreads;
}
@ManagedAttribute("maximum number of reserved threads")
public int getMaxReservedThreads()
{
TryExecutor tryExecutor = _tryExecutor;
if (tryExecutor instanceof ReservedThreadExecutor)
{
ReservedThreadExecutor reservedThreadExecutor = (ReservedThreadExecutor)tryExecutor;
return reservedThreadExecutor.getCapacity();
}
return 0;
}
@ManagedAttribute("number of available reserved threads")
public int getAvailableReservedThreads()
{
TryExecutor tryExecutor = _tryExecutor;
if (tryExecutor instanceof ReservedThreadExecutor)
{
ReservedThreadExecutor reservedThreadExecutor = (ReservedThreadExecutor)tryExecutor;
return reservedThreadExecutor.getAvailable();
}
return 0;
}
/**
* @return The name of the this thread pool
* @return the name of the this thread pool
*/
@ManagedAttribute("name of the thread pool")
public String getName()
@ -464,9 +420,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
}
/**
* Get the priority of the pool threads.
* <p>Sets the name of this thread pool, used as a prefix for the thread names.</p>
*
* @return the priority of the pool threads.
* @param name the name of the this thread pool
*/
public void setName(String name)
{
if (isRunning())
throw new IllegalStateException(getState());
_name = name;
}
/**
* @return the priority of the pool threads
*/
@ManagedAttribute("priority of threads in the pool")
public int getThreadsPriority()
@ -475,21 +441,16 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
}
/**
* Get the size of the job queue.
*
* @return Number of jobs queued waiting for a thread
* @param priority the priority of the pool threads
*/
@ManagedAttribute("size of the job queue")
public int getQueueSize()
public void setThreadsPriority(int priority)
{
// The idle counter encodes demand, which is the effective queue size
int idle = _counts.getLo();
return Math.max(0, -idle);
_priority = priority;
}
/**
* @return whether this thread pool is using daemon threads
* @see Thread#setDaemon(boolean)
* @return whether to use daemon threads
* @see Thread#isDaemon()
*/
@ManagedAttribute("thread pool uses daemon threads")
public boolean isDaemon()
@ -497,6 +458,15 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
return _daemon;
}
/**
* @param daemon whether to use daemon threads
* @see Thread#setDaemon(boolean)
*/
public void setDaemon(boolean daemon)
{
_daemon = daemon;
}
@ManagedAttribute("reports additional details in the dump")
public boolean isDetailedDump()
{
@ -519,6 +489,193 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
_lowThreadsThreshold = lowThreadsThreshold;
}
/**
* @return the number of jobs in the queue waiting for a thread
*/
@ManagedAttribute("size of the job queue")
public int getQueueSize()
{
// The idle counter encodes demand, which is the effective queue size
int idle = _counts.getLo();
return Math.max(0, -idle);
}
/**
* @return the maximum number (capacity) of reserved threads
* @see ReservedThreadExecutor#getCapacity()
*/
@ManagedAttribute("maximum number (capacity) of reserved threads")
public int getMaxReservedThreads()
{
TryExecutor tryExecutor = _tryExecutor;
if (tryExecutor instanceof ReservedThreadExecutor)
{
ReservedThreadExecutor reservedThreadExecutor = (ReservedThreadExecutor)tryExecutor;
return reservedThreadExecutor.getCapacity();
}
return 0;
}
/**
* @return the number of available reserved threads
* @see ReservedThreadExecutor#getAvailable()
*/
@ManagedAttribute("number of available reserved threads")
public int getAvailableReservedThreads()
{
TryExecutor tryExecutor = _tryExecutor;
if (tryExecutor instanceof ReservedThreadExecutor)
{
ReservedThreadExecutor reservedThreadExecutor = (ReservedThreadExecutor)tryExecutor;
return reservedThreadExecutor.getAvailable();
}
return 0;
}
/**
* <p>The <em>fundamental</em> value that represents the number of threads currently known by this thread pool.</p>
* <p>This value includes threads that have been leased to internal components, idle threads, reserved threads
* and threads that are executing transient jobs.</p>
*
* @return the number of threads currently known to the pool
* @see #getReadyThreads()
* @see #getLeasedThreads()
*/
@Override
@ManagedAttribute("number of threads in the pool")
public int getThreads()
{
int threads = _counts.getHi();
return Math.max(0, threads);
}
/**
* <p>The <em>fundamental</em> value that represents the number of threads ready to execute transient jobs.</p>
*
* @return the number of threads ready to execute transient jobs
* @see #getThreads()
* @see #getLeasedThreads()
* @see #getUtilizedThreads()
*/
@ManagedAttribute("number of threads ready to execute transient jobs")
public int getReadyThreads()
{
return getIdleThreads() + getAvailableReservedThreads();
}
/**
* <p>The <em>fundamental</em> value that represents the number of threads that are leased
* to internal components, and therefore cannot be used to execute transient jobs.</p>
*
* @return the number of threads currently used by internal components
* @see #getThreads()
* @see #getReadyThreads()
*/
@ManagedAttribute("number of threads used by internal components")
public int getLeasedThreads()
{
return getMaxLeasedThreads() - getMaxReservedThreads();
}
/**
* <p>The maximum number of threads that are leased to internal components,
* as some component may allocate its threads lazily.</p>
*
* @return the maximum number of threads leased by internal components
* @see #getLeasedThreads()
*/
@ManagedAttribute("maximum number of threads leased to internal components")
public int getMaxLeasedThreads()
{
ThreadPoolBudget budget = _budget;
return budget == null ? 0 : budget.getLeasedThreads();
}
/**
* <p>The number of idle threads, but without including reserved threads.</p>
* <p>Prefer {@link #getReadyThreads()} for a better representation of
* "threads ready to execute transient jobs".</p>
*
* @return the number of idle threads but not reserved
* @see #getReadyThreads()
*/
@Override
@ManagedAttribute("number of idle threads but not reserved")
public int getIdleThreads()
{
int idle = _counts.getLo();
return Math.max(0, idle);
}
/**
* <p>The number of threads executing internal and transient jobs.</p>
* <p>Prefer {@link #getUtilizedThreads()} for a better representation of
* "threads executing transient jobs".</p>
*
* @return the number of threads executing internal and transient jobs
* @see #getUtilizedThreads()
*/
@ManagedAttribute("number of threads executing internal and transient jobs")
public int getBusyThreads()
{
return getThreads() - getReadyThreads();
}
/**
* <p>The number of threads executing transient jobs.</p>
*
* @return the number of threads executing transient jobs
* @see #getReadyThreads()
*/
@ManagedAttribute("number of threads executing transient jobs")
public int getUtilizedThreads()
{
return getThreads() - getLeasedThreads() - getReadyThreads();
}
/**
* <p>The maximum number of threads available to run transient jobs.</p>
*
* @return the maximum number of threads available to run transient jobs
*/
@ManagedAttribute("maximum number of threads available to run transient jobs")
public int getMaxAvailableThreads()
{
return getMaxThreads() - getLeasedThreads();
}
/**
* <p>The rate between the number of {@link #getUtilizedThreads() utilized threads}
* and the maximum number of {@link #getMaxAvailableThreads() utilizable threads}.</p>
* <p>A value of {@code 0.0D} means that the thread pool is not utilized, while a
* value of {@code 1.0D} means that the thread pool is fully utilized to execute
* transient jobs.</p>
*
* @return the utilization rate of threads executing transient jobs
*/
@ManagedAttribute("utilization rate of threads executing transient jobs")
public double getUtilizationRate()
{
return (double)getUtilizedThreads() / getMaxAvailableThreads();
}
/**
* <p>Returns whether this thread pool is low on threads.</p>
* <p>The current formula is:</p>
* <pre>
* maxThreads - threads + readyThreads - queueSize &lt;= lowThreadsThreshold
* </pre>
*
* @return whether the pool is low on threads
* @see #getLowThreadsThreshold()
*/
@Override
@ManagedAttribute(value = "thread pool is low on threads", readonly = true)
public boolean isLowOnThreads()
{
return getMaxThreads() - getThreads() + getReadyThreads() - getQueueSize() <= getLowThreadsThreshold();
}
@Override
public void execute(Runnable job)
{
@ -593,73 +750,6 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
}
}
/**
* @return the total number of threads currently in the pool
*/
@Override
@ManagedAttribute("number of threads in the pool")
public int getThreads()
{
int threads = _counts.getHi();
return Math.max(0, threads);
}
/**
* @return the number of idle threads in the pool
*/
@Override
@ManagedAttribute("number of idle threads in the pool")
public int getIdleThreads()
{
int idle = _counts.getLo();
return Math.max(0, idle);
}
/**
* @return the number of busy threads in the pool
*/
@ManagedAttribute("number of busy threads in the pool")
public int getBusyThreads()
{
return getThreads() - getIdleThreads() - getAvailableReservedThreads();
}
@ManagedAttribute("number of potentially available threads in the pool")
public int getAvailableThreads()
{
return getMaxThreads() - getBusyThreads();
}
@ManagedAttribute("number of currently available threads in the pool")
public int getReadyThreads()
{
return getThreads() - getBusyThreads();
}
@ManagedAttribute("number of threads leased to components")
public int getLeasedThreads()
{
ThreadPoolBudget budget = _budget;
return budget == null ? 0 : budget.getLeasedThreads();
}
/**
* <p>Returns whether this thread pool is low on threads.</p>
* <p>The current formula is:</p>
* <pre>
* maxThreads - threads + idleThreads - queueSize &lt;= lowThreadsThreshold
* </pre>
*
* @return whether the pool is low on threads
* @see #getLowThreadsThreshold()
*/
@Override
@ManagedAttribute(value = "thread pool is low on threads", readonly = true)
public boolean isLowOnThreads()
{
return getMaxThreads() - getThreads() + getIdleThreads() - getQueueSize() <= getLowThreadsThreshold();
}
private void ensureThreads()
{
while (true)
@ -779,28 +869,6 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
return "";
}
@Override
public String toString()
{
long count = _counts.get();
int threads = Math.max(0, AtomicBiInteger.getHi(count));
int idle = Math.max(0, AtomicBiInteger.getLo(count));
int queue = getQueueSize();
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]",
getClass().getSimpleName(),
_name,
hashCode(),
getState(),
getMinThreads(),
threads,
getMaxThreads(),
idle,
getReservedThreads(),
queue,
_tryExecutor);
}
private final Runnable _runnable = new Runner();
/**
@ -874,6 +942,28 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
return null;
}
@Override
public String toString()
{
long count = _counts.get();
int threads = Math.max(0, AtomicBiInteger.getHi(count));
int idle = Math.max(0, AtomicBiInteger.getLo(count));
int queue = getQueueSize();
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]",
getClass().getSimpleName(),
_name,
hashCode(),
getState(),
getMinThreads(),
threads,
getMaxThreads(),
idle,
getReservedThreads(),
queue,
_tryExecutor);
}
private class Runner implements Runnable
{
private Runnable idleJobPoll(long idleTimeout) throws InterruptedException

View File

@ -111,12 +111,18 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
return _executor;
}
/**
* @return the maximum number of reserved threads
*/
@ManagedAttribute(value = "max number of reserved threads", readonly = true)
public int getCapacity()
{
return _capacity;
}
/**
* @return the number of threads available to {@link #tryExecute(Runnable)}
*/
@ManagedAttribute(value = "available reserved threads", readonly = true)
public int getAvailable()
{
@ -196,8 +202,10 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
}
/**
* @param task The task to run
* @return True iff a reserved thread was available and has been assigned the task to run.
* <p>Executes the given task if and only if a reserved thread is available.</p>
*
* @param task the task to run
* @return true if and only if a reserved thread was available and has been assigned the task to run.
*/
@Override
public boolean tryExecute(Runnable task)

View File

@ -21,6 +21,8 @@ package org.eclipse.jetty.util.thread;
import java.io.Closeable;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -194,7 +196,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
waitForIdle(tp, 2);
// Doesn't shrink to less than min threads
Thread.sleep(3 * tp.getIdleTimeout() / 2);
Thread.sleep(3L * tp.getIdleTimeout() / 2);
assertThat(tp.getThreads(), is(2));
assertThat(tp.getIdleThreads(), is(2));
@ -296,7 +298,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
waitForIdle(tp, 2);
// Doesn't shrink to less than min threads
Thread.sleep(3 * tp.getIdleTimeout() / 2);
Thread.sleep(3L * tp.getIdleTimeout() / 2);
waitForThreads(tp, 2);
waitForIdle(tp, 2);
@ -857,6 +859,77 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertThat(t.getContextClassLoader(), Matchers.equalTo(QueuedThreadPool.class.getClassLoader()));
}
}
@Test
public void testThreadCounts() throws Exception
{
int maxThreads = 100;
QueuedThreadPool tp = new QueuedThreadPool(maxThreads, 0);
// Long timeout so it does not expire threads during the test.
tp.setIdleTimeout(60000);
int reservedThreads = 7;
tp.setReservedThreads(reservedThreads);
tp.start();
int leasedThreads = 5;
tp.getThreadPoolBudget().leaseTo(new Object(), leasedThreads);
List<RunningJob> leasedJobs = new ArrayList<>();
for (int i = 0; i < leasedThreads; ++i)
{
RunningJob job = new RunningJob("JOB" + i);
leasedJobs.add(job);
tp.execute(job);
assertTrue(job._run.await(5, TimeUnit.SECONDS));
}
// Run some job to spawn threads.
for (int i = 0; i < 3; ++i)
{
tp.tryExecute(() -> {});
}
int spawned = 13;
List<RunningJob> jobs = new ArrayList<>();
for (int i = 0; i < spawned; ++i)
{
RunningJob job = new RunningJob("JOB" + i);
jobs.add(job);
tp.execute(job);
assertTrue(job._run.await(5, TimeUnit.SECONDS));
}
for (RunningJob job : jobs)
{
job._stopping.countDown();
}
// Wait for the threads to become idle again.
Thread.sleep(1000);
// Submit less jobs to the queue so we have active and idle threads.
jobs.clear();
int transientJobs = spawned / 2;
for (int i = 0; i < transientJobs; ++i)
{
RunningJob job = new RunningJob("JOB" + i);
jobs.add(job);
tp.execute(job);
assertTrue(job._run.await(5, TimeUnit.SECONDS));
}
try
{
assertThat(tp.getMaxReservedThreads(), Matchers.equalTo(reservedThreads));
assertThat(tp.getLeasedThreads(), Matchers.equalTo(leasedThreads));
assertThat(tp.getReadyThreads(), Matchers.equalTo(tp.getIdleThreads() + tp.getAvailableReservedThreads()));
assertThat(tp.getUtilizedThreads(), Matchers.equalTo(transientJobs));
assertThat(tp.getThreads(), Matchers.equalTo(tp.getReadyThreads() + tp.getLeasedThreads() + tp.getUtilizedThreads()));
assertThat(tp.getBusyThreads(), Matchers.equalTo(tp.getUtilizedThreads() + tp.getLeasedThreads()));
}
finally
{
jobs.forEach(job -> job._stopping.countDown());
leasedJobs.forEach(job -> job._stopping.countDown());
tp.stop();
}
}
private int count(String s, String p)
{