diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
index 7b0d8ad39a6..3a356bc7445 100644
--- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
+++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
@@ -39,6 +39,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle;
+import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
@@ -46,6 +47,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
public static final String CLIENT_CONTEXT_KEY = "http2.client";
public static final String BYTE_BUFFER_POOL_CONTEXT_KEY = "http2.client.byteBufferPool";
public static final String EXECUTOR_CONTEXT_KEY = "http2.client.executor";
+ public static final String PREALLOCATED_EXECUTOR_CONTEXT_KEY = "http2.client.preallocatedExecutor";
public static final String SCHEDULER_CONTEXT_KEY = "http2.client.scheduler";
public static final String SESSION_LISTENER_CONTEXT_KEY = "http2.client.sessionListener";
public static final String SESSION_PROMISE_CONTEXT_KEY = "http2.client.sessionPromise";
@@ -58,6 +60,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
HTTP2Client client = (HTTP2Client)context.get(CLIENT_CONTEXT_KEY);
ByteBufferPool byteBufferPool = (ByteBufferPool)context.get(BYTE_BUFFER_POOL_CONTEXT_KEY);
Executor executor = (Executor)context.get(EXECUTOR_CONTEXT_KEY);
+ ReservedThreadExecutor preallocatedExecutor = (ReservedThreadExecutor)context.get(PREALLOCATED_EXECUTOR_CONTEXT_KEY);
Scheduler scheduler = (Scheduler)context.get(SCHEDULER_CONTEXT_KEY);
Session.Listener listener = (Session.Listener)context.get(SESSION_LISTENER_CONTEXT_KEY);
@SuppressWarnings("unchecked")
@@ -67,7 +70,33 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy();
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
- HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
+
+ if (preallocatedExecutor==null)
+ {
+ // TODO move this to non lazy construction
+ preallocatedExecutor=client.getBean(ReservedThreadExecutor.class);
+ if (preallocatedExecutor==null)
+ {
+ synchronized (this)
+ {
+ if (preallocatedExecutor==null)
+ {
+ try
+ {
+ preallocatedExecutor = new ReservedThreadExecutor(executor,1); // TODO configure size
+ preallocatedExecutor.start();
+ client.addBean(preallocatedExecutor,true);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ }
+
+ HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, preallocatedExecutor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);
connection.addListener(connectionListener);
return customize(connection, context);
@@ -79,7 +108,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
private final Promise promise;
private final Session.Listener listener;
- private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise promise, Session.Listener listener)
+ private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise promise, Session.Listener listener)
{
super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
this.client = client;
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
index 65a04cbd0e6..39350c2e70b 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
@@ -35,6 +35,7 @@ import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
+import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
public class HTTP2Connection extends AbstractConnection
@@ -50,14 +51,14 @@ public class HTTP2Connection extends AbstractConnection
private final int bufferSize;
private final ExecutionStrategy strategy;
- public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
+ public HTTP2Connection(ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
{
- super(endPoint, executor);
+ super(endPoint, executor.getExecutor());
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
- this.strategy = new EatWhatYouKill(producer, executor, 0);
+ this.strategy = new EatWhatYouKill(producer, executor.getExecutor(), executor);
LifeCycle.start(strategy);
}
@@ -147,7 +148,10 @@ public class HTTP2Connection extends AbstractConnection
protected void offerTask(Runnable task, boolean dispatch)
{
offerTask(task);
- strategy.dispatch();
+ if (dispatch)
+ strategy.dispatch();
+ else
+ strategy.produce();
}
@Override
@@ -180,7 +184,7 @@ public class HTTP2Connection extends AbstractConnection
private ByteBuffer buffer;
@Override
- public synchronized Runnable produce()
+ public Runnable produce()
{
Runnable task = pollTask();
if (LOG.isDebugEnabled())
diff --git a/jetty-http2/http2-server/src/main/config/etc/jetty-http2.xml b/jetty-http2/http2-server/src/main/config/etc/jetty-http2.xml
index 226941d5e71..1d6423589e3 100644
--- a/jetty-http2/http2-server/src/main/config/etc/jetty-http2.xml
+++ b/jetty-http2/http2-server/src/main/config/etc/jetty-http2.xml
@@ -9,8 +9,10 @@
-
-
+
+
+
+
diff --git a/jetty-http2/http2-server/src/main/config/modules/http2.mod b/jetty-http2/http2-server/src/main/config/modules/http2.mod
index e1e700ba372..2ffa068ede6 100644
--- a/jetty-http2/http2-server/src/main/config/modules/http2.mod
+++ b/jetty-http2/http2-server/src/main/config/modules/http2.mod
@@ -20,7 +20,14 @@ etc/jetty-http2.xml
[ini-template]
## Max number of concurrent streams per connection
-# jetty.http2.maxConcurrentStreams=1024
+# jetty.http2.maxConcurrentStreams=128
## Initial stream receive window (client to server)
-# jetty.http2.initialStreamRecvWindow=65535
+# jetty.http2.initialStreamRecvWindow=524288
+
+## Initial session receive window (client to server)
+# jetty.http2.initialSessionRecvWindow=1048576
+
+## Reserve threads for high priority tasks (-1 use number of Selectors, 0 no reserved threads)
+# jetty.http2.reservedThreads=-1
+
diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java
index 82ed664041e..b0e6c8d2dbe 100644
--- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java
+++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java
@@ -35,6 +35,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.LifeCycle;
+import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
@ManagedObject
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
@@ -48,6 +49,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxHeaderBlockFragment = 0;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private long streamIdleTimeout;
+ private int reservedThreads = -1;
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{
@@ -108,6 +110,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.maxConcurrentStreams = maxConcurrentStreams;
}
+ @ManagedAttribute("The max header block fragment")
public int getMaxHeaderBlockFragment()
{
return maxHeaderBlockFragment;
@@ -139,6 +142,21 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.streamIdleTimeout = streamIdleTimeout;
}
+ /**
+ * @see ReservedThreadExecutor
+ * @return The number of reserved threads
+ */
+ @ManagedAttribute("The number of threads reserved for high priority tasks")
+ public int getReservedThreads()
+ {
+ return reservedThreads;
+ }
+
+ public void setReservedThreads(int threads)
+ {
+ this.reservedThreads = threads;
+ }
+
public HttpConfiguration getHttpConfiguration()
{
return httpConfiguration;
@@ -163,9 +181,32 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
streamIdleTimeout = endPoint.getIdleTimeout();
session.setStreamIdleTimeout(streamIdleTimeout);
session.setInitialSessionRecvWindow(getInitialSessionRecvWindow());
+
+ ReservedThreadExecutor executor = connector.getBean(ReservedThreadExecutor.class);
+ if (executor==null)
+ {
+ synchronized (this)
+ {
+ executor = connector.getBean(ReservedThreadExecutor.class);
+ if (executor==null)
+ {
+ try
+ {
+ executor = new ReservedThreadExecutor(connector.getExecutor(),getReservedThreads());
+ executor.start();
+ connector.addBean(executor,true);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
ServerParser parser = newServerParser(connector, session);
- HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
+ HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), executor,
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
connection.addListener(connectionListener);
return configure(connection, connector, endPoint);
diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java
index 18e6d47c22a..a1f9ed0d394 100644
--- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java
+++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java
@@ -56,6 +56,7 @@ import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.TypeUtil;
+import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
{
@@ -91,7 +92,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private final HttpConfiguration httpConfig;
private boolean recycleHttpChannels;
- public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
+ public HTTP2ServerConnection(ByteBufferPool byteBufferPool, ReservedThreadExecutor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
this.listener = listener;
diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java
index 81a8cc490ff..6487577ba87 100644
--- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java
+++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java
@@ -18,6 +18,7 @@
package org.eclipse.jetty.http2.server;
+import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -46,7 +47,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
-public class HttpChannelOverHTTP2 extends HttpChannel
+public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
{
private static final Logger LOG = Log.getLogger(HttpChannelOverHTTP2.class);
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
@@ -377,6 +378,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel
}
}
+ @Override
+ public void close()
+ {
+ abort(new IOException("Unexpected close"));
+ }
+
@Override
public String toString()
{
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
index 8868befb44c..1af25a979f4 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
@@ -38,7 +38,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
@@ -47,10 +46,9 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
+import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
-import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
-import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume;
/**
* {@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.
@@ -76,8 +74,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
_id = id;
SelectorProducer producer = new SelectorProducer();
Executor executor = selectorManager.getExecutor();
- _strategy = new EatWhatYouKill(producer,executor);
- addBean(_strategy);
+ _strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class));
+ addBean(_strategy,true);
setStopTimeout(5000);
}
@@ -446,24 +444,26 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
@Override
public String dump()
{
+ super.dump();
return ContainerLifeCycle.dump(this);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
- out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append(System.lineSeparator());
-
Selector selector = _selector;
- if (selector != null && selector.isOpen())
+ if (selector == null || !selector.isOpen())
+ dumpBeans(out, indent);
+ else
{
final ArrayList
*/
@ManagedObject("Connector Interface")
-public interface Connector extends LifeCycle, Graceful
+public interface Connector extends LifeCycle, Container, Graceful
{
/**
* @return the {@link Server} instance associated with this {@link Connector}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java
index db6a05e17f8..4137f226e0c 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java
@@ -378,6 +378,7 @@ public class ServerConnector extends AbstractNetworkConnector
}
}
+ @ManagedAttribute("The Selector Manager")
public SelectorManager getSelectorManager()
{
return _manager;
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java b/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java
index 260e4451a2a..6a582351cb1 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/component/Container.java
@@ -20,6 +20,7 @@ package org.eclipse.jetty.util.component;
import java.util.Collection;
+
/**
* A Container
*/
@@ -76,6 +77,31 @@ public interface Container
*/
public void removeEventListener(Listener listener);
+ /**
+ * Unmanages a bean already contained by this aggregate, so that it is not started/stopped/destroyed with this
+ * aggregate.
+ *
+ * @param bean The bean to unmanage (must already have been added).
+ */
+ void unmanage(Object bean);
+
+ /**
+ * Manages a bean already contained by this aggregate, so that it is started/stopped/destroyed with this
+ * aggregate.
+ *
+ * @param bean The bean to manage (must already have been added).
+ */
+ void manage(Object bean);
+
+ /**
+ * Adds the given bean, explicitly managing it or not.
+ *
+ * @param o The bean object to add
+ * @param managed whether to managed the lifecycle of the bean
+ * @return true if the bean was added, false if it was already present
+ */
+ boolean addBean(Object o, boolean managed);
+
/**
* A listener for Container events.
* If an added bean implements this interface it will receive the events
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java b/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java
index fdc0b9cd60e..533c1949537 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/component/ContainerLifeCycle.java
@@ -236,6 +236,7 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
* @param managed whether to managed the lifecycle of the bean
* @return true if the bean was added, false if it was already present
*/
+ @Override
public boolean addBean(Object o, boolean managed)
{
if (o instanceof LifeCycle)
@@ -380,6 +381,7 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
*
* @param bean The bean to manage (must already have been added).
*/
+ @Override
public void manage(Object bean)
{
for (Bean b : _beans)
@@ -426,6 +428,7 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
*
* @param bean The bean to unmanage (must already have been added).
*/
+ @Override
public void unmanage(Object bean)
{
for (Bean b : _beans)
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java
index cb926417b28..e060a4b30ad 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java
@@ -186,84 +186,4 @@ public interface Invocable
return InvocationType.BLOCKING;
}
- /**
- * An Executor wrapper that knows about Invocable
- *
- */
- public static class InvocableExecutor implements Executor
- {
- private static final Logger LOG = Log.getLogger(InvocableExecutor.class);
-
- private final Executor _executor;
- private final InvocationType _preferredInvocationForExecute;
- private final InvocationType _preferredInvocationForInvoke;
-
- public InvocableExecutor(Executor executor,InvocationType preferred)
- {
- this(executor,preferred,preferred);
- }
-
- public InvocableExecutor(Executor executor,InvocationType preferredInvocationForExecute,InvocationType preferredInvocationForIvoke)
- {
- _executor=executor;
- _preferredInvocationForExecute=preferredInvocationForExecute;
- _preferredInvocationForInvoke=preferredInvocationForIvoke;
- }
-
- public Invocable.InvocationType getPreferredInvocationType()
- {
- return _preferredInvocationForInvoke;
- }
-
- public void invoke(Runnable task)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("{} invoke {}", this, task);
- Invocable.invokePreferred(task,_preferredInvocationForInvoke);
- if (LOG.isDebugEnabled())
- LOG.debug("{} invoked {}", this, task);
- }
-
- public void execute(Runnable task)
- {
- tryExecute(task,_preferredInvocationForExecute);
- }
-
- public void execute(Runnable task, InvocationType preferred)
- {
- tryExecute(task,preferred);
- }
-
- public boolean tryExecute(Runnable task)
- {
- return tryExecute(task,_preferredInvocationForExecute);
- }
-
- public boolean tryExecute(Runnable task, InvocationType preferred)
- {
- try
- {
- _executor.execute(Invocable.asPreferred(task,preferred));
- return true;
- }
- catch(RejectedExecutionException e)
- {
- // If we cannot execute, then close the task
- LOG.debug(e);
- LOG.warn("Rejected execution of {}",task);
- try
- {
- if (task instanceof Closeable)
- ((Closeable)task).close();
- }
- catch (Exception x)
- {
- e.addSuppressed(x);
- LOG.warn(e);
- }
- }
- return false;
- }
-
- }
}
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java
index 4cb6fcfa3c4..efadbcf694f 100755
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java
@@ -506,16 +506,22 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
for (final Thread thread : _threads)
{
final StackTraceElement[] trace = thread.getStackTrace();
- boolean inIdleJobPoll = false;
+ String knownMethod = "";
for (StackTraceElement t : trace)
{
if ("idleJobPoll".equals(t.getMethodName()))
{
- inIdleJobPoll = true;
+ knownMethod = "IDLE ";
+ break;
+ }
+
+ if ("preallocatedWait".equals(t.getMethodName()))
+ {
+ knownMethod = "PREALLOCATED ";
break;
}
}
- final boolean idle = inIdleJobPoll;
+ final String known = knownMethod;
if (isDetailedDump())
{
@@ -524,11 +530,11 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
@Override
public void dump(Appendable out, String indent) throws IOException
{
- out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "");
+ out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(known).append(thread.getState().toString());
if (thread.getPriority()!=Thread.NORM_PRIORITY)
out.append(" prio=").append(String.valueOf(thread.getPriority()));
out.append(System.lineSeparator());
- if (!idle)
+ if (known.length()==0)
ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
}
@@ -542,7 +548,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
else
{
int p=thread.getPriority();
- threads.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
+ threads.add(thread.getId() + " " + thread.getName() + " " + known + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (p==Thread.NORM_PRIORITY?"":(" prio="+p)));
}
}
@@ -557,7 +563,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
@Override
public String toString()
{
- return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
+ return String.format("org.eclipse.jetty.util.thread.QueuedThreadPool@%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
}
private Runnable idleJobPoll() throws InterruptedException
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java
new file mode 100644
index 00000000000..95045b29386
--- /dev/null
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java
@@ -0,0 +1,235 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.util.thread;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.locks.Condition;
+
+import org.eclipse.jetty.util.component.AbstractLifeCycle;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+
+/**
+ * An Executor using preallocated/reserved Threads from a wrapped Executor.
+ * Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed
+ * with a Thread immediately being assigned the Runnable task, or fail if no Thread is
+ * available. Threads are preallocated up to the capacity from a wrapped {@link Executor}.
+ */
+public class ReservedThreadExecutor extends AbstractLifeCycle implements Executor
+{
+ private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class);
+
+ private final Executor _executor;
+ private final Locker _locker = new Locker();
+ private final ReservedThread[] _queue;
+ private int _head;
+ private int _size;
+ private int _pending;
+
+ public ReservedThreadExecutor(Executor executor)
+ {
+ this(executor,1);
+ }
+
+ /**
+ * @param executor The executor to use to obtain threads
+ * @param capacity The number of threads to preallocate. If less than 0 then the number of available processors is used.
+ */
+ public ReservedThreadExecutor(Executor executor,int capacity)
+ {
+ _executor = executor;
+ _queue = new ReservedThread[capacity>=0?capacity:Runtime.getRuntime().availableProcessors()];
+ }
+
+ public Executor getExecutor()
+ {
+ return _executor;
+ }
+
+ public int getCapacity()
+ {
+ return _queue.length;
+ }
+
+ public int getPreallocated()
+ {
+ try (Locker.Lock lock = _locker.lock())
+ {
+ return _size;
+ }
+ }
+
+ @Override
+ public void doStart() throws Exception
+ {
+ try (Locker.Lock lock = _locker.lock())
+ {
+ _head = _size = _pending = 0;
+ while (_pending<_queue.length)
+ {
+ _executor.execute(new ReservedThread());
+ _pending++;
+ }
+ }
+ }
+
+ @Override
+ public void doStop() throws Exception
+ {
+ try (Locker.Lock lock = _locker.lock())
+ {
+ while (_size>0)
+ {
+ ReservedThread thread = _queue[_head];
+ _queue[_head] = null;
+ _head = (_head+1)%_queue.length;
+ _size--;
+ thread._wakeup.signal();
+ }
+ }
+ }
+
+ @Override
+ public void execute(Runnable task) throws RejectedExecutionException
+ {
+ if (!tryExecute(task))
+ throw new RejectedExecutionException();
+ }
+
+ /**
+ * @param task The task to run
+ * @return True iff a reserved thread was available and has been assigned the task to run.
+ */
+ public boolean tryExecute(Runnable task)
+ {
+ if (task==null)
+ return false;
+
+ try (Locker.Lock lock = _locker.lock())
+ {
+ if (_size==0)
+ {
+ if (_pending<_queue.length)
+ {
+ _executor.execute(new ReservedThread());
+ _pending++;
+ }
+ return false;
+ }
+
+ ReservedThread thread = _queue[_head];
+ _queue[_head] = null;
+ _head = (_head+1)%_queue.length;
+ _size--;
+
+ if (_size==0 && _pending<_queue.length)
+ {
+ _executor.execute(new ReservedThread());
+ _pending++;
+ }
+
+ thread._task = task;
+ thread._wakeup.signal();
+
+ return true;
+ }
+ catch(RejectedExecutionException e)
+ {
+ LOG.ignore(e);
+ return false;
+ }
+ }
+
+ private class ReservedThread implements Runnable
+ {
+ private Condition _wakeup = null;
+ private Runnable _task = null;
+
+ private void reservedWait() throws InterruptedException
+ {
+ _wakeup.await();
+ }
+
+ @Override
+ public void run()
+ {
+ while (true)
+ {
+ Runnable task = null;
+
+ try (Locker.Lock lock = _locker.lock())
+ {
+ // if this is our first loop, decrement pending count
+ if (_wakeup==null)
+ {
+ _pending--;
+ _wakeup = _locker.newCondition();
+ }
+
+ // Exit if no longer running or there now too many preallocated threads
+ if (!isRunning() || _size>=_queue.length)
+ break;
+
+ // Insert ourselves in the queue
+ _queue[(_head+_size++)%_queue.length] = this;
+
+ // Wait for a task, ignoring spurious interrupts
+ do
+ {
+ try
+ {
+ reservedWait();
+ task = _task;
+ _task = null;
+ }
+ catch (InterruptedException e)
+ {
+ LOG.ignore(e);
+ }
+ }
+ while (isRunning() && task==null);
+ }
+
+ // Run any task
+ if (task!=null)
+ {
+ try
+ {
+ task.run();
+ }
+ catch (Exception e)
+ {
+ LOG.warn(e);
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ try (Locker.Lock lock = _locker.lock())
+ {
+ return String.format("%s{s=%d,p=%d}",super.toString(),_size,_pending);
+ }
+ }
+}
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java
index 84898e4fecb..6e8854d3aba 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java
@@ -18,18 +18,19 @@
package org.eclipse.jetty.util.thread.strategy;
+import java.io.Closeable;
import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
+import java.util.concurrent.RejectedExecutionException;
-import org.eclipse.jetty.util.component.AbstractLifeCycle;
+import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
-import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
+import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
/**
*
A strategy where the thread that produces will run the resulting task if it
@@ -57,100 +58,61 @@ import org.eclipse.jetty.util.thread.Locker.Lock;
*
*
*/
-public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrategy, Runnable
+public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrategy, Runnable
{
private static final Logger LOG = Log.getLogger(EatWhatYouKill.class);
- enum State { IDLE, PRODUCING, REPRODUCING };
+ private enum State { IDLE, PRODUCING, REPRODUCING }
private final Locker _locker = new Locker();
private State _state = State.IDLE;
private final Runnable _runProduce = new RunProduce();
private final Producer _producer;
- private final InvocableExecutor _executor;
- private int _pendingProducersMax;
- private int _pendingProducers;
- private int _pendingProducersDispatched;
- private int _pendingProducersSignalled;
- private Condition _produce = _locker.newCondition();
+ private final Executor _executor;
+ private final ReservedThreadExecutor _producers;
public EatWhatYouKill(Producer producer, Executor executor)
{
- this(producer,executor,InvocationType.NON_BLOCKING,InvocationType.BLOCKING);
+ this(producer,executor,new ReservedThreadExecutor(executor,1));
}
- public EatWhatYouKill(Producer producer, Executor executor, int maxProducersPending )
+ public EatWhatYouKill(Producer producer, Executor executor, int maxProducersPending)
{
- this(producer,executor,InvocationType.NON_BLOCKING,InvocationType.BLOCKING);
+ this(producer,executor,new ReservedThreadExecutor(executor,maxProducersPending));
}
-
- public EatWhatYouKill(Producer producer, Executor executor, InvocationType preferredInvocationPEC, InvocationType preferredInvocationEPC)
- {
- this(producer,executor,preferredInvocationPEC,preferredInvocationEPC,Integer.getInteger("org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.maxProducersPending",1));
- }
-
- public EatWhatYouKill(Producer producer, Executor executor, InvocationType preferredInvocationPEC, InvocationType preferredInvocationEPC, int maxProducersPending )
+
+ public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers)
{
_producer = producer;
- _pendingProducersMax = maxProducersPending;
- _executor = new InvocableExecutor(executor,preferredInvocationPEC,preferredInvocationEPC);
- }
-
- @Override
- public void produce()
- {
- boolean produce;
- try (Lock locked = _locker.lock())
- {
- switch(_state)
- {
- case IDLE:
- _state = State.PRODUCING;
- produce = true;
- break;
-
- case PRODUCING:
- _state = State.REPRODUCING;
- produce = false;
- break;
-
- default:
- produce = false;
- }
- }
-
- if (LOG.isDebugEnabled())
- LOG.debug("{} execute {}", this, produce);
-
- if (produce)
- doProduce();
+ _executor = executor;
+ _producers = producers;
+ addBean(_producer);
}
@Override
public void dispatch()
{
- boolean dispatch = false;
+ boolean execute = false;
try (Lock locked = _locker.lock())
{
switch(_state)
{
case IDLE:
- dispatch = true;
+ execute = true;
break;
case PRODUCING:
_state = State.REPRODUCING;
- dispatch = false;
break;
- default:
- dispatch = false;
+ default:
+ break;
}
}
if (LOG.isDebugEnabled())
- LOG.debug("{} dispatch {}", this, dispatch);
- if (dispatch)
- _executor.execute(_runProduce,InvocationType.BLOCKING);
+ LOG.debug("{} dispatch {}", this, execute);
+ if (execute)
+ _executor.execute(_runProduce);
}
@Override
@@ -158,160 +120,139 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
{
if (LOG.isDebugEnabled())
LOG.debug("{} run", this);
- if (!isRunning())
- return;
+ produce();
+ }
+
+ @Override
+ public void produce()
+ {
+ boolean reproduce = true;
+ while(isRunning() && tryProduce(reproduce) && doProduce())
+ reproduce = false;
+ }
+
+ public boolean tryProduce(boolean reproduce)
+ {
boolean producing = false;
try (Lock locked = _locker.lock())
{
- _pendingProducersDispatched--;
- _pendingProducers++;
-
- loop: while (isRunning())
+ switch (_state)
{
- try
- {
- _produce.await();
-
- if (_pendingProducersSignalled==0)
- {
- // spurious wakeup!
- continue loop;
- }
-
- _pendingProducersSignalled--;
- if (_state == State.IDLE)
- {
- _state = State.PRODUCING;
- producing = true;
- }
- }
- catch (InterruptedException e)
- {
- LOG.debug(e);
- _pendingProducers--;
- }
-
- break loop;
- }
+ case IDLE:
+ // Enter PRODUCING
+ _state = State.PRODUCING;
+ producing = true;
+ break;
+
+ case PRODUCING:
+ // Keep other Thread producing
+ if (reproduce)
+ _state = State.REPRODUCING;
+ break;
+
+ default:
+ break;
+ }
}
-
- if (producing)
- doProduce();
+ return producing;
}
- private void doProduce()
+ public boolean doProduce()
{
- boolean may_block_caller = !Invocable.isNonBlockingInvocation();
- if (LOG.isDebugEnabled())
- LOG.debug("{} produce {}", this,may_block_caller?"non-blocking":"blocking");
-
- producing: while (isRunning())
+ boolean producing = true;
+ while (isRunning() && producing)
{
// If we got here, then we are the thread that is producing.
- Runnable task = _producer.produce();
-
- boolean produce;
- boolean consume;
- boolean execute_producer;
-
- StringBuilder state = null;
-
- try (Lock locked = _locker.lock())
+ Runnable task = null;
+ try
{
- if (LOG.isDebugEnabled())
+ task = _producer.produce();
+ }
+ catch(Throwable e)
+ {
+ LOG.warn(e);
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} t={}/{}",this,task,Invocable.getInvocationType(task));
+
+ if (task==null)
+ {
+ try (Lock locked = _locker.lock())
{
- state = new StringBuilder();
- getString(state);
- getState(state);
- state.append("->");
- }
-
- // Did we produced a task?
- if (task == null)
- {
- // There is no task.
// Could another one just have been queued with a produce call?
if (_state==State.REPRODUCING)
- {
_state = State.PRODUCING;
- continue producing;
+ else
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} IDLE",toStringLocked());
+ _state = State.IDLE;
+ producing = false;
}
-
- // ... and no additional calls to execute, so we are idle
- _state = State.IDLE;
- break producing;
}
-
- // Will we eat our own kill - ie consume the task we just produced?
- if (Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING)
- {
- // ProduceConsume
- produce = true;
- consume = true;
- execute_producer = false;
- }
- else if (may_block_caller && (_pendingProducers>0 || _pendingProducersMax==0))
- {
- // ExecuteProduceConsume (eat what we kill!)
- produce = false;
- consume = true;
- execute_producer = true;
- _pendingProducersDispatched++;
- _state = State.IDLE;
- _pendingProducers--;
- _pendingProducersSignalled++;
- _produce.signal();
- }
- else
- {
- // ProduceExecuteConsume
- produce = true;
- consume = false;
- execute_producer = (_pendingProducersDispatched + _pendingProducers)<_pendingProducersMax;
- if (execute_producer)
- _pendingProducersDispatched++;
- }
-
+ }
+ else if (Invocable.getInvocationType(task)==InvocationType.NON_BLOCKING)
+ {
+ // PRODUCE CONSUME (EWYK!)
if (LOG.isDebugEnabled())
- getState(state);
-
+ LOG.debug("{} PC t={}",this,task);
+ task.run();
}
-
- if (LOG.isDebugEnabled())
- {
- LOG.debug("{} {} {}",
- state,
- consume?(execute_producer?"EPC!":"PC"):"PEC",
- task);
- }
-
- if (execute_producer)
- // Spawn a new thread to continue production by running the produce loop.
- _executor.execute(this);
-
- // Run or execute the task.
- if (consume)
- _executor.invoke(task);
else
- _executor.execute(task);
-
- // Once we have run the task, we can try producing again.
- if (produce)
- continue producing;
-
- try (Lock locked = _locker.lock())
{
- if (_state==State.IDLE)
+ boolean consume;
+ try (Lock locked = _locker.lock())
{
- _state = State.PRODUCING;
- continue producing;
+ if (_producers.tryExecute(this))
+ {
+ // EXECUTE PRODUCE CONSUME!
+ // We have executed a new Producer, so we can EWYK consume
+ _state = State.IDLE;
+ producing = false;
+ consume = true;
+ }
+ else
+ {
+ // PRODUCE EXECUTE CONSUME!
+ consume = false;
+ }
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} {} t={}",this,consume?"EPC":"PEC",task);
+
+ // Consume or execute task
+ try
+ {
+ if (consume)
+ task.run();
+ else
+ _executor.execute(task);
+ }
+ catch(RejectedExecutionException e)
+ {
+ LOG.warn(e);
+ if (task instanceof Closeable)
+ {
+ try
+ {
+ ((Closeable)task).close();
+ }
+ catch(Throwable e2)
+ {
+ LOG.ignore(e2);
+ }
+ }
+ }
+ catch(Throwable e)
+ {
+ LOG.warn(e);
}
}
-
- break producing;
}
- if (LOG.isDebugEnabled())
- LOG.debug("{} produce exit",this);
+
+ return producing;
}
public Boolean isIdle()
@@ -322,25 +263,19 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
}
}
- @Override
- protected void doStop() throws Exception
+ public String toString()
{
try (Lock locked = _locker.lock())
{
- _pendingProducersSignalled=_pendingProducers+_pendingProducersDispatched;
- _pendingProducers=0;
- _produce.signalAll();
+ return toStringLocked();
}
}
- public String toString()
+ public String toStringLocked()
{
StringBuilder builder = new StringBuilder();
getString(builder);
- try (Lock locked = _locker.lock())
- {
- getState(builder);
- }
+ getState(builder);
return builder.toString();
}
@@ -358,9 +293,7 @@ public class EatWhatYouKill extends AbstractLifeCycle implements ExecutionStrate
{
builder.append(_state);
builder.append('/');
- builder.append(_pendingProducers);
- builder.append('/');
- builder.append(_pendingProducersMax);
+ builder.append(_producers);
}
private class RunProduce implements Runnable
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
index 1a8d3666df6..8d21b7f37c6 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
@@ -24,7 +24,6 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable;
-import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
@@ -49,7 +48,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
private final Locker _locker = new Locker();
private final Runnable _runProduce = new RunProduce();
private final Producer _producer;
- private final InvocableExecutor _executor;
+ private final Executor _executor;
private boolean _idle = true;
private boolean _execute;
private boolean _producing;
@@ -57,14 +56,9 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
public ExecuteProduceConsume(Producer producer, Executor executor)
- {
- this(producer,executor,InvocationType.BLOCKING);
- }
-
- public ExecuteProduceConsume(Producer producer, Executor executor, InvocationType preferred )
{
this._producer = producer;
- _executor = new InvocableExecutor(executor,preferred);
+ _executor = executor;
}
@Override
@@ -192,15 +186,14 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
// Spawn a new thread to continue production by running the produce loop.
if (LOG.isDebugEnabled())
LOG.debug("{} dispatch", this);
- if (!_executor.tryExecute(this))
- task = null;
+ _executor.execute(this);
}
// Run the task.
if (LOG.isDebugEnabled())
LOG.debug("{} run {}", this, task);
if (task != null)
- _executor.invoke(task);
+ task.run();
if (LOG.isDebugEnabled())
LOG.debug("{} ran {}", this, task);
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java
index 4e14596bc45..6e5e1783d35 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java
@@ -24,7 +24,6 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
-import org.eclipse.jetty.util.thread.Invocable.InvocableExecutor;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
@@ -38,18 +37,13 @@ public class ProduceExecuteConsume implements ExecutionStrategy
private final Locker _locker = new Locker();
private final Producer _producer;
- private final InvocableExecutor _executor;
+ private final Executor _executor;
private State _state = State.IDLE;
public ProduceExecuteConsume(Producer producer, Executor executor)
- {
- this(producer,executor,InvocationType.NON_BLOCKING);
- }
-
- public ProduceExecuteConsume(Producer producer, Executor executor, InvocationType preferred)
{
_producer = producer;
- _executor = new InvocableExecutor(executor,preferred);
+ _executor = executor;
}
@Override
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java
new file mode 100644
index 00000000000..f6209683a59
--- /dev/null
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java
@@ -0,0 +1,204 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.util.thread;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+public class ReservedThreadExecutorTest
+{
+ final static int SIZE = 2;
+ TestExecutor _executor;
+ ReservedThreadExecutor _pae;
+
+ @Before
+ public void before() throws Exception
+ {
+ _executor = new TestExecutor();
+ _pae = new ReservedThreadExecutor(_executor,SIZE);
+ _pae.start();
+ }
+
+
+ @After
+ public void after() throws Exception
+ {
+ _pae.stop();
+ }
+
+ @Test
+ public void testStarted() throws Exception
+ {
+ assertThat(_executor._queue.size(),is(SIZE));
+ while(!_executor._queue.isEmpty())
+ _executor.execute();
+
+ assertThat(_pae.getCapacity(),is(SIZE));
+
+ long started = System.nanoTime();
+ while (_pae.getPreallocated()10)
+ break;
+ Thread.sleep(100);
+ }
+ assertThat(_pae.getPreallocated(),is(SIZE));
+ }
+
+ @Test
+ public void testPending() throws Exception
+ {
+ assertThat(_executor._queue.size(),is(SIZE));
+ assertThat(_pae.tryExecute(new NOOP()),is(false));
+ assertThat(_executor._queue.size(),is(SIZE));
+
+ _executor.execute();
+ assertThat(_executor._queue.size(),is(SIZE-1));
+ while (!_executor._queue.isEmpty())
+ _executor.execute();
+
+ long started = System.nanoTime();
+ while (_pae.getPreallocated()10)
+ break;
+ Thread.sleep(100);
+ }
+ assertThat(_executor._queue.size(),is(0));
+ assertThat(_pae.getPreallocated(),is(SIZE));
+
+ for (int i=SIZE;i-->0;)
+ assertThat(_pae.tryExecute(new Task()),is(true));
+ assertThat(_executor._queue.size(),is(1));
+ assertThat(_pae.getPreallocated(),is(0));
+
+ for (int i=SIZE;i-->0;)
+ assertThat(_pae.tryExecute(new NOOP()),is(false));
+ assertThat(_executor._queue.size(),is(SIZE));
+ assertThat(_pae.getPreallocated(),is(0));
+
+ assertThat(_pae.tryExecute(new NOOP()),is(false));
+ assertThat(_executor._queue.size(),is(SIZE));
+ assertThat(_pae.getPreallocated(),is(0));
+ }
+
+ @Test
+ public void testExecuted() throws Exception
+ {
+ while(!_executor._queue.isEmpty())
+ _executor.execute();
+ long started = System.nanoTime();
+ while (_pae.getPreallocated()10)
+ break;
+ Thread.sleep(100);
+ }
+ assertThat(_pae.getPreallocated(),is(SIZE));
+
+ Task[] task = new Task[SIZE];
+ for (int i=SIZE;i-->0;)
+ {
+ task[i] = new Task();
+ assertThat(_pae.tryExecute(task[i]),is(true));
+ }
+
+ for (int i=SIZE;i-->0;)
+ {
+ task[i]._ran.await(10,TimeUnit.SECONDS);
+ }
+
+ assertThat(_executor._queue.size(),is(1));
+ Task extra = new Task();
+ assertThat(_pae.tryExecute(extra),is(false));
+ assertThat(_executor._queue.size(),is(2));
+ Thread.sleep(100);
+ assertThat(extra._ran.getCount(),is(1L));
+
+ for (int i=SIZE;i-->0;)
+ {
+ task[i]._complete.countDown();
+ }
+
+ started = System.nanoTime();
+ while (_pae.getPreallocated()10)
+ break;
+ Thread.sleep(100);
+ }
+ assertThat(_pae.getPreallocated(),is(SIZE));
+
+
+ }
+
+
+ private static class TestExecutor implements Executor
+ {
+ Deque _queue = new ArrayDeque<>();
+
+ @Override
+ public void execute(Runnable task)
+ {
+ _queue.addLast(task);
+ }
+
+ public void execute()
+ {
+ Runnable task = _queue.pollFirst();
+ if (task!=null)
+ new Thread(task).start();
+ }
+ }
+
+ private static class NOOP implements Runnable
+ {
+ @Override
+ public void run() {}
+ }
+
+ private static class Task implements Runnable
+ {
+ private CountDownLatch _ran = new CountDownLatch(1);
+ private CountDownLatch _complete = new CountDownLatch(1);
+ @Override
+ public void run()
+ {
+ _ran.countDown();
+ try
+ {
+ _complete.await();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java
index 23981455096..f5156598cd4 100644
--- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java
@@ -158,7 +158,8 @@ public class ExecutionStrategyTest
@Override
public Runnable produce()
{
- if (tasks-->0)
+ final int id = --tasks;
+ if (id>=0)
{
try
{
@@ -171,6 +172,7 @@ public class ExecutionStrategyTest
@Override
public void run()
{
+ // System.err.println("RUN "+id);
latch.countDown();
}
};