diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/InsufficientThreadsDetectionTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/InsufficientThreadsDetectionTest.java new file mode 100644 index 00000000000..e7dcd432e4c --- /dev/null +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/InsufficientThreadsDetectionTest.java @@ -0,0 +1,58 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.Assert; +import org.junit.Test; + +public class InsufficientThreadsDetectionTest +{ + @Test(expected = IllegalStateException.class) + public void testInsufficientThreads() throws Exception + { + QueuedThreadPool clientThreads = new QueuedThreadPool(1); + HttpClient httpClient = new HttpClient(new HttpClientTransportOverHTTP(1), null); + httpClient.setExecutor(clientThreads); + httpClient.start(); + } + + @Test + public void testInsufficientThreadsForMultipleHttpClients() throws Exception + { + QueuedThreadPool clientThreads = new QueuedThreadPool(3); + HttpClient httpClient1 = new HttpClient(new HttpClientTransportOverHTTP(1), null); + httpClient1.setExecutor(clientThreads); + httpClient1.start(); + + try + { + // Share the same thread pool with another instance. + HttpClient httpClient2 = new HttpClient(new HttpClientTransportOverHTTP(1), null); + httpClient2.setExecutor(clientThreads); + httpClient2.start(); + Assert.fail(); + } + catch (IllegalStateException expected) + { + // Expected. + } + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 792610ea6b6..121b6c817a3 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -38,8 +38,8 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.ThreadBudget; import org.eclipse.jetty.util.thread.ThreadPool; +import org.eclipse.jetty.util.thread.ThreadPoolBudget; import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; /** @@ -61,6 +61,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT; private long _selectorIndex; private int _reservedThreads = -1; + private ThreadPoolBudget.Lease _lease; private static int defaultSelectors(Executor executor) { @@ -296,14 +297,13 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump protected void doStart() throws Exception { addBean(new ReservedThreadExecutor(getExecutor(),_reservedThreads,this),true); - + _lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _selectors.length); for (int i = 0; i < _selectors.length; i++) { ManagedSelector selector = newSelector(i); _selectors[i] = selector; addBean(selector); } - super.doStart(); } @@ -324,6 +324,8 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump super.doStop(); for (ManagedSelector selector : _selectors) removeBean(selector); + if (_lease != null) + _lease.close(); } /** diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index e6c53e2d40a..653a2d860d6 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -53,8 +53,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.ThreadBudget; -import org.eclipse.jetty.util.thread.ThreadPool; +import org.eclipse.jetty.util.thread.ThreadPoolBudget; /** *

An abstract implementation of {@link Connector} that provides a {@link ConnectionFactory} mechanism @@ -160,8 +159,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co private String _name; private int _acceptorPriorityDelta=-2; private boolean _accepting = true; - private ThreadBudget.Lease lease; - + private ThreadPoolBudget.Lease _lease; /** * @param server The server this connector will be added to. Must not be null. @@ -276,7 +274,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co throw new IllegalStateException("No protocol factory for SSL next protocol: '" + next + "' in " + this); } - lease = ThreadBudget.leaseFrom(getExecutor(),this,_acceptors.length); + _lease = ThreadPoolBudget.leaseFrom(getExecutor(),this,_acceptors.length); super.doStart(); _stopping=new CountDownLatch(_acceptors.length); @@ -312,8 +310,8 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co @Override protected void doStop() throws Exception { - if (lease!=null) - lease.close(); + if (_lease!=null) + _lease.close(); // Tell the acceptors we are stopping interruptAcceptors(); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java index adce494570b..7fe301f8f0c 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/InsufficientThreadsDetectionTest.java @@ -21,8 +21,8 @@ package org.eclipse.jetty.server; import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.thread.QueuedThreadPool; -import org.eclipse.jetty.util.thread.ThreadBudget; import org.eclipse.jetty.util.thread.ThreadPool; +import org.eclipse.jetty.util.thread.ThreadPoolBudget; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -61,7 +61,7 @@ public class InsufficientThreadsDetectionTest } catch(IllegalStateException e) { - Log.getLogger(ThreadBudget.class).warn(e.toString()); + Log.getLogger(ThreadPoolBudget.class).warn(e.toString()); } } @@ -107,7 +107,7 @@ public class InsufficientThreadsDetectionTest } catch(IllegalStateException e) { - Log.getLogger(ThreadBudget.class).warn(e.toString()); + Log.getLogger(ThreadPoolBudget.class).warn(e.toString()); } } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 2606e8c291b..92b6204bc14 100755 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -65,7 +65,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo private boolean _daemon = false; private boolean _detailedDump = false; private int _lowThreadsThreshold = 1; - private ThreadBudget _budget; + private ThreadPoolBudget _budget; public QueuedThreadPool() { @@ -106,23 +106,20 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } _jobs=queue; _threadGroup=threadGroup; - _budget=new ThreadBudget(this); + _budget=new ThreadPoolBudget(this); } @Override - public ThreadBudget getThreadBudget() + public ThreadPoolBudget getThreadPoolBudget() { return _budget; } - public void setThreadBudget(ThreadBudget budget) + public void setThreadPoolBudget(ThreadPoolBudget budget) { if (budget!=null && budget.getSizedThreadPool()!=this) throw new IllegalArgumentException(); - synchronized (this) - { - _budget = budget; - } + _budget = budget; } @Override diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index d58c69b2806..0c8a2d9b6ee 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -64,7 +64,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo private final AtomicInteger _size = new AtomicInteger(); private final AtomicInteger _pending = new AtomicInteger(); - private ThreadBudget.Lease _lease; + private ThreadPoolBudget.Lease _lease; private Object _owner; private long _idleTime = 1L; private TimeUnit _idleTimeUnit = TimeUnit.MINUTES; @@ -168,7 +168,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo @Override public void doStart() throws Exception { - _lease = ThreadBudget.leaseFrom(getExecutor(),this,_capacity); + _lease = ThreadPoolBudget.leaseFrom(getExecutor(),this,_capacity); super.doStart(); } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java index 2bfafe13a80..ecc61bd87c3 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPool.java @@ -71,8 +71,7 @@ public interface ThreadPool extends Executor int getMaxThreads(); void setMinThreads(int threads); void setMaxThreads(int threads); - - default ThreadBudget getThreadBudget() + default ThreadPoolBudget getThreadPoolBudget() { return null; } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadBudget.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPoolBudget.java similarity index 84% rename from jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadBudget.java rename to jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPoolBudget.java index 1d9d0033007..e2ed8069a59 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadBudget.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPoolBudget.java @@ -31,11 +31,11 @@ import org.eclipse.jetty.util.log.Logger; /** *

A budget of required thread usage, used to warn or error for insufficient configured threads.

* - * @see ThreadPool.SizedThreadPool#getThreadBudget() + * @see ThreadPool.SizedThreadPool#getThreadPoolBudget() */ -public class ThreadBudget +public class ThreadPoolBudget { - static final Logger LOG = Log.getLogger(ThreadBudget.class); + static final Logger LOG = Log.getLogger(ThreadPoolBudget.class); public interface Lease extends Closeable { @@ -92,10 +92,10 @@ public class ThreadBudget final int warnAt; /** - * Construct a bedget for a SizedThreadPool, with the warning level set by heuristic. + * Construct a budget for a SizedThreadPool, with the warning level set by heuristic. * @param pool The pool to budget thread allocation for. */ - public ThreadBudget(ThreadPool.SizedThreadPool pool) + public ThreadPoolBudget(ThreadPool.SizedThreadPool pool) { this(pool,Runtime.getRuntime().availableProcessors()); } @@ -104,7 +104,7 @@ public class ThreadBudget * @param pool The pool to budget thread allocation for. * @param warnAt The level of free threads at which a warning is generated. */ - public ThreadBudget(ThreadPool.SizedThreadPool pool, int warnAt) + public ThreadPoolBudget(ThreadPool.SizedThreadPool pool, int warnAt) { this.pool = pool; this.warnAt = warnAt; @@ -139,20 +139,20 @@ public class ThreadBudget int required = allocations.stream() .mapToInt(Lease::getThreads) .sum(); - int maximum = pool.getMaxThreads(); + int actual = maximum - required; - if (required>=maximum) + if (actual <= 0) { infoOnLeases(); - throw new IllegalStateException(String.format("Insuffient configured threads: required=%d < max=%d for %s", required, maximum, pool)); + throw new IllegalStateException(String.format("Insufficient configured threads: required=%d < max=%d for %s", required, maximum, pool)); } - if ((maximum-required) < warnAt) + if (actual < warnAt) { infoOnLeases(); if (warned.compareAndSet(false,true)) - LOG.warn("Low configured threads: ( max={} - required={} ) < warnAt={} for {}", maximum, required, warnAt, pool); + LOG.warn("Low configured threads: (max={} - required={})={} < warnAt={} for {}", maximum, required, actual, warnAt, pool); } } @@ -169,7 +169,7 @@ public class ThreadBudget { if (executor instanceof ThreadPool.SizedThreadPool) { - ThreadBudget budget = ((ThreadPool.SizedThreadPool)executor).getThreadBudget(); + ThreadPoolBudget budget = ((ThreadPool.SizedThreadPool)executor).getThreadPoolBudget(); if (budget!=null) return budget.leaseTo(leasee,threads); }