From a272fb6e391e126bbdd851bccfc6bde25870570e Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 6 Mar 2018 19:07:41 +1100 Subject: [PATCH] Improve executorSizedThreadPool (#2253) Improve executorSizedThreadPool (#2253) * Improve executorSizedThreadPool Signed-off-by: Greg Wilkins * Improved implementation. Implemented name, thread priorities, thread group and daemon properties. Implemented toString(), dump() and using a thread factory. Signed-off-by: Simone Bordet * added threadpool benchmark; Signed-off-by: Greg Wilkins * Renamed ExecutorSizedThreadPool to ExecutorThreadPool Signed-off-by: Greg Wilkins --- .../jetty/client/HttpClientTLSTest.java | 3 +- .../jetty/util/thread/ExecutionStrategy.java | 10 - .../util/thread/ExecutorSizedThreadPool.java | 88 +--- .../jetty/util/thread/ExecutorThreadPool.java | 445 +++++++++++++----- .../jetty/util/thread/QueuedThreadPool.java | 9 +- .../util/thread/jmh/ThreadPoolBenchmark.java | 136 ++++++ .../thread/strategy/jmh/EWYKBenchmark.java | 3 +- .../common/io/LocalWebSocketConnection.java | 4 +- 8 files changed, 480 insertions(+), 218 deletions(-) create mode 100644 jetty-util/src/test/java/org/eclipse/jetty/util/thread/jmh/ThreadPoolBenchmark.java diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java index bb3e3ac1151..cf2e355d50e 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java @@ -44,6 +44,7 @@ import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.ExecutorThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.hamcrest.Matchers; import org.junit.After; @@ -58,7 +59,7 @@ public class HttpClientTLSTest private void startServer(SslContextFactory sslContextFactory, Handler handler) throws Exception { - QueuedThreadPool serverThreads = new QueuedThreadPool(); + ExecutorThreadPool serverThreads = new ExecutorThreadPool(); serverThreads.setName("server"); server = new Server(serverThreads); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java index 797589d5b78..cc67e581dcf 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java @@ -18,16 +18,6 @@ package org.eclipse.jetty.util.thread; -import java.lang.reflect.Constructor; -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; - -import org.eclipse.jetty.util.Loader; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; -import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume; - /** *

An {@link ExecutionStrategy} executes {@link Runnable} tasks produced by a {@link Producer}. * The strategy to execute the task may vary depending on the implementation; the task may be diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorSizedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorSizedThreadPool.java index 6f85d8f69fb..69cdef92ad9 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorSizedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorSizedThreadPool.java @@ -18,92 +18,10 @@ package org.eclipse.jetty.util.thread; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.eclipse.jetty.util.component.AbstractLifeCycle; - /** - * A {@link org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool} wrapper around {@link ThreadPoolExecutor}. + * @deprecated Use {@link ExecutorThreadPool} */ -public class ExecutorSizedThreadPool extends AbstractLifeCycle implements ThreadPool.SizedThreadPool +@Deprecated +public class ExecutorSizedThreadPool extends ExecutorThreadPool { - private final ThreadPoolExecutor executor; - - public ExecutorSizedThreadPool() - { - this(new ThreadPoolExecutor(8, 200, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>())); - } - - public ExecutorSizedThreadPool(ThreadPoolExecutor executor) - { - this.executor = executor; - } - - @Override - public int getMinThreads() - { - return executor.getCorePoolSize(); - } - - @Override - public int getMaxThreads() - { - return executor.getMaximumPoolSize(); - } - - @Override - public void setMinThreads(int threads) - { - executor.setCorePoolSize(threads); - } - - @Override - public void setMaxThreads(int threads) - { - executor.setMaximumPoolSize(threads); - } - - @Override - public int getThreads() - { - return executor.getPoolSize(); - } - - @Override - public int getIdleThreads() - { - return getThreads() - executor.getActiveCount(); - } - - @Override - public void execute(Runnable command) - { - executor.execute(command); - } - - @Override - public boolean isLowOnThreads() - { - return getThreads() == getMaxThreads() && executor.getQueue().size() >= getIdleThreads(); - } - - @Override - protected void doStop() throws Exception - { - executor.shutdownNow(); - } - - @Override - public void join() throws InterruptedException - { - executor.awaitTermination(getStopTimeout(), TimeUnit.MILLISECONDS); - } - - @Override - public ThreadPoolBudget getThreadPoolBudget() - { - return new ThreadPoolBudget(this); - } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java index 8ee437e91a5..b37202a8de9 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java @@ -18,170 +18,385 @@ package org.eclipse.jetty.util.thread; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; -import org.eclipse.jetty.util.component.AbstractLifeCycle; -import org.eclipse.jetty.util.component.LifeCycle; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.component.DumpableCollection; /** - * Jetty ThreadPool using java 5 ThreadPoolExecutor - * This class wraps a {@link ExecutorService} as a {@link ThreadPool} and - * {@link LifeCycle} interfaces so that it may be used by the Jetty {@code org.eclipse.jetty.server.Server} - * - * @deprecated use {@link ExecutorSizedThreadPool} instead + * A {@link org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool} wrapper around {@link ThreadPoolExecutor}. */ -@Deprecated -public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, LifeCycle +@ManagedObject("A thread pool") +public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool.SizedThreadPool, TryExecutor { - private static final Logger LOG = Log.getLogger(ExecutorThreadPool.class); - private final ExecutorService _executor; + private final ThreadPoolExecutor _executor; + private final ThreadPoolBudget _budget; + private final ThreadGroup _group; + private String _name = "etp" + hashCode(); + private int _minThreads; + private int _reservedThreads = -1; + private TryExecutor _tryExecutor = TryExecutor.NO_TRY; + private int _priority = Thread.NORM_PRIORITY; + private boolean _daemon; + private boolean _detailedDump; - public ExecutorThreadPool(ExecutorService executor) - { - _executor = executor; - } - - /** - * Wraps an {@link ThreadPoolExecutor}. - * Max pool size is 256, pool thread timeout after 60 seconds and - * an unbounded {@link LinkedBlockingQueue} is used for the job queue; - */ public ExecutorThreadPool() { - // Using an unbounded queue makes the maxThreads parameter useless - // Refer to ThreadPoolExecutor javadocs for details - this(new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>())); + this(200, 8); + } + + public ExecutorThreadPool(int maxThreads) + { + this(maxThreads, Math.min(8, maxThreads)); + } + + public ExecutorThreadPool(int maxThreads, int minThreads) + { + this(new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()), minThreads, -1, null); + } + + public ExecutorThreadPool(ThreadPoolExecutor executor) + { + this(executor, -1); + } + + public ExecutorThreadPool(ThreadPoolExecutor executor, int reservedThreads) + { + this(executor, reservedThreads, null); + } + + public ExecutorThreadPool(ThreadPoolExecutor executor, int reservedThreads, ThreadGroup group) + { + this(executor, Math.min(Runtime.getRuntime().availableProcessors(), executor.getCorePoolSize()), reservedThreads, group); + } + + private ExecutorThreadPool(ThreadPoolExecutor executor, int minThreads, int reservedThreads, ThreadGroup group) + { + int maxThreads = executor.getMaximumPoolSize(); + if (maxThreads < minThreads) + { + executor.shutdownNow(); + throw new IllegalArgumentException("max threads (" + maxThreads + ") cannot be less than min threads (" + minThreads + ")"); + } + _executor = executor; + _executor.setThreadFactory(this::newThread); + _budget = new ThreadPoolBudget(this); + _group = group; + _minThreads = minThreads; + _reservedThreads = reservedThreads; } /** - * Wraps an {@link ThreadPoolExecutor}. - * Max pool size is 256, pool thread timeout after 60 seconds, and core pool size is 32 when queueSize >= 0. - * - * @param queueSize can be -1 for using an unbounded {@link LinkedBlockingQueue}, 0 for using a - * {@link SynchronousQueue}, greater than 0 for using a {@link ArrayBlockingQueue} of the given size. + * @return the name of the this thread pool */ - public ExecutorThreadPool(int queueSize) + @ManagedAttribute("name of this thread pool") + public String getName() { - this(queueSize < 0 ? new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) : - queueSize == 0 ? new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new SynchronousQueue<>()) : - new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize))); + return _name; } /** - * Wraps an {@link ThreadPoolExecutor} using - * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue; - * - * @param corePoolSize must be equal to maximumPoolSize - * @param maximumPoolSize the maximum number of threads to allow in the pool - * @param keepAliveTime the max time a thread can remain idle, in milliseconds + * @param name the name of this thread pool, used to name threads */ - public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime) + public void setName(String name) { - this(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS); - } - - /** - * Wraps an {@link ThreadPoolExecutor} using - * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue. - * - * @param corePoolSize must be equal to maximumPoolSize - * @param maximumPoolSize the maximum number of threads to allow in the pool - * @param keepAliveTime the max time a thread can remain idle - * @param unit the unit for the keepAliveTime - */ - public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) - { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>()); - } - - /** - * Wraps an {@link ThreadPoolExecutor} - * - * @param corePoolSize the number of threads to keep in the pool, even if they are idle - * @param maximumPoolSize the maximum number of threads to allow in the pool - * @param keepAliveTime the max time a thread can remain idle - * @param unit the unit for the keepAliveTime - * @param workQueue the queue to use for holding tasks before they are executed - */ - public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) - { - this(new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue)); + if (isRunning()) + throw new IllegalStateException(getState()); + _name = name; } @Override - public void execute(Runnable job) + @ManagedAttribute("minimum number of threads in the pool") + public int getMinThreads() { - _executor.execute(job); - } - - public boolean dispatch(Runnable job) - { - try - { - _executor.execute(job); - return true; - } - catch (RejectedExecutionException e) - { - LOG.warn(e); - return false; - } + return _minThreads; } @Override - public int getIdleThreads() + public void setMinThreads(int threads) { - if (_executor instanceof ThreadPoolExecutor) - { - final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor; - return tpe.getPoolSize() - tpe.getActiveCount(); - } - return -1; + _minThreads = threads; } @Override + @ManagedAttribute("maximum number of threads in the pool") + public int getMaxThreads() + { + return _executor.getMaximumPoolSize(); + } + + @Override + public void setMaxThreads(int threads) + { + _executor.setCorePoolSize(threads); + _executor.setMaximumPoolSize(threads); + } + + /** + * @return the maximum thread idle time in ms. + * @see #setIdleTimeout(int) + */ + @ManagedAttribute("maximum time a thread may be idle in ms") + public int getIdleTimeout() + { + return (int)_executor.getKeepAliveTime(TimeUnit.MILLISECONDS); + } + + /** + *

Sets the maximum thread idle time in ms.

+ *

Threads that are idle for longer than this + * period may be stopped.

+ * + * @param idleTimeout the maximum thread idle time in ms. + * @see #getIdleTimeout() + */ + public void setIdleTimeout(int idleTimeout) + { + _executor.setKeepAliveTime(idleTimeout, TimeUnit.MILLISECONDS); + } + + /** + * @return number of reserved threads or -1 to indicate that the number is heuristically determined + * @see #setReservedThreads(int) + */ + @ManagedAttribute("the number of reserved threads in the pool") + public int getReservedThreads() + { + if (isStarted()) + return getBean(ReservedThreadExecutor.class).getCapacity(); + return _reservedThreads; + } + + /** + * Sets the number of reserved threads. + * + * @param reservedThreads number of reserved threads or -1 to determine the number heuristically + * @see #getReservedThreads() + */ + public void setReservedThreads(int reservedThreads) + { + if (isRunning()) + throw new IllegalStateException(getState()); + _reservedThreads = reservedThreads; + } + + public void setThreadsPriority(int priority) + { + _priority = priority; + } + + public int getThreadsPriority() + { + return _priority; + } + + /** + * @return whether this thread pool uses daemon threads + * @see #setDaemon(boolean) + */ + @ManagedAttribute("whether this thread pool uses daemon threads") + public boolean isDaemon() + { + return _daemon; + } + + /** + * @param daemon whether this thread pool uses daemon threads + * @see Thread#setDaemon(boolean) + */ + public void setDaemon(boolean daemon) + { + _daemon = daemon; + } + + @ManagedAttribute("reports additional details in the dump") + public boolean isDetailedDump() + { + return _detailedDump; + } + + public void setDetailedDump(boolean detailedDump) + { + _detailedDump = detailedDump; + } + + @Override + @ManagedAttribute("number of threads in the pool") public int getThreads() { - if (_executor instanceof ThreadPoolExecutor) - { - final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor; - return tpe.getPoolSize(); - } - return -1; + return _executor.getPoolSize(); } @Override + @ManagedAttribute("number of idle threads in the pool") + public int getIdleThreads() + { + return _executor.getPoolSize() - _executor.getActiveCount(); + } + + @Override + public void execute(Runnable command) + { + _executor.execute(command); + } + + @Override + public boolean tryExecute(Runnable task) + { + TryExecutor tryExecutor = _tryExecutor; + return tryExecutor != null && tryExecutor.tryExecute(task); + } + + @Override + @ManagedAttribute(value = "thread pool is low on threads", readonly = true) public boolean isLowOnThreads() { - if (_executor instanceof ThreadPoolExecutor) - { - final ThreadPoolExecutor tpe = (ThreadPoolExecutor)_executor; - // getActiveCount() locks the thread pool, so execute it last - return tpe.getPoolSize() == tpe.getMaximumPoolSize() && - tpe.getQueue().size() >= tpe.getPoolSize() - tpe.getActiveCount(); - } - return false; + return getThreads() == getMaxThreads() && _executor.getQueue().size() >= getIdleThreads(); } @Override - public void join() throws InterruptedException + protected void doStart() throws Exception { - _executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + if (_executor.isShutdown()) + throw new IllegalStateException("This thread pool is not restartable"); + for (int i = 0; i < _minThreads; ++i) + _executor.prestartCoreThread(); + + _tryExecutor = new ReservedThreadExecutor(this, _reservedThreads); + addBean(_tryExecutor); + + super.doStart(); } @Override protected void doStop() throws Exception { super.doStop(); + removeBean(_tryExecutor); + _tryExecutor = TryExecutor.NO_TRY; _executor.shutdownNow(); + _budget.reset(); + } + + @Override + public void join() throws InterruptedException + { + _executor.awaitTermination(getStopTimeout(), TimeUnit.MILLISECONDS); + } + + @Override + public ThreadPoolBudget getThreadPoolBudget() + { + return _budget; + } + + protected Thread newThread(Runnable job) + { + Thread thread = new Thread(_group, job); + thread.setDaemon(isDaemon()); + thread.setPriority(getThreadsPriority()); + thread.setName(getName() + "-" + thread.getId()); + return thread; + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + String prefix = getName() + "-"; + List threads = Thread.getAllStackTraces().entrySet().stream() + .filter(entry -> entry.getKey().getName().startsWith(prefix)) + .map(entry -> + { + Thread thread = entry.getKey(); + StackTraceElement[] frames = entry.getValue(); + String knownMethod = null; + for (StackTraceElement frame : frames) + { + if ("getTask".equals(frame.getMethodName()) && frame.getClassName().endsWith("ThreadPoolExecutor")) + { + knownMethod = "IDLE "; + break; + } + else if ("reservedWait".equals(frame.getMethodName()) && frame.getClassName().endsWith("ReservedThread")) + { + knownMethod = "RESERVED "; + break; + } + else if ("select".equals(frame.getMethodName()) && frame.getClassName().endsWith("SelectorProducer")) + { + knownMethod = "SELECTING "; + break; + } + else if ("accept".equals(frame.getMethodName()) && frame.getClassName().contains("ServerConnector")) + { + knownMethod = "ACCEPTING "; + break; + } + } + String known = knownMethod == null ? "" : knownMethod; + return new Dumpable() + { + @Override + public void dump(Appendable out, String indent) throws IOException + { + out.append(String.valueOf(thread.getId())) + .append(" ") + .append(thread.getName()) + .append(" p=").append(String.valueOf(thread.getPriority())) + .append(" ") + .append(known) + .append(thread.getState().toString()); + if (isDetailedDump()) + { + out.append(System.lineSeparator()); + if (known.isEmpty()) + ContainerLifeCycle.dump(out, indent, Arrays.asList(frames)); + } + else + { + out.append(" @ ").append(frames.length > 0 ? String.valueOf(frames[0]) : "") + .append(System.lineSeparator()); + } + } + + @Override + public String dump() + { + return null; + } + }; + }) + .collect(Collectors.toList()); + + List jobs = Collections.emptyList(); + if (isDetailedDump()) + jobs = new ArrayList<>(_executor.getQueue()); + dumpBeans(out, indent, threads, Collections.singletonList(new DumpableCollection("jobs - size=" + jobs.size(), jobs))); + } + + @Override + public String toString() + { + return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,q=%d}", + getClass().getSimpleName(), + getName(), + hashCode(), + getState(), + getMinThreads(), + getThreads(), + getMaxThreads(), + getIdleThreads(), + _executor.getQueue().size()); } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index fd254cade8f..e8f99703411 100755 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -623,21 +623,22 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP if (isDetailedDump()) jobs = new ArrayList<>(getQueue()); - dumpBeans(out, indent, threads, Collections.singletonList(new DumpableCollection("jobs", jobs))); + dumpBeans(out, indent, threads, Collections.singletonList(new DumpableCollection("jobs - size=" + jobs.size(), jobs))); } @Override public String toString() { - return String.format("QueuedThreadPool@%s{%s,%d<=%d<=%d,i=%d,q=%d,r=%s}", + return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,q=%d}", + getClass().getSimpleName(), _name, + hashCode(), getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), - _jobs.size(), - _tryExecutor); + _jobs.size()); } private Runnable idleJobPoll() throws InterruptedException diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/jmh/ThreadPoolBenchmark.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/jmh/ThreadPoolBenchmark.java new file mode 100644 index 00000000000..c5c70ace7b8 --- /dev/null +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/jmh/ThreadPoolBenchmark.java @@ -0,0 +1,136 @@ +// +// ======================================================================== +// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util.thread.jmh; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.ExecutorThreadPool; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.ThreadPool; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.TimeValue; + +@State(Scope.Benchmark) +@Threads(4) +@Warmup(iterations = 7, time = 500, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 7, time = 500, timeUnit = TimeUnit.MILLISECONDS) +public class ThreadPoolBenchmark +{ + public enum Type + { + QTP, ETP; + } + + @Param({ "QTP", "ETP"}) + Type type; + + @Param({ "50"}) + int tasks; + + @Param({ "200", "2000"}) + int size; + + ThreadPool pool; + + @Setup // (Level.Iteration) + public void buildPool() + { + switch(type) + { + case QTP: + pool = new QueuedThreadPool(size); + break; + + case ETP: + pool = new ExecutorThreadPool(size); + break; + } + LifeCycle.start(pool); + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + public void testPool() + { + doWork().join(); + } + + @TearDown // (Level.Iteration) + public void shutdownPool() + { + LifeCycle.stop(pool); + pool = null; + } + + CompletableFuture doWork() + { + List> futures = new ArrayList<>(); + for (int i = 0; i < tasks; i++) + { + final CompletableFuture f = new CompletableFuture(); + futures.add(f); + pool.execute(() -> { + Blackhole.consumeCPU(64); + f.complete(null); + }); + } + + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + } + + public static void main(String[] args) throws RunnerException + { + Options opt = new OptionsBuilder() + .include(ThreadPoolBenchmark.class.getSimpleName()) + .warmupIterations(2) + .measurementIterations(3) + .forks(1) + .threads(400) + // .syncIterations(true) // Don't start all threads at same time + .warmupTime(new TimeValue(10000,TimeUnit.MILLISECONDS)) + .measurementTime(new TimeValue(10000,TimeUnit.MILLISECONDS)) + // .addProfiler(CompilerProfiler.class) + // .addProfiler(LinuxPerfProfiler.class) + // .addProfiler(LinuxPerfNormProfiler.class) + // .addProfiler(LinuxPerfAsmProfiler.class) + // .resultFormat(ResultFormatType.CSV) + .build(); + + new Runner(opt).run(); + } +} \ No newline at end of file diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/jmh/EWYKBenchmark.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/jmh/EWYKBenchmark.java index fa3e4c78add..2d8211e664f 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/jmh/EWYKBenchmark.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/jmh/EWYKBenchmark.java @@ -166,7 +166,8 @@ public class EWYKBenchmark return hash; } - public static void main(String[] args) throws RunnerException { + public static void main(String[] args) throws RunnerException + { Options opt = new OptionsBuilder() .include(EWYKBenchmark.class.getSimpleName()) .warmupIterations(2) diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java index a2716160223..46530b44c2b 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java @@ -24,7 +24,7 @@ import java.util.concurrent.Executor; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.ExecutorSizedThreadPool; +import org.eclipse.jetty.util.thread.ExecutorThreadPool; import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.WebSocketPolicy; @@ -57,7 +57,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram { this.id = id; this.bufferPool = bufferPool; - this.executor = new ExecutorSizedThreadPool(); + this.executor = new ExecutorThreadPool(); this.ioState.addListener(this); }