Fixes #514 - Allow ExecutionStrategy to be configurable.

Introduced setters and constructor parameters to components that use
ExecutionStrategy.
This commit is contained in:
Simone Bordet 2016-04-14 12:20:31 +02:00
parent e1b0967e59
commit 6827c5b045
10 changed files with 137 additions and 32 deletions

View File

@ -46,9 +46,11 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.ProduceConsume;
/**
* <p>{@link HTTP2Client} provides an asynchronous, non-blocking implementation
@ -126,6 +128,7 @@ public class HTTP2Client extends ContainerLifeCycle
private int initialSessionRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
private int initialStreamRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private ExecutionStrategy.Factory executionStrategyFactory = new ProduceConsume.Factory();
@Override
protected void doStart() throws Exception
@ -224,6 +227,16 @@ public class HTTP2Client extends ContainerLifeCycle
this.flowControlStrategyFactory = flowControlStrategyFactory;
}
public ExecutionStrategy.Factory getExecutionStrategyFactory()
{
return executionStrategyFactory;
}
public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionStrategyFactory)
{
this.executionStrategyFactory = executionStrategyFactory;
}
@ManagedAttribute("The number of selectors")
public int getSelectors()
{

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
@ -70,7 +71,8 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy();
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, client.getInputBufferSize(), promise, listener);
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), client.getExecutionStrategyFactory(), promise, listener);
connection.addListener(connectionListener);
return connection;
}
@ -108,9 +110,9 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
private final Promise<Session> promise;
private final Session.Listener listener;
public HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, ExecutionStrategy.Factory executionFactory, Promise<Session> promise, Session.Listener listener)
{
super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
super(byteBufferPool, executor, endpoint, parser, session, bufferSize, executionFactory);
this.client = client;
this.promise = promise;
this.listener = listener;

View File

@ -46,14 +46,14 @@ public class HTTP2Connection extends AbstractConnection
private final HTTP2Producer producer = new HTTP2Producer();
private final ExecutionStrategy executionStrategy;
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize, ExecutionStrategy.Factory executionFactory)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
this.executionStrategy = ExecutionStrategy.Factory.instanceFor(producer, executor);
this.executionStrategy = executionFactory.newExecutionStrategy(producer, executor);
}
public ISession getSession()

View File

@ -35,6 +35,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
@ManagedObject
public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory
@ -46,6 +47,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxConcurrentStreams = -1;
private int maxHeaderBlockFragment = 0;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private ExecutionStrategy.Factory executionStrategyFactory = ExecutionStrategy.Factory.getDefault();
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
{
@ -112,6 +114,16 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.flowControlStrategyFactory = flowControlStrategyFactory;
}
public ExecutionStrategy.Factory getExecutionStrategyFactory()
{
return executionStrategyFactory;
}
public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionStrategyFactory)
{
this.executionStrategyFactory = executionStrategyFactory;
}
public HttpConfiguration getHttpConfiguration()
{
return httpConfiguration;
@ -137,7 +149,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
ServerParser parser = newServerParser(connector, session);
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
endPoint, httpConfiguration, parser, session, getInputBufferSize(), getExecutionStrategyFactory(), listener);
connection.addListener(connectionListener);
return configure(connection, connector, endPoint);
}

View File

@ -51,6 +51,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
{
@ -61,7 +62,12 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
this(byteBufferPool, executor, endPoint, httpConfig, parser, session, inputBufferSize, ExecutionStrategy.Factory.getDefault(), listener);
}
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ExecutionStrategy.Factory executionFactory, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize, executionFactory);
this.listener = listener;
this.httpConfig = httpConfig;
}

View File

@ -67,10 +67,15 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
private Selector _selector;
public ManagedSelector(SelectorManager selectorManager, int id)
{
this(selectorManager, id, ExecutionStrategy.Factory.getDefault());
}
public ManagedSelector(SelectorManager selectorManager, int id, ExecutionStrategy.Factory executionFactory)
{
_selectorManager = selectorManager;
_id = id;
_strategy = ExecutionStrategy.Factory.instanceFor(new SelectorProducer(), selectorManager.getExecutor());
_strategy = executionFactory.newExecutionStrategy(new SelectorProducer(), selectorManager.getExecutor());
setStopTimeout(5000);
}
@ -559,7 +564,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
LOG.debug("rejected accept {}",channel);
closeNoExceptions(channel);
}
@Override
public void run()
{

View File

@ -54,7 +54,6 @@ public interface ExecutionStrategy
*/
public void execute();
/**
* A task that can handle {@link RejectedExecutionException}
*/
@ -62,7 +61,7 @@ public interface ExecutionStrategy
{
public void reject();
}
/**
* <p>A producer of {@link Runnable} tasks to run.</p>
* <p>The {@link ExecutionStrategy} will repeatedly invoke {@link #produce()} until
@ -81,30 +80,63 @@ public interface ExecutionStrategy
Runnable produce();
}
public static class Factory
/**
* <p>A factory for {@link ExecutionStrategy}.</p>
*/
public static interface Factory
{
private static final Logger LOG = Log.getLogger(Factory.class);
/**
* <p>Creates a new {@link ExecutionStrategy}.</p>
*
* @param producer the execution strategy producer
* @param executor the execution strategy executor
* @return a new {@link ExecutionStrategy}
*/
public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor);
/**
* @return the default {@link ExecutionStrategy}
*/
public static Factory getDefault()
{
return DefaultExecutionStrategyFactory.INSTANCE;
}
/**
* @deprecated use {@code getDefault().newExecutionStrategy(Producer, Executor)} instead
*/
@Deprecated
public static ExecutionStrategy instanceFor(Producer producer, Executor executor)
{
// TODO remove this mechanism before release
String strategy = System.getProperty(producer.getClass().getName()+".ExecutionStrategy");
if (strategy!=null)
return getDefault().newExecutionStrategy(producer, executor);
}
}
public static class DefaultExecutionStrategyFactory implements Factory
{
private static final Logger LOG = Log.getLogger(Factory.class);
private static final Factory INSTANCE = new DefaultExecutionStrategyFactory();
@Override
public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
{
String strategy = System.getProperty(producer.getClass().getName() + ".ExecutionStrategy");
if (strategy != null)
{
try
{
Class<? extends ExecutionStrategy> c = Loader.loadClass(producer.getClass(),strategy);
Constructor<? extends ExecutionStrategy> m = c.getConstructor(Producer.class,Executor.class);
LOG.info("Use {} for {}",c.getSimpleName(),producer.getClass().getName());
return m.newInstance(producer,executor);
Class<? extends ExecutionStrategy> c = Loader.loadClass(producer.getClass(), strategy);
Constructor<? extends ExecutionStrategy> m = c.getConstructor(Producer.class, Executor.class);
LOG.info("Use {} for {}", c.getSimpleName(), producer.getClass().getName());
return m.newInstance(producer, executor);
}
catch(Exception e)
catch (Exception e)
{
LOG.warn(e);
}
}
return new ExecuteProduceConsume(producer,executor);
return new ExecuteProduceConsume(producer, executor);
}
}
}

View File

@ -46,6 +46,7 @@ import org.eclipse.jetty.util.thread.ThreadPool;
public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
{
private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
private final Locker _locker = new Locker();
private final Runnable _runExecute = new RunExecute();
private final Producer _producer;
@ -61,7 +62,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
{
this(producer,executor,(executor instanceof ThreadPool)?new ProduceExecuteConsume(producer,executor):null);
}
public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy)
{
this._producer = producer;
@ -75,7 +76,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
{
if (LOG.isDebugEnabled())
LOG.debug("{} execute",this);
boolean produce=false;
try (Lock locked = _locker.lock())
{
@ -140,7 +141,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
// suffer as badly from thread starvation
while (_threadpool!=null && _threadpool.isLowOnThreads())
{
LOG.debug("EWYK low resources {}",this);
LOG.debug("EPC low resources {}",this);
try
{
_lowresources.execute();
@ -151,7 +152,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
LOG.warn(e);
}
}
// no longer low resources so produceAndRun normally
produceAndRun();
}
@ -272,7 +273,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
public String toString()
{
StringBuilder builder = new StringBuilder();
builder.append("EPR ");
builder.append("EPC ");
try (Lock locked = _locker.lock())
{
builder.append(_idle?"Idle/":"");
@ -292,4 +293,13 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
execute();
}
}
public static class Factory implements ExecutionStrategy.Factory
{
@Override
public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
{
return new ExecuteProduceConsume(producer, executor);
}
}
}

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.util.thread.strategy;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
/**
@ -28,6 +30,8 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy;
*/
public class ProduceConsume implements ExecutionStrategy, Runnable
{
private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
private final Producer _producer;
private final Executor _executor;
@ -45,11 +49,13 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
{
// Produce a task.
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} produced {}", this, task);
if (task == null)
break;
// run the task.
// Run the task.
task.run();
}
}
@ -65,4 +71,13 @@ public class ProduceConsume implements ExecutionStrategy, Runnable
{
execute();
}
public static class Factory implements ExecutionStrategy.Factory
{
@Override
public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
{
return new ProduceConsume(producer, executor);
}
}
}

View File

@ -31,7 +31,8 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy;
*/
public class ProduceExecuteConsume implements ExecutionStrategy
{
private static final Logger LOG = Log.getLogger(ExecutionStrategy.class);
private static final Logger LOG = Log.getLogger(ProduceExecuteConsume.class);
private final Producer _producer;
private final Executor _executor;
@ -50,7 +51,7 @@ public class ProduceExecuteConsume implements ExecutionStrategy
// Produce a task.
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} PER produced {}",_producer,task);
LOG.debug("{} produced {}", _producer, task);
if (task == null)
break;
@ -60,7 +61,7 @@ public class ProduceExecuteConsume implements ExecutionStrategy
{
_executor.execute(task);
}
catch(RejectedExecutionException e)
catch (RejectedExecutionException e)
{
// Discard/reject tasks that cannot be executed
if (task instanceof Rejectable)
@ -84,4 +85,13 @@ public class ProduceExecuteConsume implements ExecutionStrategy
{
execute();
}
public static class Factory implements ExecutionStrategy.Factory
{
@Override
public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
{
return new ProduceExecuteConsume(producer, executor);
}
}
}