Merge pull request #5995 from eclipse/jetty-9.4.x-5994-qtp_free_threads
Fixes #5994 - QueuedThreadPool "free" threads
This commit is contained in:
commit
c9cd1e4758
|
@ -77,12 +77,13 @@ public class MBeanContainerLifeCycleTest
|
|||
|
||||
String pkg = bean.getClass().getPackage().getName();
|
||||
Set<ObjectName> objectNames = mbeanServer.queryNames(ObjectName.getInstance(pkg + ":*"), null);
|
||||
assertEquals(1, objectNames.size());
|
||||
// QueuedThreadPool and ThreadPoolBudget.
|
||||
assertEquals(2, objectNames.size());
|
||||
|
||||
container.stop();
|
||||
|
||||
objectNames = mbeanServer.queryNames(ObjectName.getInstance(pkg + ":*"), null);
|
||||
assertEquals(1, objectNames.size());
|
||||
assertEquals(2, objectNames.size());
|
||||
|
||||
// Remove the MBeans to start clean on the next test.
|
||||
objectNames.forEach(objectName ->
|
||||
|
@ -105,7 +106,8 @@ public class MBeanContainerLifeCycleTest
|
|||
|
||||
String pkg = bean.getClass().getPackage().getName();
|
||||
Set<ObjectName> objectNames = mbeanServer.queryNames(ObjectName.getInstance(pkg + ":*"), null);
|
||||
assertEquals(1, objectNames.size());
|
||||
// QueuedThreadPool and ThreadPoolBudget.
|
||||
assertEquals(2, objectNames.size());
|
||||
|
||||
container.stop();
|
||||
container.destroy();
|
||||
|
|
|
@ -20,8 +20,6 @@ package org.eclipse.jetty.util.thread;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
@ -47,11 +45,45 @@ import org.eclipse.jetty.util.log.Log;
|
|||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
|
||||
|
||||
/**
|
||||
* <p>A thread pool with a queue of jobs to execute.</p>
|
||||
* <p>Jetty components that need threads (such as network acceptors and selector) may lease threads
|
||||
* from this thread pool using a {@link ThreadPoolBudget}; these threads are "active" from the point
|
||||
* of view of the thread pool, but not available to run <em>transient</em> jobs such as processing
|
||||
* an HTTP request or a WebSocket frame.</p>
|
||||
* <p>QueuedThreadPool has a {@link ReservedThreadExecutor} which leases threads from this pool,
|
||||
* but makes them available as if they are "idle" threads.</p>
|
||||
* <p>QueuedThreadPool therefore has the following <em>fundamental</em> values:</p>
|
||||
* <ul>
|
||||
* <li>{@link #getThreads() threads}: the current number of threads. These threads may execute
|
||||
* a job (either internal or transient), or may be ready to run (either idle or reserved).
|
||||
* This number may grow or shrink as the thread pool grows or shrinks.</li>
|
||||
* <li>{@link #getReadyThreads() readyThreads}: the current number of threads that are ready to
|
||||
* run transient jobs.
|
||||
* This number may grow or shrink as the thread pool grows or shrinks.</li>
|
||||
* <li>{@link #getLeasedThreads() leasedThreads}: the number of threads that run internal jobs.
|
||||
* This number is typically constant after this thread pool is {@link #start() started}.</li>
|
||||
* </ul>
|
||||
* <p>Given the definitions above, the most interesting definitions are:</p>
|
||||
* <ul>
|
||||
* <li>{@link #getThreads() threads} = {@link #getReadyThreads() readyThreads} + {@link #getLeasedThreads() leasedThreads} + {@link #getUtilizedThreads() utilizedThreads}</li>
|
||||
* <li>readyThreads = {@link #getIdleThreads() idleThreads} + {@link #getAvailableReservedThreads() availableReservedThreads}</li>
|
||||
* <li>{@link #getMaxAvailableThreads() maxAvailableThreads} = {@link #getMaxThreads() maxThreads} - leasedThreads</li>
|
||||
* <li>{@link #getUtilizationRate() utilizationRate} = utilizedThreads / maxAvailableThreads</li>
|
||||
* </ul>
|
||||
* <p>Other definitions, typically less interesting because they take into account threads that
|
||||
* execute internal jobs, or because they don't take into account available reserved threads
|
||||
* (that are essentially ready to execute transient jobs), are:</p>
|
||||
* <ul>
|
||||
* <li>{@link #getBusyThreads() busyThreads} = utilizedThreads + leasedThreads</li>
|
||||
* <li>{@link #getIdleThreads()} idleThreads} = readyThreads - availableReservedThreads</li>
|
||||
* </ul>
|
||||
*/
|
||||
@ManagedObject("A thread pool")
|
||||
public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
|
||||
private static Runnable NOOP = () ->
|
||||
private static final Runnable NOOP = () ->
|
||||
{
|
||||
};
|
||||
|
||||
|
@ -158,6 +190,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
{
|
||||
if (budget != null && budget.getSizedThreadPool() != this)
|
||||
throw new IllegalArgumentException();
|
||||
updateBean(_budget, budget);
|
||||
_budget = budget;
|
||||
}
|
||||
|
||||
|
@ -288,23 +321,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
}
|
||||
|
||||
/**
|
||||
* Thread Pool should use Daemon Threading.
|
||||
*
|
||||
* @param daemon true to enable delegation
|
||||
* @see Thread#setDaemon(boolean)
|
||||
* @return the maximum thread idle time in ms
|
||||
*/
|
||||
public void setDaemon(boolean daemon)
|
||||
@ManagedAttribute("maximum time a thread may be idle in ms")
|
||||
public int getIdleTimeout()
|
||||
{
|
||||
_daemon = daemon;
|
||||
return _idleTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the maximum thread idle time.
|
||||
* Threads that are idle for longer than this period may be
|
||||
* stopped.
|
||||
* <p>Set the maximum thread idle time in ms.</p>
|
||||
* <p>Threads that are idle for longer than this period may be stopped.</p>
|
||||
*
|
||||
* @param idleTimeout Max idle time in ms.
|
||||
* @see #getIdleTimeout
|
||||
* @param idleTimeout the maximum thread idle time in ms
|
||||
*/
|
||||
public void setIdleTimeout(int idleTimeout)
|
||||
{
|
||||
|
@ -315,10 +344,17 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
}
|
||||
|
||||
/**
|
||||
* Set the maximum number of threads.
|
||||
*
|
||||
* @param maxThreads maximum number of threads.
|
||||
* @see #getMaxThreads
|
||||
* @return the maximum number of threads
|
||||
*/
|
||||
@Override
|
||||
@ManagedAttribute("maximum number of threads in the pool")
|
||||
public int getMaxThreads()
|
||||
{
|
||||
return _maxThreads;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxThreads the maximum number of threads
|
||||
*/
|
||||
@Override
|
||||
public void setMaxThreads(int maxThreads)
|
||||
|
@ -331,10 +367,17 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
}
|
||||
|
||||
/**
|
||||
* Set the minimum number of threads.
|
||||
*
|
||||
* @return the minimum number of threads
|
||||
*/
|
||||
@Override
|
||||
@ManagedAttribute("minimum number of threads in the pool")
|
||||
public int getMinThreads()
|
||||
{
|
||||
return _minThreads;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param minThreads minimum number of threads
|
||||
* @see #getMinThreads
|
||||
*/
|
||||
@Override
|
||||
public void setMinThreads(int minThreads)
|
||||
|
@ -349,10 +392,16 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
}
|
||||
|
||||
/**
|
||||
* Set the number of reserved threads.
|
||||
*
|
||||
* @return number of reserved threads or -1 for heuristically determined
|
||||
*/
|
||||
@ManagedAttribute("number of configured reserved threads or -1 for heuristic")
|
||||
public int getReservedThreads()
|
||||
{
|
||||
return _reservedThreads;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param reservedThreads number of reserved threads or -1 for heuristically determined
|
||||
* @see #getReservedThreads
|
||||
*/
|
||||
public void setReservedThreads(int reservedThreads)
|
||||
{
|
||||
|
@ -362,83 +411,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
}
|
||||
|
||||
/**
|
||||
* @param name Name of this thread pool to use when naming threads.
|
||||
*/
|
||||
public void setName(String name)
|
||||
{
|
||||
if (isRunning())
|
||||
throw new IllegalStateException("started");
|
||||
_name = name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the priority of the pool threads.
|
||||
*
|
||||
* @param priority the new thread priority.
|
||||
*/
|
||||
public void setThreadsPriority(int priority)
|
||||
{
|
||||
_priority = priority;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the maximum thread idle time.
|
||||
*
|
||||
* @return Max idle time in ms.
|
||||
* @see #setIdleTimeout
|
||||
*/
|
||||
@ManagedAttribute("maximum time a thread may be idle in ms")
|
||||
public int getIdleTimeout()
|
||||
{
|
||||
return _idleTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the maximum number of threads.
|
||||
*
|
||||
* @return maximum number of threads.
|
||||
* @see #setMaxThreads
|
||||
*/
|
||||
@Override
|
||||
@ManagedAttribute("maximum number of threads in the pool")
|
||||
public int getMaxThreads()
|
||||
{
|
||||
return _maxThreads;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the minimum number of threads.
|
||||
*
|
||||
* @return minimum number of threads.
|
||||
* @see #setMinThreads
|
||||
*/
|
||||
@Override
|
||||
@ManagedAttribute("minimum number of threads in the pool")
|
||||
public int getMinThreads()
|
||||
{
|
||||
return _minThreads;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of reserved threads.
|
||||
*
|
||||
* @return number of reserved threads or or -1 for heuristically determined
|
||||
* @see #setReservedThreads
|
||||
*/
|
||||
@ManagedAttribute("the number of reserved threads in the pool")
|
||||
public int getReservedThreads()
|
||||
{
|
||||
if (isStarted())
|
||||
{
|
||||
ReservedThreadExecutor reservedThreadExecutor = getBean(ReservedThreadExecutor.class);
|
||||
if (reservedThreadExecutor != null)
|
||||
return reservedThreadExecutor.getCapacity();
|
||||
}
|
||||
return _reservedThreads;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The name of the this thread pool
|
||||
* @return the name of the this thread pool
|
||||
*/
|
||||
@ManagedAttribute("name of the thread pool")
|
||||
public String getName()
|
||||
|
@ -447,9 +420,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the priority of the pool threads.
|
||||
* <p>Sets the name of this thread pool, used as a prefix for the thread names.</p>
|
||||
*
|
||||
* @return the priority of the pool threads.
|
||||
* @param name the name of the this thread pool
|
||||
*/
|
||||
public void setName(String name)
|
||||
{
|
||||
if (isRunning())
|
||||
throw new IllegalStateException(getState());
|
||||
_name = name;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the priority of the pool threads
|
||||
*/
|
||||
@ManagedAttribute("priority of threads in the pool")
|
||||
public int getThreadsPriority()
|
||||
|
@ -458,21 +441,16 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the size of the job queue.
|
||||
*
|
||||
* @return Number of jobs queued waiting for a thread
|
||||
* @param priority the priority of the pool threads
|
||||
*/
|
||||
@ManagedAttribute("size of the job queue")
|
||||
public int getQueueSize()
|
||||
public void setThreadsPriority(int priority)
|
||||
{
|
||||
// The idle counter encodes demand, which is the effective queue size
|
||||
int idle = _counts.getLo();
|
||||
return Math.max(0, -idle);
|
||||
_priority = priority;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether this thread pool is using daemon threads
|
||||
* @see Thread#setDaemon(boolean)
|
||||
* @return whether to use daemon threads
|
||||
* @see Thread#isDaemon()
|
||||
*/
|
||||
@ManagedAttribute("thread pool uses daemon threads")
|
||||
public boolean isDaemon()
|
||||
|
@ -480,6 +458,15 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
return _daemon;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param daemon whether to use daemon threads
|
||||
* @see Thread#setDaemon(boolean)
|
||||
*/
|
||||
public void setDaemon(boolean daemon)
|
||||
{
|
||||
_daemon = daemon;
|
||||
}
|
||||
|
||||
@ManagedAttribute("reports additional details in the dump")
|
||||
public boolean isDetailedDump()
|
||||
{
|
||||
|
@ -502,6 +489,193 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
_lowThreadsThreshold = lowThreadsThreshold;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of jobs in the queue waiting for a thread
|
||||
*/
|
||||
@ManagedAttribute("size of the job queue")
|
||||
public int getQueueSize()
|
||||
{
|
||||
// The idle counter encodes demand, which is the effective queue size
|
||||
int idle = _counts.getLo();
|
||||
return Math.max(0, -idle);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximum number (capacity) of reserved threads
|
||||
* @see ReservedThreadExecutor#getCapacity()
|
||||
*/
|
||||
@ManagedAttribute("maximum number (capacity) of reserved threads")
|
||||
public int getMaxReservedThreads()
|
||||
{
|
||||
TryExecutor tryExecutor = _tryExecutor;
|
||||
if (tryExecutor instanceof ReservedThreadExecutor)
|
||||
{
|
||||
ReservedThreadExecutor reservedThreadExecutor = (ReservedThreadExecutor)tryExecutor;
|
||||
return reservedThreadExecutor.getCapacity();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of available reserved threads
|
||||
* @see ReservedThreadExecutor#getAvailable()
|
||||
*/
|
||||
@ManagedAttribute("number of available reserved threads")
|
||||
public int getAvailableReservedThreads()
|
||||
{
|
||||
TryExecutor tryExecutor = _tryExecutor;
|
||||
if (tryExecutor instanceof ReservedThreadExecutor)
|
||||
{
|
||||
ReservedThreadExecutor reservedThreadExecutor = (ReservedThreadExecutor)tryExecutor;
|
||||
return reservedThreadExecutor.getAvailable();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>The <em>fundamental</em> value that represents the number of threads currently known by this thread pool.</p>
|
||||
* <p>This value includes threads that have been leased to internal components, idle threads, reserved threads
|
||||
* and threads that are executing transient jobs.</p>
|
||||
*
|
||||
* @return the number of threads currently known to the pool
|
||||
* @see #getReadyThreads()
|
||||
* @see #getLeasedThreads()
|
||||
*/
|
||||
@Override
|
||||
@ManagedAttribute("number of threads in the pool")
|
||||
public int getThreads()
|
||||
{
|
||||
int threads = _counts.getHi();
|
||||
return Math.max(0, threads);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>The <em>fundamental</em> value that represents the number of threads ready to execute transient jobs.</p>
|
||||
*
|
||||
* @return the number of threads ready to execute transient jobs
|
||||
* @see #getThreads()
|
||||
* @see #getLeasedThreads()
|
||||
* @see #getUtilizedThreads()
|
||||
*/
|
||||
@ManagedAttribute("number of threads ready to execute transient jobs")
|
||||
public int getReadyThreads()
|
||||
{
|
||||
return getIdleThreads() + getAvailableReservedThreads();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>The <em>fundamental</em> value that represents the number of threads that are leased
|
||||
* to internal components, and therefore cannot be used to execute transient jobs.</p>
|
||||
*
|
||||
* @return the number of threads currently used by internal components
|
||||
* @see #getThreads()
|
||||
* @see #getReadyThreads()
|
||||
*/
|
||||
@ManagedAttribute("number of threads used by internal components")
|
||||
public int getLeasedThreads()
|
||||
{
|
||||
return getMaxLeasedThreads() - getMaxReservedThreads();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>The maximum number of threads that are leased to internal components,
|
||||
* as some component may allocate its threads lazily.</p>
|
||||
*
|
||||
* @return the maximum number of threads leased by internal components
|
||||
* @see #getLeasedThreads()
|
||||
*/
|
||||
@ManagedAttribute("maximum number of threads leased to internal components")
|
||||
public int getMaxLeasedThreads()
|
||||
{
|
||||
ThreadPoolBudget budget = _budget;
|
||||
return budget == null ? 0 : budget.getLeasedThreads();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>The number of idle threads, but without including reserved threads.</p>
|
||||
* <p>Prefer {@link #getReadyThreads()} for a better representation of
|
||||
* "threads ready to execute transient jobs".</p>
|
||||
*
|
||||
* @return the number of idle threads but not reserved
|
||||
* @see #getReadyThreads()
|
||||
*/
|
||||
@Override
|
||||
@ManagedAttribute("number of idle threads but not reserved")
|
||||
public int getIdleThreads()
|
||||
{
|
||||
int idle = _counts.getLo();
|
||||
return Math.max(0, idle);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>The number of threads executing internal and transient jobs.</p>
|
||||
* <p>Prefer {@link #getUtilizedThreads()} for a better representation of
|
||||
* "threads executing transient jobs".</p>
|
||||
*
|
||||
* @return the number of threads executing internal and transient jobs
|
||||
* @see #getUtilizedThreads()
|
||||
*/
|
||||
@ManagedAttribute("number of threads executing internal and transient jobs")
|
||||
public int getBusyThreads()
|
||||
{
|
||||
return getThreads() - getReadyThreads();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>The number of threads executing transient jobs.</p>
|
||||
*
|
||||
* @return the number of threads executing transient jobs
|
||||
* @see #getReadyThreads()
|
||||
*/
|
||||
@ManagedAttribute("number of threads executing transient jobs")
|
||||
public int getUtilizedThreads()
|
||||
{
|
||||
return getThreads() - getLeasedThreads() - getReadyThreads();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>The maximum number of threads available to run transient jobs.</p>
|
||||
*
|
||||
* @return the maximum number of threads available to run transient jobs
|
||||
*/
|
||||
@ManagedAttribute("maximum number of threads available to run transient jobs")
|
||||
public int getMaxAvailableThreads()
|
||||
{
|
||||
return getMaxThreads() - getLeasedThreads();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>The rate between the number of {@link #getUtilizedThreads() utilized threads}
|
||||
* and the maximum number of {@link #getMaxAvailableThreads() utilizable threads}.</p>
|
||||
* <p>A value of {@code 0.0D} means that the thread pool is not utilized, while a
|
||||
* value of {@code 1.0D} means that the thread pool is fully utilized to execute
|
||||
* transient jobs.</p>
|
||||
*
|
||||
* @return the utilization rate of threads executing transient jobs
|
||||
*/
|
||||
@ManagedAttribute("utilization rate of threads executing transient jobs")
|
||||
public double getUtilizationRate()
|
||||
{
|
||||
return (double)getUtilizedThreads() / getMaxAvailableThreads();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Returns whether this thread pool is low on threads.</p>
|
||||
* <p>The current formula is:</p>
|
||||
* <pre>
|
||||
* maxThreads - threads + readyThreads - queueSize <= lowThreadsThreshold
|
||||
* </pre>
|
||||
*
|
||||
* @return whether the pool is low on threads
|
||||
* @see #getLowThreadsThreshold()
|
||||
*/
|
||||
@Override
|
||||
@ManagedAttribute(value = "thread pool is low on threads", readonly = true)
|
||||
public boolean isLowOnThreads()
|
||||
{
|
||||
return getMaxThreads() - getThreads() + getReadyThreads() - getQueueSize() <= getLowThreadsThreshold();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable job)
|
||||
{
|
||||
|
@ -576,55 +750,6 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the total number of threads currently in the pool
|
||||
*/
|
||||
@Override
|
||||
@ManagedAttribute("number of threads in the pool")
|
||||
public int getThreads()
|
||||
{
|
||||
int threads = _counts.getHi();
|
||||
return Math.max(0, threads);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of idle threads in the pool
|
||||
*/
|
||||
@Override
|
||||
@ManagedAttribute("number of idle threads in the pool")
|
||||
public int getIdleThreads()
|
||||
{
|
||||
int idle = _counts.getLo();
|
||||
return Math.max(0, idle);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of busy threads in the pool
|
||||
*/
|
||||
@ManagedAttribute("number of busy threads in the pool")
|
||||
public int getBusyThreads()
|
||||
{
|
||||
int reserved = _tryExecutor instanceof ReservedThreadExecutor ? ((ReservedThreadExecutor)_tryExecutor).getAvailable() : 0;
|
||||
return getThreads() - getIdleThreads() - reserved;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Returns whether this thread pool is low on threads.</p>
|
||||
* <p>The current formula is:</p>
|
||||
* <pre>
|
||||
* maxThreads - threads + idleThreads - queueSize <= lowThreadsThreshold
|
||||
* </pre>
|
||||
*
|
||||
* @return whether the pool is low on threads
|
||||
* @see #getLowThreadsThreshold()
|
||||
*/
|
||||
@Override
|
||||
@ManagedAttribute(value = "thread pool is low on threads", readonly = true)
|
||||
public boolean isLowOnThreads()
|
||||
{
|
||||
return getMaxThreads() - getThreads() + getIdleThreads() - getQueueSize() <= getLowThreadsThreshold();
|
||||
}
|
||||
|
||||
private void ensureThreads()
|
||||
{
|
||||
while (true)
|
||||
|
@ -744,28 +869,6 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
long count = _counts.get();
|
||||
int threads = Math.max(0, AtomicBiInteger.getHi(count));
|
||||
int idle = Math.max(0, AtomicBiInteger.getLo(count));
|
||||
int queue = getQueueSize();
|
||||
|
||||
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]",
|
||||
getClass().getSimpleName(),
|
||||
_name,
|
||||
hashCode(),
|
||||
getState(),
|
||||
getMinThreads(),
|
||||
threads,
|
||||
getMaxThreads(),
|
||||
idle,
|
||||
getReservedThreads(),
|
||||
queue,
|
||||
_tryExecutor);
|
||||
}
|
||||
|
||||
private final Runnable _runnable = new Runner();
|
||||
|
||||
/**
|
||||
|
@ -839,6 +942,28 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactor
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
long count = _counts.get();
|
||||
int threads = Math.max(0, AtomicBiInteger.getHi(count));
|
||||
int idle = Math.max(0, AtomicBiInteger.getLo(count));
|
||||
int queue = getQueueSize();
|
||||
|
||||
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]",
|
||||
getClass().getSimpleName(),
|
||||
_name,
|
||||
hashCode(),
|
||||
getState(),
|
||||
getMinThreads(),
|
||||
threads,
|
||||
getMaxThreads(),
|
||||
idle,
|
||||
getReservedThreads(),
|
||||
queue,
|
||||
_tryExecutor);
|
||||
}
|
||||
|
||||
private class Runner implements Runnable
|
||||
{
|
||||
private Runnable idleJobPoll(long idleTimeout) throws InterruptedException
|
||||
|
|
|
@ -111,12 +111,18 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
|||
return _executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximum number of reserved threads
|
||||
*/
|
||||
@ManagedAttribute(value = "max number of reserved threads", readonly = true)
|
||||
public int getCapacity()
|
||||
{
|
||||
return _capacity;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of threads available to {@link #tryExecute(Runnable)}
|
||||
*/
|
||||
@ManagedAttribute(value = "available reserved threads", readonly = true)
|
||||
public int getAvailable()
|
||||
{
|
||||
|
@ -196,8 +202,10 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
|||
}
|
||||
|
||||
/**
|
||||
* @param task The task to run
|
||||
* @return True iff a reserved thread was available and has been assigned the task to run.
|
||||
* <p>Executes the given task if and only if a reserved thread is available.</p>
|
||||
*
|
||||
* @param task the task to run
|
||||
* @return true if and only if a reserved thread was available and has been assigned the task to run.
|
||||
*/
|
||||
@Override
|
||||
public boolean tryExecute(Runnable task)
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.util.concurrent.CopyOnWriteArraySet;
|
|||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
|
@ -32,9 +34,10 @@ import org.eclipse.jetty.util.log.Logger;
|
|||
*
|
||||
* @see ThreadPool.SizedThreadPool#getThreadPoolBudget()
|
||||
*/
|
||||
@ManagedObject
|
||||
public class ThreadPoolBudget
|
||||
{
|
||||
static final Logger LOG = Log.getLogger(ThreadPoolBudget.class);
|
||||
private static final Logger LOG = Log.getLogger(ThreadPoolBudget.class);
|
||||
|
||||
public interface Lease extends Closeable
|
||||
{
|
||||
|
@ -115,6 +118,14 @@ public class ThreadPoolBudget
|
|||
return pool;
|
||||
}
|
||||
|
||||
@ManagedAttribute("the number of threads leased to components")
|
||||
public int getLeasedThreads()
|
||||
{
|
||||
return leases.stream()
|
||||
.mapToInt(Lease::getThreads)
|
||||
.sum();
|
||||
}
|
||||
|
||||
public void reset()
|
||||
{
|
||||
leases.clear();
|
||||
|
@ -146,9 +157,7 @@ public class ThreadPoolBudget
|
|||
*/
|
||||
public boolean check(int maxThreads) throws IllegalStateException
|
||||
{
|
||||
int required = leases.stream()
|
||||
.mapToInt(Lease::getThreads)
|
||||
.sum();
|
||||
int required = getLeasedThreads();
|
||||
int left = maxThreads - required;
|
||||
if (left <= 0)
|
||||
{
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.eclipse.jetty.util.thread;
|
|||
import java.io.Closeable;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -194,7 +196,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
waitForIdle(tp, 2);
|
||||
|
||||
// Doesn't shrink to less than min threads
|
||||
Thread.sleep(3 * tp.getIdleTimeout() / 2);
|
||||
Thread.sleep(3L * tp.getIdleTimeout() / 2);
|
||||
assertThat(tp.getThreads(), is(2));
|
||||
assertThat(tp.getIdleThreads(), is(2));
|
||||
|
||||
|
@ -296,7 +298,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
waitForIdle(tp, 2);
|
||||
|
||||
// Doesn't shrink to less than min threads
|
||||
Thread.sleep(3 * tp.getIdleTimeout() / 2);
|
||||
Thread.sleep(3L * tp.getIdleTimeout() / 2);
|
||||
waitForThreads(tp, 2);
|
||||
waitForIdle(tp, 2);
|
||||
|
||||
|
@ -857,6 +859,77 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
assertThat(t.getContextClassLoader(), Matchers.equalTo(QueuedThreadPool.class.getClassLoader()));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThreadCounts() throws Exception
|
||||
{
|
||||
int maxThreads = 100;
|
||||
QueuedThreadPool tp = new QueuedThreadPool(maxThreads, 0);
|
||||
// Long timeout so it does not expire threads during the test.
|
||||
tp.setIdleTimeout(60000);
|
||||
int reservedThreads = 7;
|
||||
tp.setReservedThreads(reservedThreads);
|
||||
tp.start();
|
||||
int leasedThreads = 5;
|
||||
tp.getThreadPoolBudget().leaseTo(new Object(), leasedThreads);
|
||||
List<RunningJob> leasedJobs = new ArrayList<>();
|
||||
for (int i = 0; i < leasedThreads; ++i)
|
||||
{
|
||||
RunningJob job = new RunningJob("JOB" + i);
|
||||
leasedJobs.add(job);
|
||||
tp.execute(job);
|
||||
assertTrue(job._run.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
// Run some job to spawn threads.
|
||||
for (int i = 0; i < 3; ++i)
|
||||
{
|
||||
tp.tryExecute(() -> {});
|
||||
}
|
||||
int spawned = 13;
|
||||
List<RunningJob> jobs = new ArrayList<>();
|
||||
for (int i = 0; i < spawned; ++i)
|
||||
{
|
||||
RunningJob job = new RunningJob("JOB" + i);
|
||||
jobs.add(job);
|
||||
tp.execute(job);
|
||||
assertTrue(job._run.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
for (RunningJob job : jobs)
|
||||
{
|
||||
job._stopping.countDown();
|
||||
}
|
||||
|
||||
// Wait for the threads to become idle again.
|
||||
Thread.sleep(1000);
|
||||
|
||||
// Submit less jobs to the queue so we have active and idle threads.
|
||||
jobs.clear();
|
||||
int transientJobs = spawned / 2;
|
||||
for (int i = 0; i < transientJobs; ++i)
|
||||
{
|
||||
RunningJob job = new RunningJob("JOB" + i);
|
||||
jobs.add(job);
|
||||
tp.execute(job);
|
||||
assertTrue(job._run.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
assertThat(tp.getMaxReservedThreads(), Matchers.equalTo(reservedThreads));
|
||||
assertThat(tp.getLeasedThreads(), Matchers.equalTo(leasedThreads));
|
||||
assertThat(tp.getReadyThreads(), Matchers.equalTo(tp.getIdleThreads() + tp.getAvailableReservedThreads()));
|
||||
assertThat(tp.getUtilizedThreads(), Matchers.equalTo(transientJobs));
|
||||
assertThat(tp.getThreads(), Matchers.equalTo(tp.getReadyThreads() + tp.getLeasedThreads() + tp.getUtilizedThreads()));
|
||||
assertThat(tp.getBusyThreads(), Matchers.equalTo(tp.getUtilizedThreads() + tp.getLeasedThreads()));
|
||||
}
|
||||
finally
|
||||
{
|
||||
jobs.forEach(job -> job._stopping.countDown());
|
||||
leasedJobs.forEach(job -> job._stopping.countDown());
|
||||
tp.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private int count(String s, String p)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue