From 7d98cbb8703d23a30ce830f24f8cfd0fce9fc08a Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 28 Sep 2017 07:35:05 +1000 Subject: [PATCH] Issue #1851 Improve insufficient thread warnings/errors Squashed commit of the following: commit 1d9e8e4b4d53898cb6435f67529347bd2ba82cf0 Merge: 7280594 55b0f10 Author: Greg Wilkins Date: Thu Sep 28 07:20:37 2017 +1000 Merge branch 'jetty-9.4.x' into jetty-9.4.x-1851-ThreadBudget commit 7280594a0058538b603ad35625713a79830e9b93 Author: Greg Wilkins Date: Wed Sep 27 22:48:58 2017 +1000 fixed headers commit f962f18e5b098ae40846ee3832736ee4650aed84 Author: Greg Wilkins Date: Wed Sep 27 18:12:33 2017 +1000 Issue #1851 added reset commit a63894de284c8d8dc5ed031f1f6e0fccaf6c7715 Author: Greg Wilkins Date: Wed Sep 27 18:08:53 2017 +1000 Issue #1851 improved test commit 8bcc460dc63273165305a7adcb88e991e30de4b7 Author: Greg Wilkins Date: Wed Sep 27 18:03:47 2017 +1000 Issue #1851 Improve insufficient thread warnings/errors Refactor approach to use Leases, to handle multiple executors commit fe4be5f56594f342ab5c2e6c886397d9b4fe9c14 Merge: abc5eac a248d38 Author: Greg Wilkins Date: Wed Sep 27 15:37:56 2017 +1000 Merge branch 'jetty-9.4.x' into jetty-9.4.x-1851-ThreadBudget commit abc5eac2b73d306a91b28ef4db778455095a5bdb Author: Greg Wilkins Date: Wed Sep 27 12:20:03 2017 +1000 Issue #1851 Improve insufficient thread warnings/errors Created a ThreadBudget class that can be used to warn/error for registered and unregistered allocations of threads. The server on doStart does an unregistered check of all its components that implement the Allocation interface. The client will register itself as an Allocation if a shared Executor is used. --- .../jetty/embedded/ManyConnectors.java | 9 +- .../org/eclipse/jetty/client/HttpClient.java | 1 + .../org/eclipse/jetty/io/SelectorManager.java | 5 +- .../jetty/server/AbstractConnector.java | 7 + .../java/org/eclipse/jetty/server/Server.java | 34 +--- .../eclipse/jetty/server/ServerConnector.java | 6 + .../InsufficientThreadsDetectionTest.java | 70 ++++--- .../jetty/util/component/Container.java | 19 ++ .../util/component/ContainerLifeCycle.java | 50 +++++ .../jetty/util/thread/QueuedThreadPool.java | 24 ++- .../util/thread/ReservedThreadExecutor.java | 59 ++++-- .../jetty/util/thread/ThreadBudget.java | 178 ++++++++++++++++++ .../eclipse/jetty/util/thread/ThreadPool.java | 13 +- .../component/ContainerLifeCycleTest.java | 32 ++++ .../jetty/http/client/HttpClientTest.java | 1 + 15 files changed, 430 insertions(+), 78 deletions(-) create mode 100644 jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadBudget.java diff --git a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/ManyConnectors.java b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/ManyConnectors.java index 49016135da9..798434a74a7 100644 --- a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/ManyConnectors.java +++ b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/ManyConnectors.java @@ -32,6 +32,7 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; /** * A Jetty server with multiple connectors. @@ -46,12 +47,14 @@ public class ManyConnectors // probably be a direct path to your own keystore. String jettyDistKeystore = "../../jetty-distribution/target/distribution/demo-base/etc/keystore"; - String keystorePath = System.getProperty( - "example.keystore", jettyDistKeystore); + String keystorePath = System.getProperty("example.keystore", jettyDistKeystore); File keystoreFile = new File(keystorePath); if (!keystoreFile.exists()) { - throw new FileNotFoundException(keystoreFile.getAbsolutePath()); + keystorePath = "jetty-distribution/target/distribution/demo-base/etc/keystore"; + keystoreFile = new File(keystorePath); + if (!keystoreFile.exists()) + throw new FileNotFoundException(keystoreFile.getAbsolutePath()); } // Create a basic jetty server object without declaring the port. Since diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java index 023da71bfd9..3deae692ce1 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java @@ -73,6 +73,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.ThreadBudget; import org.eclipse.jetty.util.thread.ThreadPool; /** diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 32337188ca8..467920df9bf 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -38,6 +38,7 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.ThreadBudget; import org.eclipse.jetty.util.thread.ThreadPool; import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; @@ -295,13 +296,15 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump @Override protected void doStart() throws Exception { - addBean(new ReservedThreadExecutor(getExecutor(),_reservedThreads),true); + addBean(new ReservedThreadExecutor(getExecutor(),_reservedThreads,this),true); + for (int i = 0; i < _selectors.length; i++) { ManagedSelector selector = newSelector(i); _selectors[i] = selector; addBean(selector); } + super.doStart(); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index fe0e960b43f..e6c53e2d40a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -53,6 +53,8 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.ThreadBudget; +import org.eclipse.jetty.util.thread.ThreadPool; /** *

An abstract implementation of {@link Connector} that provides a {@link ConnectionFactory} mechanism @@ -158,6 +160,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co private String _name; private int _acceptorPriorityDelta=-2; private boolean _accepting = true; + private ThreadBudget.Lease lease; /** @@ -273,6 +276,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co throw new IllegalStateException("No protocol factory for SSL next protocol: '" + next + "' in " + this); } + lease = ThreadBudget.leaseFrom(getExecutor(),this,_acceptors.length); super.doStart(); _stopping=new CountDownLatch(_acceptors.length); @@ -308,6 +312,9 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co @Override protected void doStop() throws Exception { + if (lease!=null) + lease.close(); + // Tell the acceptors we are stopping interruptAcceptors(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java index 9e39f3d22a4..f476b58b9fd 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java @@ -24,12 +24,12 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.Enumeration; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -66,6 +66,7 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ShutdownThread; +import org.eclipse.jetty.util.thread.ThreadBudget; import org.eclipse.jetty.util.thread.ThreadPool; import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; @@ -379,37 +380,6 @@ public class Server extends HandlerWrapper implements Attributes HttpGenerator.setJettyVersion(HttpConfiguration.SERVER_VERSION); - // Check that the thread pool size is enough. - SizedThreadPool pool = getBean(SizedThreadPool.class); - int max=pool==null?-1:pool.getMaxThreads(); - int selectors=0; - int acceptors=0; - - for (Connector connector : _connectors) - { - if (connector instanceof AbstractConnector) - { - AbstractConnector abstractConnector = (AbstractConnector)connector; - Executor connectorExecutor = connector.getExecutor(); - - if (connectorExecutor != pool) - { - // Do not count the selectors and acceptors from this connector at - // the server level, because the connector uses a dedicated executor. - continue; - } - - acceptors += abstractConnector.getAcceptors(); - - if (connector instanceof ServerConnector) - selectors += ((ServerConnector)connector).getSelectorManager().getSelectorCount(); - } - } - - int needed=1+selectors+acceptors; - if (max>0 && needed>max) - throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d + selectors=%d + request=1)",max,acceptors,selectors)); - MultiException mex=new MultiException(); try { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java index 1a07288aa2c..72b1c26175e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java @@ -555,5 +555,11 @@ public class ServerConnector extends AbstractNetworkConnector onEndPointClosed(endpoint); super.endPointClosed(endpoint); } + + @Override + public String toString() + { + return String.format("SelectorManager@%s",ServerConnector.this); + } } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java index 7b368fc0b8d..c4e53c9bdf3 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java @@ -19,9 +19,12 @@ package org.eclipse.jetty.server; import org.eclipse.jetty.toolchain.test.AdvancedRunner; +import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.ThreadBudget; import org.eclipse.jetty.util.thread.ThreadPool; import org.junit.After; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -37,21 +40,29 @@ public class InsufficientThreadsDetectionTest _server.stop(); } - @Test(expected = IllegalStateException.class) + @Test() public void testConnectorUsesServerExecutorWithNotEnoughThreads() throws Exception { - // server has 3 threads in the executor - _server = new Server(new QueuedThreadPool(3)); + try + { + // server has 3 threads in the executor + _server = new Server(new QueuedThreadPool(3)); - // connector will use executor from server because connectorPool is null - ThreadPool connectorPool = null; - // connector requires 7 threads(2 + 4 + 1) - ServerConnector connector = new ServerConnector(_server, connectorPool, null, null, 2, 4, new HttpConnectionFactory()); - connector.setPort(0); - _server.addConnector(connector); + // connector will use executor from server because connectorPool is null + ThreadPool connectorPool = null; + // connector requires 7 threads(2 + 4 + 1) + ServerConnector connector = new ServerConnector(_server, connectorPool, null, null, 2, 4, new HttpConnectionFactory()); + connector.setPort(0); + _server.addConnector(connector); - // should throw IllegalStateException because there are no required threads in server pool - _server.start(); + // should throw IllegalStateException because there are no required threads in server pool + _server.start(); + Assert.fail(); + } + catch(IllegalStateException e) + { + Log.getLogger(ThreadBudget.class).warn(e.toString()); + } } @Test @@ -71,20 +82,35 @@ public class InsufficientThreadsDetectionTest _server.start(); } - @Test // Github issue #586 - public void testCaseForMultipleConnectors() throws Exception { - // server has 4 threads in the executor - _server = new Server(new QueuedThreadPool(4)); + // Github issue #586 - // first connector consumes all 4 threads from server pool - _server.addConnector(new ServerConnector(_server, null, null, null, 1, 1, new HttpConnectionFactory())); + @Test + public void testCaseForMultipleConnectors() throws Exception + { + try + { + // server has 4 threads in the executor + _server = new Server(new QueuedThreadPool(4)); - // second connect also require 4 threads but uses own executor, so its threads should not be counted - final QueuedThreadPool connectorPool = new QueuedThreadPool(4, 4); - _server.addConnector(new ServerConnector(_server, connectorPool, null, null, 1, 1, new HttpConnectionFactory())); + // first connector consumes 3 threads from server pool + _server.addConnector(new ServerConnector(_server, null, null, null, 1, 1, new HttpConnectionFactory())); - // should not throw exception because limit was not overflown - _server.start(); + // second connect also require 4 threads but uses own executor, so its threads should not be counted + final QueuedThreadPool connectorPool = new QueuedThreadPool(4, 4); + _server.addConnector(new ServerConnector(_server, connectorPool, null, null, 1, 1, new HttpConnectionFactory())); + + // first connector consumes 3 threads from server pool + _server.addConnector(new ServerConnector(_server, null, null, null, 1, 1, new HttpConnectionFactory())); + + // should not throw exception because limit was not overflown + _server.start(); + + Assert.fail(); + } + catch(IllegalStateException e) + { + Log.getLogger(ThreadBudget.class).warn(e.toString()); + } } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java b/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java index 6a582351cb1..1776d743188 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java @@ -18,7 +18,10 @@ package org.eclipse.jetty.util.component; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; +import java.util.Set; /** @@ -45,6 +48,7 @@ public interface Container * @return the list of beans of the given class (or subclass) * @param the Bean type * @see #getBeans() + * @see #getContainedBeans(Class) */ public Collection getBeans(Class clazz); @@ -93,6 +97,14 @@ public interface Container */ void manage(Object bean); + + /** + * Test if this container manages a bean + * @param bean the bean to test + * @return whether this aggregate contains and manages the bean + */ + boolean isManaged(Object bean); + /** * Adds the given bean, explicitly managing it or not. * @@ -121,4 +133,11 @@ public interface Container public interface InheritedListener extends Listener { } + + /** + * @param clazz the class of the beans + * @return the list of beans of the given class from the entire managed hierarchy + * @param the Bean type + */ + public Collection getContainedBeans(Class clazz); } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java b/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java index 533c1949537..5c9d177daf1 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import org.eclipse.jetty.util.annotation.ManagedObject; @@ -198,6 +200,7 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container, * @param bean the bean to test * @return whether this aggregate contains and manages the bean */ + @Override public boolean isManaged(Object bean) { for (Bean b : _beans) @@ -761,6 +764,19 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container, return _managed==Managed.MANAGED; } + public boolean isManageable() + { + switch(_managed) + { + case MANAGED: + return true; + case AUTO: + return _bean instanceof LifeCycle && ((LifeCycle)_bean).isStopped(); + default: + return false; + } + } + @Override public String toString() { @@ -822,4 +838,38 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container, } } } + + + /** + * @param clazz the class of the beans + * @return the list of beans of the given class from the entire managed hierarchy + * @param the Bean type + */ + public Collection getContainedBeans(Class clazz) + { + Set beans = new HashSet<>(); + getContainedBeans(clazz, beans); + return beans; + } + + /** + * @param clazz the class of the beans + * @param the Bean type + * @param beans the collection to add beans of the given class from the entire managed hierarchy + */ + protected void getContainedBeans(Class clazz, Collection beans) + { + beans.addAll(getBeans(clazz)); + for (Container c : getBeans(Container.class)) + { + Bean bean = getBean(c); + if (bean!=null && bean.isManageable()) + { + if (c instanceof ContainerLifeCycle) + ((ContainerLifeCycle)c).getContainedBeans(clazz, beans); + else + beans.addAll(c.getContainedBeans(clazz)); + } + } + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index efadbcf694f..2606e8c291b 100755 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -16,7 +16,6 @@ // ======================================================================== // - package org.eclipse.jetty.util.thread; import java.io.IOException; @@ -66,6 +65,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo private boolean _daemon = false; private boolean _detailedDump = false; private int _lowThreadsThreshold = 1; + private ThreadBudget _budget; public QueuedThreadPool() { @@ -106,6 +106,23 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } _jobs=queue; _threadGroup=threadGroup; + _budget=new ThreadBudget(this); + } + + @Override + public ThreadBudget getThreadBudget() + { + return _budget; + } + + public void setThreadBudget(ThreadBudget budget) + { + if (budget!=null && budget.getSizedThreadPool()!=this) + throw new IllegalArgumentException(); + synchronized (this) + { + _budget = budget; + } } @Override @@ -184,6 +201,9 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } } + if (_budget!=null) + _budget.reset(); + synchronized (_joinLock) { _joinLock.notifyAll(); @@ -563,7 +583,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @Override public String toString() { - return String.format("org.eclipse.jetty.util.thread.QueuedThreadPool@%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size())); + return String.format("QueuedThreadPool@%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size())); } private Runnable idleJobPoll() throws InterruptedException diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index 6dd85adcd25..67f880e02a6 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -45,6 +45,8 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo private int _head; private int _size; private int _pending; + private ThreadBudget.Lease _lease; + private Object _owner; public ReservedThreadExecutor(Executor executor) { @@ -58,24 +60,42 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo * thread pool size. */ public ReservedThreadExecutor(Executor executor,int capacity) + { + this(executor,capacity,null); + } + + /** + * @param executor The executor to use to obtain threads + * @param capacity The number of threads to preallocate. If less than 0 then capacity + * is calculated based on a heuristic from the number of available processors and + * thread pool size. + */ + public ReservedThreadExecutor(Executor executor,int capacity, Object owner) { _executor = executor; + _queue = new ReservedThread[reservedThreads(executor,capacity)]; + _owner = owner; + } - if (capacity < 0) + /** + * @param executor The executor to use to obtain threads + * @param capacity The number of threads to preallocate, If less than 0 then capacity + * is calculated based on a heuristic from the number of available processors and + * thread pool size. + * @return the number of reserved threads that would be used by a ReservedThreadExecutor + * constructed with these arguments. + */ + public static int reservedThreads(Executor executor,int capacity) + { + if (capacity>=0) + return capacity; + int cpus = Runtime.getRuntime().availableProcessors(); + if (executor instanceof ThreadPool.SizedThreadPool) { - int cpus = Runtime.getRuntime().availableProcessors(); - if (executor instanceof ThreadPool.SizedThreadPool) - { - int threads = ((ThreadPool.SizedThreadPool)executor).getMaxThreads(); - capacity = Math.max(1, Math.min(cpus, threads / 8)); - } - else - { - capacity = cpus; - } + int threads = ((ThreadPool.SizedThreadPool)executor).getMaxThreads(); + return Math.max(1, Math.min(cpus, threads / 8)); } - - _queue = new ReservedThread[capacity]; + return cpus; } public Executor getExecutor() @@ -107,9 +127,18 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo } } + @Override + public void doStart() throws Exception + { + _lease = ThreadBudget.leaseFrom(getExecutor(),this,_queue.length); + super.doStart(); + } + @Override public void doStop() throws Exception { + if (_lease!=null) + _lease.close(); try (Locker.Lock lock = _locker.lock()) { while (_size>0) @@ -179,7 +208,9 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo { try (Locker.Lock lock = _locker.lock()) { - return String.format("%s{s=%d,p=%d}",super.toString(),_size,_pending); + if (_owner==null) + return String.format("%s@%x{s=%d,p=%d}",this.getClass().getSimpleName(),hashCode(),_size,_pending); + return String.format("%s@%s{s=%d,p=%d}",this.getClass().getSimpleName(),_owner,_size,_pending); } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadBudget.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadBudget.java new file mode 100644 index 00000000000..1d9d0033007 --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadBudget.java @@ -0,0 +1,178 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +/** + *

A budget of required thread usage, used to warn or error for insufficient configured threads.

+ * + * @see ThreadPool.SizedThreadPool#getThreadBudget() + */ +public class ThreadBudget +{ + static final Logger LOG = Log.getLogger(ThreadBudget.class); + + public interface Lease extends Closeable + { + int getThreads(); + } + + /** + * An allocation of threads + */ + public class Leased implements Lease + { + final Object leasee; + final int threads; + + private Leased(Object leasee,int threads) + { + this.leasee = leasee; + this.threads = threads; + } + + @Override + public int getThreads() + { + return threads; + } + + @Override + public void close() + { + info.remove(this); + allocations.remove(this); + warned.set(false); + } + } + + private static final Lease NOOP_LEASE = new Lease() + { + @Override + public void close() throws IOException + { + } + + @Override + public int getThreads() + { + return 0; + } + }; + + final ThreadPool.SizedThreadPool pool; + final Set allocations = new CopyOnWriteArraySet<>(); + final Set info = new CopyOnWriteArraySet<>(); + final AtomicBoolean warned = new AtomicBoolean(); + final int warnAt; + + /** + * Construct a bedget for a SizedThreadPool, with the warning level set by heuristic. + * @param pool The pool to budget thread allocation for. + */ + public ThreadBudget(ThreadPool.SizedThreadPool pool) + { + this(pool,Runtime.getRuntime().availableProcessors()); + } + + /** + * @param pool The pool to budget thread allocation for. + * @param warnAt The level of free threads at which a warning is generated. + */ + public ThreadBudget(ThreadPool.SizedThreadPool pool, int warnAt) + { + this.pool = pool; + this.warnAt = warnAt; + } + + public ThreadPool.SizedThreadPool getSizedThreadPool() + { + return pool; + } + + public void reset() + { + allocations.clear(); + info.clear(); + warned.set(false); + } + + public Lease leaseTo(Object leasee, int threads) + { + Leased lease = new Leased(leasee,threads); + allocations.add(lease); + check(); + return lease; + } + + /** + * Check registered allocations against the budget. + * @throws IllegalStateException if insufficient threads are configured. + */ + public void check() throws IllegalStateException + { + int required = allocations.stream() + .mapToInt(Lease::getThreads) + .sum(); + + int maximum = pool.getMaxThreads(); + + if (required>=maximum) + { + infoOnLeases(); + throw new IllegalStateException(String.format("Insuffient configured threads: required=%d < max=%d for %s", required, maximum, pool)); + } + + if ((maximum-required) < warnAt) + { + infoOnLeases(); + if (warned.compareAndSet(false,true)) + LOG.warn("Low configured threads: ( max={} - required={} ) < warnAt={} for {}", maximum, required, warnAt, pool); + } + } + + private void infoOnLeases() + { + allocations.stream().filter(lease->!info.contains(lease)) + .forEach(lease->{ + info.add(lease); + LOG.info("{} requires {} threads from {}",lease.leasee,lease.getThreads(),pool); + }); + } + + public static Lease leaseFrom(Executor executor, Object leasee, int threads) + { + if (executor instanceof ThreadPool.SizedThreadPool) + { + ThreadBudget budget = ((ThreadPool.SizedThreadPool)executor).getThreadBudget(); + if (budget!=null) + return budget.leaseTo(leasee,threads); + } + return NOOP_LEASE; + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java index 56055793106..2bfafe13a80 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java @@ -67,9 +67,14 @@ public interface ThreadPool extends Executor /* ------------------------------------------------------------ */ public interface SizedThreadPool extends ThreadPool { - public int getMinThreads(); - public int getMaxThreads(); - public void setMinThreads(int threads); - public void setMaxThreads(int threads); + int getMinThreads(); + int getMaxThreads(); + void setMinThreads(int threads); + void setMaxThreads(int threads); + + default ThreadBudget getThreadBudget() + { + return null; + } } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/component/ContainerLifeCycleTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/component/ContainerLifeCycleTest.java index cdc610a7e2e..d16df62e7db 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/component/ContainerLifeCycleTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/component/ContainerLifeCycleTest.java @@ -26,9 +26,13 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.util.TypeUtil; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + public class ContainerLifeCycleTest { @Test @@ -608,4 +612,32 @@ public class ContainerLifeCycleTest super.destroy(); } } + + + @Test + public void testGetBeans() throws Exception + { + TestContainerLifeCycle root = new TestContainerLifeCycle(); + TestContainerLifeCycle left = new TestContainerLifeCycle(); + root.addBean(left); + TestContainerLifeCycle right = new TestContainerLifeCycle(); + root.addBean(right); + TestContainerLifeCycle leaf = new TestContainerLifeCycle(); + right.addBean(leaf); + + root.addBean(Integer.valueOf(0)); + root.addBean(Integer.valueOf(1)); + left.addBean(Integer.valueOf(2)); + right.addBean(Integer.valueOf(3)); + leaf.addBean(Integer.valueOf(4)); + leaf.addBean("leaf"); + + assertThat(root.getBeans(Container.class), containsInAnyOrder(left,right)); + assertThat(root.getBeans(Integer.class), containsInAnyOrder(Integer.valueOf(0),Integer.valueOf(1))); + assertThat(root.getBeans(String.class), containsInAnyOrder()); + + assertThat(root.getContainedBeans(Container.class), containsInAnyOrder(left,right,leaf)); + assertThat(root.getContainedBeans(Integer.class), containsInAnyOrder(Integer.valueOf(0),Integer.valueOf(1),Integer.valueOf(2),Integer.valueOf(3),Integer.valueOf(4))); + assertThat(root.getContainedBeans(String.class), containsInAnyOrder("leaf")); + } } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java index 9859ec878fc..b7a020d59e2 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java @@ -37,6 +37,7 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.util.BytesContentProvider;