diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SmallThreadPoolLoadTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SmallThreadPoolLoadTest.java index 1f3c30f8110..8f8a01a9065 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SmallThreadPoolLoadTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SmallThreadPoolLoadTest.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Locale; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,17 +41,14 @@ import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.ResetFrame; -import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; @@ -75,25 +71,6 @@ public class SmallThreadPoolLoadTest extends AbstractTest public void testConcurrentWithSmallServerThreadPool() throws Exception { start(new LoadServlet()); - AbstractHTTP2ServerConnectionFactory factory = connector.getBean(AbstractHTTP2ServerConnectionFactory.class); - factory.setExecutionStrategyFactory(new ExecuteProduceConsume.Factory() - { - @Override - public ExecutionStrategy newExecutionStrategy(ExecutionStrategy.Producer producer, Executor executor) - { - return new ExecuteProduceConsume(producer, executor) - { - @Override - protected void executeTask(Runnable task) - { - if (task instanceof Rejectable) - ((Rejectable)task).reject(); - else - super.executeTask(task); - } - }; - } - }); // Only one connection to the server. Session session = newClient(new Session.Listener.Adapter()); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java index 52bcef0f5f2..68dcc3b5b2d 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.io; +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ByteChannel; @@ -28,7 +29,6 @@ import java.nio.channels.SelectionKey; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; @@ -61,8 +61,9 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage private abstract class RunnableTask implements Runnable { - final String _operation; - RunnableTask(String op) + private final String _operation; + + protected RunnableTask(String op) { _operation=op; } @@ -74,19 +75,19 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage } } - private abstract class RejectableRunnable extends RunnableTask implements Rejectable + private abstract class RunnableCloseable extends RunnableTask implements Closeable { - RejectableRunnable(String op) + protected RunnableCloseable(String op) { super(op); } @Override - public void reject() + public void close() { try { - close(); + ChannelEndPoint.this.close(); } catch (Throwable x) { @@ -104,7 +105,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage } }; - private final Runnable _runFillable = new RejectableRunnable("runFillable") + private final Runnable _runFillable = new RunnableCloseable("runFillable") { @Override public void run() @@ -113,7 +114,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage } }; - private final Runnable _runCompleteWrite = new RejectableRunnable("runCompleteWrite") + private final Runnable _runCompleteWrite = new RunnableCloseable("runCompleteWrite") { @Override public void run() @@ -122,7 +123,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage } }; - private final Runnable _runCompleteWriteFillable = new RejectableRunnable("runCompleteWriteFillable") + private final Runnable _runCompleteWriteFillable = new RunnableCloseable("runCompleteWriteFillable") { @Override public void run() diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index cec1447e632..0327db6cbf3 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -43,7 +43,6 @@ import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; -import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable; import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; @@ -548,7 +547,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } } - class Accept implements Runnable, Rejectable + class Accept implements Runnable, Closeable { private final SelectableChannel channel; private final Object attachment; @@ -560,9 +559,9 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } @Override - public void reject() + public void close() { - LOG.debug("rejected accept {}",channel); + LOG.debug("closed accept of {}", channel); closeNoExceptions(channel); } @@ -582,7 +581,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } } - private class CreateEndPoint implements Product, Rejectable + private class CreateEndPoint implements Product, Closeable { private final SelectableChannel channel; private final SelectionKey key; @@ -608,9 +607,9 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } @Override - public void reject() + public void close() { - LOG.debug("rejected create {}",channel); + LOG.debug("closed creation of {}", channel); closeNoExceptions(channel); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index 11ca41feb32..8c13b919e94 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -26,13 +26,11 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Scheduler; /** - * An ChannelEndpoint that can be scheduled by {@link SelectorManager}. + * @deprecated use {@link SocketChannelEndPoint} instead */ @Deprecated -public class SelectChannelEndPoint extends SocketChannelEndPoint implements ManagedSelector.Selectable +public class SelectChannelEndPoint extends SocketChannelEndPoint { - public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class); - public SelectChannelEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout) { super(channel,selector,key,scheduler); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java index 92e42ae087b..2f688492990 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.util.thread.strategy; +import java.io.Closeable; import java.util.concurrent.Executor; import org.eclipse.jetty.util.log.Log; @@ -28,9 +29,8 @@ import org.eclipse.jetty.util.thread.Locker.Lock; import org.eclipse.jetty.util.thread.ThreadPool; /** - *
A strategy where the thread calls produce will always run the resulting task - * itself. The strategy may dispatches another thread to continue production. - *
+ *A strategy where the thread that produces will always run the resulting task.
+ *The strategy may then dispatch another thread to continue production.
*The strategy is also known by the nickname 'eat what you kill', which comes from * the hunting ethic that says a person should not kill anything he or she does not * plan on eating. In this case, the phrase is used to mean that a thread should @@ -39,8 +39,7 @@ import org.eclipse.jetty.util.thread.ThreadPool; * down by running the task in the same core, with good chances of having a hot CPU * cache. It also avoids the creation of a queue of produced tasks that the system * does not yet have capacity to consume, which can save memory and exert back - * pressure on producers. - *
+ * pressure on producers. */ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements ExecutionStrategy, Runnable { @@ -191,7 +190,7 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements } // Execute the task. - executeTask(task); + executeProduct(task); } return !idle; } @@ -203,9 +202,37 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements } } - protected void executeTask(Runnable task) + /** + *Only called when in {@link #isLowOnThreads() low threads mode} + * to execute the task produced by the producer.
+ *Because
+ *If the task implements {@link Rejectable}, then {@link Rejectable#reject()} + * is immediately called on the task object. If the task also implements + * {@link Closeable}, then {@link Closeable#close()} is called on the task object.
+ *If the task does not implement {@link Rejectable}, then it is + * {@link #execute(Runnable) executed}.
+ * + * @param task the produced task to execute + */ + protected void executeProduct(Runnable task) { - execute(task); + if (task instanceof Rejectable) + { + try + { + ((Rejectable)task).reject(); + if (task instanceof Closeable) + ((Closeable)task).close(); + } + catch (Throwable x) + { + LOG.debug(x); + } + } + else + { + execute(task); + } } private void executeProduceConsume() diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java index 29372697a90..8797e188aff 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.util.thread.strategy; +import java.io.Closeable; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; @@ -25,6 +26,12 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; +/** + *Base class for strategies that need to execute a task by submitting it to an {@link Executor}.
+ *If the submission to the {@code Executor} is rejected (via a {@link RejectedExecutionException}), + * the task is tested whether it implements {@link Closeable}; if it does, then {@link Closeable#close()} + * is called on the task object.
+ */ public abstract class ExecutingExecutionStrategy implements ExecutionStrategy { private static final Logger LOG = Log.getLogger(ExecutingExecutionStrategy.class); @@ -45,13 +52,13 @@ public abstract class ExecutingExecutionStrategy implements ExecutionStrategy } catch(RejectedExecutionException e) { - // If we cannot execute, then discard/reject the task and keep producing + // If we cannot execute, then close the task and keep producing. LOG.debug(e); - LOG.warn("RejectedExecution {}",task); + LOG.warn("Rejected execution of {}",task); try { - if (task instanceof Rejectable) - ((Rejectable)task).reject(); + if (task instanceof Closeable) + ((Closeable)task).close(); } catch (Exception x) {