diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java
index 55c3dca9df2..75006fba0ee 100644
--- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java
+++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java
@@ -47,9 +47,11 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
+import org.eclipse.jetty.util.thread.strategy.ProduceConsume;
/**
*
{@link HTTP2Client} provides an asynchronous, non-blocking implementation
@@ -127,6 +129,7 @@ public class HTTP2Client extends ContainerLifeCycle
private int initialSessionRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
private int initialStreamRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
+ private ExecutionStrategy.Factory executionStrategyFactory = new ProduceConsume.Factory();
@Override
protected void doStart() throws Exception
@@ -225,6 +228,16 @@ public class HTTP2Client extends ContainerLifeCycle
this.flowControlStrategyFactory = flowControlStrategyFactory;
}
+ public ExecutionStrategy.Factory getExecutionStrategyFactory()
+ {
+ return executionStrategyFactory;
+ }
+
+ public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionStrategyFactory)
+ {
+ this.executionStrategyFactory = executionStrategyFactory;
+ }
+
@ManagedAttribute("The number of selectors")
public int getSelectors()
{
diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
index bf5688ce669..7ae380daafd 100644
--- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
+++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java
@@ -39,6 +39,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle;
+import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
@@ -67,7 +68,8 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy();
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
- HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, client.getInputBufferSize(), promise, listener);
+ HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
+ parser, session, client.getInputBufferSize(), client.getExecutionStrategyFactory(), promise, listener);
connection.addListener(connectionListener);
return connection;
}
@@ -78,9 +80,9 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
private final Promise promise;
private final Session.Listener listener;
- public HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise promise, Session.Listener listener)
+ private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, ExecutionStrategy.Factory executionFactory, Promise promise, Session.Listener listener)
{
- super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
+ super(byteBufferPool, executor, endpoint, parser, session, bufferSize, executionFactory);
this.client = client;
this.promise = promise;
this.listener = listener;
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
index 034922f9255..e966923e04b 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
@@ -46,14 +46,14 @@ public class HTTP2Connection extends AbstractConnection
private final HTTP2Producer producer = new HTTP2Producer();
private final ExecutionStrategy executionStrategy;
- public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
+ public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize, ExecutionStrategy.Factory executionFactory)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
- this.executionStrategy = ExecutionStrategy.Factory.instanceFor(producer, executor);
+ this.executionStrategy = executionFactory.newExecutionStrategy(producer, executor);
}
public ISession getSession()
diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java
index 6bb9112f54c..0c70d362342 100644
--- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java
+++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java
@@ -35,6 +35,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.LifeCycle;
+import org.eclipse.jetty.util.thread.ExecutionStrategy;
@ManagedObject
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
@@ -46,6 +47,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxConcurrentStreams = -1;
private int maxHeaderBlockFragment = 0;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
+ private ExecutionStrategy.Factory executionStrategyFactory = ExecutionStrategy.Factory.getDefault();
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{
@@ -112,6 +114,16 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.flowControlStrategyFactory = flowControlStrategyFactory;
}
+ public ExecutionStrategy.Factory getExecutionStrategyFactory()
+ {
+ return executionStrategyFactory;
+ }
+
+ public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionStrategyFactory)
+ {
+ this.executionStrategyFactory = executionStrategyFactory;
+ }
+
public HttpConfiguration getHttpConfiguration()
{
return httpConfiguration;
@@ -135,7 +147,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
ServerParser parser = newServerParser(connector, session);
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
- endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
+ endPoint, httpConfiguration, parser, session, getInputBufferSize(), getExecutionStrategyFactory(), listener);
connection.addListener(connectionListener);
return configure(connection, connector, endPoint);
}
diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java
index 0b9755122ef..44cbff76a72 100644
--- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java
+++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java
@@ -51,6 +51,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.TypeUtil;
+import org.eclipse.jetty.util.thread.ExecutionStrategy;
public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
{
@@ -59,9 +60,9 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private final HttpConfiguration httpConfig;
private final List upgradeFrames = new ArrayList<>();
- public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
+ public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ExecutionStrategy.Factory executionFactory, ServerSessionListener listener)
{
- super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
+ super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize, executionFactory);
this.listener = listener;
this.httpConfig = httpConfig;
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
index 40d5a01a51e..1a406818bbf 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
@@ -66,10 +66,15 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
private Selector _selector;
public ManagedSelector(SelectorManager selectorManager, int id)
+ {
+ this(selectorManager, id, ExecutionStrategy.Factory.getDefault());
+ }
+
+ public ManagedSelector(SelectorManager selectorManager, int id, ExecutionStrategy.Factory executionFactory)
{
_selectorManager = selectorManager;
_id = id;
- _strategy = ExecutionStrategy.Factory.instanceFor(new SelectorProducer(), selectorManager.getExecutor());
+ _strategy = executionFactory.newExecutionStrategy(new SelectorProducer(), selectorManager.getExecutor());
setStopTimeout(5000);
}
@@ -554,7 +559,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
LOG.debug("rejected accept {}",channel);
closeNoExceptions(channel);
}
-
+
@Override
public void run()
{
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java
index acdf703e18f..4b2cffa3c9f 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java
@@ -54,7 +54,6 @@ public interface ExecutionStrategy
*/
public void execute();
-
/**
* A task that can handle {@link RejectedExecutionException}
*/
@@ -81,30 +80,63 @@ public interface ExecutionStrategy
Runnable produce();
}
- public static class Factory
+ /**
+ * A factory for {@link ExecutionStrategy}.
+ */
+ public static interface Factory
{
- private static final Logger LOG = Log.getLogger(Factory.class);
+ /**
+ * Creates a new {@link ExecutionStrategy}.
+ *
+ * @param producer the execution strategy producer
+ * @param executor the execution strategy executor
+ * @return a new {@link ExecutionStrategy}
+ */
+ public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor);
+ /**
+ * @return the default {@link ExecutionStrategy}
+ */
+ public static Factory getDefault()
+ {
+ return DefaultExecutionStrategyFactory.INSTANCE;
+ }
+
+ /**
+ * @deprecated use {@code getDefault().newExecutionStrategy(Producer, Executor)} instead
+ */
+ @Deprecated
public static ExecutionStrategy instanceFor(Producer producer, Executor executor)
{
- // TODO remove this mechanism before release
- String strategy = System.getProperty(producer.getClass().getName()+".ExecutionStrategy");
- if (strategy!=null)
+ return getDefault().newExecutionStrategy(producer, executor);
+ }
+ }
+
+ public static class DefaultExecutionStrategyFactory implements Factory
+ {
+ private static final Logger LOG = Log.getLogger(Factory.class);
+ private static final Factory INSTANCE = new DefaultExecutionStrategyFactory();
+
+ @Override
+ public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
+ {
+ String strategy = System.getProperty(producer.getClass().getName() + ".ExecutionStrategy");
+ if (strategy != null)
{
try
{
Class extends ExecutionStrategy> c = Loader.loadClass(strategy);
- Constructor extends ExecutionStrategy> m = c.getConstructor(Producer.class,Executor.class);
- LOG.info("Use {} for {}",c.getSimpleName(),producer.getClass().getName());
- return m.newInstance(producer,executor);
+ Constructor extends ExecutionStrategy> m = c.getConstructor(Producer.class, Executor.class);
+ LOG.info("Use {} for {}", c.getSimpleName(), producer.getClass().getName());
+ return m.newInstance(producer, executor);
}
- catch(Exception e)
+ catch (Exception e)
{
LOG.warn(e);
}
}
-
- return new ExecuteProduceConsume(producer,executor);
+
+ return new ExecuteProduceConsume(producer, executor);
}
}
}
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
index 4643775e6a9..854aa648e2a 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
@@ -46,6 +46,7 @@ import org.eclipse.jetty.util.thread.ThreadPool;
public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
{
private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
+
private final Locker _locker = new Locker();
private final Runnable _runExecute = new RunExecute();
private final Producer _producer;
@@ -61,7 +62,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
{
this(producer,executor,(executor instanceof ThreadPool)?new ProduceExecuteConsume(producer,executor):null);
}
-
+
public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy)
{
this._producer = producer;
@@ -75,7 +76,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
{
if (LOG.isDebugEnabled())
LOG.debug("{} execute",this);
-
+
boolean produce=false;
try (Lock locked = _locker.lock())
{
@@ -140,7 +141,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
// suffer as badly from thread starvation
while (_threadpool!=null && _threadpool.isLowOnThreads())
{
- LOG.debug("EWYK low resources {}",this);
+ LOG.debug("EPC low resources {}",this);
try
{
_lowresources.execute();
@@ -151,7 +152,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
LOG.warn(e);
}
}
-
+
// no longer low resources so produceAndRun normally
produceAndRun();
}
@@ -272,7 +273,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
public String toString()
{
StringBuilder builder = new StringBuilder();
- builder.append("EPR ");
+ builder.append("EPC ");
try (Lock locked = _locker.lock())
{
builder.append(_idle?"Idle/":"");
@@ -292,4 +293,13 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
execute();
}
}
+
+ public static class Factory implements ExecutionStrategy.Factory
+ {
+ @Override
+ public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
+ {
+ return new ExecuteProduceConsume(producer, executor);
+ }
+ }
}
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java
index d15d6f0b965..0971944df8b 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java
@@ -20,6 +20,8 @@ package org.eclipse.jetty.util.thread.strategy;
import java.util.concurrent.Executor;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
/**
@@ -28,6 +30,8 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy;
*/
public class ProduceConsume implements ExecutionStrategy, Runnable
{
+ private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
+
private final Producer _producer;
private final Executor _executor;
@@ -45,11 +49,13 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
{
// Produce a task.
Runnable task = _producer.produce();
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} produced {}", this, task);
if (task == null)
break;
- // run the task.
+ // Run the task.
task.run();
}
}
@@ -65,4 +71,13 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
{
execute();
}
+
+ public static class Factory implements ExecutionStrategy.Factory
+ {
+ @Override
+ public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
+ {
+ return new ProduceConsume(producer, executor);
+ }
+ }
}
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java
index 9310c05d5f9..ab7ddc171b7 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java
@@ -31,7 +31,8 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy;
*/
public class ProduceExecuteConsume implements ExecutionStrategy
{
- private static final Logger LOG = Log.getLogger(ExecutionStrategy.class);
+ private static final Logger LOG = Log.getLogger(ProduceExecuteConsume.class);
+
private final Producer _producer;
private final Executor _executor;
@@ -50,7 +51,7 @@ public class ProduceExecuteConsume implements ExecutionStrategy
// Produce a task.
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
- LOG.debug("{} PER produced {}",_producer,task);
+ LOG.debug("{} produced {}", _producer, task);
if (task == null)
break;
@@ -60,7 +61,7 @@ public class ProduceExecuteConsume implements ExecutionStrategy
{
_executor.execute(task);
}
- catch(RejectedExecutionException e)
+ catch (RejectedExecutionException e)
{
// Discard/reject tasks that cannot be executed
if (task instanceof Rejectable)
@@ -84,4 +85,13 @@ public class ProduceExecuteConsume implements ExecutionStrategy
{
execute();
}
+
+ public static class Factory implements ExecutionStrategy.Factory
+ {
+ @Override
+ public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
+ {
+ return new ProduceExecuteConsume(producer, executor);
+ }
+ }
}