diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorSizedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorSizedThreadPool.java new file mode 100644 index 00000000000..d525e9dc5c7 --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorSizedThreadPool.java @@ -0,0 +1,109 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.util.component.AbstractLifeCycle; + +/** + * A {@link org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool} wrapper around {@link ThreadPoolExecutor}. + */ +public class ExecutorSizedThreadPool extends AbstractLifeCycle implements ThreadPool.SizedThreadPool +{ + private final ThreadPoolExecutor executor; + + public ExecutorSizedThreadPool() + { + this(new ThreadPoolExecutor(8, 200, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>())); + } + + public ExecutorSizedThreadPool(ThreadPoolExecutor executor) + { + this.executor = executor; + } + + @Override + public int getMinThreads() + { + return executor.getCorePoolSize(); + } + + @Override + public int getMaxThreads() + { + return executor.getMaximumPoolSize(); + } + + @Override + public void setMinThreads(int threads) + { + executor.setCorePoolSize(threads); + } + + @Override + public void setMaxThreads(int threads) + { + executor.setMaximumPoolSize(threads); + } + + @Override + public int getThreads() + { + return executor.getPoolSize(); + } + + @Override + public int getIdleThreads() + { + return getThreads() - executor.getActiveCount(); + } + + @Override + public void execute(Runnable command) + { + executor.execute(command); + } + + @Override + public boolean isLowOnThreads() + { + return getThreads() == getMaxThreads() && executor.getQueue().size() >= getIdleThreads(); + } + + @Override + protected void doStop() throws Exception + { + executor.shutdownNow(); + } + + @Override + public void join() throws InterruptedException + { + executor.awaitTermination(getStopTimeout(), TimeUnit.MILLISECONDS); + } + + @Override + public ThreadPoolBudget getThreadPoolBudget() + { + return new ThreadPoolBudget(this); + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java index 23033cafaeb..ba1a8c1e738 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java @@ -32,24 +32,24 @@ import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -/* ------------------------------------------------------------ */ /** * Jetty ThreadPool using java 5 ThreadPoolExecutor * This class wraps a {@link ExecutorService} as a {@link ThreadPool} and - * {@link LifeCycle} interfaces so that it may be used by the Jetty org.eclipse.jetty.server.Server + * {@link LifeCycle} interfaces so that it may be used by the Jetty {@code org.eclipse.jetty.server.Server} + * + * @deprecated use {@link ExecutorSizedThreadPool} instead */ +@Deprecated public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, LifeCycle { private static final Logger LOG = Log.getLogger(ExecutorThreadPool.class); private final ExecutorService _executor; - /* ------------------------------------------------------------ */ public ExecutorThreadPool(ExecutorService executor) { _executor = executor; } - /* ------------------------------------------------------------ */ /** * Wraps an {@link ThreadPoolExecutor}. * Max pool size is 256, pool thread timeout after 60 seconds and @@ -59,74 +59,70 @@ public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, { // Using an unbounded queue makes the maxThreads parameter useless // Refer to ThreadPoolExecutor javadocs for details - this(new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue())); + this(new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>())); } - /* ------------------------------------------------------------ */ /** * Wraps an {@link ThreadPoolExecutor}. * Max pool size is 256, pool thread timeout after 60 seconds, and core pool size is 32 when queueSize >= 0. + * * @param queueSize can be -1 for using an unbounded {@link LinkedBlockingQueue}, 0 for using a - * {@link SynchronousQueue}, greater than 0 for using a {@link ArrayBlockingQueue} of the given size. + * {@link SynchronousQueue}, greater than 0 for using a {@link ArrayBlockingQueue} of the given size. */ public ExecutorThreadPool(int queueSize) { - this(queueSize < 0 ? new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue()) : - queueSize == 0 ? new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new SynchronousQueue()) : - new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize))); + this(queueSize < 0 ? new ThreadPoolExecutor(256, 256, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) : + queueSize == 0 ? new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new SynchronousQueue<>()) : + new ThreadPoolExecutor(32, 256, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize))); } - /* ------------------------------------------------------------ */ /** * Wraps an {@link ThreadPoolExecutor} using * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue; - * @param corePoolSize must be equal to maximumPoolSize + * + * @param corePoolSize must be equal to maximumPoolSize * @param maximumPoolSize the maximum number of threads to allow in the pool - * @param keepAliveTime the max time a thread can remain idle, in milliseconds + * @param keepAliveTime the max time a thread can remain idle, in milliseconds */ public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime) { this(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS); } - /* ------------------------------------------------------------ */ /** * Wraps an {@link ThreadPoolExecutor} using * an unbounded {@link LinkedBlockingQueue} is used for the jobs queue. - * @param corePoolSize must be equal to maximumPoolSize + * + * @param corePoolSize must be equal to maximumPoolSize * @param maximumPoolSize the maximum number of threads to allow in the pool - * @param keepAliveTime the max time a thread can remain idle - * @param unit the unit for the keepAliveTime + * @param keepAliveTime the max time a thread can remain idle + * @param unit the unit for the keepAliveTime */ public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue()); + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>()); } - /* ------------------------------------------------------------ */ - /** * Wraps an {@link ThreadPoolExecutor} - * @param corePoolSize the number of threads to keep in the pool, even if they are idle + * + * @param corePoolSize the number of threads to keep in the pool, even if they are idle * @param maximumPoolSize the maximum number of threads to allow in the pool - * @param keepAliveTime the max time a thread can remain idle - * @param unit the unit for the keepAliveTime - * @param workQueue the queue to use for holding tasks before they are executed + * @param keepAliveTime the max time a thread can remain idle + * @param unit the unit for the keepAliveTime + * @param workQueue the queue to use for holding tasks before they are executed */ public ExecutorThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue)); } - - /* ------------------------------------------------------------ */ @Override public void execute(Runnable job) { _executor.execute(job); } - /* ------------------------------------------------------------ */ public boolean dispatch(Runnable job) { try @@ -134,14 +130,13 @@ public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, _executor.execute(job); return true; } - catch(RejectedExecutionException e) + catch (RejectedExecutionException e) { LOG.warn(e); return false; } } - /* ------------------------------------------------------------ */ public int getIdleThreads() { if (_executor instanceof ThreadPoolExecutor) @@ -152,7 +147,6 @@ public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, return -1; } - /* ------------------------------------------------------------ */ public int getThreads() { if (_executor instanceof ThreadPoolExecutor) @@ -163,7 +157,6 @@ public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, return -1; } - /* ------------------------------------------------------------ */ public boolean isLowOnThreads() { if (_executor instanceof ThreadPoolExecutor) @@ -176,13 +169,11 @@ public class ExecutorThreadPool extends AbstractLifeCycle implements ThreadPool, return false; } - /* ------------------------------------------------------------ */ public void join() throws InterruptedException { _executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } - /* ------------------------------------------------------------ */ @Override protected void doStop() throws Exception { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java index ecc61bd87c3..88e21ed1610 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java @@ -24,53 +24,69 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.LifeCycle; -/* ------------------------------------------------------------ */ -/** ThreadPool. - * - * A specialization of Executor interface that provides reporting methods (eg {@link #getThreads()}) - * and the option of configuration methods (e.g. @link {@link SizedThreadPool#setMaxThreads(int)}). - * +/** + *

A pool for threads.

+ *

A specialization of Executor interface that provides reporting methods (eg {@link #getThreads()}) + * and the option of configuration methods (e.g. @link {@link SizedThreadPool#setMaxThreads(int)}).

*/ @ManagedObject("Pool of Threads") public interface ThreadPool extends Executor { - /* ------------------------------------------------------------ */ /** * Blocks until the thread pool is {@link LifeCycle#stop stopped}. + * * @throws InterruptedException if thread was interrupted */ public void join() throws InterruptedException; - /* ------------------------------------------------------------ */ /** * @return The total number of threads currently in the pool */ @ManagedAttribute("number of threads in pool") public int getThreads(); - /* ------------------------------------------------------------ */ /** * @return The number of idle threads in the pool */ @ManagedAttribute("number of idle threads in pool") public int getIdleThreads(); - - /* ------------------------------------------------------------ */ + /** * @return True if the pool is low on threads */ @ManagedAttribute("indicates the pool is low on available threads") public boolean isLowOnThreads(); - - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ + /** + *

Specialized sub-interface of ThreadPool that allows to get/set + * the minimum and maximum number of threads of the pool.

+ */ public interface SizedThreadPool extends ThreadPool { + /** + * @return the minimum number of threads + */ int getMinThreads(); + + /** + * @return the maximum number of threads + */ int getMaxThreads(); + + /** + * @param threads the minimum number of threads + */ void setMinThreads(int threads); + + /** + * @param threads the maximum number of threads + */ void setMaxThreads(int threads); + + /** + * @return a ThreadPoolBudget for this sized thread pool, + * or null of no ThreadPoolBudget can be returned + */ default ThreadPoolBudget getThreadPoolBudget() { return null; diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/LocalWebSocketConnection.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/LocalWebSocketConnection.java index 0f23afaf41a..854eaa105a2 100644 --- a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/LocalWebSocketConnection.java +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/LocalWebSocketConnection.java @@ -25,7 +25,7 @@ import java.util.concurrent.Executor; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.ExecutorThreadPool; +import org.eclipse.jetty.util.thread.ExecutorSizedThreadPool; import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.FrameCallback; import org.eclipse.jetty.websocket.api.SuspendToken; @@ -57,7 +57,7 @@ public class LocalWebSocketConnection implements LogicalConnection { this.id = id; this.bufferPool = bufferPool; - this.executor = new ExecutorThreadPool(); + this.executor = new ExecutorSizedThreadPool(); this.policy = WebSocketPolicy.newServerPolicy(); }