From 6827c5b0458f850fa7687533d2e01f67ecf6488a Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 14 Apr 2016 12:20:31 +0200 Subject: [PATCH] Fixes #514 - Allow ExecutionStrategy to be configurable. Introduced setters and constructor parameters to components that use ExecutionStrategy. --- .../jetty/http2/client/HTTP2Client.java | 13 ++++ .../client/HTTP2ClientConnectionFactory.java | 8 ++- .../eclipse/jetty/http2/HTTP2Connection.java | 4 +- .../AbstractHTTP2ServerConnectionFactory.java | 14 ++++- .../http2/server/HTTP2ServerConnection.java | 8 ++- .../org/eclipse/jetty/io/ManagedSelector.java | 9 ++- .../jetty/util/thread/ExecutionStrategy.java | 60 ++++++++++++++----- .../strategy/ExecuteProduceConsume.java | 20 +++++-- .../util/thread/strategy/ProduceConsume.java | 17 +++++- .../strategy/ProduceExecuteConsume.java | 16 ++++- 10 files changed, 137 insertions(+), 32 deletions(-) diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java index 05a8621ab8c..c5cac53e8a8 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java @@ -46,9 +46,11 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.strategy.ProduceConsume; /** *

{@link HTTP2Client} provides an asynchronous, non-blocking implementation @@ -126,6 +128,7 @@ public class HTTP2Client extends ContainerLifeCycle private int initialSessionRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE; private int initialStreamRecvWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE; private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F); + private ExecutionStrategy.Factory executionStrategyFactory = new ProduceConsume.Factory(); @Override protected void doStart() throws Exception @@ -224,6 +227,16 @@ public class HTTP2Client extends ContainerLifeCycle this.flowControlStrategyFactory = flowControlStrategyFactory; } + public ExecutionStrategy.Factory getExecutionStrategyFactory() + { + return executionStrategyFactory; + } + + public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionStrategyFactory) + { + this.executionStrategyFactory = executionStrategyFactory; + } + @ManagedAttribute("The number of selectors") public int getSelectors() { diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index 28aad7e4e47..fbae47ee42e 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -39,6 +39,7 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Scheduler; public class HTTP2ClientConnectionFactory implements ClientConnectionFactory @@ -70,7 +71,8 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy(); HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl); Parser parser = new Parser(byteBufferPool, session, 4096, 8192); - HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, parser, session, client.getInputBufferSize(), promise, listener); + HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, + parser, session, client.getInputBufferSize(), client.getExecutionStrategyFactory(), promise, listener); connection.addListener(connectionListener); return connection; } @@ -108,9 +110,9 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory private final Promise promise; private final Session.Listener listener; - public HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise promise, Session.Listener listener) + private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, ExecutionStrategy.Factory executionFactory, Promise promise, Session.Listener listener) { - super(byteBufferPool, executor, endpoint, parser, session, bufferSize); + super(byteBufferPool, executor, endpoint, parser, session, bufferSize, executionFactory); this.client = client; this.promise = promise; this.listener = listener; diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 034922f9255..e966923e04b 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -46,14 +46,14 @@ public class HTTP2Connection extends AbstractConnection private final HTTP2Producer producer = new HTTP2Producer(); private final ExecutionStrategy executionStrategy; - public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize) + public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize, ExecutionStrategy.Factory executionFactory) { super(endPoint, executor); this.byteBufferPool = byteBufferPool; this.parser = parser; this.session = session; this.bufferSize = bufferSize; - this.executionStrategy = ExecutionStrategy.Factory.instanceFor(producer, executor); + this.executionStrategy = executionFactory.newExecutionStrategy(producer, executor); } public ISession getSession() diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java index 8b6debe4f82..80b2584f3fb 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java @@ -35,6 +35,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.ExecutionStrategy; @ManagedObject public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConnectionFactory @@ -46,6 +47,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne private int maxConcurrentStreams = -1; private int maxHeaderBlockFragment = 0; private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F); + private ExecutionStrategy.Factory executionStrategyFactory = ExecutionStrategy.Factory.getDefault(); public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration) { @@ -112,6 +114,16 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne this.flowControlStrategyFactory = flowControlStrategyFactory; } + public ExecutionStrategy.Factory getExecutionStrategyFactory() + { + return executionStrategyFactory; + } + + public void setExecutionStrategyFactory(ExecutionStrategy.Factory executionStrategyFactory) + { + this.executionStrategyFactory = executionStrategyFactory; + } + public HttpConfiguration getHttpConfiguration() { return httpConfiguration; @@ -137,7 +149,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne ServerParser parser = newServerParser(connector, session); HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(), - endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener); + endPoint, httpConfiguration, parser, session, getInputBufferSize(), getExecutionStrategyFactory(), listener); connection.addListener(connectionListener); return configure(connection, connector, endPoint); } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index 0b9755122ef..ad3ef6cd988 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -51,6 +51,7 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.util.thread.ExecutionStrategy; public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo { @@ -61,7 +62,12 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener) { - super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize); + this(byteBufferPool, executor, endPoint, httpConfig, parser, session, inputBufferSize, ExecutionStrategy.Factory.getDefault(), listener); + } + + public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ExecutionStrategy.Factory executionFactory, ServerSessionListener listener) + { + super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize, executionFactory); this.listener = listener; this.httpConfig = httpConfig; } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 0af682740f3..46a1cffbd07 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -67,10 +67,15 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump private Selector _selector; public ManagedSelector(SelectorManager selectorManager, int id) + { + this(selectorManager, id, ExecutionStrategy.Factory.getDefault()); + } + + public ManagedSelector(SelectorManager selectorManager, int id, ExecutionStrategy.Factory executionFactory) { _selectorManager = selectorManager; _id = id; - _strategy = ExecutionStrategy.Factory.instanceFor(new SelectorProducer(), selectorManager.getExecutor()); + _strategy = executionFactory.newExecutionStrategy(new SelectorProducer(), selectorManager.getExecutor()); setStopTimeout(5000); } @@ -559,7 +564,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump LOG.debug("rejected accept {}",channel); closeNoExceptions(channel); } - + @Override public void run() { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java index 31a17c82cb5..0c7557e894e 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutionStrategy.java @@ -54,7 +54,6 @@ public interface ExecutionStrategy */ public void execute(); - /** * A task that can handle {@link RejectedExecutionException} */ @@ -62,7 +61,7 @@ public interface ExecutionStrategy { public void reject(); } - + /** *

A producer of {@link Runnable} tasks to run.

*

The {@link ExecutionStrategy} will repeatedly invoke {@link #produce()} until @@ -81,30 +80,63 @@ public interface ExecutionStrategy Runnable produce(); } - public static class Factory + /** + *

A factory for {@link ExecutionStrategy}.

+ */ + public static interface Factory { - private static final Logger LOG = Log.getLogger(Factory.class); + /** + *

Creates a new {@link ExecutionStrategy}.

+ * + * @param producer the execution strategy producer + * @param executor the execution strategy executor + * @return a new {@link ExecutionStrategy} + */ + public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor); + /** + * @return the default {@link ExecutionStrategy} + */ + public static Factory getDefault() + { + return DefaultExecutionStrategyFactory.INSTANCE; + } + + /** + * @deprecated use {@code getDefault().newExecutionStrategy(Producer, Executor)} instead + */ + @Deprecated public static ExecutionStrategy instanceFor(Producer producer, Executor executor) { - // TODO remove this mechanism before release - String strategy = System.getProperty(producer.getClass().getName()+".ExecutionStrategy"); - if (strategy!=null) + return getDefault().newExecutionStrategy(producer, executor); + } + } + + public static class DefaultExecutionStrategyFactory implements Factory + { + private static final Logger LOG = Log.getLogger(Factory.class); + private static final Factory INSTANCE = new DefaultExecutionStrategyFactory(); + + @Override + public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor) + { + String strategy = System.getProperty(producer.getClass().getName() + ".ExecutionStrategy"); + if (strategy != null) { try { - Class c = Loader.loadClass(producer.getClass(),strategy); - Constructor m = c.getConstructor(Producer.class,Executor.class); - LOG.info("Use {} for {}",c.getSimpleName(),producer.getClass().getName()); - return m.newInstance(producer,executor); + Class c = Loader.loadClass(producer.getClass(), strategy); + Constructor m = c.getConstructor(Producer.class, Executor.class); + LOG.info("Use {} for {}", c.getSimpleName(), producer.getClass().getName()); + return m.newInstance(producer, executor); } - catch(Exception e) + catch (Exception e) { LOG.warn(e); } } - - return new ExecuteProduceConsume(producer,executor); + + return new ExecuteProduceConsume(producer, executor); } } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java index 4643775e6a9..854aa648e2a 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java @@ -46,6 +46,7 @@ import org.eclipse.jetty.util.thread.ThreadPool; public class ExecuteProduceConsume implements ExecutionStrategy, Runnable { private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class); + private final Locker _locker = new Locker(); private final Runnable _runExecute = new RunExecute(); private final Producer _producer; @@ -61,7 +62,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable { this(producer,executor,(executor instanceof ThreadPool)?new ProduceExecuteConsume(producer,executor):null); } - + public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy) { this._producer = producer; @@ -75,7 +76,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable { if (LOG.isDebugEnabled()) LOG.debug("{} execute",this); - + boolean produce=false; try (Lock locked = _locker.lock()) { @@ -140,7 +141,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable // suffer as badly from thread starvation while (_threadpool!=null && _threadpool.isLowOnThreads()) { - LOG.debug("EWYK low resources {}",this); + LOG.debug("EPC low resources {}",this); try { _lowresources.execute(); @@ -151,7 +152,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable LOG.warn(e); } } - + // no longer low resources so produceAndRun normally produceAndRun(); } @@ -272,7 +273,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("EPR "); + builder.append("EPC "); try (Lock locked = _locker.lock()) { builder.append(_idle?"Idle/":""); @@ -292,4 +293,13 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable execute(); } } + + public static class Factory implements ExecutionStrategy.Factory + { + @Override + public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor) + { + return new ExecuteProduceConsume(producer, executor); + } + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java index d15d6f0b965..0971944df8b 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceConsume.java @@ -20,6 +20,8 @@ package org.eclipse.jetty.util.thread.strategy; import java.util.concurrent.Executor; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; /** @@ -28,6 +30,8 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy; */ public class ProduceConsume implements ExecutionStrategy, Runnable { + private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class); + private final Producer _producer; private final Executor _executor; @@ -45,11 +49,13 @@ public class ProduceConsume implements ExecutionStrategy, Runnable { // Produce a task. Runnable task = _producer.produce(); + if (LOG.isDebugEnabled()) + LOG.debug("{} produced {}", this, task); if (task == null) break; - // run the task. + // Run the task. task.run(); } } @@ -65,4 +71,13 @@ public class ProduceConsume implements ExecutionStrategy, Runnable { execute(); } + + public static class Factory implements ExecutionStrategy.Factory + { + @Override + public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor) + { + return new ProduceConsume(producer, executor); + } + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java index 9310c05d5f9..ab7ddc171b7 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ProduceExecuteConsume.java @@ -31,7 +31,8 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy; */ public class ProduceExecuteConsume implements ExecutionStrategy { - private static final Logger LOG = Log.getLogger(ExecutionStrategy.class); + private static final Logger LOG = Log.getLogger(ProduceExecuteConsume.class); + private final Producer _producer; private final Executor _executor; @@ -50,7 +51,7 @@ public class ProduceExecuteConsume implements ExecutionStrategy // Produce a task. Runnable task = _producer.produce(); if (LOG.isDebugEnabled()) - LOG.debug("{} PER produced {}",_producer,task); + LOG.debug("{} produced {}", _producer, task); if (task == null) break; @@ -60,7 +61,7 @@ public class ProduceExecuteConsume implements ExecutionStrategy { _executor.execute(task); } - catch(RejectedExecutionException e) + catch (RejectedExecutionException e) { // Discard/reject tasks that cannot be executed if (task instanceof Rejectable) @@ -84,4 +85,13 @@ public class ProduceExecuteConsume implements ExecutionStrategy { execute(); } + + public static class Factory implements ExecutionStrategy.Factory + { + @Override + public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor) + { + return new ProduceExecuteConsume(producer, executor); + } + } }