Jetty 9.4.x 1803 proposal 0 - single ReservedThreadExecutor (#2119)

Issue #1803 - Review usage of scheduling strategies

Use a single ReservedThreadExecutor built into the QueuedThreadPool
via new interface TryExecutor.

Signed-off-by: Greg Wilkins <gregw@webtide.com>
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Greg Wilkins 2018-02-08 05:08:48 -05:00 committed by Simone Bordet
parent 9a787b1e54
commit ab5fc29cca
20 changed files with 204 additions and 184 deletions

View File

@ -39,7 +39,6 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
@ -69,36 +68,19 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
ReservedThreadExecutor reservedExecutor = provideReservedThreadExecutor(client, executor);
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, reservedExecutor, endPoint,
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);
connection.addListener(connectionListener);
return customize(connection, context);
}
protected ReservedThreadExecutor provideReservedThreadExecutor(HTTP2Client client, Executor executor)
{
synchronized (this)
{
ReservedThreadExecutor reservedExecutor = client.getBean(ReservedThreadExecutor.class);
if (reservedExecutor == null)
{
// TODO: see HTTP2Connection.FillableCallback
reservedExecutor = new ReservedThreadExecutor(executor, 0);
client.addManaged(reservedExecutor);
}
return reservedExecutor;
}
}
private class HTTP2ClientConnection extends HTTP2Connection implements Callback
{
private final HTTP2Client client;
private final Promise<Session> promise;
private final Session.Listener listener;
private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
{
super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
this.client = client;

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.http2.parser.Parser;
@ -35,7 +36,7 @@ import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.TryExecutor;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
public class HTTP2Connection extends AbstractConnection implements WriteFlusher.Listener
@ -51,14 +52,15 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
private final int bufferSize;
private final ExecutionStrategy strategy;
public HTTP2Connection(ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
{
super(endPoint, executor.getExecutor());
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
this.strategy = new EatWhatYouKill(producer, executor.getExecutor(), executor);
// TODO HTTP2 cannot use EWYK without fix for #1803
this.strategy = new EatWhatYouKill(producer, new TryExecutor.NoTryExecutor(executor));
LifeCycle.start(strategy);
}

View File

@ -12,7 +12,6 @@
<Set name="maxConcurrentStreams"><Property name="jetty.http2.maxConcurrentStreams" deprecated="http2.maxConcurrentStreams" default="128"/></Set>
<Set name="initialStreamRecvWindow"><Property name="jetty.http2.initialStreamRecvWindow" default="524288"/></Set>
<Set name="initialSessionRecvWindow"><Property name="jetty.http2.initialSessionRecvWindow" default="1048576"/></Set>
<Set name="reservedThreads"><Property name="jetty.http2.reservedThreads" default="-1"/></Set>
</New>
</Arg>
</Call>

View File

@ -27,7 +27,3 @@ etc/jetty-http2.xml
## Initial session receive window (client to server)
# jetty.http2.initialSessionRecvWindow=1048576
## Reserve threads for high priority tasks (-1 use number of Selectors, 0 no reserved threads)
# jetty.http2.reservedThreads=-1

View File

@ -35,7 +35,6 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
@ManagedObject
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
@ -49,7 +48,6 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxHeaderBlockFragment = 0;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private long streamIdleTimeout;
private int reservedThreads;
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{
@ -143,20 +141,23 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
}
/**
* @see ReservedThreadExecutor
* @return The number of reserved threads
* @return -1
* @deprecated
*/
@ManagedAttribute("The number of threads reserved for high priority tasks")
@Deprecated
public int getReservedThreads()
{
return reservedThreads;
return -1;
}
/**
* @param threads ignored
* @deprecated
*/
@Deprecated
public void setReservedThreads(int threads)
{
// TODO: see also HTTP2Connection.FillableCallback.
// TODO: currently disabled since the only value that works is 0.
// this.reservedThreads = threads;
throw new UnsupportedOperationException();
}
public HttpConfiguration getHttpConfiguration()
@ -183,30 +184,14 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
streamIdleTimeout = endPoint.getIdleTimeout();
session.setStreamIdleTimeout(streamIdleTimeout);
session.setInitialSessionRecvWindow(getInitialSessionRecvWindow());
ReservedThreadExecutor executor = provideReservedThreadExecutor(connector);
ServerParser parser = newServerParser(connector, session);
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), executor,
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
connection.addListener(connectionListener);
return configure(connection, connector, endPoint);
}
protected ReservedThreadExecutor provideReservedThreadExecutor(Connector connector)
{
synchronized (this)
{
ReservedThreadExecutor executor = getBean(ReservedThreadExecutor.class);
if (executor == null)
{
executor = new ReservedThreadExecutor(connector.getExecutor(), getReservedThreads());
addManaged(executor);
}
return executor;
}
}
protected abstract ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint);
protected ServerParser newServerParser(Connector connector, ServerParser.Listener listener)

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.http.BadMessageException;
@ -58,7 +59,6 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
{
@ -94,7 +94,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private final HttpConfiguration httpConfig;
private boolean recycleHttpChannels;
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
this.listener = listener;

View File

@ -48,7 +48,6 @@ import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
@ -77,7 +76,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
_id = id;
SelectorProducer producer = new SelectorProducer();
Executor executor = selectorManager.getExecutor();
_strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class));
_strategy = new EatWhatYouKill(producer,executor);
addBean(_strategy,true);
setStopTimeout(5000);
}
@ -136,17 +135,17 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
/**
* Submit an {@link SelectorUpdate} to be acted on between calls to {@link Selector#select()}
* @param action
* @param update The selector update to apply at next wakeup
*/
public void submit(SelectorUpdate action)
public void submit(SelectorUpdate update)
{
if (LOG.isDebugEnabled())
LOG.debug("Queued change {} on {}", action, this);
LOG.debug("Queued change {} on {}", update, this);
Selector selector = null;
synchronized(ManagedSelector.this)
{
_updates.offer(action);
_updates.offer(update);
if (_selecting)
{
@ -265,15 +264,15 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{
Selector selector = _selector;
List<String> keys = null;
List<SelectorUpdate> actions = null;
List<SelectorUpdate> updates = null;
if (selector != null && selector.isOpen())
{
DumpKeys dump = new DumpKeys();
String actionsAt;
String updatesAt;
synchronized(ManagedSelector.this)
{
actionsAt = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now());
actions = new ArrayList<>(_updates);
updatesAt = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now());
updates = new ArrayList<>(_updates);
_updates.addFirst(dump);
_selecting = false;
}
@ -282,7 +281,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
String keysAt = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now());
if (keys==null)
keys = Collections.singletonList("No dump keys retrieved");
dumpBeans(out, indent, Arrays.asList(new DumpableCollection("actions @ "+actionsAt, actions),
dumpBeans(out, indent, Arrays.asList(new DumpableCollection("updates @ "+updatesAt, updates),
new DumpableCollection("keys @ "+keysAt, keys)));
}
else
@ -295,7 +294,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
public String toString()
{
Selector selector = _selector;
return String.format("%s id=%s keys=%d selected=%d actions=%d",
return String.format("%s id=%s keys=%d selected=%d updates=%d",
super.toString(),
_id,
selector != null && selector.isOpen() ? selector.keys().size() : -1,
@ -377,16 +376,16 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
_updateable.clear();
Selector selector;
int actions;
int updates;
synchronized(ManagedSelector.this)
{
actions = _updates.size();
_selecting = actions==0;
updates = _updates.size();
_selecting = updates==0;
selector = _selecting?null:_selector;
}
if (LOG.isDebugEnabled())
LOG.debug("actions {}",actions);
LOG.debug("updates {}",updates);
if (selector != null)
selector.wakeup();
@ -405,18 +404,18 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
if (LOG.isDebugEnabled())
LOG.debug("Selector {} woken up from select, {}/{} selected", selector, selected, selector.keys().size());
int actions;
int updates;
synchronized(ManagedSelector.this)
{
// finished selecting
_selecting = false;
actions = _updates.size();
updates = _updates.size();
}
_keys = selector.selectedKeys();
_cursor = _keys.iterator();
if (LOG.isDebugEnabled())
LOG.debug("Selector {} processing {} keys, {} actions", selector, _keys.size(), actions);
LOG.debug("Selector {} processing {} keys, {} updates", selector, _keys.size(), updates);
return true;
}
@ -489,7 +488,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{
// Do update keys for only previously selected keys.
// This will update only those keys whose selection did not cause an
// updateKeys action to be submitted.
// updateKeys update to be submitted.
for (SelectionKey key : _keys)
updateKey(key);
_keys.clear();

View File

@ -38,11 +38,9 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.util.thread.ThreadPoolBudget;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
/**
* <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
@ -63,9 +61,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
private final AtomicInteger _selectorIndex = new AtomicInteger();
private final IntUnaryOperator _selectorIndexUpdate;
private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private int _reservedThreads = -1;
private ThreadPoolBudget.Lease _lease;
private ReservedThreadExecutor _reservedThreadExecutor;
private static int defaultSelectors(Executor executor)
{
@ -133,34 +129,23 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
}
/**
* Get the number of preallocated producing threads
* @see EatWhatYouKill
* @see ReservedThreadExecutor
* @return The number of threads preallocated to producing (default -1).
* @return -1
* @deprecated
*/
@ManagedAttribute("The number of reserved producer threads")
@Deprecated
public int getReservedThreads()
{
return _reservedThreads;
return -1;
}
/**
* Set the number of reserved threads for high priority tasks.
* <p>Reserved threads are used to take over producing duties, so that a
* producer thread may immediately consume a task it has produced (EatWhatYouKill
* scheduling). If a reserved thread is not available, then produced tasks must
* be submitted to an executor to be executed by a different thread.
* @see EatWhatYouKill
* @see ReservedThreadExecutor
* @param threads The number of producing threads to preallocate. If
* less that 0 (the default), then a heuristic based on the number of CPUs and
* the thread pool size is used to select the number of threads. If 0, no
* threads are preallocated and the EatWhatYouKill scheduler will be
* disabled and all produced tasks will be executed in a separate thread.
* @param threads ignored
* @deprecated
*/
@Deprecated
public void setReservedThreads(int threads)
{
_reservedThreads = threads;
throw new UnsupportedOperationException();
}
/**
@ -182,7 +167,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
return _selectors.length;
}
private ManagedSelector chooseSelector(SelectableChannel channel)
private ManagedSelector chooseSelector()
{
return _selectors[_selectorIndex.updateAndGet(_selectorIndexUpdate)];
}
@ -199,7 +184,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
*/
public void connect(SelectableChannel channel, Object attachment)
{
ManagedSelector set = chooseSelector(channel);
ManagedSelector set = chooseSelector();
set.submit(set.new Connect(channel, attachment));
}
@ -224,7 +209,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
*/
public void accept(SelectableChannel channel, Object attachment)
{
final ManagedSelector selector = chooseSelector(channel);
final ManagedSelector selector = chooseSelector();
selector.submit(selector.new Accept(channel, attachment));
}
@ -239,7 +224,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
*/
public Closeable acceptor(SelectableChannel server)
{
final ManagedSelector selector = chooseSelector(null);
final ManagedSelector selector = chooseSelector();
ManagedSelector.Acceptor acceptor = selector.new Acceptor(server);
selector.submit(acceptor);
return acceptor;
@ -262,8 +247,6 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
@Override
protected void doStart() throws Exception
{
_reservedThreadExecutor = new ReservedThreadExecutor(getExecutor(),_reservedThreads,this);
addBean(_reservedThreadExecutor,true);
_lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _selectors.length);
for (int i = 0; i < _selectors.length; i++)
{
@ -301,9 +284,6 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
removeBean(selector);
}
Arrays.fill(_selectors,null);
if (_reservedThreadExecutor!=null)
removeBean(_reservedThreadExecutor);
_reservedThreadExecutor = null;
if (_lease != null)
_lease.close();
}

View File

@ -12,7 +12,6 @@
<Set name="maxConcurrentStreams"><Property name="jetty.http2.maxConcurrentStreams" deprecated="http2.maxConcurrentStreams" default="128"/></Set>
<Set name="initialStreamRecvWindow"><Property name="jetty.http2.initialStreamRecvWindow" default="524288"/></Set>
<Set name="initialSessionRecvWindow"><Property name="jetty.http2.initialSessionRecvWindow" default="1048576"/></Set>
<Set name="reservedThreads"><Property name="jetty.http2.reservedThreads" default="-1"/></Set>
</New>
</Arg>
</Call>

View File

@ -42,7 +42,6 @@
<Set name="acceptQueueSize"><Property name="jetty.http.acceptQueueSize" deprecated="http.acceptQueueSize" default="0"/></Set>
<Get name="SelectorManager">
<Set name="connectTimeout"><Property name="jetty.http.connectTimeout" default="15000"/></Set>
<Set name="reservedThreads"><Property name="jetty.http.reservedThreads" default="-2"/></Set>
</Get>
</New>
</Arg>

View File

@ -34,7 +34,6 @@
<Set name="acceptQueueSize"><Property name="jetty.ssl.acceptQueueSize" deprecated="ssl.acceptQueueSize" default="0"/></Set>
<Get name="SelectorManager">
<Set name="connectTimeout"><Property name="jetty.ssl.connectTimeout" default="15000"/></Set>
<Set name="reservedThreads"><Property name="jetty.ssl.reservedThreads" default="-2"/></Set>
</Get>
</New>
</Arg>

View File

@ -22,6 +22,7 @@
<!-- =========================================================== -->
<Set name="minThreads" type="int"><Property name="jetty.threadPool.minThreads" deprecated="threads.min" default="10"/></Set>
<Set name="maxThreads" type="int"><Property name="jetty.threadPool.maxThreads" deprecated="threads.max" default="200"/></Set>
<Set name="reservedThreads" type="int"><Property name="jetty.threadPool.reservedThreads" default="-1"/></Set>
<Set name="idleTimeout" type="int"><Property name="jetty.threadPool.idleTimeout" deprecated="threads.timeout" default="60000"/></Set>
<Set name="detailedDump" type="boolean"><Property name="jetty.threadPool.detailedDump" default="false"/></Set>
</Configure>

View File

@ -24,7 +24,6 @@
<!-- configuration that may be set here. -->
<!-- =============================================================== -->
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<Arg name="threadpool"><Ref refid="threadPool"/></Arg>
<!-- =========================================================== -->

View File

@ -40,9 +40,6 @@ etc/jetty-http.xml
## Thread priority delta to give to acceptor threads
# jetty.http.acceptorPriorityDelta=0
## Reserve threads for high priority tasks (-1 use a heuristic, 0 no reserved threads)
# jetty.http.reservedThreads=-1
## Connect Timeout in milliseconds
# jetty.http.connectTimeout=15000

View File

@ -44,9 +44,6 @@ basehome:modules/ssl/keystore|etc/keystore
## Thread priority delta to give to acceptor threads
# jetty.ssl.acceptorPriorityDelta=0
## Preallocated producer threads (0 disables EatWhatYouKill scheduling)
# jetty.ssl.reservedThreads=-1
## Connect Timeout in milliseconds
# jetty.ssl.connectTimeout=15000

View File

@ -13,6 +13,9 @@ etc/jetty-threadpool.xml
## Maximum Number of Threads
#jetty.threadPool.maxThreads=200
## Number of reserved threads (-1 for heuristic)
# jetty.threadPool.reservedThreads=-1
## Thread Idle Timeout (in milliseconds)
#jetty.threadPool.idleTimeout=60000

View File

@ -36,7 +36,6 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
@ -46,7 +45,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
@ManagedObject("A thread pool")
public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadPool, Dumpable, TryExecutor
{
private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
@ -61,6 +60,8 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
private int _idleTimeout;
private int _maxThreads;
private int _minThreads;
private int _reservedThreads = -1;
private TryExecutor _tryExecutor = TryExecutor.NO_TRY;
private int _priority = Thread.NORM_PRIORITY;
private boolean _daemon = false;
private boolean _detailedDump = false;
@ -93,12 +94,17 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
}
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
{
this(maxThreads, minThreads, idleTimeout, -1, queue, threadGroup);
}
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads, @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
{
setMinThreads(minThreads);
setMaxThreads(maxThreads);
setIdleTimeout(idleTimeout);
setStopTimeout(5000);
setReservedThreads(reservedThreads);
if (queue==null)
{
int capacity=Math.max(_minThreads, 8);
@ -106,7 +112,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
}
_jobs=queue;
_threadGroup=threadGroup;
_budget=new ThreadPoolBudget(this);
setThreadPoolBudget(new ThreadPoolBudget(this));
}
@Override
@ -125,15 +131,21 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
@Override
protected void doStart() throws Exception
{
_tryExecutor = new ReservedThreadExecutor(this,_reservedThreads);
addBean(_tryExecutor);
super.doStart();
_threadsStarted.set(0);
startThreads(_minThreads);
startThreads(_minThreads);
}
@Override
protected void doStop() throws Exception
{
removeBean(_tryExecutor);
_tryExecutor = TryExecutor.NO_TRY;
super.doStop();
long timeout = getStopTimeout();
@ -222,7 +234,6 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
* Set the maximum thread idle time.
* Threads that are idle for longer than this period may be
* stopped.
* Delegated to the named or anonymous Pool.
*
* @param idleTimeout Max idle time in ms.
* @see #getIdleTimeout
@ -234,7 +245,6 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
/**
* Set the maximum number of threads.
* Delegated to the named or anonymous Pool.
*
* @param maxThreads maximum number of threads.
* @see #getMaxThreads
@ -249,7 +259,6 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
/**
* Set the minimum number of threads.
* Delegated to the named or anonymous Pool.
*
* @param minThreads minimum number of threads
* @see #getMinThreads
@ -266,6 +275,19 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
if (isStarted() && threads < _minThreads)
startThreads(_minThreads - threads);
}
/**
* Set the number of reserved threads.
*
* @param reservedThreads number of reserved threads or -1 for heuristically determined
* @see #getReservedThreads
*/
public void setReservedThreads(int reservedThreads)
{
if (isRunning())
throw new IllegalStateException(getState());
_reservedThreads = reservedThreads;
}
/**
* @param name Name of this thread pool to use when naming threads.
@ -289,7 +311,6 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
/**
* Get the maximum thread idle time.
* Delegated to the named or anonymous Pool.
*
* @return Max idle time in ms.
* @see #setIdleTimeout
@ -302,7 +323,6 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
/**
* Get the maximum number of threads.
* Delegated to the named or anonymous Pool.
*
* @return maximum number of threads.
* @see #setMaxThreads
@ -316,7 +336,6 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
/**
* Get the minimum number of threads.
* Delegated to the named or anonymous Pool.
*
* @return minimum number of threads.
* @see #setMinThreads
@ -328,6 +347,20 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
return _minThreads;
}
/**
* Get the number of reserved threads.
*
* @return number of reserved threads or or -1 for heuristically determined
* @see #setReservedThreads
*/
@ManagedAttribute("the number of reserved threads in the pool")
public int getReservedThreads()
{
if (isStarted())
return getBean(ReservedThreadExecutor.class).getCapacity();
return _reservedThreads;
}
/**
* @return The name of the this thread pool
*/
@ -409,6 +442,13 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
}
}
@Override
public boolean tryExecute(Runnable task)
{
TryExecutor tryExecutor = _tryExecutor;
return tryExecutor!=null && tryExecutor.tryExecute(task);
}
/**
* Blocks until the thread pool is {@link LifeCycle#stop stopped}.
*/
@ -509,13 +549,6 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
return new Thread(_threadGroup, runnable);
}
@Override
@ManagedOperation("dumps thread pool state")
public String dump()
{
return ContainerLifeCycle.dump(this);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
@ -585,14 +618,21 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
if (isDetailedDump())
jobs = new ArrayList<>(getQueue());
ContainerLifeCycle.dumpObject(out, this);
ContainerLifeCycle.dump(out, indent, threads, Collections.singletonList(new DumpableCollection("jobs",jobs)));
dumpBeans(out, indent, threads, Collections.singletonList(new DumpableCollection("jobs", jobs)));
}
@Override
public String toString()
{
return String.format("QueuedThreadPool@%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
return String.format("QueuedThreadPool@%s{%s,%d<=%d<=%d,i=%d,q=%d,r=%s}",
_name,
getState(),
getMinThreads(),
getThreads(),
getMaxThreads(),
getIdleThreads(),
_jobs.size(),
_tryExecutor);
}
private Runnable idleJobPoll() throws InterruptedException

View File

@ -42,7 +42,7 @@ import org.eclipse.jetty.util.log.Logger;
* whenever it has been idle for that period.
*/
@ManagedObject("A pool for reserved threads")
public class ReservedThreadExecutor extends AbstractLifeCycle implements Executor
public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor
{
private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class);
private static final Runnable STOP = new Runnable()
@ -66,42 +66,24 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
private final AtomicInteger _pending = new AtomicInteger();
private ThreadPoolBudget.Lease _lease;
private Object _owner;
private long _idleTime = 1L;
private TimeUnit _idleTimeUnit = TimeUnit.MINUTES;
public ReservedThreadExecutor(Executor executor)
{
this(executor,1);
}
/**
* @param executor The executor to use to obtain threads
* @param capacity The number of threads to preallocate. If less than 0 then capacity
* is calculated based on a heuristic from the number of available processors and
* thread pool size.
*/
public ReservedThreadExecutor(Executor executor, int capacity)
{
this(executor,capacity,null);
}
/**
* @param executor The executor to use to obtain threads
* @param capacity The number of threads to preallocate. If less than 0 then capacity
* is calculated based on a heuristic from the number of available processors and
* thread pool size.
* @param owner the owner of the instance. Only used for debugging purpose withing the {@link #toString()} method
*/
public ReservedThreadExecutor(Executor executor,int capacity, Object owner)
public ReservedThreadExecutor(Executor executor,int capacity)
{
_executor = executor;
_capacity = reservedThreads(executor,capacity);
_stack = new ConcurrentLinkedDeque<>();
_owner = owner;
LOG.debug("{}",this);
}
/**
* @param executor The executor to use to obtain threads
* @param capacity The number of threads to preallocate, If less than 0 then capacity
@ -110,7 +92,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
* @return the number of reserved threads that would be used by a ReservedThreadExecutor
* constructed with these arguments.
*/
public static int reservedThreads(Executor executor,int capacity)
private static int reservedThreads(Executor executor,int capacity)
{
if (capacity>=0)
return capacity;
@ -118,7 +100,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
if (executor instanceof ThreadPool.SizedThreadPool)
{
int threads = ((ThreadPool.SizedThreadPool)executor).getMaxThreads();
return Math.max(1, Math.min(cpus, threads / 8));
return Math.max(1, Math.min(cpus, threads / 10));
}
return cpus;
}
@ -193,8 +175,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
@Override
public void execute(Runnable task) throws RejectedExecutionException
{
if (!tryExecute(task))
throw new RejectedExecutionException();
_executor.execute(task);
}
/**
@ -255,13 +236,12 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
@Override
public String toString()
{
return String.format("%s@%x{s=%d/%d,p=%d}@%s",
return String.format("%s@%x{s=%d/%d,p=%d}",
getClass().getSimpleName(),
hashCode(),
_size.get(),
_capacity,
_pending.get(),
_owner);
_pending.get());
}
private class ReservedThread implements Runnable

View File

@ -0,0 +1,72 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
/**
* A variation of Executor that can confirm if a thread is available immediately
*/
public interface TryExecutor extends Executor
{
/**
* Attempt to execute a task.
* @param task The task to be executed
* @return True IFF the task has been given directly to a thread to execute. The task cannot be queued pending the later availability of a Thread.
*/
boolean tryExecute(Runnable task);
default void execute(Runnable task)
{
if (!tryExecute(task))
throw new RejectedExecutionException();
}
public static TryExecutor asTryExecutor(Executor executor)
{
if (executor instanceof TryExecutor)
return (TryExecutor)executor;
return new NoTryExecutor(executor);
}
public static class NoTryExecutor implements TryExecutor
{
private final Executor executor;
public NoTryExecutor(Executor executor)
{
this.executor = executor;
}
@Override
public void execute(Runnable task)
{
executor.execute(task);
}
@Override
public boolean tryExecute(Runnable task)
{
return false;
}
}
public static final TryExecutor NO_TRY = task -> false;
}

View File

@ -34,7 +34,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.TryExecutor;
/**
* <p>A strategy where the thread that produces will run the resulting task if it
@ -73,25 +73,16 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
private final LongAdder _executed = new LongAdder();
private final Producer _producer;
private final Executor _executor;
private final ReservedThreadExecutor _producers;
private final TryExecutor _tryExecutor;
private State _state = State.IDLE;
public EatWhatYouKill(Producer producer, Executor executor)
{
this(producer,executor,new ReservedThreadExecutor(executor,1));
}
public EatWhatYouKill(Producer producer, Executor executor, int maxReserved)
{
this(producer,executor,new ReservedThreadExecutor(executor,maxReserved));
}
public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers)
{
_producer = producer;
_executor = executor;
_producers = producers;
_tryExecutor = TryExecutor.asTryExecutor(executor);
addBean(_producer);
addBean(_tryExecutor);
if (LOG.isDebugEnabled())
LOG.debug("{} created", this);
}
@ -214,7 +205,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{
synchronized(this)
{
if (_producers.tryExecute(this))
if (_tryExecutor.tryExecute(this))
{
// EXECUTE PRODUCE CONSUME!
// We have executed a new Producer, so we can EWYK consume
@ -334,7 +325,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{
builder.append(_state);
builder.append('/');
builder.append(_producers);
builder.append(_tryExecutor);
builder.append("[nb=");
builder.append(getNonBlockingTasksConsumed());
builder.append(",c=");