Issue #1851 - Improve insufficient thread warnings/errors.
ThreadBudget -> ThreadPoolBudget. Added selectors to the leased threads.
This commit is contained in:
parent
a811785d64
commit
3b98a6c000
|
@ -0,0 +1,58 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.client;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
|
||||||
|
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class InsufficientThreadsDetectionTest
|
||||||
|
{
|
||||||
|
@Test(expected = IllegalStateException.class)
|
||||||
|
public void testInsufficientThreads() throws Exception
|
||||||
|
{
|
||||||
|
QueuedThreadPool clientThreads = new QueuedThreadPool(1);
|
||||||
|
HttpClient httpClient = new HttpClient(new HttpClientTransportOverHTTP(1), null);
|
||||||
|
httpClient.setExecutor(clientThreads);
|
||||||
|
httpClient.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInsufficientThreadsForMultipleHttpClients() throws Exception
|
||||||
|
{
|
||||||
|
QueuedThreadPool clientThreads = new QueuedThreadPool(3);
|
||||||
|
HttpClient httpClient1 = new HttpClient(new HttpClientTransportOverHTTP(1), null);
|
||||||
|
httpClient1.setExecutor(clientThreads);
|
||||||
|
httpClient1.start();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Share the same thread pool with another instance.
|
||||||
|
HttpClient httpClient2 = new HttpClient(new HttpClientTransportOverHTTP(1), null);
|
||||||
|
httpClient2.setExecutor(clientThreads);
|
||||||
|
httpClient2.start();
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
catch (IllegalStateException expected)
|
||||||
|
{
|
||||||
|
// Expected.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -38,8 +38,8 @@ import org.eclipse.jetty.util.log.Log;
|
||||||
import org.eclipse.jetty.util.log.Logger;
|
import org.eclipse.jetty.util.log.Logger;
|
||||||
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
|
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
|
||||||
import org.eclipse.jetty.util.thread.Scheduler;
|
import org.eclipse.jetty.util.thread.Scheduler;
|
||||||
import org.eclipse.jetty.util.thread.ThreadBudget;
|
|
||||||
import org.eclipse.jetty.util.thread.ThreadPool;
|
import org.eclipse.jetty.util.thread.ThreadPool;
|
||||||
|
import org.eclipse.jetty.util.thread.ThreadPoolBudget;
|
||||||
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
|
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -61,6 +61,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
|
||||||
private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
|
private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
|
||||||
private long _selectorIndex;
|
private long _selectorIndex;
|
||||||
private int _reservedThreads = -1;
|
private int _reservedThreads = -1;
|
||||||
|
private ThreadPoolBudget.Lease _lease;
|
||||||
|
|
||||||
private static int defaultSelectors(Executor executor)
|
private static int defaultSelectors(Executor executor)
|
||||||
{
|
{
|
||||||
|
@ -296,14 +297,13 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
|
||||||
protected void doStart() throws Exception
|
protected void doStart() throws Exception
|
||||||
{
|
{
|
||||||
addBean(new ReservedThreadExecutor(getExecutor(),_reservedThreads,this),true);
|
addBean(new ReservedThreadExecutor(getExecutor(),_reservedThreads,this),true);
|
||||||
|
_lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _selectors.length);
|
||||||
for (int i = 0; i < _selectors.length; i++)
|
for (int i = 0; i < _selectors.length; i++)
|
||||||
{
|
{
|
||||||
ManagedSelector selector = newSelector(i);
|
ManagedSelector selector = newSelector(i);
|
||||||
_selectors[i] = selector;
|
_selectors[i] = selector;
|
||||||
addBean(selector);
|
addBean(selector);
|
||||||
}
|
}
|
||||||
|
|
||||||
super.doStart();
|
super.doStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,6 +324,8 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
|
||||||
super.doStop();
|
super.doStop();
|
||||||
for (ManagedSelector selector : _selectors)
|
for (ManagedSelector selector : _selectors)
|
||||||
removeBean(selector);
|
removeBean(selector);
|
||||||
|
if (_lease != null)
|
||||||
|
_lease.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -53,8 +53,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||||
import org.eclipse.jetty.util.thread.Locker;
|
import org.eclipse.jetty.util.thread.Locker;
|
||||||
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
|
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
|
||||||
import org.eclipse.jetty.util.thread.Scheduler;
|
import org.eclipse.jetty.util.thread.Scheduler;
|
||||||
import org.eclipse.jetty.util.thread.ThreadBudget;
|
import org.eclipse.jetty.util.thread.ThreadPoolBudget;
|
||||||
import org.eclipse.jetty.util.thread.ThreadPool;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>An abstract implementation of {@link Connector} that provides a {@link ConnectionFactory} mechanism
|
* <p>An abstract implementation of {@link Connector} that provides a {@link ConnectionFactory} mechanism
|
||||||
|
@ -160,8 +159,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
|
||||||
private String _name;
|
private String _name;
|
||||||
private int _acceptorPriorityDelta=-2;
|
private int _acceptorPriorityDelta=-2;
|
||||||
private boolean _accepting = true;
|
private boolean _accepting = true;
|
||||||
private ThreadBudget.Lease lease;
|
private ThreadPoolBudget.Lease _lease;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param server The server this connector will be added to. Must not be null.
|
* @param server The server this connector will be added to. Must not be null.
|
||||||
|
@ -276,7 +274,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
|
||||||
throw new IllegalStateException("No protocol factory for SSL next protocol: '" + next + "' in " + this);
|
throw new IllegalStateException("No protocol factory for SSL next protocol: '" + next + "' in " + this);
|
||||||
}
|
}
|
||||||
|
|
||||||
lease = ThreadBudget.leaseFrom(getExecutor(),this,_acceptors.length);
|
_lease = ThreadPoolBudget.leaseFrom(getExecutor(),this,_acceptors.length);
|
||||||
super.doStart();
|
super.doStart();
|
||||||
|
|
||||||
_stopping=new CountDownLatch(_acceptors.length);
|
_stopping=new CountDownLatch(_acceptors.length);
|
||||||
|
@ -312,8 +310,8 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
|
||||||
@Override
|
@Override
|
||||||
protected void doStop() throws Exception
|
protected void doStop() throws Exception
|
||||||
{
|
{
|
||||||
if (lease!=null)
|
if (_lease!=null)
|
||||||
lease.close();
|
_lease.close();
|
||||||
|
|
||||||
// Tell the acceptors we are stopping
|
// Tell the acceptors we are stopping
|
||||||
interruptAcceptors();
|
interruptAcceptors();
|
||||||
|
|
|
@ -21,8 +21,8 @@ package org.eclipse.jetty.server;
|
||||||
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
|
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
|
||||||
import org.eclipse.jetty.util.log.Log;
|
import org.eclipse.jetty.util.log.Log;
|
||||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||||
import org.eclipse.jetty.util.thread.ThreadBudget;
|
|
||||||
import org.eclipse.jetty.util.thread.ThreadPool;
|
import org.eclipse.jetty.util.thread.ThreadPool;
|
||||||
|
import org.eclipse.jetty.util.thread.ThreadPoolBudget;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -61,7 +61,7 @@ public class InsufficientThreadsDetectionTest
|
||||||
}
|
}
|
||||||
catch(IllegalStateException e)
|
catch(IllegalStateException e)
|
||||||
{
|
{
|
||||||
Log.getLogger(ThreadBudget.class).warn(e.toString());
|
Log.getLogger(ThreadPoolBudget.class).warn(e.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,7 +107,7 @@ public class InsufficientThreadsDetectionTest
|
||||||
}
|
}
|
||||||
catch(IllegalStateException e)
|
catch(IllegalStateException e)
|
||||||
{
|
{
|
||||||
Log.getLogger(ThreadBudget.class).warn(e.toString());
|
Log.getLogger(ThreadPoolBudget.class).warn(e.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
private boolean _daemon = false;
|
private boolean _daemon = false;
|
||||||
private boolean _detailedDump = false;
|
private boolean _detailedDump = false;
|
||||||
private int _lowThreadsThreshold = 1;
|
private int _lowThreadsThreshold = 1;
|
||||||
private ThreadBudget _budget;
|
private ThreadPoolBudget _budget;
|
||||||
|
|
||||||
public QueuedThreadPool()
|
public QueuedThreadPool()
|
||||||
{
|
{
|
||||||
|
@ -106,23 +106,20 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
|
||||||
}
|
}
|
||||||
_jobs=queue;
|
_jobs=queue;
|
||||||
_threadGroup=threadGroup;
|
_threadGroup=threadGroup;
|
||||||
_budget=new ThreadBudget(this);
|
_budget=new ThreadPoolBudget(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ThreadBudget getThreadBudget()
|
public ThreadPoolBudget getThreadPoolBudget()
|
||||||
{
|
{
|
||||||
return _budget;
|
return _budget;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setThreadBudget(ThreadBudget budget)
|
public void setThreadPoolBudget(ThreadPoolBudget budget)
|
||||||
{
|
{
|
||||||
if (budget!=null && budget.getSizedThreadPool()!=this)
|
if (budget!=null && budget.getSizedThreadPool()!=this)
|
||||||
throw new IllegalArgumentException();
|
throw new IllegalArgumentException();
|
||||||
synchronized (this)
|
_budget = budget;
|
||||||
{
|
|
||||||
_budget = budget;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
|
||||||
private final AtomicInteger _size = new AtomicInteger();
|
private final AtomicInteger _size = new AtomicInteger();
|
||||||
private final AtomicInteger _pending = new AtomicInteger();
|
private final AtomicInteger _pending = new AtomicInteger();
|
||||||
|
|
||||||
private ThreadBudget.Lease _lease;
|
private ThreadPoolBudget.Lease _lease;
|
||||||
private Object _owner;
|
private Object _owner;
|
||||||
private long _idleTime = 1L;
|
private long _idleTime = 1L;
|
||||||
private TimeUnit _idleTimeUnit = TimeUnit.MINUTES;
|
private TimeUnit _idleTimeUnit = TimeUnit.MINUTES;
|
||||||
|
@ -168,7 +168,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
|
||||||
@Override
|
@Override
|
||||||
public void doStart() throws Exception
|
public void doStart() throws Exception
|
||||||
{
|
{
|
||||||
_lease = ThreadBudget.leaseFrom(getExecutor(),this,_capacity);
|
_lease = ThreadPoolBudget.leaseFrom(getExecutor(),this,_capacity);
|
||||||
super.doStart();
|
super.doStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -71,8 +71,7 @@ public interface ThreadPool extends Executor
|
||||||
int getMaxThreads();
|
int getMaxThreads();
|
||||||
void setMinThreads(int threads);
|
void setMinThreads(int threads);
|
||||||
void setMaxThreads(int threads);
|
void setMaxThreads(int threads);
|
||||||
|
default ThreadPoolBudget getThreadPoolBudget()
|
||||||
default ThreadBudget getThreadBudget()
|
|
||||||
{
|
{
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,11 +31,11 @@ import org.eclipse.jetty.util.log.Logger;
|
||||||
/**
|
/**
|
||||||
* <p>A budget of required thread usage, used to warn or error for insufficient configured threads.</p>
|
* <p>A budget of required thread usage, used to warn or error for insufficient configured threads.</p>
|
||||||
*
|
*
|
||||||
* @see ThreadPool.SizedThreadPool#getThreadBudget()
|
* @see ThreadPool.SizedThreadPool#getThreadPoolBudget()
|
||||||
*/
|
*/
|
||||||
public class ThreadBudget
|
public class ThreadPoolBudget
|
||||||
{
|
{
|
||||||
static final Logger LOG = Log.getLogger(ThreadBudget.class);
|
static final Logger LOG = Log.getLogger(ThreadPoolBudget.class);
|
||||||
|
|
||||||
public interface Lease extends Closeable
|
public interface Lease extends Closeable
|
||||||
{
|
{
|
||||||
|
@ -92,10 +92,10 @@ public class ThreadBudget
|
||||||
final int warnAt;
|
final int warnAt;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a bedget for a SizedThreadPool, with the warning level set by heuristic.
|
* Construct a budget for a SizedThreadPool, with the warning level set by heuristic.
|
||||||
* @param pool The pool to budget thread allocation for.
|
* @param pool The pool to budget thread allocation for.
|
||||||
*/
|
*/
|
||||||
public ThreadBudget(ThreadPool.SizedThreadPool pool)
|
public ThreadPoolBudget(ThreadPool.SizedThreadPool pool)
|
||||||
{
|
{
|
||||||
this(pool,Runtime.getRuntime().availableProcessors());
|
this(pool,Runtime.getRuntime().availableProcessors());
|
||||||
}
|
}
|
||||||
|
@ -104,7 +104,7 @@ public class ThreadBudget
|
||||||
* @param pool The pool to budget thread allocation for.
|
* @param pool The pool to budget thread allocation for.
|
||||||
* @param warnAt The level of free threads at which a warning is generated.
|
* @param warnAt The level of free threads at which a warning is generated.
|
||||||
*/
|
*/
|
||||||
public ThreadBudget(ThreadPool.SizedThreadPool pool, int warnAt)
|
public ThreadPoolBudget(ThreadPool.SizedThreadPool pool, int warnAt)
|
||||||
{
|
{
|
||||||
this.pool = pool;
|
this.pool = pool;
|
||||||
this.warnAt = warnAt;
|
this.warnAt = warnAt;
|
||||||
|
@ -139,20 +139,20 @@ public class ThreadBudget
|
||||||
int required = allocations.stream()
|
int required = allocations.stream()
|
||||||
.mapToInt(Lease::getThreads)
|
.mapToInt(Lease::getThreads)
|
||||||
.sum();
|
.sum();
|
||||||
|
|
||||||
int maximum = pool.getMaxThreads();
|
int maximum = pool.getMaxThreads();
|
||||||
|
int actual = maximum - required;
|
||||||
|
|
||||||
if (required>=maximum)
|
if (actual <= 0)
|
||||||
{
|
{
|
||||||
infoOnLeases();
|
infoOnLeases();
|
||||||
throw new IllegalStateException(String.format("Insuffient configured threads: required=%d < max=%d for %s", required, maximum, pool));
|
throw new IllegalStateException(String.format("Insufficient configured threads: required=%d < max=%d for %s", required, maximum, pool));
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((maximum-required) < warnAt)
|
if (actual < warnAt)
|
||||||
{
|
{
|
||||||
infoOnLeases();
|
infoOnLeases();
|
||||||
if (warned.compareAndSet(false,true))
|
if (warned.compareAndSet(false,true))
|
||||||
LOG.warn("Low configured threads: ( max={} - required={} ) < warnAt={} for {}", maximum, required, warnAt, pool);
|
LOG.warn("Low configured threads: (max={} - required={})={} < warnAt={} for {}", maximum, required, actual, warnAt, pool);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,7 +169,7 @@ public class ThreadBudget
|
||||||
{
|
{
|
||||||
if (executor instanceof ThreadPool.SizedThreadPool)
|
if (executor instanceof ThreadPool.SizedThreadPool)
|
||||||
{
|
{
|
||||||
ThreadBudget budget = ((ThreadPool.SizedThreadPool)executor).getThreadBudget();
|
ThreadPoolBudget budget = ((ThreadPool.SizedThreadPool)executor).getThreadPoolBudget();
|
||||||
if (budget!=null)
|
if (budget!=null)
|
||||||
return budget.leaseTo(leasee,threads);
|
return budget.leaseTo(leasee,threads);
|
||||||
}
|
}
|
Loading…
Reference in New Issue