From a105be95e44fae91fcf8b85b5d6d0f78fd143a04 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 21 Jun 2017 11:48:41 +0200 Subject: [PATCH] Issue #1637 Thread per HTTP/2 Connection This fix simplifies the EWYK scheduler by factoring out the preallocated producer into a ReservedThreadExecutor class. A shared ReservedThreadExecutor can then be used by multiple EWYK instances to avoid over allocation of threads. Squashed commit of the following: commit c435dc20e25bd274d69423be1be7b0565925f249 Merge: 58a5a9a 90e5b56 Author: Greg Wilkins Date: Wed Jun 21 10:48:22 2017 +0200 Merge branch 'jetty-9.4.x' into jetty-9.4.x-ewyk3 commit 58a5a9a655ee1a72a66f54ac8c95d7c9d73afe85 Author: Simone Bordet Date: Wed Jun 14 15:56:43 2017 +0200 Code cleanups. commit 4e5296216b52948523572352cba391438ff6b494 Author: Greg Wilkins Date: Wed Jun 14 07:34:58 2017 +0200 refixed Producing to Reproducing commit a1f8682f86d1f0803121162e3f14d7768286d3ed Author: Greg Wilkins Date: Wed Jun 14 07:26:29 2017 +0200 fixed Producing to Reproducing commit 9468932e062d2271d8dc1d43a78544757732fff5 Author: Greg Wilkins Date: Tue Jun 13 16:33:44 2017 +0200 fixed javadoc commit 9d4941eb97638fec09b3fe34d423538d17943b6f Author: Greg Wilkins Date: Tue Jun 13 16:05:27 2017 +0200 Renamed Preallocated to ReservedThread commit 6d3379ab64c6dcc2a7aa8ec7088afd77863816c2 Author: Greg Wilkins Date: Tue Jun 13 12:28:52 2017 +0200 Added configuration in modules commit 1bd1adea4682538e1546c2ae53f4c9340dafb3bb Merge: 83418a9 6702248 Author: Greg Wilkins Date: Tue Jun 13 10:09:29 2017 +0200 Merge branch 'jetty-9.4.x' into jetty-9.4.x-ewyk3 commit 83418a91320c8bfc54465ca02efdce0d2c874a0e Author: Greg Wilkins Date: Tue Jun 13 10:08:35 2017 +0200 javadoc commit 62918fd39189fed3414fec4a7c8380c21e90a4b8 Author: Greg Wilkins Date: Sat Jun 10 00:04:06 2017 +0200 Improved EatWhatYouKill implementation Simplified by abstracting out PreallocatedExecutor Removed invocation execution HTTP2 now uses a shared PreallocationExcecutor between connection --- .../client/HTTP2ClientConnectionFactory.java | 33 +- .../eclipse/jetty/http2/HTTP2Connection.java | 14 +- .../src/main/config/etc/jetty-http2.xml | 6 +- .../src/main/config/modules/http2.mod | 11 +- .../AbstractHTTP2ServerConnectionFactory.java | 43 ++- .../http2/server/HTTP2ServerConnection.java | 3 +- .../http2/server/HttpChannelOverHTTP2.java | 9 +- .../org/eclipse/jetty/io/ManagedSelector.java | 22 +- .../org/eclipse/jetty/io/SelectorManager.java | 38 +- .../maven/plugin/MavenServerConnector.java | 4 +- .../src/main/config/etc/jetty-http.xml | 4 + .../src/main/config/etc/jetty-ssl.xml | 4 + jetty-server/src/main/config/modules/http.mod | 6 + jetty-server/src/main/config/modules/ssl.mod | 6 + .../org/eclipse/jetty/server/Connector.java | 3 +- .../eclipse/jetty/server/ServerConnector.java | 1 + .../jetty/util/component/Container.java | 26 ++ .../util/component/ContainerLifeCycle.java | 3 + .../eclipse/jetty/util/thread/Invocable.java | 80 ----- .../jetty/util/thread/QueuedThreadPool.java | 20 +- .../util/thread/ReservedThreadExecutor.java | 235 ++++++++++++ .../util/thread/strategy/EatWhatYouKill.java | 339 +++++++----------- .../strategy/ExecuteProduceConsume.java | 15 +- .../strategy/ProduceExecuteConsume.java | 10 +- .../thread/ReservedThreadExecutorTest.java | 204 +++++++++++ .../strategy/ExecutionStrategyTest.java | 4 +- 26 files changed, 804 insertions(+), 339 deletions(-) create mode 100644 jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java create mode 100644 jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index 7b0d8ad39a6..3a356bc7445 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -39,6 +39,7 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.Scheduler; public class HTTP2ClientConnectionFactory implements ClientConnectionFactory @@ -46,6 +47,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory public static final String CLIENT_CONTEXT_KEY = "http2.client"; public static final String BYTE_BUFFER_POOL_CONTEXT_KEY = "http2.client.byteBufferPool"; public static final String EXECUTOR_CONTEXT_KEY = "http2.client.executor"; + public static final String PREALLOCATED_EXECUTOR_CONTEXT_KEY = "http2.client.preallocatedExecutor"; public static final String SCHEDULER_CONTEXT_KEY = "http2.client.scheduler"; public static final String SESSION_LISTENER_CONTEXT_KEY = "http2.client.sessionListener"; public static final String SESSION_PROMISE_CONTEXT_KEY = "http2.client.sessionPromise"; @@ -58,6 +60,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory HTTP2Client client = (HTTP2Client)context.get(CLIENT_CONTEXT_KEY); ByteBufferPool byteBufferPool = (ByteBufferPool)context.get(BYTE_BUFFER_POOL_CONTEXT_KEY); Executor executor = (Executor)context.get(EXECUTOR_CONTEXT_KEY); + ReservedThreadExecutor preallocatedExecutor = (ReservedThreadExecutor)context.get(PREALLOCATED_EXECUTOR_CONTEXT_KEY); Scheduler scheduler = (Scheduler)context.get(SCHEDULER_CONTEXT_KEY); Session.Listener listener = (Session.Listener)context.get(SESSION_LISTENER_CONTEXT_KEY); @SuppressWarnings("unchecked") @@ -67,7 +70,33 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy(); HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl); Parser parser = new Parser(byteBufferPool, session, 4096, 8192); - HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, + + if (preallocatedExecutor==null) + { + // TODO move this to non lazy construction + preallocatedExecutor=client.getBean(ReservedThreadExecutor.class); + if (preallocatedExecutor==null) + { + synchronized (this) + { + if (preallocatedExecutor==null) + { + try + { + preallocatedExecutor = new ReservedThreadExecutor(executor,1); // TODO configure size + preallocatedExecutor.start(); + client.addBean(preallocatedExecutor,true); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + } + } + } + + HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, preallocatedExecutor, endPoint, parser, session, client.getInputBufferSize(), promise, listener); connection.addListener(connectionListener); return customize(connection, context); @@ -79,7 +108,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory private final Promise promise; private final Session.Listener listener; - private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise promise, Session.Listener listener) + private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise promise, Session.Listener listener) { super(byteBufferPool, executor, endpoint, parser, session, bufferSize); this.client = client; diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 65a04cbd0e6..39350c2e70b 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -35,6 +35,7 @@ import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; public class HTTP2Connection extends AbstractConnection @@ -50,14 +51,14 @@ public class HTTP2Connection extends AbstractConnection private final int bufferSize; private final ExecutionStrategy strategy; - public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize) + public HTTP2Connection(ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize) { - super(endPoint, executor); + super(endPoint, executor.getExecutor()); this.byteBufferPool = byteBufferPool; this.parser = parser; this.session = session; this.bufferSize = bufferSize; - this.strategy = new EatWhatYouKill(producer, executor, 0); + this.strategy = new EatWhatYouKill(producer, executor.getExecutor(), executor); LifeCycle.start(strategy); } @@ -147,7 +148,10 @@ public class HTTP2Connection extends AbstractConnection protected void offerTask(Runnable task, boolean dispatch) { offerTask(task); - strategy.dispatch(); + if (dispatch) + strategy.dispatch(); + else + strategy.produce(); } @Override @@ -180,7 +184,7 @@ public class HTTP2Connection extends AbstractConnection private ByteBuffer buffer; @Override - public synchronized Runnable produce() + public Runnable produce() { Runnable task = pollTask(); if (LOG.isDebugEnabled()) diff --git a/jetty-http2/http2-server/src/main/config/etc/jetty-http2.xml b/jetty-http2/http2-server/src/main/config/etc/jetty-http2.xml index 226941d5e71..1d6423589e3 100644 --- a/jetty-http2/http2-server/src/main/config/etc/jetty-http2.xml +++ b/jetty-http2/http2-server/src/main/config/etc/jetty-http2.xml @@ -9,8 +9,10 @@ - - + + + + diff --git a/jetty-http2/http2-server/src/main/config/modules/http2.mod b/jetty-http2/http2-server/src/main/config/modules/http2.mod index e1e700ba372..2ffa068ede6 100644 --- a/jetty-http2/http2-server/src/main/config/modules/http2.mod +++ b/jetty-http2/http2-server/src/main/config/modules/http2.mod @@ -20,7 +20,14 @@ etc/jetty-http2.xml [ini-template] ## Max number of concurrent streams per connection -# jetty.http2.maxConcurrentStreams=1024 +# jetty.http2.maxConcurrentStreams=128 ## Initial stream receive window (client to server) -# jetty.http2.initialStreamRecvWindow=65535 +# jetty.http2.initialStreamRecvWindow=524288 + +## Initial session receive window (client to server) +# jetty.http2.initialSessionRecvWindow=1048576 + +## Reserve threads for high priority tasks (-1 use number of Selectors, 0 no reserved threads) +# jetty.http2.reservedThreads=-1 + diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java index 82ed664041e..b0e6c8d2dbe 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java @@ -35,6 +35,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.ReservedThreadExecutor; @ManagedObject public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory @@ -48,6 +49,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne private int maxHeaderBlockFragment = 0; private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F); private long streamIdleTimeout; + private int reservedThreads = -1; public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration) { @@ -108,6 +110,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne this.maxConcurrentStreams = maxConcurrentStreams; } + @ManagedAttribute("The max header block fragment") public int getMaxHeaderBlockFragment() { return maxHeaderBlockFragment; @@ -139,6 +142,21 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne this.streamIdleTimeout = streamIdleTimeout; } + /** + * @see ReservedThreadExecutor + * @return The number of reserved threads + */ + @ManagedAttribute("The number of threads reserved for high priority tasks") + public int getReservedThreads() + { + return reservedThreads; + } + + public void setReservedThreads(int threads) + { + this.reservedThreads = threads; + } + public HttpConfiguration getHttpConfiguration() { return httpConfiguration; @@ -163,9 +181,32 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne streamIdleTimeout = endPoint.getIdleTimeout(); session.setStreamIdleTimeout(streamIdleTimeout); session.setInitialSessionRecvWindow(getInitialSessionRecvWindow()); + + ReservedThreadExecutor executor = connector.getBean(ReservedThreadExecutor.class); + if (executor==null) + { + synchronized (this) + { + executor = connector.getBean(ReservedThreadExecutor.class); + if (executor==null) + { + try + { + executor = new ReservedThreadExecutor(connector.getExecutor(),getReservedThreads()); + executor.start(); + connector.addBean(executor,true); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + } + } + ServerParser parser = newServerParser(connector, session); - HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(), + HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), executor, endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener); connection.addListener(connectionListener); return configure(connection, connector, endPoint); diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index 18e6d47c22a..a1f9ed0d394 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -56,6 +56,7 @@ import org.eclipse.jetty.util.B64Code; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.util.thread.ReservedThreadExecutor; public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo { @@ -91,7 +92,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection private final HttpConfiguration httpConfig; private boolean recycleHttpChannels; - public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener) + public HTTP2ServerConnection(ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener) { super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize); this.listener = listener; diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index 81a8cc490ff..6487577ba87 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.http2.server; +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; @@ -46,7 +47,7 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public class HttpChannelOverHTTP2 extends HttpChannel +public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable { private static final Logger LOG = Log.getLogger(HttpChannelOverHTTP2.class); private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION); @@ -377,6 +378,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel } } + @Override + public void close() + { + abort(new IOException("Unexpected close")); + } + @Override public String toString() { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 8868befb44c..1af25a979f4 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -38,7 +38,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; @@ -47,10 +46,9 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Invocable.InvocationType; import org.eclipse.jetty.util.thread.Locker; +import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; -import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume; -import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume; /** *

{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.

@@ -76,8 +74,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable _id = id; SelectorProducer producer = new SelectorProducer(); Executor executor = selectorManager.getExecutor(); - _strategy = new EatWhatYouKill(producer,executor); - addBean(_strategy); + _strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class)); + addBean(_strategy,true); setStopTimeout(5000); } @@ -446,24 +444,26 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable @Override public String dump() { + super.dump(); return ContainerLifeCycle.dump(this); } @Override public void dump(Appendable out, String indent) throws IOException { - out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append(System.lineSeparator()); - Selector selector = _selector; - if (selector != null && selector.isOpen()) + if (selector == null || !selector.isOpen()) + dumpBeans(out, indent); + else { final ArrayList dump = new ArrayList<>(selector.keys().size() * 2); - DumpKeys dumpKeys = new DumpKeys(dump); submit(dumpKeys); dumpKeys.await(5, TimeUnit.SECONDS); - - ContainerLifeCycle.dump(out, indent, dump); + if (dump.isEmpty()) + dumpBeans(out, indent); + else + dumpBeans(out, indent, dump); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 80f7d732ba3..5b86b80c09c 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -29,12 +29,15 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; /** *

{@link SelectorManager} manages a number of {@link ManagedSelector}s that @@ -42,6 +45,8 @@ import org.eclipse.jetty.util.thread.Scheduler; *

{@link SelectorManager} subclasses implement methods to return protocol-specific * {@link EndPoint}s and {@link Connection}s.

*/ + +@ManagedObject("Manager of the NIO Selectors") public abstract class SelectorManager extends ContainerLifeCycle implements Dumpable { public static final int DEFAULT_CONNECT_TIMEOUT = 15000; @@ -52,6 +57,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump private final ManagedSelector[] _selectors; private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT; private long _selectorIndex; + private int _reservedThreads = -2; protected SelectorManager(Executor executor, Scheduler scheduler) { @@ -67,11 +73,13 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump _selectors = new ManagedSelector[selectors]; } + @ManagedAttribute("The Executor") public Executor getExecutor() { return executor; } + @ManagedAttribute("The Scheduler") public Scheduler getScheduler() { return scheduler; @@ -82,6 +90,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump * * @return the connect timeout (in milliseconds) */ + @ManagedAttribute("The Connection timeout (ms)") public long getConnectTimeout() { return _connectTimeout; @@ -97,6 +106,30 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump _connectTimeout = milliseconds; } + /** + * Get the number of preallocated producing threads + * @see EatWhatYouKill + * @see ReservedThreadExecutor + * @return The number of threads preallocated to producing (default 1). + */ + @ManagedAttribute("The number of preallocated producer threads") + public int getReservedThreads() + { + return _reservedThreads; + } + + /** + * Set the number of preallocated threads for high priority tasks + * @see EatWhatYouKill + * @see ReservedThreadExecutor + * @param threads The number of producing threads to preallocate (default 1). + * The EatWhatYouKill scheduler will be disabled with a value of 0. + */ + public void setReservedThreads(int threads) + { + _reservedThreads = threads; + } + /** * Executes the given task in a different thread. * @@ -110,6 +143,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump /** * @return the number of selectors in use */ + @ManagedAttribute("The number of NIO Selectors") public int getSelectorCount() { return _selectors.length; @@ -231,6 +265,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump @Override protected void doStart() throws Exception { + addBean(new ReservedThreadExecutor(getExecutor(),_reservedThreads==-2?_selectors.length:_reservedThreads),true); for (int i = 0; i < _selectors.length; i++) { ManagedSelector selector = newSelector(i); @@ -373,4 +408,5 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump * @throws IOException if unable to create new connection */ public abstract Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException; + } diff --git a/jetty-maven-plugin/src/main/java/org/eclipse/jetty/maven/plugin/MavenServerConnector.java b/jetty-maven-plugin/src/main/java/org/eclipse/jetty/maven/plugin/MavenServerConnector.java index b5cc6360b69..075f4d9c66b 100644 --- a/jetty-maven-plugin/src/main/java/org/eclipse/jetty/maven/plugin/MavenServerConnector.java +++ b/jetty-maven-plugin/src/main/java/org/eclipse/jetty/maven/plugin/MavenServerConnector.java @@ -31,7 +31,7 @@ import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.util.annotation.ManagedAttribute; -import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.thread.Scheduler; @@ -45,7 +45,7 @@ import org.eclipse.jetty.util.thread.Scheduler; * be referenced in the pom.xml. This class wraps a ServerConnector, delaying setting the * server instance. Only a few of the setters from the ServerConnector class are supported. */ -public class MavenServerConnector extends AbstractLifeCycle implements Connector +public class MavenServerConnector extends ContainerLifeCycle implements Connector { public static String PORT_SYSPROPERTY = "jetty.http.port"; diff --git a/jetty-server/src/main/config/etc/jetty-http.xml b/jetty-server/src/main/config/etc/jetty-http.xml index 70d317bab86..93ce0739193 100644 --- a/jetty-server/src/main/config/etc/jetty-http.xml +++ b/jetty-server/src/main/config/etc/jetty-http.xml @@ -40,6 +40,10 @@ + + + + diff --git a/jetty-server/src/main/config/etc/jetty-ssl.xml b/jetty-server/src/main/config/etc/jetty-ssl.xml index a079c1f6ab2..956809bda28 100644 --- a/jetty-server/src/main/config/etc/jetty-ssl.xml +++ b/jetty-server/src/main/config/etc/jetty-ssl.xml @@ -32,6 +32,10 @@ + + + + diff --git a/jetty-server/src/main/config/modules/http.mod b/jetty-server/src/main/config/modules/http.mod index fa13822fff5..d833a6da297 100644 --- a/jetty-server/src/main/config/modules/http.mod +++ b/jetty-server/src/main/config/modules/http.mod @@ -40,5 +40,11 @@ etc/jetty-http.xml ## Thread priority delta to give to acceptor threads # jetty.http.acceptorPriorityDelta=0 +## Reserve threads for high priority tasks (-2 use number of selectors,-1 use number of CPUs, 0 no reserved threads) +# jetty.http.reservedThreads=-2 + +## Connect Timeout in milliseconds +# jetty.http.connectTimeout=15000 + ## HTTP Compliance: RFC7230, RFC2616, LEGACY # jetty.http.compliance=RFC7230 diff --git a/jetty-server/src/main/config/modules/ssl.mod b/jetty-server/src/main/config/modules/ssl.mod index 3efb90e3b93..cac38448246 100644 --- a/jetty-server/src/main/config/modules/ssl.mod +++ b/jetty-server/src/main/config/modules/ssl.mod @@ -44,6 +44,12 @@ basehome:modules/ssl/keystore|etc/keystore ## Thread priority delta to give to acceptor threads # jetty.ssl.acceptorPriorityDelta=0 +## Preallocated producer threads (0 disables EatWhatYouKill scheduling) +# jetty.ssl.reservedThreads=-1 + +## Connect Timeout in milliseconds +# jetty.ssl.connectTimeout=15000 + ## Whether request host names are checked to match any SNI names # jetty.ssl.sniHostCheck=true diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java index 8b2f3707dbd..2e38fc81ba5 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java @@ -27,6 +27,7 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.component.Container; import org.eclipse.jetty.util.component.Graceful; import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.thread.Scheduler; @@ -37,7 +38,7 @@ import org.eclipse.jetty.util.thread.Scheduler; * the machinery needed to handle such tasks.

*/ @ManagedObject("Connector Interface") -public interface Connector extends LifeCycle, Graceful +public interface Connector extends LifeCycle, Container, Graceful { /** * @return the {@link Server} instance associated with this {@link Connector} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java index db6a05e17f8..4137f226e0c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java @@ -378,6 +378,7 @@ public class ServerConnector extends AbstractNetworkConnector } } + @ManagedAttribute("The Selector Manager") public SelectorManager getSelectorManager() { return _manager; diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java b/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java index 260e4451a2a..6a582351cb1 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.util.component; import java.util.Collection; + /** * A Container */ @@ -76,6 +77,31 @@ public interface Container */ public void removeEventListener(Listener listener); + /** + * Unmanages a bean already contained by this aggregate, so that it is not started/stopped/destroyed with this + * aggregate. + * + * @param bean The bean to unmanage (must already have been added). + */ + void unmanage(Object bean); + + /** + * Manages a bean already contained by this aggregate, so that it is started/stopped/destroyed with this + * aggregate. + * + * @param bean The bean to manage (must already have been added). + */ + void manage(Object bean); + + /** + * Adds the given bean, explicitly managing it or not. + * + * @param o The bean object to add + * @param managed whether to managed the lifecycle of the bean + * @return true if the bean was added, false if it was already present + */ + boolean addBean(Object o, boolean managed); + /** * A listener for Container events. * If an added bean implements this interface it will receive the events diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java b/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java index fdc0b9cd60e..533c1949537 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java @@ -236,6 +236,7 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container, * @param managed whether to managed the lifecycle of the bean * @return true if the bean was added, false if it was already present */ + @Override public boolean addBean(Object o, boolean managed) { if (o instanceof LifeCycle) @@ -380,6 +381,7 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container, * * @param bean The bean to manage (must already have been added). */ + @Override public void manage(Object bean) { for (Bean b : _beans) @@ -426,6 +428,7 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container, * * @param bean The bean to unmanage (must already have been added). */ + @Override public void unmanage(Object bean) { for (Bean b : _beans) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java index cb926417b28..e060a4b30ad 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java @@ -186,84 +186,4 @@ public interface Invocable return InvocationType.BLOCKING; } - /** - * An Executor wrapper that knows about Invocable - * - */ - public static class InvocableExecutor implements Executor - { - private static final Logger LOG = Log.getLogger(InvocableExecutor.class); - - private final Executor _executor; - private final InvocationType _preferredInvocationForExecute; - private final InvocationType _preferredInvocationForInvoke; - - public InvocableExecutor(Executor executor,InvocationType preferred) - { - this(executor,preferred,preferred); - } - - public InvocableExecutor(Executor executor,InvocationType preferredInvocationForExecute,InvocationType preferredInvocationForIvoke) - { - _executor=executor; - _preferredInvocationForExecute=preferredInvocationForExecute; - _preferredInvocationForInvoke=preferredInvocationForIvoke; - } - - public Invocable.InvocationType getPreferredInvocationType() - { - return _preferredInvocationForInvoke; - } - - public void invoke(Runnable task) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} invoke {}", this, task); - Invocable.invokePreferred(task,_preferredInvocationForInvoke); - if (LOG.isDebugEnabled()) - LOG.debug("{} invoked {}", this, task); - } - - public void execute(Runnable task) - { - tryExecute(task,_preferredInvocationForExecute); - } - - public void execute(Runnable task, InvocationType preferred) - { - tryExecute(task,preferred); - } - - public boolean tryExecute(Runnable task) - { - return tryExecute(task,_preferredInvocationForExecute); - } - - public boolean tryExecute(Runnable task, InvocationType preferred) - { - try - { - _executor.execute(Invocable.asPreferred(task,preferred)); - return true; - } - catch(RejectedExecutionException e) - { - // If we cannot execute, then close the task - LOG.debug(e); - LOG.warn("Rejected execution of {}",task); - try - { - if (task instanceof Closeable) - ((Closeable)task).close(); - } - catch (Exception x) - { - e.addSuppressed(x); - LOG.warn(e); - } - } - return false; - } - - } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 4cb6fcfa3c4..efadbcf694f 100755 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -506,16 +506,22 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo for (final Thread thread : _threads) { final StackTraceElement[] trace = thread.getStackTrace(); - boolean inIdleJobPoll = false; + String knownMethod = ""; for (StackTraceElement t : trace) { if ("idleJobPoll".equals(t.getMethodName())) { - inIdleJobPoll = true; + knownMethod = "IDLE "; + break; + } + + if ("preallocatedWait".equals(t.getMethodName())) + { + knownMethod = "PREALLOCATED "; break; } } - final boolean idle = inIdleJobPoll; + final String known = knownMethod; if (isDetailedDump()) { @@ -524,11 +530,11 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @Override public void dump(Appendable out, String indent) throws IOException { - out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : ""); + out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(known).append(thread.getState().toString()); if (thread.getPriority()!=Thread.NORM_PRIORITY) out.append(" prio=").append(String.valueOf(thread.getPriority())); out.append(System.lineSeparator()); - if (!idle) + if (known.length()==0) ContainerLifeCycle.dump(out, indent, Arrays.asList(trace)); } @@ -542,7 +548,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo else { int p=thread.getPriority(); - threads.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p))); + threads.add(thread.getId() + " " + thread.getName() + " " + known + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (p==Thread.NORM_PRIORITY?"":(" prio="+p))); } } @@ -557,7 +563,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @Override public String toString() { - return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size())); + return String.format("org.eclipse.jetty.util.thread.QueuedThreadPool@%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size())); } private Runnable idleJobPoll() throws InterruptedException diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java new file mode 100644 index 00000000000..95045b29386 --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -0,0 +1,235 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.locks.Condition; + +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +/** + * An Executor using preallocated/reserved Threads from a wrapped Executor. + *

Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed + * with a Thread immediately being assigned the Runnable task, or fail if no Thread is + * available. Threads are preallocated up to the capacity from a wrapped {@link Executor}. + */ +public class ReservedThreadExecutor extends AbstractLifeCycle implements Executor +{ + private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class); + + private final Executor _executor; + private final Locker _locker = new Locker(); + private final ReservedThread[] _queue; + private int _head; + private int _size; + private int _pending; + + public ReservedThreadExecutor(Executor executor) + { + this(executor,1); + } + + /** + * @param executor The executor to use to obtain threads + * @param capacity The number of threads to preallocate. If less than 0 then the number of available processors is used. + */ + public ReservedThreadExecutor(Executor executor,int capacity) + { + _executor = executor; + _queue = new ReservedThread[capacity>=0?capacity:Runtime.getRuntime().availableProcessors()]; + } + + public Executor getExecutor() + { + return _executor; + } + + public int getCapacity() + { + return _queue.length; + } + + public int getPreallocated() + { + try (Locker.Lock lock = _locker.lock()) + { + return _size; + } + } + + @Override + public void doStart() throws Exception + { + try (Locker.Lock lock = _locker.lock()) + { + _head = _size = _pending = 0; + while (_pending<_queue.length) + { + _executor.execute(new ReservedThread()); + _pending++; + } + } + } + + @Override + public void doStop() throws Exception + { + try (Locker.Lock lock = _locker.lock()) + { + while (_size>0) + { + ReservedThread thread = _queue[_head]; + _queue[_head] = null; + _head = (_head+1)%_queue.length; + _size--; + thread._wakeup.signal(); + } + } + } + + @Override + public void execute(Runnable task) throws RejectedExecutionException + { + if (!tryExecute(task)) + throw new RejectedExecutionException(); + } + + /** + * @param task The task to run + * @return True iff a reserved thread was available and has been assigned the task to run. + */ + public boolean tryExecute(Runnable task) + { + if (task==null) + return false; + + try (Locker.Lock lock = _locker.lock()) + { + if (_size==0) + { + if (_pending<_queue.length) + { + _executor.execute(new ReservedThread()); + _pending++; + } + return false; + } + + ReservedThread thread = _queue[_head]; + _queue[_head] = null; + _head = (_head+1)%_queue.length; + _size--; + + if (_size==0 && _pending<_queue.length) + { + _executor.execute(new ReservedThread()); + _pending++; + } + + thread._task = task; + thread._wakeup.signal(); + + return true; + } + catch(RejectedExecutionException e) + { + LOG.ignore(e); + return false; + } + } + + private class ReservedThread implements Runnable + { + private Condition _wakeup = null; + private Runnable _task = null; + + private void reservedWait() throws InterruptedException + { + _wakeup.await(); + } + + @Override + public void run() + { + while (true) + { + Runnable task = null; + + try (Locker.Lock lock = _locker.lock()) + { + // if this is our first loop, decrement pending count + if (_wakeup==null) + { + _pending--; + _wakeup = _locker.newCondition(); + } + + // Exit if no longer running or there now too many preallocated threads + if (!isRunning() || _size>=_queue.length) + break; + + // Insert ourselves in the queue + _queue[(_head+_size++)%_queue.length] = this; + + // Wait for a task, ignoring spurious interrupts + do + { + try + { + reservedWait(); + task = _task; + _task = null; + } + catch (InterruptedException e) + { + LOG.ignore(e); + } + } + while (isRunning() && task==null); + } + + // Run any task + if (task!=null) + { + try + { + task.run(); + } + catch (Exception e) + { + LOG.warn(e); + break; + } + } + } + } + } + + @Override + public String toString() + { + try (Locker.Lock lock = _locker.lock()) + { + return String.format("%s{s=%d,p=%d}",super.toString(),_size,_pending); + } + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java index 84898e4fecb..6e8854d3aba 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java @@ -18,18 +18,19 @@ package org.eclipse.jetty.util.thread.strategy; +import java.io.Closeable; import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; +import java.util.concurrent.RejectedExecutionException; -import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Invocable; -import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor; import org.eclipse.jetty.util.thread.Invocable.InvocationType; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Locker.Lock; +import org.eclipse.jetty.util.thread.ReservedThreadExecutor; /** *

A strategy where the thread that produces will run the resulting task if it @@ -57,100 +58,61 @@ import org.eclipse.jetty.util.thread.Locker.Lock; *

* */ -public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrategy, Runnable +public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrategy, Runnable { private static final Logger LOG = Log.getLogger(EatWhatYouKill.class); - enum State { IDLE, PRODUCING, REPRODUCING }; + private enum State { IDLE, PRODUCING, REPRODUCING } private final Locker _locker = new Locker(); private State _state = State.IDLE; private final Runnable _runProduce = new RunProduce(); private final Producer _producer; - private final InvocableExecutor _executor; - private int _pendingProducersMax; - private int _pendingProducers; - private int _pendingProducersDispatched; - private int _pendingProducersSignalled; - private Condition _produce = _locker.newCondition(); + private final Executor _executor; + private final ReservedThreadExecutor _producers; public EatWhatYouKill(Producer producer, Executor executor) { - this(producer,executor,InvocationType.NON_BLOCKING,InvocationType.BLOCKING); + this(producer,executor,new ReservedThreadExecutor(executor,1)); } - public EatWhatYouKill(Producer producer, Executor executor, int maxProducersPending ) + public EatWhatYouKill(Producer producer, Executor executor, int maxProducersPending) { - this(producer,executor,InvocationType.NON_BLOCKING,InvocationType.BLOCKING); + this(producer,executor,new ReservedThreadExecutor(executor,maxProducersPending)); } - - public EatWhatYouKill(Producer producer, Executor executor, InvocationType preferredInvocationPEC, InvocationType preferredInvocationEPC) - { - this(producer,executor,preferredInvocationPEC,preferredInvocationEPC,Integer.getInteger("org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.maxProducersPending",1)); - } - - public EatWhatYouKill(Producer producer, Executor executor, InvocationType preferredInvocationPEC, InvocationType preferredInvocationEPC, int maxProducersPending ) + + public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers) { _producer = producer; - _pendingProducersMax = maxProducersPending; - _executor = new InvocableExecutor(executor,preferredInvocationPEC,preferredInvocationEPC); - } - - @Override - public void produce() - { - boolean produce; - try (Lock locked = _locker.lock()) - { - switch(_state) - { - case IDLE: - _state = State.PRODUCING; - produce = true; - break; - - case PRODUCING: - _state = State.REPRODUCING; - produce = false; - break; - - default: - produce = false; - } - } - - if (LOG.isDebugEnabled()) - LOG.debug("{} execute {}", this, produce); - - if (produce) - doProduce(); + _executor = executor; + _producers = producers; + addBean(_producer); } @Override public void dispatch() { - boolean dispatch = false; + boolean execute = false; try (Lock locked = _locker.lock()) { switch(_state) { case IDLE: - dispatch = true; + execute = true; break; case PRODUCING: _state = State.REPRODUCING; - dispatch = false; break; - default: - dispatch = false; + default: + break; } } if (LOG.isDebugEnabled()) - LOG.debug("{} dispatch {}", this, dispatch); - if (dispatch) - _executor.execute(_runProduce,InvocationType.BLOCKING); + LOG.debug("{} dispatch {}", this, execute); + if (execute) + _executor.execute(_runProduce); } @Override @@ -158,160 +120,139 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate { if (LOG.isDebugEnabled()) LOG.debug("{} run", this); - if (!isRunning()) - return; + produce(); + } + + @Override + public void produce() + { + boolean reproduce = true; + while(isRunning() && tryProduce(reproduce) && doProduce()) + reproduce = false; + } + + public boolean tryProduce(boolean reproduce) + { boolean producing = false; try (Lock locked = _locker.lock()) { - _pendingProducersDispatched--; - _pendingProducers++; - - loop: while (isRunning()) + switch (_state) { - try - { - _produce.await(); - - if (_pendingProducersSignalled==0) - { - // spurious wakeup! - continue loop; - } - - _pendingProducersSignalled--; - if (_state == State.IDLE) - { - _state = State.PRODUCING; - producing = true; - } - } - catch (InterruptedException e) - { - LOG.debug(e); - _pendingProducers--; - } - - break loop; - } + case IDLE: + // Enter PRODUCING + _state = State.PRODUCING; + producing = true; + break; + + case PRODUCING: + // Keep other Thread producing + if (reproduce) + _state = State.REPRODUCING; + break; + + default: + break; + } } - - if (producing) - doProduce(); + return producing; } - private void doProduce() + public boolean doProduce() { - boolean may_block_caller = !Invocable.isNonBlockingInvocation(); - if (LOG.isDebugEnabled()) - LOG.debug("{} produce {}", this,may_block_caller?"non-blocking":"blocking"); - - producing: while (isRunning()) + boolean producing = true; + while (isRunning() && producing) { // If we got here, then we are the thread that is producing. - Runnable task = _producer.produce(); - - boolean produce; - boolean consume; - boolean execute_producer; - - StringBuilder state = null; - - try (Lock locked = _locker.lock()) + Runnable task = null; + try { - if (LOG.isDebugEnabled()) + task = _producer.produce(); + } + catch(Throwable e) + { + LOG.warn(e); + } + + if (LOG.isDebugEnabled()) + LOG.debug("{} t={}/{}",this,task,Invocable.getInvocationType(task)); + + if (task==null) + { + try (Lock locked = _locker.lock()) { - state = new StringBuilder(); - getString(state); - getState(state); - state.append("->"); - } - - // Did we produced a task? - if (task == null) - { - // There is no task. // Could another one just have been queued with a produce call? if (_state==State.REPRODUCING) - { _state = State.PRODUCING; - continue producing; + else + { + if (LOG.isDebugEnabled()) + LOG.debug("{} IDLE",toStringLocked()); + _state = State.IDLE; + producing = false; } - - // ... and no additional calls to execute, so we are idle - _state = State.IDLE; - break producing; } - - // Will we eat our own kill - ie consume the task we just produced? - if (Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING) - { - // ProduceConsume - produce = true; - consume = true; - execute_producer = false; - } - else if (may_block_caller && (_pendingProducers>0 || _pendingProducersMax==0)) - { - // ExecuteProduceConsume (eat what we kill!) - produce = false; - consume = true; - execute_producer = true; - _pendingProducersDispatched++; - _state = State.IDLE; - _pendingProducers--; - _pendingProducersSignalled++; - _produce.signal(); - } - else - { - // ProduceExecuteConsume - produce = true; - consume = false; - execute_producer = (_pendingProducersDispatched + _pendingProducers)<_pendingProducersMax; - if (execute_producer) - _pendingProducersDispatched++; - } - + } + else if (Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING) + { + // PRODUCE CONSUME (EWYK!) if (LOG.isDebugEnabled()) - getState(state); - + LOG.debug("{} PC t={}",this,task); + task.run(); } - - if (LOG.isDebugEnabled()) - { - LOG.debug("{} {} {}", - state, - consume?(execute_producer?"EPC!":"PC"):"PEC", - task); - } - - if (execute_producer) - // Spawn a new thread to continue production by running the produce loop. - _executor.execute(this); - - // Run or execute the task. - if (consume) - _executor.invoke(task); else - _executor.execute(task); - - // Once we have run the task, we can try producing again. - if (produce) - continue producing; - - try (Lock locked = _locker.lock()) { - if (_state==State.IDLE) + boolean consume; + try (Lock locked = _locker.lock()) { - _state = State.PRODUCING; - continue producing; + if (_producers.tryExecute(this)) + { + // EXECUTE PRODUCE CONSUME! + // We have executed a new Producer, so we can EWYK consume + _state = State.IDLE; + producing = false; + consume = true; + } + else + { + // PRODUCE EXECUTE CONSUME! + consume = false; + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("{} {} t={}",this,consume?"EPC":"PEC",task); + + // Consume or execute task + try + { + if (consume) + task.run(); + else + _executor.execute(task); + } + catch(RejectedExecutionException e) + { + LOG.warn(e); + if (task instanceof Closeable) + { + try + { + ((Closeable)task).close(); + } + catch(Throwable e2) + { + LOG.ignore(e2); + } + } + } + catch(Throwable e) + { + LOG.warn(e); } } - - break producing; } - if (LOG.isDebugEnabled()) - LOG.debug("{} produce exit",this); + + return producing; } public Boolean isIdle() @@ -322,25 +263,19 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate } } - @Override - protected void doStop() throws Exception + public String toString() { try (Lock locked = _locker.lock()) { - _pendingProducersSignalled=_pendingProducers+_pendingProducersDispatched; - _pendingProducers=0; - _produce.signalAll(); + return toStringLocked(); } } - public String toString() + public String toStringLocked() { StringBuilder builder = new StringBuilder(); getString(builder); - try (Lock locked = _locker.lock()) - { - getState(builder); - } + getState(builder); return builder.toString(); } @@ -358,9 +293,7 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate { builder.append(_state); builder.append('/'); - builder.append(_pendingProducers); - builder.append('/'); - builder.append(_pendingProducersMax); + builder.append(_producers); } private class RunProduce implements Runnable diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java index 1a8d3666df6..8d21b7f37c6 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java @@ -24,7 +24,6 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Invocable; -import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor; import org.eclipse.jetty.util.thread.Invocable.InvocationType; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Locker.Lock; @@ -49,7 +48,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable private final Locker _locker = new Locker(); private final Runnable _runProduce = new RunProduce(); private final Producer _producer; - private final InvocableExecutor _executor; + private final Executor _executor; private boolean _idle = true; private boolean _execute; private boolean _producing; @@ -57,14 +56,9 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable public ExecuteProduceConsume(Producer producer, Executor executor) - { - this(producer,executor,InvocationType.BLOCKING); - } - - public ExecuteProduceConsume(Producer producer, Executor executor, InvocationType preferred ) { this._producer = producer; - _executor = new InvocableExecutor(executor,preferred); + _executor = executor; } @Override @@ -192,15 +186,14 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable // Spawn a new thread to continue production by running the produce loop. if (LOG.isDebugEnabled()) LOG.debug("{} dispatch", this); - if (!_executor.tryExecute(this)) - task = null; + _executor.execute(this); } // Run the task. if (LOG.isDebugEnabled()) LOG.debug("{} run {}", this, task); if (task != null) - _executor.invoke(task); + task.run(); if (LOG.isDebugEnabled()) LOG.debug("{} ran {}", this, task); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java index 4e14596bc45..6e5e1783d35 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java @@ -24,7 +24,6 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Invocable.InvocationType; -import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Locker.Lock; @@ -38,18 +37,13 @@ public class ProduceExecuteConsume implements ExecutionStrategy private final Locker _locker = new Locker(); private final Producer _producer; - private final InvocableExecutor _executor; + private final Executor _executor; private State _state = State.IDLE; public ProduceExecuteConsume(Producer producer, Executor executor) - { - this(producer,executor,InvocationType.NON_BLOCKING); - } - - public ProduceExecuteConsume(Producer producer, Executor executor, InvocationType preferred) { _producer = producer; - _executor = new InvocableExecutor(executor,preferred); + _executor = executor; } @Override diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java new file mode 100644 index 00000000000..f6209683a59 --- /dev/null +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java @@ -0,0 +1,204 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +public class ReservedThreadExecutorTest +{ + final static int SIZE = 2; + TestExecutor _executor; + ReservedThreadExecutor _pae; + + @Before + public void before() throws Exception + { + _executor = new TestExecutor(); + _pae = new ReservedThreadExecutor(_executor,SIZE); + _pae.start(); + } + + + @After + public void after() throws Exception + { + _pae.stop(); + } + + @Test + public void testStarted() throws Exception + { + assertThat(_executor._queue.size(),is(SIZE)); + while(!_executor._queue.isEmpty()) + _executor.execute(); + + assertThat(_pae.getCapacity(),is(SIZE)); + + long started = System.nanoTime(); + while (_pae.getPreallocated()10) + break; + Thread.sleep(100); + } + assertThat(_pae.getPreallocated(),is(SIZE)); + } + + @Test + public void testPending() throws Exception + { + assertThat(_executor._queue.size(),is(SIZE)); + assertThat(_pae.tryExecute(new NOOP()),is(false)); + assertThat(_executor._queue.size(),is(SIZE)); + + _executor.execute(); + assertThat(_executor._queue.size(),is(SIZE-1)); + while (!_executor._queue.isEmpty()) + _executor.execute(); + + long started = System.nanoTime(); + while (_pae.getPreallocated()10) + break; + Thread.sleep(100); + } + assertThat(_executor._queue.size(),is(0)); + assertThat(_pae.getPreallocated(),is(SIZE)); + + for (int i=SIZE;i-->0;) + assertThat(_pae.tryExecute(new Task()),is(true)); + assertThat(_executor._queue.size(),is(1)); + assertThat(_pae.getPreallocated(),is(0)); + + for (int i=SIZE;i-->0;) + assertThat(_pae.tryExecute(new NOOP()),is(false)); + assertThat(_executor._queue.size(),is(SIZE)); + assertThat(_pae.getPreallocated(),is(0)); + + assertThat(_pae.tryExecute(new NOOP()),is(false)); + assertThat(_executor._queue.size(),is(SIZE)); + assertThat(_pae.getPreallocated(),is(0)); + } + + @Test + public void testExecuted() throws Exception + { + while(!_executor._queue.isEmpty()) + _executor.execute(); + long started = System.nanoTime(); + while (_pae.getPreallocated()10) + break; + Thread.sleep(100); + } + assertThat(_pae.getPreallocated(),is(SIZE)); + + Task[] task = new Task[SIZE]; + for (int i=SIZE;i-->0;) + { + task[i] = new Task(); + assertThat(_pae.tryExecute(task[i]),is(true)); + } + + for (int i=SIZE;i-->0;) + { + task[i]._ran.await(10,TimeUnit.SECONDS); + } + + assertThat(_executor._queue.size(),is(1)); + Task extra = new Task(); + assertThat(_pae.tryExecute(extra),is(false)); + assertThat(_executor._queue.size(),is(2)); + Thread.sleep(100); + assertThat(extra._ran.getCount(),is(1L)); + + for (int i=SIZE;i-->0;) + { + task[i]._complete.countDown(); + } + + started = System.nanoTime(); + while (_pae.getPreallocated()10) + break; + Thread.sleep(100); + } + assertThat(_pae.getPreallocated(),is(SIZE)); + + + } + + + private static class TestExecutor implements Executor + { + Deque _queue = new ArrayDeque<>(); + + @Override + public void execute(Runnable task) + { + _queue.addLast(task); + } + + public void execute() + { + Runnable task = _queue.pollFirst(); + if (task!=null) + new Thread(task).start(); + } + } + + private static class NOOP implements Runnable + { + @Override + public void run() {} + } + + private static class Task implements Runnable + { + private CountDownLatch _ran = new CountDownLatch(1); + private CountDownLatch _complete = new CountDownLatch(1); + @Override + public void run() + { + _ran.countDown(); + try + { + _complete.await(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } + } +} diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java index 23981455096..f5156598cd4 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java @@ -158,7 +158,8 @@ public class ExecutionStrategyTest @Override public Runnable produce() { - if (tasks-->0) + final int id = --tasks; + if (id>=0) { try { @@ -171,6 +172,7 @@ public class ExecutionStrategyTest @Override public void run() { + // System.err.println("RUN "+id); latch.countDown(); } };