From 0c61ec3e4dcbb24e098ce8b515ff641909c1c6cc Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 29 May 2019 17:24:01 +0200 Subject: [PATCH] Fix idle race by allowing idle count to go negative (#3694) * Fix idle race by allowing idle count to go negative * Fixed flakey dump test * don't exit Runner on exceptions * cleanup after pair programming with sbordet * longer benchmark runs * optimized by removing need to check isRunning Signed-off-by: Greg Wilkins * Code reformatting. * Fixed stop logic. Signed-off-by: Simone Bordet --- .../util/thread/jmh/ThreadPoolBenchmark.java | 43 +- .../eclipse/jetty/util/AtomicBiInteger.java | 17 +- .../eclipse/jetty/util/AtomicTriInteger.java | 175 ------ .../jetty/util/BlockingArrayQueue.java | 48 +- .../jetty/util/thread/ExecutorThreadPool.java | 8 +- .../jetty/util/thread/QueuedThreadPool.java | 514 ++++++++++-------- .../thread/ScheduledExecutorScheduler.java | 27 +- .../jetty/util/AtomicTriIntegerTest.java | 103 ---- .../jetty/util/BlockingArrayQueueTest.java | 39 +- .../util/thread/QueuedThreadPoolTest.java | 69 ++- 10 files changed, 494 insertions(+), 549 deletions(-) delete mode 100644 jetty-util/src/main/java/org/eclipse/jetty/util/AtomicTriInteger.java mode change 100755 => 100644 jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java mode change 100755 => 100644 jetty-util/src/main/java/org/eclipse/jetty/util/thread/ScheduledExecutorScheduler.java delete mode 100644 jetty-util/src/test/java/org/eclipse/jetty/util/AtomicTriIntegerTest.java diff --git a/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/jmh/ThreadPoolBenchmark.java b/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/jmh/ThreadPoolBenchmark.java index 0796286afbc..396dec710eb 100644 --- a/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/jmh/ThreadPoolBenchmark.java +++ b/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/jmh/ThreadPoolBenchmark.java @@ -18,9 +18,12 @@ package org.eclipse.jetty.util.thread.jmh; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.thread.ExecutorThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool; @@ -42,16 +45,16 @@ import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; @State(Scope.Benchmark) -@Warmup(iterations = 5, time = 10000, timeUnit = TimeUnit.MILLISECONDS) -@Measurement(iterations = 3, time = 10000, timeUnit = TimeUnit.MILLISECONDS) +@Warmup(iterations = 8, time = 1000, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 3, time = 1000, timeUnit = TimeUnit.MILLISECONDS) public class ThreadPoolBenchmark { public enum Type { - QTP, ETP; + QTP, ETP, LQTP, LETP, AQTP, AETP; } - @Param({ "QTP", "ETP"}) + @Param({ "QTP", "ETP" /*, "LQTP", "LETP", "AQTP", "AETP" */ }) Type type; @Param({ "200" }) @@ -65,11 +68,39 @@ public class ThreadPoolBenchmark switch(type) { case QTP: - pool = new QueuedThreadPool(size,size); + { + QueuedThreadPool qtp = new QueuedThreadPool(size, size, new BlockingArrayQueue<>(32768, 32768)); + qtp.setReservedThreads(0); + pool = qtp; break; + } case ETP: - pool = new ExecutorThreadPool(size,size); + pool = new ExecutorThreadPool(size, size, new BlockingArrayQueue<>(32768, 32768)); + break; + + case LQTP: + { + QueuedThreadPool qtp = new QueuedThreadPool(size, size, new LinkedBlockingQueue<>()); + qtp.setReservedThreads(0); + pool = qtp; + break; + } + + case LETP: + pool = new ExecutorThreadPool(size, size, new LinkedBlockingQueue<>()); + break; + + case AQTP: + { + QueuedThreadPool qtp = new QueuedThreadPool(size, size, new ArrayBlockingQueue<>(32768)); + qtp.setReservedThreads(0); + pool = qtp; + break; + } + + case AETP: + pool = new ExecutorThreadPool(size, size, new ArrayBlockingQueue<>(32768)); break; } LifeCycle.start(pool); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicBiInteger.java b/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicBiInteger.java index de4e26718e0..e9f9f294a98 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicBiInteger.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicBiInteger.java @@ -24,7 +24,22 @@ import java.util.concurrent.atomic.AtomicLong; * An AtomicLong with additional methods to treat it as two hi/lo integers. */ public class AtomicBiInteger extends AtomicLong -{ +{ + + public AtomicBiInteger() + { + } + + public AtomicBiInteger(long encoded) + { + super(encoded); + } + + public AtomicBiInteger(int hi, int lo) + { + super(encode(hi, lo)); + } + /** * @return the hi value */ diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicTriInteger.java b/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicTriInteger.java deleted file mode 100644 index 6ecec73d382..00000000000 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicTriInteger.java +++ /dev/null @@ -1,175 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.util; - -import java.util.concurrent.atomic.AtomicLong; - -/** - * An AtomicLong with additional methods to treat it as three 21 bit unsigned words. - */ -public class AtomicTriInteger extends AtomicLong -{ - public static int MAX_VALUE = 0x1FFFFF; - public static int MIN_VALUE = 0; - - /** - * Sets the hi and lo values. - * - * @param w0 the 0th word - * @param w1 the 1st word - * @param w2 the 2nd word - */ - public void set(int w0, int w1, int w2) - { - set(encode(w0, w1, w2)); - } - - /** - * Atomically sets the word values to the given updated values only if - * the current encoded value is as expected. - * - * @param expectEncoded the expected encoded value - * @param w0 the 0th word - * @param w1 the 1st word - * @param w2 the 2nd word - * @return {@code true} if successful. - */ - public boolean compareAndSet(long expectEncoded, int w0, int w1, int w2) - { - return compareAndSet(expectEncoded, encode(w0, w1, w2)); - } - - /** - * Atomically adds the given deltas to the current hi and lo values. - * - * @param delta0 the delta to apply to the 0th word value - * @param delta1 the delta to apply to the 1st word value - * @param delta2 the delta to apply to the 2nd word value - */ - public void add(int delta0, int delta1, int delta2) - { - while (true) - { - long encoded = get(); - long update = encode( - getWord0(encoded) + delta0, - getWord1(encoded) + delta1, - getWord2(encoded) + delta2); - if (compareAndSet(encoded, update)) - return; - } - } - - /** - * Gets word 0 value - * - * @return the 16 bit value as an int - */ - public int getWord0() - { - return getWord0(get()); - } - - - /** - * Gets word 1 value - * - * @return the 16 bit value as an int - */ - public int getWord1() - { - return getWord1(get()); - } - - /** - * Gets word 2 value - * - * @return the 16 bit value as an int - */ - public int getWord2() - { - return getWord2(get()); - } - - /** - * Gets word 0 value from the given encoded value. - * - * @param encoded the encoded value - * @return the 16 bit value as an int - */ - public static int getWord0(long encoded) - { - return (int)((encoded >> 42) & MAX_VALUE); - } - - /** - * Gets word 0 value from the given encoded value. - * - * @param encoded the encoded value - * @return the 16 bit value as an int - */ - public static int getWord1(long encoded) - { - return (int)((encoded >> 21) & MAX_VALUE); - } - - /** - * Gets word 0 value from the given encoded value. - * - * @param encoded the encoded value - * @return the 16 bit value as an int - */ - public static int getWord2(long encoded) - { - return (int)(encoded & MAX_VALUE); - } - - /** - * Encodes 4 16 bit words values into a long. - * - * @param w0 the 0th word - * @param w1 the 1st word - * @param w2 the 2nd word - * @return the encoded value - */ - public static long encode(int w0, int w1, int w2) - { - if (w0 < MIN_VALUE - || w0 > MAX_VALUE - || w1 < MIN_VALUE - || w1 > MAX_VALUE - || w2 < MIN_VALUE - || w2 > MAX_VALUE) - throw new IllegalArgumentException(String.format("Words must be %d <= word <= %d: %d, %d, %d", MIN_VALUE, MAX_VALUE, w0, w1, w2)); - long wl0 = ((long)w0) & MAX_VALUE; - long wl1 = ((long)w1) & MAX_VALUE; - long wl2 = ((long)w2) & MAX_VALUE; - return (wl0 << 42) + (wl1 << 21) + (wl2); - } - - @Override - public String toString() - { - long encoded = get(); - int w0 = getWord0(encoded); - int w1 = getWord1(encoded); - int w2 = getWord2(encoded); - return String.format("{%d,%d,%d}", w0, w1, w2); - } -} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingArrayQueue.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingArrayQueue.java index 4658e88a27f..50596c8fac3 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingArrayQueue.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingArrayQueue.java @@ -484,13 +484,56 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu @Override public int drainTo(Collection c) { - throw new UnsupportedOperationException(); + return drainTo(c, Integer.MAX_VALUE); } @Override public int drainTo(Collection c, int maxElements) { - throw new UnsupportedOperationException(); + int elements = 0; + _tailLock.lock(); + try + { + _headLock.lock(); + try + { + final int head = _indexes[HEAD_OFFSET]; + final int tail = _indexes[TAIL_OFFSET]; + final int capacity = _elements.length; + + int i = head; + while (i!=tail && elements extends AbstractList implements BlockingQu _tailLock.lock(); try { - _headLock.lock(); try { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java index 7b7253c9e29..c9da93cdb64 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ExecutorThreadPool.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -63,7 +64,12 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool public ExecutorThreadPool(int maxThreads, int minThreads) { - this(new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()), minThreads, -1, null); + this(maxThreads, minThreads, new LinkedBlockingQueue<>()); + } + + public ExecutorThreadPool(int maxThreads, int minThreads, BlockingQueue queue) + { + this(new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, queue), minThreads, -1, null); } public ExecutorThreadPool(ThreadPoolExecutor executor) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java old mode 100755 new mode 100644 index 7d11e8e19e5..94dab54c6db --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -29,7 +29,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.eclipse.jetty.util.AtomicTriInteger; +import org.eclipse.jetty.util.AtomicBiInteger; import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.annotation.ManagedAttribute; @@ -48,15 +48,16 @@ import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadPool, Dumpable, TryExecutor { private static final Logger LOG = Log.getLogger(QueuedThreadPool.class); + private static Runnable NOOP = () -> {}; /** - * Encodes thread counts:
- *
Word0
Total thread count (including starting and idle)
- *
Word1
Starting threads
- *
Word2
Idle threads
+ * Encodes thread counts: + *
+ *
Hi
Total thread count or Integer.MIN_VALUE if stopping
+ *
Lo
Net idle threads == idle threads - job queue size
*
*/ - private final AtomicTriInteger _counts = new AtomicTriInteger(); + private final AtomicBiInteger _counts = new AtomicBiInteger(Integer.MIN_VALUE, 0); private final AtomicLong _lastShrink = new AtomicLong(); private final Set _threads = ConcurrentHashMap.newKeySet(); private final Object _joinLock = new Object(); @@ -84,12 +85,17 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP this(maxThreads, Math.min(8, maxThreads)); } - public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads) + public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads) { this(maxThreads, minThreads, 60000); } - public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout")int idleTimeout) + public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("queue") BlockingQueue queue) + { + this(maxThreads, minThreads, 60000, -1, queue, null); + } + + public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout) { this(maxThreads, minThreads, idleTimeout, null); } @@ -103,26 +109,24 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP { this(maxThreads, minThreads, idleTimeout, -1, queue, threadGroup); } - + public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads, @Name("queue") BlockingQueue queue, @Name("threadGroup") ThreadGroup threadGroup) { - if (maxThreads < minThreads) { - throw new IllegalArgumentException("max threads ("+maxThreads+") less than min threads (" - +minThreads+")"); - } - + if (maxThreads < minThreads) + throw new IllegalArgumentException("max threads (" + maxThreads + ") less than min threads (" + + minThreads + ")"); setMinThreads(minThreads); setMaxThreads(maxThreads); setIdleTimeout(idleTimeout); setStopTimeout(5000); setReservedThreads(reservedThreads); - if (queue==null) + if (queue == null) { - int capacity=Math.max(_minThreads, 8); - queue=new BlockingArrayQueue<>(capacity, capacity); + int capacity = Math.max(_minThreads, 8) * 1024; + queue = new BlockingArrayQueue<>(capacity, capacity); } - _jobs=queue; - _threadGroup=threadGroup; + _jobs = queue; + _threadGroup = threadGroup; setThreadPoolBudget(new ThreadPoolBudget(this)); } @@ -134,7 +138,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP public void setThreadPoolBudget(ThreadPoolBudget budget) { - if (budget!=null && budget.getSizedThreadPool()!=this) + if (budget != null && budget.getSizedThreadPool() != this) throw new IllegalArgumentException(); _budget = budget; } @@ -142,21 +146,21 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP @Override protected void doStart() throws Exception { - if (_reservedThreads==0) + if (_reservedThreads == 0) { _tryExecutor = NO_TRY; } else { - ReservedThreadExecutor reserved = new ReservedThreadExecutor(this,_reservedThreads); - reserved.setIdleTimeout(_idleTimeout,TimeUnit.MILLISECONDS); + ReservedThreadExecutor reserved = new ReservedThreadExecutor(this, _reservedThreads); + reserved.setIdleTimeout(_idleTimeout, TimeUnit.MILLISECONDS); _tryExecutor = reserved; } addBean(_tryExecutor); - - super.doStart(); - _counts.set(0,0,0); // threads, starting, idle + super.doStart(); + // The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped. + _counts.set(0, 0); // threads, idle ensureThreads(); } @@ -168,54 +172,59 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP removeBean(_tryExecutor); _tryExecutor = TryExecutor.NO_TRY; - + super.doStop(); + // Signal the Runner threads that we are stopping + int threads = _counts.getAndSetHi(Integer.MIN_VALUE); + + // If stop timeout try to gracefully stop long timeout = getStopTimeout(); BlockingQueue jobs = getQueue(); - - // If no stop timeout, clear job queue - if (timeout <= 0) - jobs.clear(); - - // Fill job Q with noop jobs to wakeup idle - Runnable noop = () -> {}; - for (int i = getThreads(); i-- > 0; ) - jobs.offer(noop); - - // try to let jobs complete naturally for half our stop time - joinThreads(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2); - - // If we still have threads running, get a bit more aggressive - - // interrupt remaining threads - for (Thread thread : _threads) + if (timeout > 0) { - if (LOG.isDebugEnabled()) - LOG.debug("Interrupting {}", thread); - thread.interrupt(); - } - - // wait again for the other half of our stop time - joinThreads(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2); - - Thread.yield(); - if (LOG.isDebugEnabled()) - { - for (Thread unstopped : _threads) + // Fill the job queue with noop jobs to wakeup idle threads. + for (int i = 0; i < threads; ++i) { - StringBuilder dmp = new StringBuilder(); - for (StackTraceElement element : unstopped.getStackTrace()) - { - dmp.append(System.lineSeparator()).append("\tat ").append(element); - } - LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString()); + jobs.offer(NOOP); + } + + // try to let jobs complete naturally for half our stop time + joinThreads(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2); + + // If we still have threads running, get a bit more aggressive + + // interrupt remaining threads + for (Thread thread : _threads) + { + if (LOG.isDebugEnabled()) + LOG.debug("Interrupting {}", thread); + thread.interrupt(); + } + + // wait again for the other half of our stop time + joinThreads(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2); + + Thread.yield(); + if (LOG.isDebugEnabled()) + { + for (Thread unstopped : _threads) + { + StringBuilder dmp = new StringBuilder(); + for (StackTraceElement element : unstopped.getStackTrace()) + { + dmp.append(System.lineSeparator()).append("\tat ").append(element); + } + LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString()); + } + } + else + { + for (Thread unstopped : _threads) + { + LOG.warn("{} Couldn't stop {}", this, unstopped); + } } - } - else - { - for (Thread unstopped : _threads) - LOG.warn("{} Couldn't stop {}",this,unstopped); } // Close any un-executed jobs @@ -233,11 +242,11 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP LOG.warn(t); } } - else if (job != noop) + else if (job != NOOP) LOG.warn("Stopped without executing or closing {}", job); } - if (_budget!=null) + if (_budget != null) _budget.reset(); synchronized (_joinLock) @@ -259,7 +268,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP } /** - * Thread Pool should use Daemon Threading. + * Thread Pool should use Daemon Threading. * * @param daemon true to enable delegation * @see Thread#setDaemon(boolean) @@ -291,9 +300,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP @Override public void setMaxThreads(int maxThreads) { - if (maxThreadsAtomicTriInteger.MAX_VALUE) - throw new IllegalArgumentException("maxThreads="+maxThreads); - if (_budget!=null) + if (_budget != null) _budget.check(maxThreads); _maxThreads = maxThreads; if (_minThreads > _maxThreads) @@ -317,11 +324,11 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP if (isStarted()) ensureThreads(); } - + /** * Set the number of reserved threads. * - * @param reservedThreads number of reserved threads or -1 for heuristically determined + * @param reservedThreads number of reserved threads or -1 for heuristically determined * @see #getReservedThreads */ public void setReservedThreads(int reservedThreads) @@ -426,10 +433,10 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP { return _priority; } - + /** * Get the size of the job queue. - * + * * @return Number of jobs queued waiting for a thread */ @ManagedAttribute("size of the job queue") @@ -458,7 +465,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP { _detailedDump = detailedDump; } - + @ManagedAttribute("threshold at which the pool is low on threads") public int getLowThreadsThreshold() { @@ -473,22 +480,55 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP @Override public void execute(Runnable job) { - if (!isRunning() || !_jobs.offer(job)) + // Determine if we need to start a thread, use and idle thread or just queue this job + boolean startThread; + while (true) { - LOG.warn("{} rejected {}", this, job); + // Get the atomic counts + long counts = _counts.get(); + + // Get the number of threads started (might not yet be running) + int threads = AtomicBiInteger.getHi(counts); + if (threads == Integer.MIN_VALUE) + throw new RejectedExecutionException(job.toString()); + + // Get the number of truly idle threads. This count is reduced by the + // job queue size so that any threads that are idle but are about to take + // a job from the queue are not counted. + int idle = AtomicBiInteger.getLo(counts); + + // Start a thread if we have insufficient idle threads to meet demand + // and we are not at max threads. + startThread = (idle <= 0 && threads < _maxThreads); + + // The job will be run by an idle thread when available + if (!_counts.compareAndSet(counts, threads + (startThread ? 1 : 0), idle - 1)) + continue; + + break; + } + + if (!_jobs.offer(job)) + { + // reverse our changes to _counts. + if (addCounts(startThread ? -1 : 0, 1)) + LOG.warn("{} rejected {}", this, job); throw new RejectedExecutionException(job.toString()); } + if (LOG.isDebugEnabled()) - LOG.debug("queue {}",job); - // Make sure there is at least one thread executing the job. - ensureThreads(); + LOG.debug("queue {} startThread={}", job, startThread); + + // Start a thread if one was needed + if (startThread) + startThread(); } @Override public boolean tryExecute(Runnable task) { TryExecutor tryExecutor = _tryExecutor; - return tryExecutor!=null && tryExecutor.tryExecute(task); + return tryExecutor != null && tryExecutor.tryExecute(task); } /** @@ -500,11 +540,15 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP synchronized (_joinLock) { while (isRunning()) + { _joinLock.wait(); + } } while (isStopping()) + { Thread.sleep(1); + } } /** @@ -514,7 +558,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP @ManagedAttribute("number of threads in the pool") public int getThreads() { - return _counts.getWord0(); + int threads = _counts.getHi(); + return Math.max(0, threads); } /** @@ -524,7 +569,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP @ManagedAttribute("number of idle threads in the pool") public int getIdleThreads() { - return _counts.getWord2(); + int idle = _counts.getLo(); + return Math.max(0, idle); } /** @@ -536,7 +582,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP int reserved = _tryExecutor instanceof ReservedThreadExecutor ? ((ReservedThreadExecutor)_tryExecutor).getAvailable() : 0; return getThreads() - getIdleThreads() - reserved; } - + /** *

Returns whether this thread pool is low on threads.

*

The current formula is:

@@ -556,43 +602,63 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP private void ensureThreads() { - while (isRunning()) + while (true) { long counts = _counts.get(); - int threads = AtomicTriInteger.getWord0(counts); - int starting = AtomicTriInteger.getWord1(counts); - int idle = AtomicTriInteger.getWord2(counts); - int queue = getQueueSize(); + int threads = AtomicBiInteger.getHi(counts); + if (threads == Integer.MIN_VALUE) + break; - if (threads >= _maxThreads) - break; - if (threads >= _minThreads && (starting + idle) >= queue) - break; - if (!_counts.compareAndSet(counts, threads + 1, starting + 1, idle)) + // If we have less than min threads + // OR insufficient idle threads to meet demand + int idle = AtomicBiInteger.getLo(counts); + if (threads < _minThreads || (idle < 0 && threads < _maxThreads)) + { + // Then try to start a thread. + if (_counts.compareAndSet(counts, threads + 1, idle)) + startThread(); + // Otherwise continue to check state again. continue; - - boolean started = false; - try - { - if (LOG.isDebugEnabled()) - LOG.debug("Starting thread {}",this); - - Thread thread = newThread(_runnable); - thread.setDaemon(isDaemon()); - thread.setPriority(getThreadsPriority()); - thread.setName(_name + "-" + thread.getId()); - if (LOG.isDebugEnabled()) - LOG.debug("Starting {}", thread); - _threads.add(thread); - _lastShrink.set(System.nanoTime()); - thread.start(); - started = true; - } - finally - { - if (!started) - _counts.add(-1,-1,0); // threads, starting, idle } + break; + } + } + + protected void startThread() + { + boolean started = false; + try + { + Thread thread = newThread(_runnable); + thread.setDaemon(isDaemon()); + thread.setPriority(getThreadsPriority()); + thread.setName(_name + "-" + thread.getId()); + if (LOG.isDebugEnabled()) + LOG.debug("Starting {}", thread); + _threads.add(thread); + _lastShrink.set(System.nanoTime()); + thread.start(); + started = true; + } + finally + { + if (!started) + addCounts(-1, 0); // threads, idle + } + } + + private boolean addCounts(int deltaThreads, int deltaIdle) + { + while (true) + { + long encoded = _counts.get(); + int threads = AtomicBiInteger.getHi(encoded); + int idle = AtomicBiInteger.getLo(encoded); + if (threads == Integer.MIN_VALUE) // This is a marker that the pool is stopped. + return false; + long update = AtomicBiInteger.encode(threads + deltaThreads, idle + deltaIdle); + if (_counts.compareAndSet(encoded, update)) + return true; } } @@ -621,19 +687,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP knownMethod = "IDLE "; break; } - + if ("reservedWait".equals(t.getMethodName()) && t.getClassName().endsWith("ReservedThread")) { knownMethod = "RESERVED "; break; } - + if ("select".equals(t.getMethodName()) && t.getClassName().endsWith("SelectorProducer")) { knownMethod = "SELECTING "; break; } - + if ("accept".equals(t.getMethodName()) && t.getClassName().contains("ServerConnector")) { knownMethod = "ACCEPTING "; @@ -664,8 +730,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP } else { - int p=thread.getPriority(); - threads.add(thread.getId() + " " + thread.getName() + " " + known + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (p==Thread.NORM_PRIORITY?"":(" prio="+p))); + int p = thread.getPriority(); + threads.add(thread.getId() + " " + thread.getName() + " " + known + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (p == Thread.NORM_PRIORITY ? "" : (" prio=" + p))); } } @@ -684,12 +750,11 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP public String toString() { long count = _counts.get(); - int threads = AtomicTriInteger.getWord0(count); - int starting = AtomicTriInteger.getWord1(count); - int idle = AtomicTriInteger.getWord2(count); + int threads = Math.max(0, AtomicBiInteger.getHi(count)); + int idle = Math.max(0, AtomicBiInteger.getLo(count)); int queue = getQueueSize(); - return String.format("%s[%s]@%x{%s,%d<=%d<=%d,s=%d,i=%d,r=%d,q=%d}[%s]", + return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]", getClass().getSimpleName(), _name, hashCode(), @@ -697,7 +762,6 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP getMinThreads(), threads, getMaxThreads(), - starting, idle, getReservedThreads(), queue, @@ -768,7 +832,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP buf.append(thread.getId()).append(" ").append(thread.getName()).append(" "); buf.append(thread.getState()).append(":").append(System.lineSeparator()); for (StackTraceElement element : thread.getStackTrace()) + { buf.append(" at ").append(element.toString()).append(System.lineSeparator()); + } return buf.toString(); } } @@ -777,102 +843,110 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP private class Runner implements Runnable { - @Override - public void run() - { - boolean idle = false; - Runnable job = null; - - try - { - job = _jobs.poll(); - idle = job==null; - _counts.add(0,-1,idle?1:0); // threads, starting, idle - - if (LOG.isDebugEnabled()) - LOG.debug("Runner started with {} for {}", job, QueuedThreadPool.this); - - while (true) - { - if (job == null) - { - if (!idle) - { - idle = true; - _counts.add(0,0,1); // threads, starting, idle - } - - long idleTimeout = getIdleTimeout(); - job = idleJobPoll(idleTimeout); - - // maybe we should shrink? - if (job == null && getThreads() > _minThreads && idleTimeout > 0) - { - long last = _lastShrink.get(); - long now = System.nanoTime(); - if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout)) - { - if (_lastShrink.compareAndSet(last, now)) - { - if (LOG.isDebugEnabled()) - LOG.debug("shrinking {}", QueuedThreadPool.this); - break; - } - } - } - } - - // run job - if (job != null) - { - if (idle) - { - idle = false; - _counts.add(0,0,-1); // threads, starting, idle - } - - if (LOG.isDebugEnabled()) - LOG.debug("run {} in {}", job, QueuedThreadPool.this); - runJob(job); - if (LOG.isDebugEnabled()) - LOG.debug("ran {} in {}", job, QueuedThreadPool.this); - - // Clear interrupted status - Thread.interrupted(); - } - - if (!isRunning()) - break; - - job = _jobs.poll(); - } - } - catch (InterruptedException e) - { - if (LOG.isDebugEnabled()) - LOG.debug("interrupted {} in {}", job, QueuedThreadPool.this); - LOG.ignore(e); - } - catch (Throwable e) - { - LOG.warn(String.format("Unexpected thread death: %s in %s", job, QueuedThreadPool.this), e); - } - finally - { - _counts.add(-1,0,idle?-1:0); // threads, starting, idle - removeThread(Thread.currentThread()); - ensureThreads(); - - if (LOG.isDebugEnabled()) - LOG.debug("Runner exited for {}", QueuedThreadPool.this); - } - } - private Runnable idleJobPoll(long idleTimeout) throws InterruptedException { if (idleTimeout <= 0) return _jobs.take(); return _jobs.poll(idleTimeout, TimeUnit.MILLISECONDS); } + + @Override + public void run() + { + if (LOG.isDebugEnabled()) + LOG.debug("Runner started for {}", QueuedThreadPool.this); + + Runnable job = null; + + try + { + // All threads start idle (not yet taken a job) + if (!addCounts(0, 1)) + return; + + while (true) + { + // If we had a job, signal that we are idle again + if (job != null) + { + if (!addCounts(0, 1)) + break; + } + // else check we are still running + else if (_counts.getHi() == Integer.MIN_VALUE) + { + break; + } + + try + { + // Look for an immediately available job + job = _jobs.poll(); + if (job == null) + { + // Wait for a job + long idleTimeout = getIdleTimeout(); + job = idleJobPoll(idleTimeout); + + // If still no job? + if (job == null) + { + // maybe we should shrink + if (getThreads() > _minThreads && idleTimeout > 0) + { + long last = _lastShrink.get(); + long now = System.nanoTime(); + if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout)) + { + if (_lastShrink.compareAndSet(last, now)) + { + if (LOG.isDebugEnabled()) + LOG.debug("shrinking {}", QueuedThreadPool.this); + break; + } + } + } + // continue to try again + continue; + } + } + + // run job + if (LOG.isDebugEnabled()) + LOG.debug("run {} in {}", job, QueuedThreadPool.this); + runJob(job); + if (LOG.isDebugEnabled()) + LOG.debug("ran {} in {}", job, QueuedThreadPool.this); + + // Clear any interrupted status + Thread.interrupted(); + } + catch (InterruptedException e) + { + if (LOG.isDebugEnabled()) + LOG.debug("interrupted {} in {}", job, QueuedThreadPool.this); + LOG.ignore(e); + } + catch (Throwable e) + { + LOG.warn(e); + } + } + } + finally + { + Thread thread = Thread.currentThread(); + removeThread(thread); + + // Decrement the total thread count and the idle count if we had no job + addCounts(-1, job == null ? -1 : 0); + if (LOG.isDebugEnabled()) + LOG.debug("{} exited for {}", thread, QueuedThreadPool.this); + + // There is a chance that we shrunk just as a job was queued for us, so + // check again if we have sufficient threads to meet demand + ensureThreads(); + } + } } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ScheduledExecutorScheduler.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ScheduledExecutorScheduler.java old mode 100755 new mode 100644 index 0ad6b76bb86..a0c889d162c --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ScheduledExecutorScheduler.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ScheduledExecutorScheduler.java @@ -21,7 +21,6 @@ package org.eclipse.jetty.util.thread; import java.io.IOException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.util.component.AbstractLifeCycle; @@ -47,13 +46,13 @@ public class ScheduledExecutorScheduler extends AbstractLifeCycle implements Sch public ScheduledExecutorScheduler() { this(null, false); - } + } public ScheduledExecutorScheduler(String name, boolean daemon) { - this (name,daemon, Thread.currentThread().getContextClassLoader()); + this(name, daemon, Thread.currentThread().getContextClassLoader()); } - + public ScheduledExecutorScheduler(String name, boolean daemon, ClassLoader threadFactoryClassLoader) { this(name, daemon, threadFactoryClassLoader, null); @@ -70,16 +69,12 @@ public class ScheduledExecutorScheduler extends AbstractLifeCycle implements Sch @Override protected void doStart() throws Exception { - scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory() + scheduler = new ScheduledThreadPoolExecutor(1, r -> { - @Override - public Thread newThread(Runnable r) - { - Thread thread = ScheduledExecutorScheduler.this.thread = new Thread(threadGroup, r, name); - thread.setDaemon(daemon); - thread.setContextClassLoader(classloader); - return thread; - } + Thread thread = ScheduledExecutorScheduler.this.thread = new Thread(threadGroup, r, name); + thread.setDaemon(daemon); + thread.setContextClassLoader(classloader); + return thread; }); scheduler.setRemoveOnCancelPolicy(true); super.doStart(); @@ -97,8 +92,8 @@ public class ScheduledExecutorScheduler extends AbstractLifeCycle implements Sch public Task schedule(Runnable task, long delay, TimeUnit unit) { ScheduledThreadPoolExecutor s = scheduler; - if (s==null) - return ()->false; + if (s == null) + return () -> false; ScheduledFuture result = s.schedule(task, delay, unit); return new ScheduledFutureTask(result); } @@ -116,7 +111,7 @@ public class ScheduledExecutorScheduler extends AbstractLifeCycle implements Sch if (thread == null) Dumpable.dumpObject(out, this); else - Dumpable.dumpObjects(out,indent,this, (Object[])thread.getStackTrace()); + Dumpable.dumpObjects(out, indent, this, (Object[])thread.getStackTrace()); } private static class ScheduledFutureTask implements Task diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/AtomicTriIntegerTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/AtomicTriIntegerTest.java deleted file mode 100644 index cd1bd5ade8c..00000000000 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/AtomicTriIntegerTest.java +++ /dev/null @@ -1,103 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.util; - -import org.junit.jupiter.api.Test; - -import static org.eclipse.jetty.util.AtomicTriInteger.MAX_VALUE; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class AtomicTriIntegerTest -{ - - @Test - public void testBitOperations() - { - long encoded; - - encoded = AtomicTriInteger.encode(0,0,0); - assertThat(AtomicTriInteger.getWord0(encoded),is(0)); - assertThat(AtomicTriInteger.getWord1(encoded),is(0)); - assertThat(AtomicTriInteger.getWord2(encoded),is(0)); - - encoded = AtomicTriInteger.encode(1,2,3); - assertThat(AtomicTriInteger.getWord0(encoded),is(1)); - assertThat(AtomicTriInteger.getWord1(encoded),is(2)); - assertThat(AtomicTriInteger.getWord2(encoded),is(3)); - - encoded = AtomicTriInteger.encode(MAX_VALUE, MAX_VALUE, MAX_VALUE); - assertThat(AtomicTriInteger.getWord0(encoded),is(MAX_VALUE)); - assertThat(AtomicTriInteger.getWord1(encoded),is(MAX_VALUE)); - assertThat(AtomicTriInteger.getWord2(encoded),is(MAX_VALUE)); - - assertThrows(IllegalArgumentException.class,()-> AtomicTriInteger.encode(-1, MAX_VALUE, MAX_VALUE)); - assertThrows(IllegalArgumentException.class,()-> AtomicTriInteger.encode(MAX_VALUE, -1, MAX_VALUE)); - assertThrows(IllegalArgumentException.class,()-> AtomicTriInteger.encode(MAX_VALUE, MAX_VALUE, -1)); - assertThrows(IllegalArgumentException.class,()-> AtomicTriInteger.encode(MAX_VALUE+1, MAX_VALUE, MAX_VALUE)); - assertThrows(IllegalArgumentException.class,()-> AtomicTriInteger.encode(MAX_VALUE, MAX_VALUE+1, MAX_VALUE)); - assertThrows(IllegalArgumentException.class,()-> AtomicTriInteger.encode(MAX_VALUE, MAX_VALUE, MAX_VALUE+1)); - } - - @Test - public void testSetGet() - { - AtomicTriInteger ati = new AtomicTriInteger(); - ati.set(1,2,3); - assertThat(ati.getWord0(),is(1)); - assertThat(ati.getWord1(),is(2)); - assertThat(ati.getWord2(),is(3)); - } - - @Test - public void testCompareAndSet() - { - AtomicTriInteger ati = new AtomicTriInteger(); - ati.set(1,2,3); - long value = ati.get(); - - ati.set(2,3,4); - assertFalse(ati.compareAndSet(value,5,6,7)); - assertThat(ati.getWord0(),is(2)); - assertThat(ati.getWord1(),is(3)); - assertThat(ati.getWord2(),is(4)); - - value = ati.get(); - assertTrue(ati.compareAndSet(value,6,7,8)); - assertThat(ati.getWord0(),is(6)); - assertThat(ati.getWord1(),is(7)); - assertThat(ati.getWord2(),is(8)); - } - - - @Test - public void testAdd() - { - AtomicTriInteger ati = new AtomicTriInteger(); - ati.set(1,2,3); - ati.add(-1,-2,4); - assertThat(ati.getWord0(),is(0)); - assertThat(ati.getWord1(),is(0)); - assertThat(ati.getWord2(),is(7)); - } - -} diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java index 0d59a1ec4af..3a35ff6fb92 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java @@ -18,12 +18,9 @@ package org.eclipse.jetty.util; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.ListIterator; import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue; @@ -31,10 +28,16 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - +import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledIfSystemProperty; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + public class BlockingArrayQueueTest { @Test @@ -494,4 +497,28 @@ public class BlockingArrayQueueTest assertTrue(iterator.hasNext()); assertFalse(iterator.hasPrevious()); } + + + @Test + public void testDrainTo() throws Exception + { + BlockingArrayQueue queue = new BlockingArrayQueue<>(); + queue.add("one"); + queue.add("two"); + queue.add("three"); + queue.add("four"); + queue.add("five"); + queue.add("six"); + + List to = new ArrayList<>(); + queue.drainTo(to,3); + assertThat(to, Matchers.contains("one", "two", "three")); + assertThat(queue.size(),Matchers.is(3)); + assertThat(queue, Matchers.contains("four", "five", "six")); + + queue.drainTo(to); + assertThat(to, Matchers.contains("one", "two", "three", "four", "five", "six")); + assertThat(queue.size(),Matchers.is(0)); + assertThat(queue, Matchers.empty()); + } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java index 1f00ac25f9c..e03322d7b74 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java @@ -181,6 +181,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest RunningJob job4=new RunningJob("JOB4"); tp.execute(job4); assertFalse(job4._run.await(1,TimeUnit.SECONDS)); + assertThat(tp.getThreads(),is(4)); // finish job 0 job0._stopping.countDown(); @@ -214,19 +215,21 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest duration = System.nanoTime() - duration; assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout()/2L)); assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout()*2L)); + + tp.stop(); } @Test public void testThreadPoolFailingJobs() throws Exception { + QueuedThreadPool tp= new QueuedThreadPool(); + tp.setMinThreads(2); + tp.setMaxThreads(4); + tp.setIdleTimeout(900); + tp.setThreadsPriority(Thread.NORM_PRIORITY-1); + try (StacklessLogging stackless = new StacklessLogging(QueuedThreadPool.class)) { - QueuedThreadPool tp= new QueuedThreadPool(); - tp.setMinThreads(2); - tp.setMaxThreads(4); - tp.setIdleTimeout(900); - tp.setThreadsPriority(Thread.NORM_PRIORITY-1); - tp.start(); // min threads started @@ -297,8 +300,10 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest assertTrue(job4._stopped.await(10,TimeUnit.SECONDS)); waitForIdle(tp,2); - assertThat(tp.getThreads(),is(2)); + waitForThreads(tp,2); } + + tp.stop(); } @Test @@ -340,6 +345,8 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest RunningJob job4 = new RunningJob(); tp.execute(job4); assertTrue(job4._run.await(5, TimeUnit.SECONDS)); + + tp.stop(); } @Test @@ -364,7 +371,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest tp.execute(job0); tp.execute(job1); - // Add a more jobs (which should not be run) + // Add more jobs (which should not be run) RunningJob job2=new RunningJob(); CloseableJob job3=new CloseableJob(); RunningJob job4=new RunningJob(); @@ -391,6 +398,8 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest // Verify ClosableJobs have not been run but have been closed assertThat(job4._run.await(200, TimeUnit.MILLISECONDS), is(false)); assertThat(job3._closed.await(200, TimeUnit.MILLISECONDS), is(true)); + + tp.stop(); } @@ -437,6 +446,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest } waitForThreads(tp,2); waitForIdle(tp,2); + tp.stop(); } @Test @@ -484,6 +494,27 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest assertEquals(idle, tp.getIdleThreads()); } + + private void waitForReserved(QueuedThreadPool tp, int reserved) + { + long now=TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + long start=now; + ReservedThreadExecutor reservedThreadExecutor = tp.getBean(ReservedThreadExecutor.class); + while (reservedThreadExecutor.getAvailable()!=reserved && (now-start)<10000) + { + try + { + Thread.sleep(50); + } + catch(InterruptedException ignored) + { + } + now=TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + } + assertEquals(reserved, reservedThreadExecutor.getAvailable()); + } + + private void waitForThreads(QueuedThreadPool tp, int threads) { long now=TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); @@ -520,6 +551,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest Thread.sleep(100); assertThat(tp.getThreads(),greaterThanOrEqualTo(5)); } + tp.stop(); } @Test @@ -548,24 +580,26 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest public void testDump() throws Exception { QueuedThreadPool pool = new QueuedThreadPool(4, 3); + pool.setIdleTimeout(10000); String dump = pool.dump(); // TODO use hamcrest 2.0 regex matcher assertThat(dump,containsString("STOPPED")); - assertThat(dump,containsString(",3<=0<=4,s=0,i=0,r=-1,q=0")); + assertThat(dump,containsString(",3<=0<=4,i=0,r=-1,q=0")); assertThat(dump,containsString("[NO_TRY]")); pool.setReservedThreads(2); dump = pool.dump(); assertThat(dump,containsString("STOPPED")); - assertThat(dump,containsString(",3<=0<=4,s=0,i=0,r=2,q=0")); + assertThat(dump,containsString(",3<=0<=4,i=0,r=2,q=0")); assertThat(dump,containsString("[NO_TRY]")); pool.start(); waitForIdle(pool,3); + Thread.sleep(250); // TODO need to give time for threads to read idle poll after setting idle dump = pool.dump(); assertThat(count(dump," - STARTED"),is(2)); - assertThat(dump,containsString(",3<=3<=4,s=0,i=3,r=2,q=0")); + assertThat(dump,containsString(",3<=3<=4,i=3,r=2,q=0")); assertThat(dump,containsString("[ReservedThreadExecutor@")); assertThat(count(dump," IDLE "),is(3)); assertThat(count(dump," RESERVED "),is(0)); @@ -585,10 +619,10 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest } }); started.await(); - + Thread.sleep(250); // TODO need to give time for threads to read idle poll after setting idle dump = pool.dump(); assertThat(count(dump," - STARTED"),is(2)); - assertThat(dump,containsString(",3<=3<=4,s=0,i=2,r=2,q=0")); + assertThat(dump,containsString(",3<=3<=4,i=2,r=2,q=0")); assertThat(dump,containsString("[ReservedThreadExecutor@")); assertThat(count(dump," IDLE "),is(2)); assertThat(count(dump," WAITING "),is(1)); @@ -598,7 +632,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest pool.setDetailedDump(true); dump = pool.dump(); assertThat(count(dump," - STARTED"),is(2)); - assertThat(dump,containsString(",3<=3<=4,s=0,i=2,r=2,q=0")); + assertThat(dump,containsString(",3<=3<=4,i=2,r=2,q=0")); assertThat(dump,containsString("s=0/2")); assertThat(dump,containsString("[ReservedThreadExecutor@")); assertThat(count(dump," IDLE "),is(2)); @@ -607,12 +641,11 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest assertThat(count(dump,"QueuedThreadPoolTest.lambda$testDump$"),is(1)); assertFalse(pool.tryExecute(()->{})); - while(pool.getIdleThreads()==2) - Thread.sleep(10); - + waitForReserved(pool,1); + Thread.sleep(250); // TODO need to give time for threads to read idle poll after setting idle dump = pool.dump(); assertThat(count(dump," - STARTED"),is(2)); - assertThat(dump,containsString(",3<=3<=4,s=0,i=1,r=2,q=0")); + assertThat(dump,containsString(",3<=3<=4,i=1,r=2,q=0")); assertThat(dump,containsString("s=1/2")); assertThat(dump,containsString("[ReservedThreadExecutor@")); assertThat(count(dump," IDLE "),is(1));