diff --git a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/ManyConnectors.java b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/ManyConnectors.java index 49016135da9..798434a74a7 100644 --- a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/ManyConnectors.java +++ b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/ManyConnectors.java @@ -32,6 +32,7 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; /** * A Jetty server with multiple connectors. @@ -46,12 +47,14 @@ public class ManyConnectors // probably be a direct path to your own keystore. String jettyDistKeystore = "../../jetty-distribution/target/distribution/demo-base/etc/keystore"; - String keystorePath = System.getProperty( - "example.keystore", jettyDistKeystore); + String keystorePath = System.getProperty("example.keystore", jettyDistKeystore); File keystoreFile = new File(keystorePath); if (!keystoreFile.exists()) { - throw new FileNotFoundException(keystoreFile.getAbsolutePath()); + keystorePath = "jetty-distribution/target/distribution/demo-base/etc/keystore"; + keystoreFile = new File(keystorePath); + if (!keystoreFile.exists()) + throw new FileNotFoundException(keystoreFile.getAbsolutePath()); } // Create a basic jetty server object without declaring the port. Since diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java index 023da71bfd9..3deae692ce1 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java @@ -73,6 +73,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.ThreadBudget; import org.eclipse.jetty.util.thread.ThreadPool; /** diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 32337188ca8..467920df9bf 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -38,6 +38,7 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.ThreadBudget; import org.eclipse.jetty.util.thread.ThreadPool; import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; @@ -295,13 +296,15 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump @Override protected void doStart() throws Exception { - addBean(new ReservedThreadExecutor(getExecutor(),_reservedThreads),true); + addBean(new ReservedThreadExecutor(getExecutor(),_reservedThreads,this),true); + for (int i = 0; i < _selectors.length; i++) { ManagedSelector selector = newSelector(i); _selectors[i] = selector; addBean(selector); } + super.doStart(); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index fe0e960b43f..e6c53e2d40a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -53,6 +53,8 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.ThreadBudget; +import org.eclipse.jetty.util.thread.ThreadPool; /** *

An abstract implementation of {@link Connector} that provides a {@link ConnectionFactory} mechanism @@ -158,6 +160,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co private String _name; private int _acceptorPriorityDelta=-2; private boolean _accepting = true; + private ThreadBudget.Lease lease; /** @@ -273,6 +276,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co throw new IllegalStateException("No protocol factory for SSL next protocol: '" + next + "' in " + this); } + lease = ThreadBudget.leaseFrom(getExecutor(),this,_acceptors.length); super.doStart(); _stopping=new CountDownLatch(_acceptors.length); @@ -308,6 +312,9 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co @Override protected void doStop() throws Exception { + if (lease!=null) + lease.close(); + // Tell the acceptors we are stopping interruptAcceptors(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java index 9e39f3d22a4..f476b58b9fd 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java @@ -24,12 +24,12 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.Enumeration; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -66,6 +66,7 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ShutdownThread; +import org.eclipse.jetty.util.thread.ThreadBudget; import org.eclipse.jetty.util.thread.ThreadPool; import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; @@ -379,37 +380,6 @@ public class Server extends HandlerWrapper implements Attributes HttpGenerator.setJettyVersion(HttpConfiguration.SERVER_VERSION); - // Check that the thread pool size is enough. - SizedThreadPool pool = getBean(SizedThreadPool.class); - int max=pool==null?-1:pool.getMaxThreads(); - int selectors=0; - int acceptors=0; - - for (Connector connector : _connectors) - { - if (connector instanceof AbstractConnector) - { - AbstractConnector abstractConnector = (AbstractConnector)connector; - Executor connectorExecutor = connector.getExecutor(); - - if (connectorExecutor != pool) - { - // Do not count the selectors and acceptors from this connector at - // the server level, because the connector uses a dedicated executor. - continue; - } - - acceptors += abstractConnector.getAcceptors(); - - if (connector instanceof ServerConnector) - selectors += ((ServerConnector)connector).getSelectorManager().getSelectorCount(); - } - } - - int needed=1+selectors+acceptors; - if (max>0 && needed>max) - throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d + selectors=%d + request=1)",max,acceptors,selectors)); - MultiException mex=new MultiException(); try { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java index 1a07288aa2c..72b1c26175e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java @@ -555,5 +555,11 @@ public class ServerConnector extends AbstractNetworkConnector onEndPointClosed(endpoint); super.endPointClosed(endpoint); } + + @Override + public String toString() + { + return String.format("SelectorManager@%s",ServerConnector.this); + } } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java index 7b368fc0b8d..c4e53c9bdf3 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java @@ -19,9 +19,12 @@ package org.eclipse.jetty.server; import org.eclipse.jetty.toolchain.test.AdvancedRunner; +import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.ThreadBudget; import org.eclipse.jetty.util.thread.ThreadPool; import org.junit.After; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -37,21 +40,29 @@ public class InsufficientThreadsDetectionTest _server.stop(); } - @Test(expected = IllegalStateException.class) + @Test() public void testConnectorUsesServerExecutorWithNotEnoughThreads() throws Exception { - // server has 3 threads in the executor - _server = new Server(new QueuedThreadPool(3)); + try + { + // server has 3 threads in the executor + _server = new Server(new QueuedThreadPool(3)); - // connector will use executor from server because connectorPool is null - ThreadPool connectorPool = null; - // connector requires 7 threads(2 + 4 + 1) - ServerConnector connector = new ServerConnector(_server, connectorPool, null, null, 2, 4, new HttpConnectionFactory()); - connector.setPort(0); - _server.addConnector(connector); + // connector will use executor from server because connectorPool is null + ThreadPool connectorPool = null; + // connector requires 7 threads(2 + 4 + 1) + ServerConnector connector = new ServerConnector(_server, connectorPool, null, null, 2, 4, new HttpConnectionFactory()); + connector.setPort(0); + _server.addConnector(connector); - // should throw IllegalStateException because there are no required threads in server pool - _server.start(); + // should throw IllegalStateException because there are no required threads in server pool + _server.start(); + Assert.fail(); + } + catch(IllegalStateException e) + { + Log.getLogger(ThreadBudget.class).warn(e.toString()); + } } @Test @@ -71,20 +82,35 @@ public class InsufficientThreadsDetectionTest _server.start(); } - @Test // Github issue #586 - public void testCaseForMultipleConnectors() throws Exception { - // server has 4 threads in the executor - _server = new Server(new QueuedThreadPool(4)); + // Github issue #586 - // first connector consumes all 4 threads from server pool - _server.addConnector(new ServerConnector(_server, null, null, null, 1, 1, new HttpConnectionFactory())); + @Test + public void testCaseForMultipleConnectors() throws Exception + { + try + { + // server has 4 threads in the executor + _server = new Server(new QueuedThreadPool(4)); - // second connect also require 4 threads but uses own executor, so its threads should not be counted - final QueuedThreadPool connectorPool = new QueuedThreadPool(4, 4); - _server.addConnector(new ServerConnector(_server, connectorPool, null, null, 1, 1, new HttpConnectionFactory())); + // first connector consumes 3 threads from server pool + _server.addConnector(new ServerConnector(_server, null, null, null, 1, 1, new HttpConnectionFactory())); - // should not throw exception because limit was not overflown - _server.start(); + // second connect also require 4 threads but uses own executor, so its threads should not be counted + final QueuedThreadPool connectorPool = new QueuedThreadPool(4, 4); + _server.addConnector(new ServerConnector(_server, connectorPool, null, null, 1, 1, new HttpConnectionFactory())); + + // first connector consumes 3 threads from server pool + _server.addConnector(new ServerConnector(_server, null, null, null, 1, 1, new HttpConnectionFactory())); + + // should not throw exception because limit was not overflown + _server.start(); + + Assert.fail(); + } + catch(IllegalStateException e) + { + Log.getLogger(ThreadBudget.class).warn(e.toString()); + } } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java b/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java index 6a582351cb1..1776d743188 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java @@ -18,7 +18,10 @@ package org.eclipse.jetty.util.component; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; +import java.util.Set; /** @@ -45,6 +48,7 @@ public interface Container * @return the list of beans of the given class (or subclass) * @param the Bean type * @see #getBeans() + * @see #getContainedBeans(Class) */ public Collection getBeans(Class clazz); @@ -93,6 +97,14 @@ public interface Container */ void manage(Object bean); + + /** + * Test if this container manages a bean + * @param bean the bean to test + * @return whether this aggregate contains and manages the bean + */ + boolean isManaged(Object bean); + /** * Adds the given bean, explicitly managing it or not. * @@ -121,4 +133,11 @@ public interface Container public interface InheritedListener extends Listener { } + + /** + * @param clazz the class of the beans + * @return the list of beans of the given class from the entire managed hierarchy + * @param the Bean type + */ + public Collection getContainedBeans(Class clazz); } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java b/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java index 533c1949537..5c9d177daf1 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import org.eclipse.jetty.util.annotation.ManagedObject; @@ -198,6 +200,7 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container, * @param bean the bean to test * @return whether this aggregate contains and manages the bean */ + @Override public boolean isManaged(Object bean) { for (Bean b : _beans) @@ -761,6 +764,19 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container, return _managed==Managed.MANAGED; } + public boolean isManageable() + { + switch(_managed) + { + case MANAGED: + return true; + case AUTO: + return _bean instanceof LifeCycle && ((LifeCycle)_bean).isStopped(); + default: + return false; + } + } + @Override public String toString() { @@ -822,4 +838,38 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container, } } } + + + /** + * @param clazz the class of the beans + * @return the list of beans of the given class from the entire managed hierarchy + * @param the Bean type + */ + public Collection getContainedBeans(Class clazz) + { + Set beans = new HashSet<>(); + getContainedBeans(clazz, beans); + return beans; + } + + /** + * @param clazz the class of the beans + * @param the Bean type + * @param beans the collection to add beans of the given class from the entire managed hierarchy + */ + protected void getContainedBeans(Class clazz, Collection beans) + { + beans.addAll(getBeans(clazz)); + for (Container c : getBeans(Container.class)) + { + Bean bean = getBean(c); + if (bean!=null && bean.isManageable()) + { + if (c instanceof ContainerLifeCycle) + ((ContainerLifeCycle)c).getContainedBeans(clazz, beans); + else + beans.addAll(c.getContainedBeans(clazz)); + } + } + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index efadbcf694f..2606e8c291b 100755 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -16,7 +16,6 @@ // ======================================================================== // - package org.eclipse.jetty.util.thread; import java.io.IOException; @@ -66,6 +65,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo private boolean _daemon = false; private boolean _detailedDump = false; private int _lowThreadsThreshold = 1; + private ThreadBudget _budget; public QueuedThreadPool() { @@ -106,6 +106,23 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } _jobs=queue; _threadGroup=threadGroup; + _budget=new ThreadBudget(this); + } + + @Override + public ThreadBudget getThreadBudget() + { + return _budget; + } + + public void setThreadBudget(ThreadBudget budget) + { + if (budget!=null && budget.getSizedThreadPool()!=this) + throw new IllegalArgumentException(); + synchronized (this) + { + _budget = budget; + } } @Override @@ -184,6 +201,9 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } } + if (_budget!=null) + _budget.reset(); + synchronized (_joinLock) { _joinLock.notifyAll(); @@ -563,7 +583,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @Override public String toString() { - return String.format("org.eclipse.jetty.util.thread.QueuedThreadPool@%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size())); + return String.format("QueuedThreadPool@%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size())); } private Runnable idleJobPoll() throws InterruptedException diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index 6dd85adcd25..67f880e02a6 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -45,6 +45,8 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo private int _head; private int _size; private int _pending; + private ThreadBudget.Lease _lease; + private Object _owner; public ReservedThreadExecutor(Executor executor) { @@ -58,24 +60,42 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo * thread pool size. */ public ReservedThreadExecutor(Executor executor,int capacity) + { + this(executor,capacity,null); + } + + /** + * @param executor The executor to use to obtain threads + * @param capacity The number of threads to preallocate. If less than 0 then capacity + * is calculated based on a heuristic from the number of available processors and + * thread pool size. + */ + public ReservedThreadExecutor(Executor executor,int capacity, Object owner) { _executor = executor; + _queue = new ReservedThread[reservedThreads(executor,capacity)]; + _owner = owner; + } - if (capacity < 0) + /** + * @param executor The executor to use to obtain threads + * @param capacity The number of threads to preallocate, If less than 0 then capacity + * is calculated based on a heuristic from the number of available processors and + * thread pool size. + * @return the number of reserved threads that would be used by a ReservedThreadExecutor + * constructed with these arguments. + */ + public static int reservedThreads(Executor executor,int capacity) + { + if (capacity>=0) + return capacity; + int cpus = Runtime.getRuntime().availableProcessors(); + if (executor instanceof ThreadPool.SizedThreadPool) { - int cpus = Runtime.getRuntime().availableProcessors(); - if (executor instanceof ThreadPool.SizedThreadPool) - { - int threads = ((ThreadPool.SizedThreadPool)executor).getMaxThreads(); - capacity = Math.max(1, Math.min(cpus, threads / 8)); - } - else - { - capacity = cpus; - } + int threads = ((ThreadPool.SizedThreadPool)executor).getMaxThreads(); + return Math.max(1, Math.min(cpus, threads / 8)); } - - _queue = new ReservedThread[capacity]; + return cpus; } public Executor getExecutor() @@ -107,9 +127,18 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo } } + @Override + public void doStart() throws Exception + { + _lease = ThreadBudget.leaseFrom(getExecutor(),this,_queue.length); + super.doStart(); + } + @Override public void doStop() throws Exception { + if (_lease!=null) + _lease.close(); try (Locker.Lock lock = _locker.lock()) { while (_size>0) @@ -179,7 +208,9 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo { try (Locker.Lock lock = _locker.lock()) { - return String.format("%s{s=%d,p=%d}",super.toString(),_size,_pending); + if (_owner==null) + return String.format("%s@%x{s=%d,p=%d}",this.getClass().getSimpleName(),hashCode(),_size,_pending); + return String.format("%s@%s{s=%d,p=%d}",this.getClass().getSimpleName(),_owner,_size,_pending); } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadBudget.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadBudget.java new file mode 100644 index 00000000000..1d9d0033007 --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadBudget.java @@ -0,0 +1,178 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +/** + *

A budget of required thread usage, used to warn or error for insufficient configured threads.

+ * + * @see ThreadPool.SizedThreadPool#getThreadBudget() + */ +public class ThreadBudget +{ + static final Logger LOG = Log.getLogger(ThreadBudget.class); + + public interface Lease extends Closeable + { + int getThreads(); + } + + /** + * An allocation of threads + */ + public class Leased implements Lease + { + final Object leasee; + final int threads; + + private Leased(Object leasee,int threads) + { + this.leasee = leasee; + this.threads = threads; + } + + @Override + public int getThreads() + { + return threads; + } + + @Override + public void close() + { + info.remove(this); + allocations.remove(this); + warned.set(false); + } + } + + private static final Lease NOOP_LEASE = new Lease() + { + @Override + public void close() throws IOException + { + } + + @Override + public int getThreads() + { + return 0; + } + }; + + final ThreadPool.SizedThreadPool pool; + final Set allocations = new CopyOnWriteArraySet<>(); + final Set info = new CopyOnWriteArraySet<>(); + final AtomicBoolean warned = new AtomicBoolean(); + final int warnAt; + + /** + * Construct a bedget for a SizedThreadPool, with the warning level set by heuristic. + * @param pool The pool to budget thread allocation for. + */ + public ThreadBudget(ThreadPool.SizedThreadPool pool) + { + this(pool,Runtime.getRuntime().availableProcessors()); + } + + /** + * @param pool The pool to budget thread allocation for. + * @param warnAt The level of free threads at which a warning is generated. + */ + public ThreadBudget(ThreadPool.SizedThreadPool pool, int warnAt) + { + this.pool = pool; + this.warnAt = warnAt; + } + + public ThreadPool.SizedThreadPool getSizedThreadPool() + { + return pool; + } + + public void reset() + { + allocations.clear(); + info.clear(); + warned.set(false); + } + + public Lease leaseTo(Object leasee, int threads) + { + Leased lease = new Leased(leasee,threads); + allocations.add(lease); + check(); + return lease; + } + + /** + * Check registered allocations against the budget. + * @throws IllegalStateException if insufficient threads are configured. + */ + public void check() throws IllegalStateException + { + int required = allocations.stream() + .mapToInt(Lease::getThreads) + .sum(); + + int maximum = pool.getMaxThreads(); + + if (required>=maximum) + { + infoOnLeases(); + throw new IllegalStateException(String.format("Insuffient configured threads: required=%d < max=%d for %s", required, maximum, pool)); + } + + if ((maximum-required) < warnAt) + { + infoOnLeases(); + if (warned.compareAndSet(false,true)) + LOG.warn("Low configured threads: ( max={} - required={} ) < warnAt={} for {}", maximum, required, warnAt, pool); + } + } + + private void infoOnLeases() + { + allocations.stream().filter(lease->!info.contains(lease)) + .forEach(lease->{ + info.add(lease); + LOG.info("{} requires {} threads from {}",lease.leasee,lease.getThreads(),pool); + }); + } + + public static Lease leaseFrom(Executor executor, Object leasee, int threads) + { + if (executor instanceof ThreadPool.SizedThreadPool) + { + ThreadBudget budget = ((ThreadPool.SizedThreadPool)executor).getThreadBudget(); + if (budget!=null) + return budget.leaseTo(leasee,threads); + } + return NOOP_LEASE; + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java index 56055793106..2bfafe13a80 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java @@ -67,9 +67,14 @@ public interface ThreadPool extends Executor /* ------------------------------------------------------------ */ public interface SizedThreadPool extends ThreadPool { - public int getMinThreads(); - public int getMaxThreads(); - public void setMinThreads(int threads); - public void setMaxThreads(int threads); + int getMinThreads(); + int getMaxThreads(); + void setMinThreads(int threads); + void setMaxThreads(int threads); + + default ThreadBudget getThreadBudget() + { + return null; + } } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/component/ContainerLifeCycleTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/component/ContainerLifeCycleTest.java index cdc610a7e2e..d16df62e7db 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/component/ContainerLifeCycleTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/component/ContainerLifeCycleTest.java @@ -26,9 +26,13 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.util.TypeUtil; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + public class ContainerLifeCycleTest { @Test @@ -608,4 +612,32 @@ public class ContainerLifeCycleTest super.destroy(); } } + + + @Test + public void testGetBeans() throws Exception + { + TestContainerLifeCycle root = new TestContainerLifeCycle(); + TestContainerLifeCycle left = new TestContainerLifeCycle(); + root.addBean(left); + TestContainerLifeCycle right = new TestContainerLifeCycle(); + root.addBean(right); + TestContainerLifeCycle leaf = new TestContainerLifeCycle(); + right.addBean(leaf); + + root.addBean(Integer.valueOf(0)); + root.addBean(Integer.valueOf(1)); + left.addBean(Integer.valueOf(2)); + right.addBean(Integer.valueOf(3)); + leaf.addBean(Integer.valueOf(4)); + leaf.addBean("leaf"); + + assertThat(root.getBeans(Container.class), containsInAnyOrder(left,right)); + assertThat(root.getBeans(Integer.class), containsInAnyOrder(Integer.valueOf(0),Integer.valueOf(1))); + assertThat(root.getBeans(String.class), containsInAnyOrder()); + + assertThat(root.getContainedBeans(Container.class), containsInAnyOrder(left,right,leaf)); + assertThat(root.getContainedBeans(Integer.class), containsInAnyOrder(Integer.valueOf(0),Integer.valueOf(1),Integer.valueOf(2),Integer.valueOf(3),Integer.valueOf(4))); + assertThat(root.getContainedBeans(String.class), containsInAnyOrder("leaf")); + } } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java index 9859ec878fc..b7a020d59e2 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java @@ -37,6 +37,7 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.util.BytesContentProvider;