Fixes #514 - Allow ExecutionStrategy to be configurable.

Introduced setters and constructor parameters to components that use
ExecutionStrategy.
This commit is contained in:
Simone Bordet 2016-04-14 12:20:31 +02:00
parent 6163ee7293
commit a37fdcd0e2
10 changed files with 131 additions and 31 deletions

View File

@ -47,9 +47,11 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.ProduceConsume;
/** /**
* <p>{@link HTTP2Client} provides an asynchronous, non-blocking implementation * <p>{@link HTTP2Client} provides an asynchronous, non-blocking implementation
@ -127,6 +129,7 @@ public class HTTP2Client extends ContainerLifeCycle
private int initialSessionRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE; private int initialSessionRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
private int initialStreamRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE; private int initialStreamRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F); private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private ExecutionStrategy.Factory executionStrategyFactory = new ProduceConsume.Factory();
@Override @Override
protected void doStart() throws Exception protected void doStart() throws Exception
@ -225,6 +228,16 @@ public class HTTP2Client extends ContainerLifeCycle
this.flowControlStrategyFactory = flowControlStrategyFactory; this.flowControlStrategyFactory = flowControlStrategyFactory;
} }
public ExecutionStrategy.Factory getExecutionStrategyFactory()
{
return executionStrategyFactory;
}
public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionStrategyFactory)
{
this.executionStrategyFactory = executionStrategyFactory;
}
@ManagedAttribute("The number of selectors") @ManagedAttribute("The number of selectors")
public int getSelectors() public int getSelectors()
{ {

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2ClientConnectionFactory implements ClientConnectionFactory public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
@ -67,7 +68,8 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy(); FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy();
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl); HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
Parser parser = new Parser(byteBufferPool, session, 4096, 8192); Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, client.getInputBufferSize(), promise, listener); HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), client.getExecutionStrategyFactory(), promise, listener);
connection.addListener(connectionListener); connection.addListener(connectionListener);
return connection; return connection;
} }
@ -78,9 +80,9 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
private final Promise<Session> promise; private final Promise<Session> promise;
private final Session.Listener listener; private final Session.Listener listener;
public HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener) private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, ExecutionStrategy.Factory executionFactory, Promise<Session> promise, Session.Listener listener)
{ {
super(byteBufferPool, executor, endpoint, parser, session, bufferSize); super(byteBufferPool, executor, endpoint, parser, session, bufferSize, executionFactory);
this.client = client; this.client = client;
this.promise = promise; this.promise = promise;
this.listener = listener; this.listener = listener;

View File

@ -46,14 +46,14 @@ public class HTTP2Connection extends AbstractConnection
private final HTTP2Producer producer = new HTTP2Producer(); private final HTTP2Producer producer = new HTTP2Producer();
private final ExecutionStrategy executionStrategy; private final ExecutionStrategy executionStrategy;
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize) public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize, ExecutionStrategy.Factory executionFactory)
{ {
super(endPoint, executor); super(endPoint, executor);
this.byteBufferPool = byteBufferPool; this.byteBufferPool = byteBufferPool;
this.parser = parser; this.parser = parser;
this.session = session; this.session = session;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.executionStrategy = ExecutionStrategy.Factory.instanceFor(producer, executor); this.executionStrategy = executionFactory.newExecutionStrategy(producer, executor);
} }
public ISession getSession() public ISession getSession()

View File

@ -35,6 +35,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
@ManagedObject @ManagedObject
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
@ -46,6 +47,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxConcurrentStreams = -1; private int maxConcurrentStreams = -1;
private int maxHeaderBlockFragment = 0; private int maxHeaderBlockFragment = 0;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F); private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private ExecutionStrategy.Factory executionStrategyFactory = ExecutionStrategy.Factory.getDefault();
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration) public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{ {
@ -112,6 +114,16 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.flowControlStrategyFactory = flowControlStrategyFactory; this.flowControlStrategyFactory = flowControlStrategyFactory;
} }
public ExecutionStrategy.Factory getExecutionStrategyFactory()
{
return executionStrategyFactory;
}
public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionStrategyFactory)
{
this.executionStrategyFactory = executionStrategyFactory;
}
public HttpConfiguration getHttpConfiguration() public HttpConfiguration getHttpConfiguration()
{ {
return httpConfiguration; return httpConfiguration;
@ -135,7 +147,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
ServerParser parser = newServerParser(connector, session); ServerParser parser = newServerParser(connector, session);
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(), HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener); endPoint, httpConfiguration, parser, session, getInputBufferSize(), getExecutionStrategyFactory(), listener);
connection.addListener(connectionListener); connection.addListener(connectionListener);
return configure(connection, connector, endPoint); return configure(connection, connector, endPoint);
} }

View File

@ -51,6 +51,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
{ {
@ -59,9 +60,9 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private final HttpConfiguration httpConfig; private final HttpConfiguration httpConfig;
private final List<Frame> upgradeFrames = new ArrayList<>(); private final List<Frame> upgradeFrames = new ArrayList<>();
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener) public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ExecutionStrategy.Factory executionFactory, ServerSessionListener listener)
{ {
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize); super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize, executionFactory);
this.listener = listener; this.listener = listener;
this.httpConfig = httpConfig; this.httpConfig = httpConfig;
} }

View File

@ -66,10 +66,15 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
private Selector _selector; private Selector _selector;
public ManagedSelector(SelectorManager selectorManager, int id) public ManagedSelector(SelectorManager selectorManager, int id)
{
this(selectorManager, id, ExecutionStrategy.Factory.getDefault());
}
public ManagedSelector(SelectorManager selectorManager, int id, ExecutionStrategy.Factory executionFactory)
{ {
_selectorManager = selectorManager; _selectorManager = selectorManager;
_id = id; _id = id;
_strategy = ExecutionStrategy.Factory.instanceFor(new SelectorProducer(), selectorManager.getExecutor()); _strategy = executionFactory.newExecutionStrategy(new SelectorProducer(), selectorManager.getExecutor());
setStopTimeout(5000); setStopTimeout(5000);
} }
@ -554,7 +559,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
LOG.debug("rejected accept {}",channel); LOG.debug("rejected accept {}",channel);
closeNoExceptions(channel); closeNoExceptions(channel);
} }
@Override @Override
public void run() public void run()
{ {

View File

@ -54,7 +54,6 @@ public interface ExecutionStrategy
*/ */
public void execute(); public void execute();
/** /**
* A task that can handle {@link RejectedExecutionException} * A task that can handle {@link RejectedExecutionException}
*/ */
@ -81,30 +80,63 @@ public interface ExecutionStrategy
Runnable produce(); Runnable produce();
} }
public static class Factory /**
* <p>A factory for {@link ExecutionStrategy}.</p>
*/
public static interface Factory
{ {
private static final Logger LOG = Log.getLogger(Factory.class); /**
* <p>Creates a new {@link ExecutionStrategy}.</p>
*
* @param producer the execution strategy producer
* @param executor the execution strategy executor
* @return a new {@link ExecutionStrategy}
*/
public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor);
/**
* @return the default {@link ExecutionStrategy}
*/
public static Factory getDefault()
{
return DefaultExecutionStrategyFactory.INSTANCE;
}
/**
* @deprecated use {@code getDefault().newExecutionStrategy(Producer, Executor)} instead
*/
@Deprecated
public static ExecutionStrategy instanceFor(Producer producer, Executor executor) public static ExecutionStrategy instanceFor(Producer producer, Executor executor)
{ {
// TODO remove this mechanism before release return getDefault().newExecutionStrategy(producer, executor);
String strategy = System.getProperty(producer.getClass().getName()+".ExecutionStrategy"); }
if (strategy!=null) }
public static class DefaultExecutionStrategyFactory implements Factory
{
private static final Logger LOG = Log.getLogger(Factory.class);
private static final Factory INSTANCE = new DefaultExecutionStrategyFactory();
@Override
public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
{
String strategy = System.getProperty(producer.getClass().getName() + ".ExecutionStrategy");
if (strategy != null)
{ {
try try
{ {
Class<? extends ExecutionStrategy> c = Loader.loadClass(strategy); Class<? extends ExecutionStrategy> c = Loader.loadClass(strategy);
Constructor<? extends ExecutionStrategy> m = c.getConstructor(Producer.class,Executor.class); Constructor<? extends ExecutionStrategy> m = c.getConstructor(Producer.class, Executor.class);
LOG.info("Use {} for {}",c.getSimpleName(),producer.getClass().getName()); LOG.info("Use {} for {}", c.getSimpleName(), producer.getClass().getName());
return m.newInstance(producer,executor); return m.newInstance(producer, executor);
} }
catch(Exception e) catch (Exception e)
{ {
LOG.warn(e); LOG.warn(e);
} }
} }
return new ExecuteProduceConsume(producer,executor); return new ExecuteProduceConsume(producer, executor);
} }
} }
} }

View File

@ -46,6 +46,7 @@ import org.eclipse.jetty.util.thread.ThreadPool;
public class ExecuteProduceConsume implements ExecutionStrategy, Runnable public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
{ {
private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class); private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
private final Locker _locker = new Locker(); private final Locker _locker = new Locker();
private final Runnable _runExecute = new RunExecute(); private final Runnable _runExecute = new RunExecute();
private final Producer _producer; private final Producer _producer;
@ -61,7 +62,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
{ {
this(producer,executor,(executor instanceof ThreadPool)?new ProduceExecuteConsume(producer,executor):null); this(producer,executor,(executor instanceof ThreadPool)?new ProduceExecuteConsume(producer,executor):null);
} }
public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy) public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy)
{ {
this._producer = producer; this._producer = producer;
@ -75,7 +76,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} execute",this); LOG.debug("{} execute",this);
boolean produce=false; boolean produce=false;
try (Lock locked = _locker.lock()) try (Lock locked = _locker.lock())
{ {
@ -140,7 +141,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
// suffer as badly from thread starvation // suffer as badly from thread starvation
while (_threadpool!=null && _threadpool.isLowOnThreads()) while (_threadpool!=null && _threadpool.isLowOnThreads())
{ {
LOG.debug("EWYK low resources {}",this); LOG.debug("EPC low resources {}",this);
try try
{ {
_lowresources.execute(); _lowresources.execute();
@ -151,7 +152,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
LOG.warn(e); LOG.warn(e);
} }
} }
// no longer low resources so produceAndRun normally // no longer low resources so produceAndRun normally
produceAndRun(); produceAndRun();
} }
@ -272,7 +273,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
public String toString() public String toString()
{ {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("EPR "); builder.append("EPC ");
try (Lock locked = _locker.lock()) try (Lock locked = _locker.lock())
{ {
builder.append(_idle?"Idle/":""); builder.append(_idle?"Idle/":"");
@ -292,4 +293,13 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
execute(); execute();
} }
} }
public static class Factory implements ExecutionStrategy.Factory
{
@Override
public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
{
return new ExecuteProduceConsume(producer, executor);
}
}
} }

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.util.thread.strategy;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ExecutionStrategy;
/** /**
@ -28,6 +30,8 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy;
*/ */
public class ProduceConsume implements ExecutionStrategy, Runnable public class ProduceConsume implements ExecutionStrategy, Runnable
{ {
private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
private final Producer _producer; private final Producer _producer;
private final Executor _executor; private final Executor _executor;
@ -45,11 +49,13 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
{ {
// Produce a task. // Produce a task.
Runnable task = _producer.produce(); Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} produced {}", this, task);
if (task == null) if (task == null)
break; break;
// run the task. // Run the task.
task.run(); task.run();
} }
} }
@ -65,4 +71,13 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
{ {
execute(); execute();
} }
public static class Factory implements ExecutionStrategy.Factory
{
@Override
public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
{
return new ProduceConsume(producer, executor);
}
}
} }

View File

@ -31,7 +31,8 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy;
*/ */
public class ProduceExecuteConsume implements ExecutionStrategy public class ProduceExecuteConsume implements ExecutionStrategy
{ {
private static final Logger LOG = Log.getLogger(ExecutionStrategy.class); private static final Logger LOG = Log.getLogger(ProduceExecuteConsume.class);
private final Producer _producer; private final Producer _producer;
private final Executor _executor; private final Executor _executor;
@ -50,7 +51,7 @@ public class ProduceExecuteConsume implements ExecutionStrategy
// Produce a task. // Produce a task.
Runnable task = _producer.produce(); Runnable task = _producer.produce();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} PER produced {}",_producer,task); LOG.debug("{} produced {}", _producer, task);
if (task == null) if (task == null)
break; break;
@ -60,7 +61,7 @@ public class ProduceExecuteConsume implements ExecutionStrategy
{ {
_executor.execute(task); _executor.execute(task);
} }
catch(RejectedExecutionException e) catch (RejectedExecutionException e)
{ {
// Discard/reject tasks that cannot be executed // Discard/reject tasks that cannot be executed
if (task instanceof Rejectable) if (task instanceof Rejectable)
@ -84,4 +85,13 @@ public class ProduceExecuteConsume implements ExecutionStrategy
{ {
execute(); execute();
} }
public static class Factory implements ExecutionStrategy.Factory
{
@Override
public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
{
return new ProduceExecuteConsume(producer, executor);
}
}
} }