From 30fd8323f4f824ecd405333cb0770c628767759d Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 9 May 2016 15:52:16 +0200 Subject: [PATCH 1/8] Added jobs to detailed dump. --- .../jetty/util/thread/QueuedThreadPool.java | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index b52daa037f0..ba3d4060778 100755 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -22,6 +22,7 @@ package org.eclipse.jetty.util.thread; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -51,7 +52,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo private final AtomicInteger _threadsStarted = new AtomicInteger(); private final AtomicInteger _threadsIdle = new AtomicInteger(); private final AtomicLong _lastShrink = new AtomicLong(); - private final ConcurrentHashSet _threads=new ConcurrentHashSet(); + private final ConcurrentHashSet _threads=new ConcurrentHashSet<>(); private final Object _joinLock = new Object(); private final BlockingQueue _jobs; private final ThreadGroup _threadGroup; @@ -126,13 +127,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo jobs.clear(); // Fill job Q with noop jobs to wakeup idle - Runnable noop = new Runnable() - { - @Override - public void run() - { - } - }; + Runnable noop = () -> {}; for (int i = _threadsStarted.get(); i-- > 0; ) jobs.offer(noop); @@ -166,7 +161,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo if (size > 0) { Thread.yield(); - + if (LOG.isDebugEnabled()) { for (Thread unstopped : _threads) @@ -193,7 +188,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } /** - * Thread Pool should use Daemon Threading. + * Thread Pool should use Daemon Threading. * * @param daemon true to enable delegation * @see Thread#setDaemon(boolean) @@ -332,10 +327,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo { return _priority; } - + /** * Get the size of the job queue. - * + * * @return Number of jobs queued waiting for a thread */ @ManagedAttribute("Size of the job queue") @@ -346,7 +341,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo /** * Is thread pool using daemon threading - * + * * @return true if delegating to named or anonymous pool * @see Thread#setDaemon(boolean) */ @@ -365,7 +360,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo { _detailedDump = detailedDump; } - + @Override public void execute(Runnable job) { @@ -428,7 +423,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo { return getThreads() - getIdleThreads(); } - + /** * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs */ @@ -487,7 +482,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @Override public void dump(Appendable out, String indent) throws IOException { - List dump = new ArrayList<>(getMaxThreads()); + List threads = new ArrayList<>(getMaxThreads()); for (final Thread thread : _threads) { final StackTraceElement[] trace = thread.getStackTrace(); @@ -504,7 +499,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo if (isDetailedDump()) { - dump.add(new Dumpable() + threads.add(new Dumpable() { @Override public void dump(Appendable out, String indent) throws IOException @@ -527,12 +522,16 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo else { int p=thread.getPriority(); - dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p))); + threads.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")+ (p==Thread.NORM_PRIORITY?"":(" prio="+p))); } } + List jobs = Collections.emptyList(); + if (isDetailedDump()) + jobs = new ArrayList<>(getQueue()); + ContainerLifeCycle.dumpObject(out, this); - ContainerLifeCycle.dump(out, indent, dump); + ContainerLifeCycle.dump(out, indent, threads, jobs); } @Override @@ -664,7 +663,9 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo /** * @param queue the job queue + * @deprecated pass the queue to the constructor instead */ + @Deprecated public void setQueue(BlockingQueue queue) { throw new UnsupportedOperationException("Use constructor injection"); From 9b6d42317b03e8293a91e6a0ddf9661b3d47b2d7 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 10 May 2016 14:48:27 +0200 Subject: [PATCH 2/8] Improved logging. --- .../src/main/java/org/eclipse/jetty/server/HttpInput.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 4bb8f90db9a..293887cb77a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -147,9 +147,9 @@ public class HttpInput extends ServletInputStream implements Runnable Content item = nextContent(); if (item!=null) { - if (LOG.isDebugEnabled()) - LOG.debug("{} read {} from {}",this,len,item); int l = get(item, b, off, len); + if (LOG.isDebugEnabled()) + LOG.debug("{} read {} from {}",this,l,item); consumeNonContent(); @@ -357,7 +357,7 @@ public class HttpInput extends ServletInputStream implements Runnable } if (LOG.isDebugEnabled()) - LOG.debug("{} blocking for content timeout={} ...", this,timeout); + LOG.debug("{} blocking for content timeout={}", this,timeout); if (timeout>0) _inputQ.wait(timeout); else @@ -845,5 +845,4 @@ public class HttpInput extends ServletInputStream implements Runnable return "AEOF"; } }; - } From 8ac23d187abcf12305c90dbdb49a5426cfad1220 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 10 May 2016 14:57:57 +0200 Subject: [PATCH 3/8] Added tests to verify input data consumption. Verify that input data is consumed at the end of a request handling, either when input is not read and when an exception is thrown, to make sure that the session flow control is not stalled. --- .../jetty/http2/client/StreamResetTest.java | 90 ++++++++++++++++++- .../test/resources/jetty-logging.properties | 3 +- .../http2/server/HttpTransportOverHTTP2.java | 26 +++--- 3 files changed, 105 insertions(+), 14 deletions(-) diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index 776cb56bec8..87cb224f6d4 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -37,6 +37,8 @@ import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.FlowControlStrategy; +import org.eclipse.jetty.http2.ISession; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; @@ -47,6 +49,7 @@ import org.eclipse.jetty.server.HttpOutput; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FuturePromise; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; @@ -221,7 +224,7 @@ public class StreamResetTest extends AbstractTest response.setStatus(200); response.setContentType("text/plain;charset=" + charset.name()); - response.setContentLength(data.length*10); + response.setContentLength(data.length * 10); response.flushBuffer(); try @@ -238,7 +241,7 @@ public class StreamResetTest extends AbstractTest { // Write some content after the stream has // been reset, it should throw an exception. - for (int i=0;i<10;i++) + for (int i = 0; i < 10; i++) { Thread.sleep(500); response.getOutputStream().write(data); @@ -350,4 +353,87 @@ public class StreamResetTest extends AbstractTest Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testClientResetConsumesQueuedData() throws Exception + { + start(new EmptyHttpServlet()); + + Session client = newClient(new Session.Listener.Adapter()); + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame frame = new HeadersFrame(request, null, false); + FuturePromise promise = new FuturePromise<>(); + client.newStream(frame, promise, new Stream.Listener.Adapter()); + Stream stream = promise.get(5, TimeUnit.SECONDS); + ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + CountDownLatch dataLatch = new CountDownLatch(1); + stream.data(new DataFrame(stream.getId(), data, false), new Callback() + { + @Override + public void succeeded() + { + dataLatch.countDown(); + } + }); + // The server does not read the data, so the flow control window should be zero. + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(0, ((ISession)client).updateSendWindow(0)); + + // Now reset the stream. + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); + + // Wait for the server to receive the reset and process + // it, and for the client to process the window updates. + Thread.sleep(1000); + + Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0)); + } + + @Test + public void testServerExceptionConsumesQueuedData() throws Exception + { + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + try + { + // Wait to let the data sent by the client to be queued. + Thread.sleep(1000); + throw new IllegalStateException(); + } + catch (InterruptedException e) + { + throw new InterruptedIOException(); + } + } + }); + + Session client = newClient(new Session.Listener.Adapter()); + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame frame = new HeadersFrame(request, null, false); + FuturePromise promise = new FuturePromise<>(); + client.newStream(frame, promise, new Stream.Listener.Adapter()); + Stream stream = promise.get(5, TimeUnit.SECONDS); + ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + CountDownLatch dataLatch = new CountDownLatch(1); + stream.data(new DataFrame(stream.getId(), data, false), new Callback() + { + @Override + public void succeeded() + { + dataLatch.countDown(); + } + }); + // The server does not read the data, so the flow control window should be zero. + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(0, ((ISession)client).updateSendWindow(0)); + + // Wait for the server process the exception, and + // for the client to process the window updates. + Thread.sleep(2000); + + Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0)); + } } diff --git a/jetty-http2/http2-client/src/test/resources/jetty-logging.properties b/jetty-http2/http2-client/src/test/resources/jetty-logging.properties index b7185f09f50..5304801a325 100644 --- a/jetty-http2/http2-client/src/test/resources/jetty-logging.properties +++ b/jetty-http2/http2-client/src/test/resources/jetty-logging.properties @@ -1,4 +1,5 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog -org.eclipse.jetty.http2.hpack.LEVEL=INFO +#org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.http2.LEVEL=DEBUG +org.eclipse.jetty.http2.hpack.LEVEL=INFO #org.eclipse.jetty.servlets.LEVEL=DEBUG diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index 102734bdfeb..21d03ff0a67 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -62,7 +62,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport // copying we can defer to the endpoint return connection.getEndPoint().isOptimizedForDirectBuffers(); } - + public IStream getStream() { return stream; @@ -145,7 +145,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport if (LOG.isDebugEnabled()) LOG.debug("HTTP/2 Push {}",request); - + stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Promise() { @Override @@ -190,16 +190,20 @@ public class HttpTransportOverHTTP2 implements HttpTransport @Override public void onCompleted() { + // If the stream is not closed, it is still reading the request content. + // Send a reset to the other end so that it stops sending data. if (!stream.isClosed()) - { - // If the stream is not closed, it is still reading the request content. - // Send a reset to the other end so that it stops sending data. stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); - // Now that this stream is reset, in-flight data frames will be consumed and discarded. - // Consume the existing queued data frames to avoid stalling the flow control. - HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); - channel.getRequest().getHttpInput().consumeAll(); - } + + // Consume the existing queued data frames to + // avoid stalling the session flow control. + consumeInput(); + } + + protected void consumeInput() + { + HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); + channel.getRequest().getHttpInput().consumeAll(); } @Override @@ -213,7 +217,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport } private class CommitCallback implements Callback.NonBlocking - { + { @Override public void succeeded() { From fe8102a4304045417d6539a9a4cccaae060f60e9 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 10 May 2016 15:07:27 +0200 Subject: [PATCH 4/8] Cosmetics. --- .../util/thread/strategy/ExecutingExecutionStrategy.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java index 5985dc9594a..29372697a90 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecutingExecutionStrategy.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.util.thread.strategy; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; + import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; @@ -27,14 +28,13 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy; public abstract class ExecutingExecutionStrategy implements ExecutionStrategy { private static final Logger LOG = Log.getLogger(ExecutingExecutionStrategy.class); - + private final Executor _executor; - + protected ExecutingExecutionStrategy(Executor executor) { _executor=executor; } - protected boolean execute(Runnable task) { @@ -61,5 +61,4 @@ public abstract class ExecutingExecutionStrategy implements ExecutionStrategy } return false; } - } From d9c9e4a39901828975558d49f57041344625e98e Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 10 May 2016 17:57:52 +0200 Subject: [PATCH 5/8] Issue #557 (Review ThreadPool.isLowOnThreads()) Updated isLowOnThreads() to take into account also the number of idle threads, so that it now returns true if the number of idle threads <= 1. --- .../org/eclipse/jetty/util/thread/QueuedThreadPool.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index ba3d4060778..24d25eae191 100755 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -425,13 +425,16 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } /** - * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs + * @return whether the pool is at {@code maxThreads} and there are + * either one or less idle threads, or less idle threads than queued jobs */ @Override - @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs") + @ManagedAttribute("Whether the pools is at maxThreads and there are either one or less idle threads, or less idle threads than queued jobs") public boolean isLowOnThreads() { - return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get(); + int idleThreads = _threadsIdle.get(); + return _threadsStarted.get() == _maxThreads && + (idleThreads <= 1 || idleThreads <= _jobs.size()); } private boolean startThreads(int threadsToStart) From f47b6811b47a6616bcde58a8d9dffa83723c4f14 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 11 May 2016 12:21:08 +0200 Subject: [PATCH 6/8] Improved javadocs and JMX descriptions. --- .../jetty/util/thread/QueuedThreadPool.java | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 24d25eae191..a68f3da8de5 100755 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -44,7 +44,7 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; -@ManagedObject("A thread pool with no max bound by default") +@ManagedObject("A thread pool") public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable { private static final Logger LOG = Log.getLogger(QueuedThreadPool.class); @@ -333,24 +333,23 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo * * @return Number of jobs queued waiting for a thread */ - @ManagedAttribute("Size of the job queue") + @ManagedAttribute("size of the job queue") public int getQueueSize() { return _jobs.size(); } /** - * Is thread pool using daemon threading - * - * @return true if delegating to named or anonymous pool + * @return whether this thread pool is using daemon threads * @see Thread#setDaemon(boolean) */ - @ManagedAttribute("thead pool using a daemon thread") + @ManagedAttribute("thread pool uses daemon threads") public boolean isDaemon() { return _daemon; } + @ManagedAttribute("reports additional details in the dump") public boolean isDetailedDump() { return _detailedDump; @@ -396,29 +395,29 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } /** - * @return The total number of threads currently in the pool + * @return the total number of threads currently in the pool */ @Override - @ManagedAttribute("total number of threads currently in the pool") + @ManagedAttribute("number of threads in the pool") public int getThreads() { return _threadsStarted.get(); } /** - * @return The number of idle threads in the pool + * @return the number of idle threads in the pool */ @Override - @ManagedAttribute("total number of idle threads in the pool") + @ManagedAttribute("number of idle threads in the pool") public int getIdleThreads() { return _threadsIdle.get(); } /** - * @return The number of busy threads in the pool + * @return the number of busy threads in the pool */ - @ManagedAttribute("total number of busy threads in the pool") + @ManagedAttribute("number of busy threads in the pool") public int getBusyThreads() { return getThreads() - getIdleThreads(); @@ -476,7 +475,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } @Override - @ManagedOperation("dump thread state") + @ManagedOperation("dumps thread pool state") public String dump() { return ContainerLifeCycle.dump(this); @@ -675,10 +674,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } /** - * @param id The thread ID to interrupt. + * @param id the thread ID to interrupt. * @return true if the thread was found and interrupted. */ - @ManagedOperation("interrupt a pool thread") + @ManagedOperation("interrupts a pool thread") public boolean interruptThread(@Name("id") long id) { for (Thread thread : _threads) @@ -693,10 +692,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } /** - * @param id The thread ID to interrupt. - * @return true if the thread was found and interrupted. + * @param id the thread ID to interrupt. + * @return the stack frames dump */ - @ManagedOperation("dump a pool thread stack") + @ManagedOperation("dumps a pool thread stack") public String dumpThread(@Name("id") long id) { for (Thread thread : _threads) From 509e1f0f93fc6b8f883eeb28a351936ea0cd0fe2 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 11 May 2016 12:23:15 +0200 Subject: [PATCH 7/8] Fixes #557 (Review ThreadPool.isLowOnThreads()). Updated the isLowOnThreads() formula with a more conservative one, introducing lowThreadsThreshold parameter to tune it. --- .../jetty/util/thread/QueuedThreadPool.java | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index a68f3da8de5..b41fab09151 100755 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -63,6 +63,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo private int _priority = Thread.NORM_PRIORITY; private boolean _daemon = false; private boolean _detailedDump = false; + private int _lowThreadsThreshold = 1; public QueuedThreadPool() { @@ -360,6 +361,17 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo _detailedDump = detailedDump; } + @ManagedAttribute("threshold at which the pool is low on threads") + public int getLowThreadsThreshold() + { + return _lowThreadsThreshold; + } + + public void setLowThreadsThreshold(int lowThreadsThreshold) + { + _lowThreadsThreshold = lowThreadsThreshold; + } + @Override public void execute(Runnable job) { @@ -424,16 +436,20 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } /** - * @return whether the pool is at {@code maxThreads} and there are - * either one or less idle threads, or less idle threads than queued jobs + *

Returns whether this thread pool is low on threads.

+ *

The current formula is:

+ *
+     * maxThreads - threads + idleThreads - queueSize <= lowThreadsThreshold
+     * 
+ * + * @return whether the pool is low on threads + * @see #getLowThreadsThreshold() */ @Override - @ManagedAttribute("Whether the pools is at maxThreads and there are either one or less idle threads, or less idle threads than queued jobs") + @ManagedAttribute(value = "thread pool is low on threads", readonly = true) public boolean isLowOnThreads() { - int idleThreads = _threadsIdle.get(); - return _threadsStarted.get() == _maxThreads && - (idleThreads <= 1 || idleThreads <= _jobs.size()); + return getMaxThreads() - getThreads() + getIdleThreads() - getQueueSize() <= getLowThreadsThreshold(); } private boolean startThreads(int threadsToStart) From 1fc40864e180c2770c3cefc25f5128ae79677665 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 10 May 2016 17:59:51 +0200 Subject: [PATCH 8/8] Issue #558 (HTTP/2 server hangs when thread pool is low on threads). Verify that rejecting tasks avoids hanging the server when in low threads mode. --- .../http2/client/SmallThreadPoolLoadTest.java | 237 ++++++++++++++++++ .../eclipse/jetty/http2/HTTP2Connection.java | 2 +- .../http2/server/HTTP2ServerConnection.java | 23 +- .../http2/server/HttpChannelOverHTTP2.java | 2 +- .../strategy/ExecuteProduceConsume.java | 7 +- 5 files changed, 266 insertions(+), 5 deletions(-) create mode 100644 jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SmallThreadPoolLoadTest.java diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SmallThreadPoolLoadTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SmallThreadPoolLoadTest.java new file mode 100644 index 00000000000..1f3c30f8110 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SmallThreadPoolLoadTest.java @@ -0,0 +1,237 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Locale; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; + +public class SmallThreadPoolLoadTest extends AbstractTest +{ + private final Logger logger = Log.getLogger(SmallThreadPoolLoadTest.class); + private final AtomicLong requestIds = new AtomicLong(); + + @Override + protected void customizeContext(ServletContextHandler context) + { + QueuedThreadPool serverThreads = (QueuedThreadPool)context.getServer().getThreadPool(); + serverThreads.setDetailedDump(true); + serverThreads.setMaxThreads(5); + serverThreads.setLowThreadsThreshold(1); + } + + @Test + public void testConcurrentWithSmallServerThreadPool() throws Exception + { + start(new LoadServlet()); + AbstractHTTP2ServerConnectionFactory factory = connector.getBean(AbstractHTTP2ServerConnectionFactory.class); + factory.setExecutionStrategyFactory(new ExecuteProduceConsume.Factory() + { + @Override + public ExecutionStrategy newExecutionStrategy(ExecutionStrategy.Producer producer, Executor executor) + { + return new ExecuteProduceConsume(producer, executor) + { + @Override + protected void executeTask(Runnable task) + { + if (task instanceof Rejectable) + ((Rejectable)task).reject(); + else + super.executeTask(task); + } + }; + } + }); + + // Only one connection to the server. + Session session = newClient(new Session.Listener.Adapter()); + + int runs = 10; + int iterations = 512; + boolean result = IntStream.range(0, 16).parallel() + .mapToObj(i -> IntStream.range(0, runs) + .mapToObj(j -> run(session, iterations)) + .reduce(true, (acc, res) -> acc && res)) + .reduce(true, (acc, res) -> acc && res); + + Assert.assertTrue(result); + } + + private boolean run(Session session, int iterations) + { + try + { + CountDownLatch latch = new CountDownLatch(iterations); + int factor = (logger.isDebugEnabled() ? 25 : 1) * 100; + + // Dumps the state of the client if the test takes too long. + final Thread testThread = Thread.currentThread(); + Scheduler.Task task = client.getScheduler().schedule(() -> + { + logger.warn("Interrupting test, it is taking too long{}{}{}{}", + System.lineSeparator(), server.dump(), + System.lineSeparator(), client.dump()); + testThread.interrupt(); + }, iterations * factor, TimeUnit.MILLISECONDS); + + long successes = 0; + long begin = System.nanoTime(); + for (int i = 0; i < iterations; ++i) + { + boolean success = test(session, latch); + if (success) + ++successes; + } + + Assert.assertTrue(latch.await(iterations, TimeUnit.SECONDS)); + long end = System.nanoTime(); + Assert.assertThat(successes, Matchers.greaterThan(0L)); + task.cancel(); + long elapsed = TimeUnit.NANOSECONDS.toMillis(end - begin); + logger.info("{} requests in {} ms, {}/{} success/failure, {} req/s", + iterations, elapsed, + successes, iterations - successes, + elapsed > 0 ? iterations * 1000 / elapsed : -1); + return true; + } + catch (Exception x) + { + x.printStackTrace(); + return false; + } + } + + private boolean test(Session session, CountDownLatch latch) throws Exception + { + ThreadLocalRandom random = ThreadLocalRandom.current(); + // Choose a random method + boolean download = random.nextBoolean(); + HttpMethod method = download ? HttpMethod.GET : HttpMethod.POST; + + int maxContentLength = 128 * 1024; + int contentLength = random.nextInt(maxContentLength) + 1; + + long requestId = requestIds.incrementAndGet(); + MetaData.Request request = newRequest(method.asString(), "/" + requestId, new HttpFields()); + if (download) + request.getFields().put("X-Download", String.valueOf(contentLength)); + HeadersFrame requestFrame = new HeadersFrame(request, null, download); + FuturePromise promise = new FuturePromise<>(); + CountDownLatch requestLatch = new CountDownLatch(1); + AtomicBoolean reset = new AtomicBoolean(); + session.newStream(requestFrame, promise, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + requestLatch.countDown(); + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + callback.succeeded(); + if (frame.isEndStream()) + requestLatch.countDown(); + } + + @Override + public void onReset(Stream stream, ResetFrame frame) + { + reset.set(true); + requestLatch.countDown(); + } + }); + if (!download) + { + Stream stream = promise.get(5, TimeUnit.SECONDS); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(contentLength), true), Callback.NOOP); + } + + boolean success = requestLatch.await(5, TimeUnit.SECONDS); + if (success) + latch.countDown(); + else + logger.warn("Request {} took too long{}{}{}{}", requestId, + System.lineSeparator(), server.dump(), + System.lineSeparator(), client.dump()); + return !reset.get(); + } + + private static class LoadServlet extends HttpServlet + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + String method = request.getMethod().toUpperCase(Locale.ENGLISH); + switch (method) + { + case "GET": + { + int contentLength = request.getIntHeader("X-Download"); + if (contentLength > 0) + response.getOutputStream().write(new byte[contentLength]); + break; + } + case "POST": + { + IO.copy(request.getInputStream(), response.getOutputStream()); + break; + } + } + } + } +} diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index e966923e04b..8e89cf2c5b2 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -167,7 +167,7 @@ public class HTTP2Connection extends AbstractConnection task = tasks.poll(); if (LOG.isDebugEnabled()) - LOG.debug("Dequeued task {}", task); + LOG.debug("Dequeued new task {}", task); if (task != null) { release(); diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index ad3ef6cd988..946d4b3babc 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -30,6 +30,7 @@ import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData.Request; +import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.HTTP2Connection; import org.eclipse.jetty.http2.ISession; import org.eclipse.jetty.http2.IStream; @@ -38,6 +39,7 @@ import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.PrefaceFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.http2.parser.ServerParser; import org.eclipse.jetty.http2.parser.SettingsBodyParser; @@ -192,19 +194,36 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection return true; } - private class ServerHttpChannelOverHTTP2 extends HttpChannelOverHTTP2 + private class ServerHttpChannelOverHTTP2 extends HttpChannelOverHTTP2 implements ExecutionStrategy.Rejectable { public ServerHttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport) { super(connector, configuration, endPoint, transport); } + @Override + public void recycle() + { + super.recycle(); + channels.offer(this); + } + @Override public void onCompleted() { super.onCompleted(); recycle(); - channels.offer(this); + } + + @Override + public void reject() + { + IStream stream = getStream(); + stream.reset(new ResetFrame(stream.getId(), ErrorCode.ENHANCE_YOUR_CALM_ERROR.code), Callback.NOOP); + // Consume the existing queued data frames to + // avoid stalling the session flow control. + getHttpTransport().consumeInput(); + recycle(); } } } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index a9e054c079d..2751a76d883 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -59,7 +59,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel super(connector, configuration, endPoint, transport); } - private IStream getStream() + protected IStream getStream() { return getHttpTransport().getStream(); } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java index ae71e64ac77..92e42ae087b 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsume.java @@ -191,7 +191,7 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements } // Execute the task. - execute(task); + executeTask(task); } return !idle; } @@ -203,6 +203,11 @@ public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements } } + protected void executeTask(Runnable task) + { + execute(task); + } + private void executeProduceConsume() { if (LOG.isDebugEnabled())