From 016fc6569fb32052c3833d5bfcaeb5b12d5fa432 Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Thu, 12 May 2016 00:13:35 +0200
Subject: [PATCH] Fixes #558 - HTTP/2 server hangs when thread pool is low on
threads.
Modified ExecuteProduceConsume to test for Rejectable tasks when
trying to execute a task in low threads mode, and if so, reject them
immediately.
---
.../http2/client/SmallThreadPoolLoadTest.java | 23 ----------
.../org/eclipse/jetty/io/ManagedSelector.java | 13 +++---
.../jetty/io/SelectChannelEndPoint.java | 14 +++---
.../strategy/ExecuteProduceConsume.java | 43 +++++++++++++++----
.../strategy/ExecutingExecutionStrategy.java | 15 +++++--
5 files changed, 59 insertions(+), 49 deletions(-)
diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SmallThreadPoolLoadTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SmallThreadPoolLoadTest.java
index 1f3c30f8110..8f8a01a9065 100644
--- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SmallThreadPoolLoadTest.java
+++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SmallThreadPoolLoadTest.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,17 +41,14 @@ import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
-import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
-import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
-import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
@@ -75,25 +71,6 @@ public class SmallThreadPoolLoadTest extends AbstractTest
public void testConcurrentWithSmallServerThreadPool() throws Exception
{
start(new LoadServlet());
- AbstractHTTP2ServerConnectionFactory factory = connector.getBean(AbstractHTTP2ServerConnectionFactory.class);
- factory.setExecutionStrategyFactory(new ExecuteProduceConsume.Factory()
- {
- @Override
- public ExecutionStrategy newExecutionStrategy(ExecutionStrategy.Producer producer, Executor executor)
- {
- return new ExecuteProduceConsume(producer, executor)
- {
- @Override
- protected void executeTask(Runnable task)
- {
- if (task instanceof Rejectable)
- ((Rejectable)task).reject();
- else
- super.executeTask(task);
- }
- };
- }
- });
// Only one connection to the server.
Session session = newClient(new Session.Listener.Adapter());
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
index 5b3375d9fa0..c05d70e0eae 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java
@@ -44,7 +44,6 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
-import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
@@ -553,7 +552,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
- class Accept implements Runnable, Rejectable
+ class Accept implements Runnable, Closeable
{
private final SocketChannel channel;
private final Object attachment;
@@ -565,9 +564,9 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
@Override
- public void reject()
+ public void close()
{
- LOG.debug("rejected accept {}",channel);
+ LOG.debug("closed accept of {}", channel);
closeNoExceptions(channel);
}
@@ -587,7 +586,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
- private class CreateEndPoint implements Product, Rejectable
+ private class CreateEndPoint implements Product, Closeable
{
private final SocketChannel channel;
private final SelectionKey key;
@@ -613,9 +612,9 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
@Override
- public void reject()
+ public void close()
{
- LOG.debug("rejected create {}",channel);
+ LOG.debug("closed creation of {}", channel);
closeNoExceptions(channel);
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
index 1f1d82af525..c63aba16394 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
@@ -18,6 +18,7 @@
package org.eclipse.jetty.io;
+import java.io.Closeable;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
@@ -25,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
-import org.eclipse.jetty.util.thread.ExecutionStrategy.Rejectable;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
@@ -69,14 +69,14 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
}
};
- private abstract class RejectableRunnable implements Runnable,Rejectable
+ private abstract class RunnableCloseable implements Runnable, Closeable
{
@Override
- public void reject()
+ public void close()
{
try
{
- close();
+ SelectChannelEndPoint.this.close();
}
catch (Throwable x)
{
@@ -85,7 +85,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
}
}
- private final Runnable _runFillable = new RejectableRunnable()
+ private final Runnable _runFillable = new RunnableCloseable()
{
@Override
public void run()
@@ -99,7 +99,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
return SelectChannelEndPoint.this.toString()+":runFillable";
}
};
- private final Runnable _runCompleteWrite = new RejectableRunnable()
+ private final Runnable _runCompleteWrite = new RunnableCloseable()
{
@Override
public void run()
@@ -113,7 +113,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSel
return SelectChannelEndPoint.this.toString()+":runCompleteWrite";
}
};
- private final Runnable _runCompleteWriteFillable = new RejectableRunnable()
+ private final Runnable _runCompleteWriteFillable = new RunnableCloseable()
{
@Override
public void run()
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
index 92e42ae087b..2f688492990 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java
@@ -18,6 +18,7 @@
package org.eclipse.jetty.util.thread.strategy;
+import java.io.Closeable;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.log.Log;
@@ -28,9 +29,8 @@ import org.eclipse.jetty.util.thread.Locker.Lock;
import org.eclipse.jetty.util.thread.ThreadPool;
/**
- * A strategy where the thread calls produce will always run the resulting task
- * itself. The strategy may dispatches another thread to continue production.
- *
+ * A strategy where the thread that produces will always run the resulting task.
+ * The strategy may then dispatch another thread to continue production.
* The strategy is also known by the nickname 'eat what you kill', which comes from
* the hunting ethic that says a person should not kill anything he or she does not
* plan on eating. In this case, the phrase is used to mean that a thread should
@@ -39,8 +39,7 @@ import org.eclipse.jetty.util.thread.ThreadPool;
* down by running the task in the same core, with good chances of having a hot CPU
* cache. It also avoids the creation of a queue of produced tasks that the system
* does not yet have capacity to consume, which can save memory and exert back
- * pressure on producers.
- *
+ * pressure on producers.
*/
public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements ExecutionStrategy, Runnable
{
@@ -191,7 +190,7 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
}
// Execute the task.
- executeTask(task);
+ executeProduct(task);
}
return !idle;
}
@@ -203,9 +202,37 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements
}
}
- protected void executeTask(Runnable task)
+ /**
+ * Only called when in {@link #isLowOnThreads() low threads mode}
+ * to execute the task produced by the producer.
+ * Because
+ * If the task implements {@link Rejectable}, then {@link Rejectable#reject()}
+ * is immediately called on the task object. If the task also implements
+ * {@link Closeable}, then {@link Closeable#close()} is called on the task object.
+ * If the task does not implement {@link Rejectable}, then it is
+ * {@link #execute(Runnable) executed}.
+ *
+ * @param task the produced task to execute
+ */
+ protected void executeProduct(Runnable task)
{
- execute(task);
+ if (task instanceof Rejectable)
+ {
+ try
+ {
+ ((Rejectable)task).reject();
+ if (task instanceof Closeable)
+ ((Closeable)task).close();
+ }
+ catch (Throwable x)
+ {
+ LOG.debug(x);
+ }
+ }
+ else
+ {
+ execute(task);
+ }
}
private void executeProduceConsume()
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java
index 29372697a90..8797e188aff 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java
@@ -18,6 +18,7 @@
package org.eclipse.jetty.util.thread.strategy;
+import java.io.Closeable;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
@@ -25,6 +26,12 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
+/**
+ * Base class for strategies that need to execute a task by submitting it to an {@link Executor}.
+ * If the submission to the {@code Executor} is rejected (via a {@link RejectedExecutionException}),
+ * the task is tested whether it implements {@link Closeable}; if it does, then {@link Closeable#close()}
+ * is called on the task object.
+ */
public abstract class ExecutingExecutionStrategy implements ExecutionStrategy
{
private static final Logger LOG = Log.getLogger(ExecutingExecutionStrategy.class);
@@ -45,13 +52,13 @@ public abstract class ExecutingExecutionStrategy implements ExecutionStrategy
}
catch(RejectedExecutionException e)
{
- // If we cannot execute, then discard/reject the task and keep producing
+ // If we cannot execute, then close the task and keep producing.
LOG.debug(e);
- LOG.warn("RejectedExecution {}",task);
+ LOG.warn("Rejected execution of {}",task);
try
{
- if (task instanceof Rejectable)
- ((Rejectable)task).reject();
+ if (task instanceof Closeable)
+ ((Closeable)task).close();
}
catch (Exception x)
{