Improve executorSizedThreadPool (#2253)
Improve executorSizedThreadPool (#2253) * Improve executorSizedThreadPool Signed-off-by: Greg Wilkins <gregw@webtide.com> * Improved implementation. Implemented name, thread priorities, thread group and daemon properties. Implemented toString(), dump() and using a thread factory. Signed-off-by: Simone Bordet <simone.bordet@gmail.com> * added threadpool benchmark; Signed-off-by: Greg Wilkins <gregw@webtide.com> * Renamed ExecutorSizedThreadPool to ExecutorThreadPool Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
parent
daeaad7624
commit
a272fb6e39
|
@ -44,6 +44,7 @@ import org.eclipse.jetty.server.Handler;
|
|||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.After;
|
||||
|
@ -58,7 +59,7 @@ public class HttpClientTLSTest
|
|||
|
||||
private void startServer(SslContextFactory sslContextFactory, Handler handler) throws Exception
|
||||
{
|
||||
QueuedThreadPool serverThreads = new QueuedThreadPool();
|
||||
ExecutorThreadPool serverThreads = new ExecutorThreadPool();
|
||||
serverThreads.setName("server");
|
||||
server = new Server(serverThreads);
|
||||
|
||||
|
|
|
@ -18,16 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.util.thread;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
||||
import org.eclipse.jetty.util.Loader;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
|
||||
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
|
||||
|
||||
/**
|
||||
* <p>An {@link ExecutionStrategy} executes {@link Runnable} tasks produced by a {@link Producer}.
|
||||
* The strategy to execute the task may vary depending on the implementation; the task may be
|
||||
|
|
|
@ -18,92 +18,10 @@
|
|||
|
||||
package org.eclipse.jetty.util.thread;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
|
||||
/**
|
||||
* A {@link org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool} wrapper around {@link ThreadPoolExecutor}.
|
||||
* @deprecated Use {@link ExecutorThreadPool}
|
||||
*/
|
||||
public class ExecutorSizedThreadPool extends AbstractLifeCycle implements ThreadPool.SizedThreadPool
|
||||
@Deprecated
|
||||
public class ExecutorSizedThreadPool extends ExecutorThreadPool
|
||||
{
|
||||
private final ThreadPoolExecutor executor;
|
||||
|
||||
public ExecutorSizedThreadPool()
|
||||
{
|
||||
this(new ThreadPoolExecutor(8, 200, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()));
|
||||
}
|
||||
|
||||
public ExecutorSizedThreadPool(ThreadPoolExecutor executor)
|
||||
{
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMinThreads()
|
||||
{
|
||||
return executor.getCorePoolSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxThreads()
|
||||
{
|
||||
return executor.getMaximumPoolSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMinThreads(int threads)
|
||||
{
|
||||
executor.setCorePoolSize(threads);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxThreads(int threads)
|
||||
{
|
||||
executor.setMaximumPoolSize(threads);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getThreads()
|
||||
{
|
||||
return executor.getPoolSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getIdleThreads()
|
||||
{
|
||||
return getThreads() - executor.getActiveCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command)
|
||||
{
|
||||
executor.execute(command);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLowOnThreads()
|
||||
{
|
||||
return getThreads() == getMaxThreads() && executor.getQueue().size() >= getIdleThreads();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
executor.shutdownNow();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void join() throws InterruptedException
|
||||
{
|
||||
executor.awaitTermination(getStopTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ThreadPoolBudget getThreadPoolBudget()
|
||||
{
|
||||
return new ThreadPoolBudget(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,170 +18,385 @@
|
|||
|
||||
package org.eclipse.jetty.util.thread;
|
||||
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.component.DumpableCollection;
|
||||
|
||||
/**
|
||||
* Jetty ThreadPool using java 5 ThreadPoolExecutor
|
||||
* This class wraps a {@link ExecutorService} as a {@link ThreadPool} and
|
||||
* {@link LifeCycle} interfaces so that it may be used by the Jetty {@code org.eclipse.jetty.server.Server}
|
||||
*
|
||||
* @deprecated use {@link ExecutorSizedThreadPool} instead
|
||||
* A {@link org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool} wrapper around {@link ThreadPoolExecutor}.
|
||||
*/
|
||||
@Deprecated
|
||||
public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, LifeCycle
|
||||
@ManagedObject("A thread pool")
|
||||
public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool.SizedThreadPool, TryExecutor
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(ExecutorThreadPool.class);
|
||||
private final ExecutorService _executor;
|
||||
private final ThreadPoolExecutor _executor;
|
||||
private final ThreadPoolBudget _budget;
|
||||
private final ThreadGroup _group;
|
||||
private String _name = "etp" + hashCode();
|
||||
private int _minThreads;
|
||||
private int _reservedThreads = -1;
|
||||
private TryExecutor _tryExecutor = TryExecutor.NO_TRY;
|
||||
private int _priority = Thread.NORM_PRIORITY;
|
||||
private boolean _daemon;
|
||||
private boolean _detailedDump;
|
||||
|
||||
public ExecutorThreadPool(ExecutorService executor)
|
||||
{
|
||||
_executor = executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps an {@link ThreadPoolExecutor}.
|
||||
* Max pool size is 256, pool thread timeout after 60 seconds and
|
||||
* an unbounded {@link LinkedBlockingQueue} is used for the job queue;
|
||||
*/
|
||||
public ExecutorThreadPool()
|
||||
{
|
||||
// Using an unbounded queue makes the maxThreads parameter useless
|
||||
// Refer to ThreadPoolExecutor javadocs for details
|
||||
this(new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()));
|
||||
this(200, 8);
|
||||
}
|
||||
|
||||
public ExecutorThreadPool(int maxThreads)
|
||||
{
|
||||
this(maxThreads, Math.min(8, maxThreads));
|
||||
}
|
||||
|
||||
public ExecutorThreadPool(int maxThreads, int minThreads)
|
||||
{
|
||||
this(new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()), minThreads, -1, null);
|
||||
}
|
||||
|
||||
public ExecutorThreadPool(ThreadPoolExecutor executor)
|
||||
{
|
||||
this(executor, -1);
|
||||
}
|
||||
|
||||
public ExecutorThreadPool(ThreadPoolExecutor executor, int reservedThreads)
|
||||
{
|
||||
this(executor, reservedThreads, null);
|
||||
}
|
||||
|
||||
public ExecutorThreadPool(ThreadPoolExecutor executor, int reservedThreads, ThreadGroup group)
|
||||
{
|
||||
this(executor, Math.min(Runtime.getRuntime().availableProcessors(), executor.getCorePoolSize()), reservedThreads, group);
|
||||
}
|
||||
|
||||
private ExecutorThreadPool(ThreadPoolExecutor executor, int minThreads, int reservedThreads, ThreadGroup group)
|
||||
{
|
||||
int maxThreads = executor.getMaximumPoolSize();
|
||||
if (maxThreads < minThreads)
|
||||
{
|
||||
executor.shutdownNow();
|
||||
throw new IllegalArgumentException("max threads (" + maxThreads + ") cannot be less than min threads (" + minThreads + ")");
|
||||
}
|
||||
_executor = executor;
|
||||
_executor.setThreadFactory(this::newThread);
|
||||
_budget = new ThreadPoolBudget(this);
|
||||
_group = group;
|
||||
_minThreads = minThreads;
|
||||
_reservedThreads = reservedThreads;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps an {@link ThreadPoolExecutor}.
|
||||
* Max pool size is 256, pool thread timeout after 60 seconds, and core pool size is 32 when queueSize >= 0.
|
||||
*
|
||||
* @param queueSize can be -1 for using an unbounded {@link LinkedBlockingQueue}, 0 for using a
|
||||
* {@link SynchronousQueue}, greater than 0 for using a {@link ArrayBlockingQueue} of the given size.
|
||||
* @return the name of the this thread pool
|
||||
*/
|
||||
public ExecutorThreadPool(int queueSize)
|
||||
@ManagedAttribute("name of this thread pool")
|
||||
public String getName()
|
||||
{
|
||||
this(queueSize < 0 ? new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) :
|
||||
queueSize == 0 ? new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new SynchronousQueue<>()) :
|
||||
new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize)));
|
||||
return _name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps an {@link ThreadPoolExecutor} using
|
||||
* an unbounded {@link LinkedBlockingQueue} is used for the jobs queue;
|
||||
*
|
||||
* @param corePoolSize must be equal to maximumPoolSize
|
||||
* @param maximumPoolSize the maximum number of threads to allow in the pool
|
||||
* @param keepAliveTime the max time a thread can remain idle, in milliseconds
|
||||
* @param name the name of this thread pool, used to name threads
|
||||
*/
|
||||
public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime)
|
||||
public void setName(String name)
|
||||
{
|
||||
this(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps an {@link ThreadPoolExecutor} using
|
||||
* an unbounded {@link LinkedBlockingQueue} is used for the jobs queue.
|
||||
*
|
||||
* @param corePoolSize must be equal to maximumPoolSize
|
||||
* @param maximumPoolSize the maximum number of threads to allow in the pool
|
||||
* @param keepAliveTime the max time a thread can remain idle
|
||||
* @param unit the unit for the keepAliveTime
|
||||
*/
|
||||
public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit)
|
||||
{
|
||||
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps an {@link ThreadPoolExecutor}
|
||||
*
|
||||
* @param corePoolSize the number of threads to keep in the pool, even if they are idle
|
||||
* @param maximumPoolSize the maximum number of threads to allow in the pool
|
||||
* @param keepAliveTime the max time a thread can remain idle
|
||||
* @param unit the unit for the keepAliveTime
|
||||
* @param workQueue the queue to use for holding tasks before they are executed
|
||||
*/
|
||||
public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
|
||||
{
|
||||
this(new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue));
|
||||
if (isRunning())
|
||||
throw new IllegalStateException(getState());
|
||||
_name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable job)
|
||||
@ManagedAttribute("minimum number of threads in the pool")
|
||||
public int getMinThreads()
|
||||
{
|
||||
_executor.execute(job);
|
||||
}
|
||||
|
||||
public boolean dispatch(Runnable job)
|
||||
{
|
||||
try
|
||||
{
|
||||
_executor.execute(job);
|
||||
return true;
|
||||
}
|
||||
catch (RejectedExecutionException e)
|
||||
{
|
||||
LOG.warn(e);
|
||||
return false;
|
||||
}
|
||||
return _minThreads;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getIdleThreads()
|
||||
public void setMinThreads(int threads)
|
||||
{
|
||||
if (_executor instanceof ThreadPoolExecutor)
|
||||
{
|
||||
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
|
||||
return tpe.getPoolSize() - tpe.getActiveCount();
|
||||
}
|
||||
return -1;
|
||||
_minThreads = threads;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ManagedAttribute("maximum number of threads in the pool")
|
||||
public int getMaxThreads()
|
||||
{
|
||||
return _executor.getMaximumPoolSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxThreads(int threads)
|
||||
{
|
||||
_executor.setCorePoolSize(threads);
|
||||
_executor.setMaximumPoolSize(threads);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximum thread idle time in ms.
|
||||
* @see #setIdleTimeout(int)
|
||||
*/
|
||||
@ManagedAttribute("maximum time a thread may be idle in ms")
|
||||
public int getIdleTimeout()
|
||||
{
|
||||
return (int)_executor.getKeepAliveTime(TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Sets the maximum thread idle time in ms.</p>
|
||||
* <p>Threads that are idle for longer than this
|
||||
* period may be stopped.</p>
|
||||
*
|
||||
* @param idleTimeout the maximum thread idle time in ms.
|
||||
* @see #getIdleTimeout()
|
||||
*/
|
||||
public void setIdleTimeout(int idleTimeout)
|
||||
{
|
||||
_executor.setKeepAliveTime(idleTimeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return number of reserved threads or -1 to indicate that the number is heuristically determined
|
||||
* @see #setReservedThreads(int)
|
||||
*/
|
||||
@ManagedAttribute("the number of reserved threads in the pool")
|
||||
public int getReservedThreads()
|
||||
{
|
||||
if (isStarted())
|
||||
return getBean(ReservedThreadExecutor.class).getCapacity();
|
||||
return _reservedThreads;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the number of reserved threads.
|
||||
*
|
||||
* @param reservedThreads number of reserved threads or -1 to determine the number heuristically
|
||||
* @see #getReservedThreads()
|
||||
*/
|
||||
public void setReservedThreads(int reservedThreads)
|
||||
{
|
||||
if (isRunning())
|
||||
throw new IllegalStateException(getState());
|
||||
_reservedThreads = reservedThreads;
|
||||
}
|
||||
|
||||
public void setThreadsPriority(int priority)
|
||||
{
|
||||
_priority = priority;
|
||||
}
|
||||
|
||||
public int getThreadsPriority()
|
||||
{
|
||||
return _priority;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether this thread pool uses daemon threads
|
||||
* @see #setDaemon(boolean)
|
||||
*/
|
||||
@ManagedAttribute("whether this thread pool uses daemon threads")
|
||||
public boolean isDaemon()
|
||||
{
|
||||
return _daemon;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param daemon whether this thread pool uses daemon threads
|
||||
* @see Thread#setDaemon(boolean)
|
||||
*/
|
||||
public void setDaemon(boolean daemon)
|
||||
{
|
||||
_daemon = daemon;
|
||||
}
|
||||
|
||||
@ManagedAttribute("reports additional details in the dump")
|
||||
public boolean isDetailedDump()
|
||||
{
|
||||
return _detailedDump;
|
||||
}
|
||||
|
||||
public void setDetailedDump(boolean detailedDump)
|
||||
{
|
||||
_detailedDump = detailedDump;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ManagedAttribute("number of threads in the pool")
|
||||
public int getThreads()
|
||||
{
|
||||
if (_executor instanceof ThreadPoolExecutor)
|
||||
{
|
||||
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
|
||||
return tpe.getPoolSize();
|
||||
}
|
||||
return -1;
|
||||
return _executor.getPoolSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ManagedAttribute("number of idle threads in the pool")
|
||||
public int getIdleThreads()
|
||||
{
|
||||
return _executor.getPoolSize() - _executor.getActiveCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command)
|
||||
{
|
||||
_executor.execute(command);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryExecute(Runnable task)
|
||||
{
|
||||
TryExecutor tryExecutor = _tryExecutor;
|
||||
return tryExecutor != null && tryExecutor.tryExecute(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ManagedAttribute(value = "thread pool is low on threads", readonly = true)
|
||||
public boolean isLowOnThreads()
|
||||
{
|
||||
if (_executor instanceof ThreadPoolExecutor)
|
||||
{
|
||||
final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor;
|
||||
// getActiveCount() locks the thread pool, so execute it last
|
||||
return tpe.getPoolSize() == tpe.getMaximumPoolSize() &&
|
||||
tpe.getQueue().size() >= tpe.getPoolSize() - tpe.getActiveCount();
|
||||
}
|
||||
return false;
|
||||
return getThreads() == getMaxThreads() && _executor.getQueue().size() >= getIdleThreads();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void join() throws InterruptedException
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
_executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
||||
if (_executor.isShutdown())
|
||||
throw new IllegalStateException("This thread pool is not restartable");
|
||||
for (int i = 0; i < _minThreads; ++i)
|
||||
_executor.prestartCoreThread();
|
||||
|
||||
_tryExecutor = new ReservedThreadExecutor(this, _reservedThreads);
|
||||
addBean(_tryExecutor);
|
||||
|
||||
super.doStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
super.doStop();
|
||||
removeBean(_tryExecutor);
|
||||
_tryExecutor = TryExecutor.NO_TRY;
|
||||
_executor.shutdownNow();
|
||||
_budget.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void join() throws InterruptedException
|
||||
{
|
||||
_executor.awaitTermination(getStopTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ThreadPoolBudget getThreadPoolBudget()
|
||||
{
|
||||
return _budget;
|
||||
}
|
||||
|
||||
protected Thread newThread(Runnable job)
|
||||
{
|
||||
Thread thread = new Thread(_group, job);
|
||||
thread.setDaemon(isDaemon());
|
||||
thread.setPriority(getThreadsPriority());
|
||||
thread.setName(getName() + "-" + thread.getId());
|
||||
return thread;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
String prefix = getName() + "-";
|
||||
List<Dumpable> threads = Thread.getAllStackTraces().entrySet().stream()
|
||||
.filter(entry -> entry.getKey().getName().startsWith(prefix))
|
||||
.map(entry ->
|
||||
{
|
||||
Thread thread = entry.getKey();
|
||||
StackTraceElement[] frames = entry.getValue();
|
||||
String knownMethod = null;
|
||||
for (StackTraceElement frame : frames)
|
||||
{
|
||||
if ("getTask".equals(frame.getMethodName()) && frame.getClassName().endsWith("ThreadPoolExecutor"))
|
||||
{
|
||||
knownMethod = "IDLE ";
|
||||
break;
|
||||
}
|
||||
else if ("reservedWait".equals(frame.getMethodName()) && frame.getClassName().endsWith("ReservedThread"))
|
||||
{
|
||||
knownMethod = "RESERVED ";
|
||||
break;
|
||||
}
|
||||
else if ("select".equals(frame.getMethodName()) && frame.getClassName().endsWith("SelectorProducer"))
|
||||
{
|
||||
knownMethod = "SELECTING ";
|
||||
break;
|
||||
}
|
||||
else if ("accept".equals(frame.getMethodName()) && frame.getClassName().contains("ServerConnector"))
|
||||
{
|
||||
knownMethod = "ACCEPTING ";
|
||||
break;
|
||||
}
|
||||
}
|
||||
String known = knownMethod == null ? "" : knownMethod;
|
||||
return new Dumpable()
|
||||
{
|
||||
@Override
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
out.append(String.valueOf(thread.getId()))
|
||||
.append(" ")
|
||||
.append(thread.getName())
|
||||
.append(" p=").append(String.valueOf(thread.getPriority()))
|
||||
.append(" ")
|
||||
.append(known)
|
||||
.append(thread.getState().toString());
|
||||
if (isDetailedDump())
|
||||
{
|
||||
out.append(System.lineSeparator());
|
||||
if (known.isEmpty())
|
||||
ContainerLifeCycle.dump(out, indent, Arrays.asList(frames));
|
||||
}
|
||||
else
|
||||
{
|
||||
out.append(" @ ").append(frames.length > 0 ? String.valueOf(frames[0]) : "<no_stack_frames>")
|
||||
.append(System.lineSeparator());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String dump()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
};
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<Runnable> jobs = Collections.emptyList();
|
||||
if (isDetailedDump())
|
||||
jobs = new ArrayList<>(_executor.getQueue());
|
||||
dumpBeans(out, indent, threads, Collections.singletonList(new DumpableCollection("jobs - size=" + jobs.size(), jobs)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,q=%d}",
|
||||
getClass().getSimpleName(),
|
||||
getName(),
|
||||
hashCode(),
|
||||
getState(),
|
||||
getMinThreads(),
|
||||
getThreads(),
|
||||
getMaxThreads(),
|
||||
getIdleThreads(),
|
||||
_executor.getQueue().size());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -623,21 +623,22 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
if (isDetailedDump())
|
||||
jobs = new ArrayList<>(getQueue());
|
||||
|
||||
dumpBeans(out, indent, threads, Collections.singletonList(new DumpableCollection("jobs", jobs)));
|
||||
dumpBeans(out, indent, threads, Collections.singletonList(new DumpableCollection("jobs - size=" + jobs.size(), jobs)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("QueuedThreadPool@%s{%s,%d<=%d<=%d,i=%d,q=%d,r=%s}",
|
||||
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,q=%d}",
|
||||
getClass().getSimpleName(),
|
||||
_name,
|
||||
hashCode(),
|
||||
getState(),
|
||||
getMinThreads(),
|
||||
getThreads(),
|
||||
getMaxThreads(),
|
||||
getIdleThreads(),
|
||||
_jobs.size(),
|
||||
_tryExecutor);
|
||||
_jobs.size());
|
||||
}
|
||||
|
||||
private Runnable idleJobPoll() throws InterruptedException
|
||||
|
|
|
@ -0,0 +1,136 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.util.thread.jmh;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.eclipse.jetty.util.thread.ThreadPool;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Measurement;
|
||||
import org.openjdk.jmh.annotations.Mode;
|
||||
import org.openjdk.jmh.annotations.Param;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
import org.openjdk.jmh.annotations.TearDown;
|
||||
import org.openjdk.jmh.annotations.Threads;
|
||||
import org.openjdk.jmh.annotations.Warmup;
|
||||
import org.openjdk.jmh.infra.Blackhole;
|
||||
import org.openjdk.jmh.runner.Runner;
|
||||
import org.openjdk.jmh.runner.RunnerException;
|
||||
import org.openjdk.jmh.runner.options.Options;
|
||||
import org.openjdk.jmh.runner.options.OptionsBuilder;
|
||||
import org.openjdk.jmh.runner.options.TimeValue;
|
||||
|
||||
@State(Scope.Benchmark)
|
||||
@Threads(4)
|
||||
@Warmup(iterations = 7, time = 500, timeUnit = TimeUnit.MILLISECONDS)
|
||||
@Measurement(iterations = 7, time = 500, timeUnit = TimeUnit.MILLISECONDS)
|
||||
public class ThreadPoolBenchmark
|
||||
{
|
||||
public enum Type
|
||||
{
|
||||
QTP, ETP;
|
||||
}
|
||||
|
||||
@Param({ "QTP", "ETP"})
|
||||
Type type;
|
||||
|
||||
@Param({ "50"})
|
||||
int tasks;
|
||||
|
||||
@Param({ "200", "2000"})
|
||||
int size;
|
||||
|
||||
ThreadPool pool;
|
||||
|
||||
@Setup // (Level.Iteration)
|
||||
public void buildPool()
|
||||
{
|
||||
switch(type)
|
||||
{
|
||||
case QTP:
|
||||
pool = new QueuedThreadPool(size);
|
||||
break;
|
||||
|
||||
case ETP:
|
||||
pool = new ExecutorThreadPool(size);
|
||||
break;
|
||||
}
|
||||
LifeCycle.start(pool);
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@BenchmarkMode(Mode.Throughput)
|
||||
public void testPool()
|
||||
{
|
||||
doWork().join();
|
||||
}
|
||||
|
||||
@TearDown // (Level.Iteration)
|
||||
public void shutdownPool()
|
||||
{
|
||||
LifeCycle.stop(pool);
|
||||
pool = null;
|
||||
}
|
||||
|
||||
CompletableFuture<Void> doWork()
|
||||
{
|
||||
List<CompletableFuture<Void>> futures = new ArrayList<>();
|
||||
for (int i = 0; i < tasks; i++)
|
||||
{
|
||||
final CompletableFuture<Void> f = new CompletableFuture<Void>();
|
||||
futures.add(f);
|
||||
pool.execute(() -> {
|
||||
Blackhole.consumeCPU(64);
|
||||
f.complete(null);
|
||||
});
|
||||
}
|
||||
|
||||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws RunnerException
|
||||
{
|
||||
Options opt = new OptionsBuilder()
|
||||
.include(ThreadPoolBenchmark.class.getSimpleName())
|
||||
.warmupIterations(2)
|
||||
.measurementIterations(3)
|
||||
.forks(1)
|
||||
.threads(400)
|
||||
// .syncIterations(true) // Don't start all threads at same time
|
||||
.warmupTime(new TimeValue(10000,TimeUnit.MILLISECONDS))
|
||||
.measurementTime(new TimeValue(10000,TimeUnit.MILLISECONDS))
|
||||
// .addProfiler(CompilerProfiler.class)
|
||||
// .addProfiler(LinuxPerfProfiler.class)
|
||||
// .addProfiler(LinuxPerfNormProfiler.class)
|
||||
// .addProfiler(LinuxPerfAsmProfiler.class)
|
||||
// .resultFormat(ResultFormatType.CSV)
|
||||
.build();
|
||||
|
||||
new Runner(opt).run();
|
||||
}
|
||||
}
|
|
@ -166,7 +166,8 @@ public class EWYKBenchmark
|
|||
return hash;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws RunnerException {
|
||||
public static void main(String[] args) throws RunnerException
|
||||
{
|
||||
Options opt = new OptionsBuilder()
|
||||
.include(EWYKBenchmark.class.getSimpleName())
|
||||
.warmupIterations(2)
|
||||
|
|
|
@ -24,7 +24,7 @@ import java.util.concurrent.Executor;
|
|||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.ExecutorSizedThreadPool;
|
||||
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.SuspendToken;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
|
@ -57,7 +57,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
|
|||
{
|
||||
this.id = id;
|
||||
this.bufferPool = bufferPool;
|
||||
this.executor = new ExecutorSizedThreadPool();
|
||||
this.executor = new ExecutorThreadPool();
|
||||
this.ioState.addListener(this);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue