Issue #1637 Thread per HTTP/2 Connection

This fix simplifies the EWYK scheduler by factoring out the preallocated producer into a
ReservedThreadExecutor class.   A shared ReservedThreadExecutor can then be used by multiple
EWYK instances to avoid over allocation of threads.

Squashed commit of the following:

commit c435dc20e25bd274d69423be1be7b0565925f249
Merge: 58a5a9a 90e5b56
Author: Greg Wilkins <gregw@webtide.com>
Date:   Wed Jun 21 10:48:22 2017 +0200

    Merge branch 'jetty-9.4.x' into jetty-9.4.x-ewyk3

commit 58a5a9a655ee1a72a66f54ac8c95d7c9d73afe85
Author: Simone Bordet <simone.bordet@gmail.com>
Date:   Wed Jun 14 15:56:43 2017 +0200

    Code cleanups.

commit 4e5296216b52948523572352cba391438ff6b494
Author: Greg Wilkins <gregw@webtide.com>
Date:   Wed Jun 14 07:34:58 2017 +0200

    refixed Producing to Reproducing

commit a1f8682f86d1f0803121162e3f14d7768286d3ed
Author: Greg Wilkins <gregw@webtide.com>
Date:   Wed Jun 14 07:26:29 2017 +0200

    fixed Producing to Reproducing

commit 9468932e062d2271d8dc1d43a78544757732fff5
Author: Greg Wilkins <gregw@webtide.com>
Date:   Tue Jun 13 16:33:44 2017 +0200

    fixed javadoc

commit 9d4941eb97638fec09b3fe34d423538d17943b6f
Author: Greg Wilkins <gregw@webtide.com>
Date:   Tue Jun 13 16:05:27 2017 +0200

    Renamed Preallocated to ReservedThread

commit 6d3379ab64c6dcc2a7aa8ec7088afd77863816c2
Author: Greg Wilkins <gregw@webtide.com>
Date:   Tue Jun 13 12:28:52 2017 +0200

    Added configuration in modules

commit 1bd1adea4682538e1546c2ae53f4c9340dafb3bb
Merge: 83418a9 6702248
Author: Greg Wilkins <gregw@webtide.com>
Date:   Tue Jun 13 10:09:29 2017 +0200

    Merge branch 'jetty-9.4.x' into jetty-9.4.x-ewyk3

commit 83418a91320c8bfc54465ca02efdce0d2c874a0e
Author: Greg Wilkins <gregw@webtide.com>
Date:   Tue Jun 13 10:08:35 2017 +0200

    javadoc

commit 62918fd39189fed3414fec4a7c8380c21e90a4b8
Author: Greg Wilkins <gregw@webtide.com>
Date:   Sat Jun 10 00:04:06 2017 +0200

    Improved EatWhatYouKill implementation

    Simplified by abstracting out PreallocatedExecutor
    Removed invocation execution
    HTTP2 now uses a shared PreallocationExcecutor between connection
This commit is contained in:
Greg Wilkins 2017-06-21 11:48:41 +02:00 committed by Joakim Erdfelt
parent c9a1395f08
commit a105be95e4
26 changed files with 804 additions and 339 deletions

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
@ -46,6 +47,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
public static final String CLIENT_CONTEXT_KEY = "http2.client";
public static final String BYTE_BUFFER_POOL_CONTEXT_KEY = "http2.client.byteBufferPool";
public static final String EXECUTOR_CONTEXT_KEY = "http2.client.executor";
public static final String PREALLOCATED_EXECUTOR_CONTEXT_KEY = "http2.client.preallocatedExecutor";
public static final String SCHEDULER_CONTEXT_KEY = "http2.client.scheduler";
public static final String SESSION_LISTENER_CONTEXT_KEY = "http2.client.sessionListener";
public static final String SESSION_PROMISE_CONTEXT_KEY = "http2.client.sessionPromise";
@ -58,6 +60,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
HTTP2Client client = (HTTP2Client)context.get(CLIENT_CONTEXT_KEY);
ByteBufferPool byteBufferPool = (ByteBufferPool)context.get(BYTE_BUFFER_POOL_CONTEXT_KEY);
Executor executor = (Executor)context.get(EXECUTOR_CONTEXT_KEY);
ReservedThreadExecutor preallocatedExecutor = (ReservedThreadExecutor)context.get(PREALLOCATED_EXECUTOR_CONTEXT_KEY);
Scheduler scheduler = (Scheduler)context.get(SCHEDULER_CONTEXT_KEY);
Session.Listener listener = (Session.Listener)context.get(SESSION_LISTENER_CONTEXT_KEY);
@SuppressWarnings("unchecked")
@ -67,7 +70,33 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy();
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
if (preallocatedExecutor==null)
{
// TODO move this to non lazy construction
preallocatedExecutor=client.getBean(ReservedThreadExecutor.class);
if (preallocatedExecutor==null)
{
synchronized (this)
{
if (preallocatedExecutor==null)
{
try
{
preallocatedExecutor = new ReservedThreadExecutor(executor,1); // TODO configure size
preallocatedExecutor.start();
client.addBean(preallocatedExecutor,true);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
}
}
}
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, preallocatedExecutor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);
connection.addListener(connectionListener);
return customize(connection, context);
@ -79,7 +108,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
private final Promise<Session> promise;
private final Session.Listener listener;
private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
{
super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
this.client = client;

View File

@ -35,6 +35,7 @@ import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
public class HTTP2Connection extends AbstractConnection
@ -50,14 +51,14 @@ public class HTTP2Connection extends AbstractConnection
private final int bufferSize;
private final ExecutionStrategy strategy;
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
public HTTP2Connection(ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
{
super(endPoint, executor);
super(endPoint, executor.getExecutor());
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
this.strategy = new EatWhatYouKill(producer, executor, 0);
this.strategy = new EatWhatYouKill(producer, executor.getExecutor(), executor);
LifeCycle.start(strategy);
}
@ -147,7 +148,10 @@ public class HTTP2Connection extends AbstractConnection
protected void offerTask(Runnable task, boolean dispatch)
{
offerTask(task);
strategy.dispatch();
if (dispatch)
strategy.dispatch();
else
strategy.produce();
}
@Override
@ -180,7 +184,7 @@ public class HTTP2Connection extends AbstractConnection
private ByteBuffer buffer;
@Override
public synchronized Runnable produce()
public Runnable produce()
{
Runnable task = pollTask();
if (LOG.isDebugEnabled())

View File

@ -9,8 +9,10 @@
<Arg>
<New class="org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory">
<Arg name="config"><Ref refid="sslHttpConfig"/></Arg>
<Set name="maxConcurrentStreams"><Property name="jetty.http2.maxConcurrentStreams" deprecated="http2.maxConcurrentStreams" default="1024"/></Set>
<Set name="initialStreamRecvWindow"><Property name="jetty.http2.initialStreamRecvWindow" default="65535"/></Set>
<Set name="maxConcurrentStreams"><Property name="jetty.http2.maxConcurrentStreams" deprecated="http2.maxConcurrentStreams" default="128"/></Set>
<Set name="initialStreamRecvWindow"><Property name="jetty.http2.initialStreamRecvWindow" default="524288"/></Set>
<Set name="initialSessionRecvWindow"><Property name="jetty.http2.initialSessionRecvWindow" default="1048576"/></Set>
<Set name="reservedThreads"><Property name="jetty.http2.reservedThreads" default="-1"/></Set>
</New>
</Arg>
</Call>

View File

@ -20,7 +20,14 @@ etc/jetty-http2.xml
[ini-template]
## Max number of concurrent streams per connection
# jetty.http2.maxConcurrentStreams=1024
# jetty.http2.maxConcurrentStreams=128
## Initial stream receive window (client to server)
# jetty.http2.initialStreamRecvWindow=65535
# jetty.http2.initialStreamRecvWindow=524288
## Initial session receive window (client to server)
# jetty.http2.initialSessionRecvWindow=1048576
## Reserve threads for high priority tasks (-1 use number of Selectors, 0 no reserved threads)
# jetty.http2.reservedThreads=-1

View File

@ -35,6 +35,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
@ManagedObject
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
@ -48,6 +49,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxHeaderBlockFragment = 0;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private long streamIdleTimeout;
private int reservedThreads = -1;
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{
@ -108,6 +110,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.maxConcurrentStreams = maxConcurrentStreams;
}
@ManagedAttribute("The max header block fragment")
public int getMaxHeaderBlockFragment()
{
return maxHeaderBlockFragment;
@ -139,6 +142,21 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.streamIdleTimeout = streamIdleTimeout;
}
/**
* @see ReservedThreadExecutor
* @return The number of reserved threads
*/
@ManagedAttribute("The number of threads reserved for high priority tasks")
public int getReservedThreads()
{
return reservedThreads;
}
public void setReservedThreads(int threads)
{
this.reservedThreads = threads;
}
public HttpConfiguration getHttpConfiguration()
{
return httpConfiguration;
@ -163,9 +181,32 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
streamIdleTimeout = endPoint.getIdleTimeout();
session.setStreamIdleTimeout(streamIdleTimeout);
session.setInitialSessionRecvWindow(getInitialSessionRecvWindow());
ReservedThreadExecutor executor = connector.getBean(ReservedThreadExecutor.class);
if (executor==null)
{
synchronized (this)
{
executor = connector.getBean(ReservedThreadExecutor.class);
if (executor==null)
{
try
{
executor = new ReservedThreadExecutor(connector.getExecutor(),getReservedThreads());
executor.start();
connector.addBean(executor,true);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
}
}
ServerParser parser = newServerParser(connector, session);
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), executor,
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
connection.addListener(connectionListener);
return configure(connection, connector, endPoint);

View File

@ -56,6 +56,7 @@ import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
{
@ -91,7 +92,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private final HttpConfiguration httpConfig;
private boolean recycleHttpChannels;
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
this.listener = listener;

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.http2.server;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -46,7 +47,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpChannelOverHTTP2 extends HttpChannel
public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
{
private static final Logger LOG = Log.getLogger(HttpChannelOverHTTP2.class);
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
@ -377,6 +378,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel
}
}
@Override
public void close()
{
abort(new IOException("Unexpected close"));
}
@Override
public String toString()
{

View File

@ -38,7 +38,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
@ -47,10 +46,9 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume;
/**
* <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
@ -76,8 +74,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
_id = id;
SelectorProducer producer = new SelectorProducer();
Executor executor = selectorManager.getExecutor();
_strategy = new EatWhatYouKill(producer,executor);
addBean(_strategy);
_strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class));
addBean(_strategy,true);
setStopTimeout(5000);
}
@ -446,24 +444,26 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
@Override
public String dump()
{
super.dump();
return ContainerLifeCycle.dump(this);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append(System.lineSeparator());
Selector selector = _selector;
if (selector != null && selector.isOpen())
if (selector == null || !selector.isOpen())
dumpBeans(out, indent);
else
{
final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
DumpKeys dumpKeys = new DumpKeys(dump);
submit(dumpKeys);
dumpKeys.await(5, TimeUnit.SECONDS);
ContainerLifeCycle.dump(out, indent, dump);
if (dump.isEmpty())
dumpBeans(out, indent);
else
dumpBeans(out, indent, dump);
}
}

View File

@ -29,12 +29,15 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
/**
* <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
@ -42,6 +45,8 @@ import org.eclipse.jetty.util.thread.Scheduler;
* <p>{@link SelectorManager} subclasses implement methods to return protocol-specific
* {@link EndPoint}s and {@link Connection}s.</p>
*/
@ManagedObject("Manager of the NIO Selectors")
public abstract class SelectorManager extends ContainerLifeCycle implements Dumpable
{
public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
@ -52,6 +57,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
private final ManagedSelector[] _selectors;
private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private long _selectorIndex;
private int _reservedThreads = -2;
protected SelectorManager(Executor executor, Scheduler scheduler)
{
@ -67,11 +73,13 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
_selectors = new ManagedSelector[selectors];
}
@ManagedAttribute("The Executor")
public Executor getExecutor()
{
return executor;
}
@ManagedAttribute("The Scheduler")
public Scheduler getScheduler()
{
return scheduler;
@ -82,6 +90,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
*
* @return the connect timeout (in milliseconds)
*/
@ManagedAttribute("The Connection timeout (ms)")
public long getConnectTimeout()
{
return _connectTimeout;
@ -97,6 +106,30 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
_connectTimeout = milliseconds;
}
/**
* Get the number of preallocated producing threads
* @see EatWhatYouKill
* @see ReservedThreadExecutor
* @return The number of threads preallocated to producing (default 1).
*/
@ManagedAttribute("The number of preallocated producer threads")
public int getReservedThreads()
{
return _reservedThreads;
}
/**
* Set the number of preallocated threads for high priority tasks
* @see EatWhatYouKill
* @see ReservedThreadExecutor
* @param threads The number of producing threads to preallocate (default 1).
* The EatWhatYouKill scheduler will be disabled with a value of 0.
*/
public void setReservedThreads(int threads)
{
_reservedThreads = threads;
}
/**
* Executes the given task in a different thread.
*
@ -110,6 +143,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
/**
* @return the number of selectors in use
*/
@ManagedAttribute("The number of NIO Selectors")
public int getSelectorCount()
{
return _selectors.length;
@ -231,6 +265,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
@Override
protected void doStart() throws Exception
{
addBean(new ReservedThreadExecutor(getExecutor(),_reservedThreads==-2?_selectors.length:_reservedThreads),true);
for (int i = 0; i < _selectors.length; i++)
{
ManagedSelector selector = newSelector(i);
@ -373,4 +408,5 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
* @throws IOException if unable to create new connection
*/
public abstract Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException;
}

View File

@ -31,7 +31,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.thread.Scheduler;
@ -45,7 +45,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
* be referenced in the pom.xml. This class wraps a ServerConnector, delaying setting the
* server instance. Only a few of the setters from the ServerConnector class are supported.
*/
public class MavenServerConnector extends AbstractLifeCycle implements Connector
public class MavenServerConnector extends ContainerLifeCycle implements Connector
{
public static String PORT_SYSPROPERTY = "jetty.http.port";

View File

@ -40,6 +40,10 @@
<Set name="soLingerTime"><Property name="jetty.http.soLingerTime" deprecated="http.soLingerTime" default="-1"/></Set>
<Set name="acceptorPriorityDelta"><Property name="jetty.http.acceptorPriorityDelta" deprecated="http.acceptorPriorityDelta" default="0"/></Set>
<Set name="acceptQueueSize"><Property name="jetty.http.acceptQueueSize" deprecated="http.acceptQueueSize" default="0"/></Set>
<Get name="SelectorManager">
<Set name="connectTimeout"><Property name="jetty.http.connectTimeout" default="15000"/></Set>
<Set name="reservedThreads"><Property name="jetty.http.reservedThreads" default="-2"/></Set>
</Get>
</New>
</Arg>
</Call>

View File

@ -32,6 +32,10 @@
<Set name="soLingerTime"><Property name="jetty.ssl.soLingerTime" deprecated="ssl.soLingerTime" default="-1"/></Set>
<Set name="acceptorPriorityDelta"><Property name="jetty.ssl.acceptorPriorityDelta" deprecated="ssl.acceptorPriorityDelta" default="0"/></Set>
<Set name="acceptQueueSize"><Property name="jetty.ssl.acceptQueueSize" deprecated="ssl.acceptQueueSize" default="0"/></Set>
<Get name="SelectorManager">
<Set name="connectTimeout"><Property name="jetty.ssl.connectTimeout" default="15000"/></Set>
<Set name="reservedThreads"><Property name="jetty.ssl.reservedThreads" default="-2"/></Set>
</Get>
</New>
</Arg>
</Call>

View File

@ -40,5 +40,11 @@ etc/jetty-http.xml
## Thread priority delta to give to acceptor threads
# jetty.http.acceptorPriorityDelta=0
## Reserve threads for high priority tasks (-2 use number of selectors,-1 use number of CPUs, 0 no reserved threads)
# jetty.http.reservedThreads=-2
## Connect Timeout in milliseconds
# jetty.http.connectTimeout=15000
## HTTP Compliance: RFC7230, RFC2616, LEGACY
# jetty.http.compliance=RFC7230

View File

@ -44,6 +44,12 @@ basehome:modules/ssl/keystore|etc/keystore
## Thread priority delta to give to acceptor threads
# jetty.ssl.acceptorPriorityDelta=0
## Preallocated producer threads (0 disables EatWhatYouKill scheduling)
# jetty.ssl.reservedThreads=-1
## Connect Timeout in milliseconds
# jetty.ssl.connectTimeout=15000
## Whether request host names are checked to match any SNI names
# jetty.ssl.sniHostCheck=true

View File

@ -27,6 +27,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Container;
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.Scheduler;
@ -37,7 +38,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
* the machinery needed to handle such tasks.</p>
*/
@ManagedObject("Connector Interface")
public interface Connector extends LifeCycle, Graceful
public interface Connector extends LifeCycle, Container, Graceful
{
/**
* @return the {@link Server} instance associated with this {@link Connector}

View File

@ -378,6 +378,7 @@ public class ServerConnector extends AbstractNetworkConnector
}
}
@ManagedAttribute("The Selector Manager")
public SelectorManager getSelectorManager()
{
return _manager;

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.util.component;
import java.util.Collection;
/**
* A Container
*/
@ -76,6 +77,31 @@ public interface Container
*/
public void removeEventListener(Listener listener);
/**
* Unmanages a bean already contained by this aggregate, so that it is not started/stopped/destroyed with this
* aggregate.
*
* @param bean The bean to unmanage (must already have been added).
*/
void unmanage(Object bean);
/**
* Manages a bean already contained by this aggregate, so that it is started/stopped/destroyed with this
* aggregate.
*
* @param bean The bean to manage (must already have been added).
*/
void manage(Object bean);
/**
* Adds the given bean, explicitly managing it or not.
*
* @param o The bean object to add
* @param managed whether to managed the lifecycle of the bean
* @return true if the bean was added, false if it was already present
*/
boolean addBean(Object o, boolean managed);
/**
* A listener for Container events.
* If an added bean implements this interface it will receive the events

View File

@ -236,6 +236,7 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
* @param managed whether to managed the lifecycle of the bean
* @return true if the bean was added, false if it was already present
*/
@Override
public boolean addBean(Object o, boolean managed)
{
if (o instanceof LifeCycle)
@ -380,6 +381,7 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
*
* @param bean The bean to manage (must already have been added).
*/
@Override
public void manage(Object bean)
{
for (Bean b : _beans)
@ -426,6 +428,7 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
*
* @param bean The bean to unmanage (must already have been added).
*/
@Override
public void unmanage(Object bean)
{
for (Bean b : _beans)

View File

@ -186,84 +186,4 @@ public interface Invocable
return InvocationType.BLOCKING;
}
/**
* An Executor wrapper that knows about Invocable
*
*/
public static class InvocableExecutor implements Executor
{
private static final Logger LOG = Log.getLogger(InvocableExecutor.class);
private final Executor _executor;
private final InvocationType _preferredInvocationForExecute;
private final InvocationType _preferredInvocationForInvoke;
public InvocableExecutor(Executor executor,InvocationType preferred)
{
this(executor,preferred,preferred);
}
public InvocableExecutor(Executor executor,InvocationType preferredInvocationForExecute,InvocationType preferredInvocationForIvoke)
{
_executor=executor;
_preferredInvocationForExecute=preferredInvocationForExecute;
_preferredInvocationForInvoke=preferredInvocationForIvoke;
}
public Invocable.InvocationType getPreferredInvocationType()
{
return _preferredInvocationForInvoke;
}
public void invoke(Runnable task)
{
if (LOG.isDebugEnabled())
LOG.debug("{} invoke {}", this, task);
Invocable.invokePreferred(task,_preferredInvocationForInvoke);
if (LOG.isDebugEnabled())
LOG.debug("{} invoked {}", this, task);
}
public void execute(Runnable task)
{
tryExecute(task,_preferredInvocationForExecute);
}
public void execute(Runnable task, InvocationType preferred)
{
tryExecute(task,preferred);
}
public boolean tryExecute(Runnable task)
{
return tryExecute(task,_preferredInvocationForExecute);
}
public boolean tryExecute(Runnable task, InvocationType preferred)
{
try
{
_executor.execute(Invocable.asPreferred(task,preferred));
return true;
}
catch(RejectedExecutionException e)
{
// If we cannot execute, then close the task
LOG.debug(e);
LOG.warn("Rejected execution of {}",task);
try
{
if (task instanceof Closeable)
((Closeable)task).close();
}
catch (Exception x)
{
e.addSuppressed(x);
LOG.warn(e);
}
}
return false;
}
}
}

View File

@ -506,16 +506,22 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
for (final Thread thread : _threads)
{
final StackTraceElement[] trace = thread.getStackTrace();
boolean inIdleJobPoll = false;
String knownMethod = "";
for (StackTraceElement t : trace)
{
if ("idleJobPoll".equals(t.getMethodName()))
{
inIdleJobPoll = true;
knownMethod = "IDLE ";
break;
}
if ("preallocatedWait".equals(t.getMethodName()))
{
knownMethod = "PREALLOCATED ";
break;
}
}
final boolean idle = inIdleJobPoll;
final String known = knownMethod;
if (isDetailedDump())
{
@ -524,11 +530,11 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
@Override
public void dump(Appendable out, String indent) throws IOException
{
out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "");
out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(known).append(thread.getState().toString());
if (thread.getPriority()!=Thread.NORM_PRIORITY)
out.append(" prio=").append(String.valueOf(thread.getPriority()));
out.append(System.lineSeparator());
if (!idle)
if (known.length()==0)
ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
}
@ -542,7 +548,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
else
{
int p=thread.getPriority();
threads.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
threads.add(thread.getId() + " " + thread.getName() + " " + known + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
}
}
@ -557,7 +563,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
@Override
public String toString()
{
return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
return String.format("org.eclipse.jetty.util.thread.QueuedThreadPool@%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
}
private Runnable idleJobPoll() throws InterruptedException

View File

@ -0,0 +1,235 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Condition;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* An Executor using preallocated/reserved Threads from a wrapped Executor.
* <p>Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed
* with a Thread immediately being assigned the Runnable task, or fail if no Thread is
* available. Threads are preallocated up to the capacity from a wrapped {@link Executor}.
*/
public class ReservedThreadExecutor extends AbstractLifeCycle implements Executor
{
private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class);
private final Executor _executor;
private final Locker _locker = new Locker();
private final ReservedThread[] _queue;
private int _head;
private int _size;
private int _pending;
public ReservedThreadExecutor(Executor executor)
{
this(executor,1);
}
/**
* @param executor The executor to use to obtain threads
* @param capacity The number of threads to preallocate. If less than 0 then the number of available processors is used.
*/
public ReservedThreadExecutor(Executor executor,int capacity)
{
_executor = executor;
_queue = new ReservedThread[capacity>=0?capacity:Runtime.getRuntime().availableProcessors()];
}
public Executor getExecutor()
{
return _executor;
}
public int getCapacity()
{
return _queue.length;
}
public int getPreallocated()
{
try (Locker.Lock lock = _locker.lock())
{
return _size;
}
}
@Override
public void doStart() throws Exception
{
try (Locker.Lock lock = _locker.lock())
{
_head = _size = _pending = 0;
while (_pending<_queue.length)
{
_executor.execute(new ReservedThread());
_pending++;
}
}
}
@Override
public void doStop() throws Exception
{
try (Locker.Lock lock = _locker.lock())
{
while (_size>0)
{
ReservedThread thread = _queue[_head];
_queue[_head] = null;
_head = (_head+1)%_queue.length;
_size--;
thread._wakeup.signal();
}
}
}
@Override
public void execute(Runnable task) throws RejectedExecutionException
{
if (!tryExecute(task))
throw new RejectedExecutionException();
}
/**
* @param task The task to run
* @return True iff a reserved thread was available and has been assigned the task to run.
*/
public boolean tryExecute(Runnable task)
{
if (task==null)
return false;
try (Locker.Lock lock = _locker.lock())
{
if (_size==0)
{
if (_pending<_queue.length)
{
_executor.execute(new ReservedThread());
_pending++;
}
return false;
}
ReservedThread thread = _queue[_head];
_queue[_head] = null;
_head = (_head+1)%_queue.length;
_size--;
if (_size==0 && _pending<_queue.length)
{
_executor.execute(new ReservedThread());
_pending++;
}
thread._task = task;
thread._wakeup.signal();
return true;
}
catch(RejectedExecutionException e)
{
LOG.ignore(e);
return false;
}
}
private class ReservedThread implements Runnable
{
private Condition _wakeup = null;
private Runnable _task = null;
private void reservedWait() throws InterruptedException
{
_wakeup.await();
}
@Override
public void run()
{
while (true)
{
Runnable task = null;
try (Locker.Lock lock = _locker.lock())
{
// if this is our first loop, decrement pending count
if (_wakeup==null)
{
_pending--;
_wakeup = _locker.newCondition();
}
// Exit if no longer running or there now too many preallocated threads
if (!isRunning() || _size>=_queue.length)
break;
// Insert ourselves in the queue
_queue[(_head+_size++)%_queue.length] = this;
// Wait for a task, ignoring spurious interrupts
do
{
try
{
reservedWait();
task = _task;
_task = null;
}
catch (InterruptedException e)
{
LOG.ignore(e);
}
}
while (isRunning() && task==null);
}
// Run any task
if (task!=null)
{
try
{
task.run();
}
catch (Exception e)
{
LOG.warn(e);
break;
}
}
}
}
}
@Override
public String toString()
{
try (Locker.Lock lock = _locker.lock())
{
return String.format("%s{s=%d,p=%d}",super.toString(),_size,_pending);
}
}
}

View File

@ -18,18 +18,19 @@
package org.eclipse.jetty.util.thread.strategy;
import java.io.Closeable;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
/**
* <p>A strategy where the thread that produces will run the resulting task if it
@ -57,100 +58,61 @@ import org.eclipse.jetty.util.thread.Locker.Lock;
* </p>
*
*/
public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrategy, Runnable
public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrategy, Runnable
{
private static final Logger LOG = Log.getLogger(EatWhatYouKill.class);
enum State { IDLE, PRODUCING, REPRODUCING };
private enum State { IDLE, PRODUCING, REPRODUCING }
private final Locker _locker = new Locker();
private State _state = State.IDLE;
private final Runnable _runProduce = new RunProduce();
private final Producer _producer;
private final InvocableExecutor _executor;
private int _pendingProducersMax;
private int _pendingProducers;
private int _pendingProducersDispatched;
private int _pendingProducersSignalled;
private Condition _produce = _locker.newCondition();
private final Executor _executor;
private final ReservedThreadExecutor _producers;
public EatWhatYouKill(Producer producer, Executor executor)
{
this(producer,executor,InvocationType.NON_BLOCKING,InvocationType.BLOCKING);
this(producer,executor,new ReservedThreadExecutor(executor,1));
}
public EatWhatYouKill(Producer producer, Executor executor, int maxProducersPending )
public EatWhatYouKill(Producer producer, Executor executor, int maxProducersPending)
{
this(producer,executor,InvocationType.NON_BLOCKING,InvocationType.BLOCKING);
this(producer,executor,new ReservedThreadExecutor(executor,maxProducersPending));
}
public EatWhatYouKill(Producer producer, Executor executor, InvocationType preferredInvocationPEC, InvocationType preferredInvocationEPC)
{
this(producer,executor,preferredInvocationPEC,preferredInvocationEPC,Integer.getInteger("org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.maxProducersPending",1));
}
public EatWhatYouKill(Producer producer, Executor executor, InvocationType preferredInvocationPEC, InvocationType preferredInvocationEPC, int maxProducersPending )
public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers)
{
_producer = producer;
_pendingProducersMax = maxProducersPending;
_executor = new InvocableExecutor(executor,preferredInvocationPEC,preferredInvocationEPC);
}
@Override
public void produce()
{
boolean produce;
try (Lock locked = _locker.lock())
{
switch(_state)
{
case IDLE:
_state = State.PRODUCING;
produce = true;
break;
case PRODUCING:
_state = State.REPRODUCING;
produce = false;
break;
default:
produce = false;
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} execute {}", this, produce);
if (produce)
doProduce();
_executor = executor;
_producers = producers;
addBean(_producer);
}
@Override
public void dispatch()
{
boolean dispatch = false;
boolean execute = false;
try (Lock locked = _locker.lock())
{
switch(_state)
{
case IDLE:
dispatch = true;
execute = true;
break;
case PRODUCING:
_state = State.REPRODUCING;
dispatch = false;
break;
default:
dispatch = false;
default:
break;
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} dispatch {}", this, dispatch);
if (dispatch)
_executor.execute(_runProduce,InvocationType.BLOCKING);
LOG.debug("{} dispatch {}", this, execute);
if (execute)
_executor.execute(_runProduce);
}
@Override
@ -158,160 +120,139 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
{
if (LOG.isDebugEnabled())
LOG.debug("{} run", this);
if (!isRunning())
return;
produce();
}
@Override
public void produce()
{
boolean reproduce = true;
while(isRunning() && tryProduce(reproduce) && doProduce())
reproduce = false;
}
public boolean tryProduce(boolean reproduce)
{
boolean producing = false;
try (Lock locked = _locker.lock())
{
_pendingProducersDispatched--;
_pendingProducers++;
loop: while (isRunning())
switch (_state)
{
try
{
_produce.await();
if (_pendingProducersSignalled==0)
{
// spurious wakeup!
continue loop;
}
_pendingProducersSignalled--;
if (_state == State.IDLE)
{
_state = State.PRODUCING;
producing = true;
}
}
catch (InterruptedException e)
{
LOG.debug(e);
_pendingProducers--;
}
break loop;
}
case IDLE:
// Enter PRODUCING
_state = State.PRODUCING;
producing = true;
break;
case PRODUCING:
// Keep other Thread producing
if (reproduce)
_state = State.REPRODUCING;
break;
default:
break;
}
}
if (producing)
doProduce();
return producing;
}
private void doProduce()
public boolean doProduce()
{
boolean may_block_caller = !Invocable.isNonBlockingInvocation();
if (LOG.isDebugEnabled())
LOG.debug("{} produce {}", this,may_block_caller?"non-blocking":"blocking");
producing: while (isRunning())
boolean producing = true;
while (isRunning() && producing)
{
// If we got here, then we are the thread that is producing.
Runnable task = _producer.produce();
boolean produce;
boolean consume;
boolean execute_producer;
StringBuilder state = null;
try (Lock locked = _locker.lock())
Runnable task = null;
try
{
if (LOG.isDebugEnabled())
task = _producer.produce();
}
catch(Throwable e)
{
LOG.warn(e);
}
if (LOG.isDebugEnabled())
LOG.debug("{} t={}/{}",this,task,Invocable.getInvocationType(task));
if (task==null)
{
try (Lock locked = _locker.lock())
{
state = new StringBuilder();
getString(state);
getState(state);
state.append("->");
}
// Did we produced a task?
if (task == null)
{
// There is no task.
// Could another one just have been queued with a produce call?
if (_state==State.REPRODUCING)
{
_state = State.PRODUCING;
continue producing;
else
{
if (LOG.isDebugEnabled())
LOG.debug("{} IDLE",toStringLocked());
_state = State.IDLE;
producing = false;
}
// ... and no additional calls to execute, so we are idle
_state = State.IDLE;
break producing;
}
// Will we eat our own kill - ie consume the task we just produced?
if (Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING)
{
// ProduceConsume
produce = true;
consume = true;
execute_producer = false;
}
else if (may_block_caller && (_pendingProducers>0 || _pendingProducersMax==0))
{
// ExecuteProduceConsume (eat what we kill!)
produce = false;
consume = true;
execute_producer = true;
_pendingProducersDispatched++;
_state = State.IDLE;
_pendingProducers--;
_pendingProducersSignalled++;
_produce.signal();
}
else
{
// ProduceExecuteConsume
produce = true;
consume = false;
execute_producer = (_pendingProducersDispatched + _pendingProducers)<_pendingProducersMax;
if (execute_producer)
_pendingProducersDispatched++;
}
}
else if (Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING)
{
// PRODUCE CONSUME (EWYK!)
if (LOG.isDebugEnabled())
getState(state);
LOG.debug("{} PC t={}",this,task);
task.run();
}
if (LOG.isDebugEnabled())
{
LOG.debug("{} {} {}",
state,
consume?(execute_producer?"EPC!":"PC"):"PEC",
task);
}
if (execute_producer)
// Spawn a new thread to continue production by running the produce loop.
_executor.execute(this);
// Run or execute the task.
if (consume)
_executor.invoke(task);
else
_executor.execute(task);
// Once we have run the task, we can try producing again.
if (produce)
continue producing;
try (Lock locked = _locker.lock())
{
if (_state==State.IDLE)
boolean consume;
try (Lock locked = _locker.lock())
{
_state = State.PRODUCING;
continue producing;
if (_producers.tryExecute(this))
{
// EXECUTE PRODUCE CONSUME!
// We have executed a new Producer, so we can EWYK consume
_state = State.IDLE;
producing = false;
consume = true;
}
else
{
// PRODUCE EXECUTE CONSUME!
consume = false;
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} {} t={}",this,consume?"EPC":"PEC",task);
// Consume or execute task
try
{
if (consume)
task.run();
else
_executor.execute(task);
}
catch(RejectedExecutionException e)
{
LOG.warn(e);
if (task instanceof Closeable)
{
try
{
((Closeable)task).close();
}
catch(Throwable e2)
{
LOG.ignore(e2);
}
}
}
catch(Throwable e)
{
LOG.warn(e);
}
}
break producing;
}
if (LOG.isDebugEnabled())
LOG.debug("{} produce exit",this);
return producing;
}
public Boolean isIdle()
@ -322,25 +263,19 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
}
}
@Override
protected void doStop() throws Exception
public String toString()
{
try (Lock locked = _locker.lock())
{
_pendingProducersSignalled=_pendingProducers+_pendingProducersDispatched;
_pendingProducers=0;
_produce.signalAll();
return toStringLocked();
}
}
public String toString()
public String toStringLocked()
{
StringBuilder builder = new StringBuilder();
getString(builder);
try (Lock locked = _locker.lock())
{
getState(builder);
}
getState(builder);
return builder.toString();
}
@ -358,9 +293,7 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
{
builder.append(_state);
builder.append('/');
builder.append(_pendingProducers);
builder.append('/');
builder.append(_pendingProducersMax);
builder.append(_producers);
}
private class RunProduce implements Runnable

View File

@ -24,7 +24,6 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
@ -49,7 +48,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
private final Locker _locker = new Locker();
private final Runnable _runProduce = new RunProduce();
private final Producer _producer;
private final InvocableExecutor _executor;
private final Executor _executor;
private boolean _idle = true;
private boolean _execute;
private boolean _producing;
@ -57,14 +56,9 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
public ExecuteProduceConsume(Producer producer, Executor executor)
{
this(producer,executor,InvocationType.BLOCKING);
}
public ExecuteProduceConsume(Producer producer, Executor executor, InvocationType preferred )
{
this._producer = producer;
_executor = new InvocableExecutor(executor,preferred);
_executor = executor;
}
@Override
@ -192,15 +186,14 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
// Spawn a new thread to continue production by running the produce loop.
if (LOG.isDebugEnabled())
LOG.debug("{} dispatch", this);
if (!_executor.tryExecute(this))
task = null;
_executor.execute(this);
}
// Run the task.
if (LOG.isDebugEnabled())
LOG.debug("{} run {}", this, task);
if (task != null)
_executor.invoke(task);
task.run();
if (LOG.isDebugEnabled())
LOG.debug("{} ran {}", this, task);

View File

@ -24,7 +24,6 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
@ -38,18 +37,13 @@ public class ProduceExecuteConsume implements ExecutionStrategy
private final Locker _locker = new Locker();
private final Producer _producer;
private final InvocableExecutor _executor;
private final Executor _executor;
private State _state = State.IDLE;
public ProduceExecuteConsume(Producer producer, Executor executor)
{
this(producer,executor,InvocationType.NON_BLOCKING);
}
public ProduceExecuteConsume(Producer producer, Executor executor, InvocationType preferred)
{
_producer = producer;
_executor = new InvocableExecutor(executor,preferred);
_executor = executor;
}
@Override

View File

@ -0,0 +1,204 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public class ReservedThreadExecutorTest
{
final static int SIZE = 2;
TestExecutor _executor;
ReservedThreadExecutor _pae;
@Before
public void before() throws Exception
{
_executor = new TestExecutor();
_pae = new ReservedThreadExecutor(_executor,SIZE);
_pae.start();
}
@After
public void after() throws Exception
{
_pae.stop();
}
@Test
public void testStarted() throws Exception
{
assertThat(_executor._queue.size(),is(SIZE));
while(!_executor._queue.isEmpty())
_executor.execute();
assertThat(_pae.getCapacity(),is(SIZE));
long started = System.nanoTime();
while (_pae.getPreallocated()<SIZE)
{
if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()-started)>10)
break;
Thread.sleep(100);
}
assertThat(_pae.getPreallocated(),is(SIZE));
}
@Test
public void testPending() throws Exception
{
assertThat(_executor._queue.size(),is(SIZE));
assertThat(_pae.tryExecute(new NOOP()),is(false));
assertThat(_executor._queue.size(),is(SIZE));
_executor.execute();
assertThat(_executor._queue.size(),is(SIZE-1));
while (!_executor._queue.isEmpty())
_executor.execute();
long started = System.nanoTime();
while (_pae.getPreallocated()<SIZE)
{
if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()-started)>10)
break;
Thread.sleep(100);
}
assertThat(_executor._queue.size(),is(0));
assertThat(_pae.getPreallocated(),is(SIZE));
for (int i=SIZE;i-->0;)
assertThat(_pae.tryExecute(new Task()),is(true));
assertThat(_executor._queue.size(),is(1));
assertThat(_pae.getPreallocated(),is(0));
for (int i=SIZE;i-->0;)
assertThat(_pae.tryExecute(new NOOP()),is(false));
assertThat(_executor._queue.size(),is(SIZE));
assertThat(_pae.getPreallocated(),is(0));
assertThat(_pae.tryExecute(new NOOP()),is(false));
assertThat(_executor._queue.size(),is(SIZE));
assertThat(_pae.getPreallocated(),is(0));
}
@Test
public void testExecuted() throws Exception
{
while(!_executor._queue.isEmpty())
_executor.execute();
long started = System.nanoTime();
while (_pae.getPreallocated()<SIZE)
{
if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()-started)>10)
break;
Thread.sleep(100);
}
assertThat(_pae.getPreallocated(),is(SIZE));
Task[] task = new Task[SIZE];
for (int i=SIZE;i-->0;)
{
task[i] = new Task();
assertThat(_pae.tryExecute(task[i]),is(true));
}
for (int i=SIZE;i-->0;)
{
task[i]._ran.await(10,TimeUnit.SECONDS);
}
assertThat(_executor._queue.size(),is(1));
Task extra = new Task();
assertThat(_pae.tryExecute(extra),is(false));
assertThat(_executor._queue.size(),is(2));
Thread.sleep(100);
assertThat(extra._ran.getCount(),is(1L));
for (int i=SIZE;i-->0;)
{
task[i]._complete.countDown();
}
started = System.nanoTime();
while (_pae.getPreallocated()<SIZE)
{
if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()-started)>10)
break;
Thread.sleep(100);
}
assertThat(_pae.getPreallocated(),is(SIZE));
}
private static class TestExecutor implements Executor
{
Deque<Runnable> _queue = new ArrayDeque<>();
@Override
public void execute(Runnable task)
{
_queue.addLast(task);
}
public void execute()
{
Runnable task = _queue.pollFirst();
if (task!=null)
new Thread(task).start();
}
}
private static class NOOP implements Runnable
{
@Override
public void run() {}
}
private static class Task implements Runnable
{
private CountDownLatch _ran = new CountDownLatch(1);
private CountDownLatch _complete = new CountDownLatch(1);
@Override
public void run()
{
_ran.countDown();
try
{
_complete.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}

View File

@ -158,7 +158,8 @@ public class ExecutionStrategyTest
@Override
public Runnable produce()
{
if (tasks-->0)
final int id = --tasks;
if (id>=0)
{
try
{
@ -171,6 +172,7 @@ public class ExecutionStrategyTest
@Override
public void run()
{
// System.err.println("RUN "+id);
latch.countDown();
}
};