diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicWords.java b/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicWords.java new file mode 100644 index 00000000000..5a3e0c638f7 --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/AtomicWords.java @@ -0,0 +1,182 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * An AtomicLong with additional methods to treat it as two hi/lo integers. + */ +public class AtomicWords extends AtomicLong +{ + /** + * Sets the hi and lo values. + * + * @param w0 the 0th word + * @param w1 the 1st word + * @param w2 the 2nd word + * @param w3 the 3rd word + */ + public void set(int w0, int w1, int w2, int w3) + { + set(encode(w0, w1, w2, w3)); + } + + /** + * Atomically sets the word values to the given updated values only if + * the current encoded value is as expected. + * + * @param expectEncoded the expected encoded value + * @param w0 the 0th word + * @param w1 the 1st word + * @param w2 the 2nd word + * @param w3 the 3rd word + * @return {@code true} if successful. + */ + public boolean compareAndSet(long expectEncoded, int w0, int w1, int w2, int w3) + { + return compareAndSet(expectEncoded,encode(w0, w1, w2, w3)); + } + + /** + * Atomically adds the given deltas to the current hi and lo values. + * + * @param delta0 the delta to apply to the 0th word value + * @param delta1 the delta to apply to the 1st word value + * @param delta2 the delta to apply to the 2nd word value + * @param delta3 the delta to apply to the 3rd word value + */ + public void add(int delta0, int delta1, int delta2, int delta3) + { + while(true) + { + long encoded = get(); + long update = encode(getWord0(encoded)+delta0, + getWord1(encoded)+delta1, + getWord2(encoded)+delta2, + getWord3(encoded)+delta3); + if (compareAndSet(encoded,update)) + return; + } + } + + + /** + * Gets word 0 value + * + * @return the 16 bit value as an int + */ + public int getWord0() + { + return getWord0(get()); + } + + + /** + * Gets word 1 value + * + * @return the 16 bit value as an int + */ + public int getWord1() + { + return getWord1(get()); + } + + /** + * Gets word 2 value + * + * @return the 16 bit value as an int + */ + public int getWord2() + { + return getWord2(get()); + } + + /** + * Gets word 3 value + * + * @return the 16 bit value as an int + */ + public int getWord3() + { + return getWord3(get()); + } + + /** + * Gets word 0 value from the given encoded value. + * + * @param encoded the encoded value + * @return the 16 bit value as an int + */ + public static int getWord0(long encoded) + { + return (int) ((encoded>>48)&0xFFFFL); + } + + /** + * Gets word 0 value from the given encoded value. + * + * @param encoded the encoded value + * @return the 16 bit value as an int + */ + public static int getWord1(long encoded) + { + return (int) ((encoded>>32)&0xFFFFL); + } + + /** + * Gets word 0 value from the given encoded value. + * + * @param encoded the encoded value + * @return the 16 bit value as an int + */ + public static int getWord2(long encoded) + { + return (int) ((encoded>>16)&0xFFFFL); + } + + /** + * Gets word 0 value from the given encoded value. + * + * @param encoded the encoded value + * @return the 16 bit value as an int + */ + public static int getWord3(long encoded) + { + return (int) (encoded&0xFFFFL); + } + + /** + * Encodes 4 16 bit words values into a long. + * + * @param w0 the 0th word + * @param w1 the 1st word + * @param w2 the 2nd word + * @param w3 the 3rd word + * @return the encoded value + */ + public static long encode(int w0, int w1, int w2, int w3) + { + long wl0 = ((long)w0)&0xFFFFL; + long wl1 = ((long)w1)&0xFFFFL; + long wl2 = ((long)w2)&0xFFFFL; + long wl3 = ((long)w3)&0xFFFFL; + return (wl0<<48)+(wl1<<32)+(wl2<<16)+wl3; + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 131bcd14f09..a2a5653a4b4 100755 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -27,9 +27,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.eclipse.jetty.util.AtomicWords; import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.annotation.ManagedAttribute; @@ -49,8 +49,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP { private static final Logger LOG = Log.getLogger(QueuedThreadPool.class); - private final AtomicInteger _threadsStarted = new AtomicInteger(); - private final AtomicInteger _threadsIdle = new AtomicInteger(); + private final AtomicWords _counts = new AtomicWords(); private final AtomicLong _lastShrink = new AtomicLong(); private final Set _threads = ConcurrentHashMap.newKeySet(); private final Object _joinLock = new Object(); @@ -140,9 +139,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP addBean(_tryExecutor); super.doStart(); - _threadsStarted.set(0); - - startThreads(_minThreads); + _counts.set(0,0,0,0); + ensureThreads(); } @Override @@ -165,42 +163,25 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP // Fill job Q with noop jobs to wakeup idle Runnable noop = () -> {}; - for (int i = _threadsStarted.get(); i-- > 0; ) + for (int i = _counts.getWord0(); i-- > 0; ) jobs.offer(noop); // try to let jobs complete naturally for half our stop time - long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2; - for (Thread thread : _threads) - { - long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime()); - if (LOG.isDebugEnabled()) - LOG.debug("Waiting for {} for {}", thread, canwait); - if (canwait > 0) - thread.join(canwait); - } + joinThreads(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2); // If we still have threads running, get a bit more aggressive // interrupt remaining threads - if (_threadsStarted.get() > 0) - for (Thread thread : _threads) - { - if (LOG.isDebugEnabled()) - LOG.debug("Interrupting {}", thread); - thread.interrupt(); - } - - // wait again for the other half of our stop time - stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2; for (Thread thread : _threads) { - long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime()); if (LOG.isDebugEnabled()) - LOG.debug("Waiting for {} for {}", thread, canwait); - if (canwait > 0) - thread.join(canwait); + LOG.debug("Interrupting {}", thread); + thread.interrupt(); } + // wait again for the other half of our stop time + joinThreads(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2); + Thread.yield(); int size = _threads.size(); if (size > 0) @@ -254,6 +235,18 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP } } + private void joinThreads(long stopByNanos) throws InterruptedException + { + for (Thread thread : _threads) + { + long canWait = TimeUnit.NANOSECONDS.toMillis(stopByNanos - System.nanoTime()); + if (LOG.isDebugEnabled()) + LOG.debug("Waiting for {} for {}", thread, canWait); + if (canWait > 0) + thread.join(canWait); + } + } + /** * Thread Pool should use Daemon Threading. * @@ -308,9 +301,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP if (_minThreads > _maxThreads) _maxThreads = _minThreads; - int threads = _threadsStarted.get(); - if (isStarted() && threads < _minThreads) - startThreads(_minThreads - threads); + if (isStarted()) + ensureThreads(); } /** @@ -471,12 +463,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP LOG.warn("{} rejected {}", this, job); throw new RejectedExecutionException(job.toString()); } - else - { - // Make sure there is at least one thread executing the job. - if (getQueueSize() > 0 && getIdleThreads() == 0) - startThreads(1); - } + + // Make sure there is at least one thread executing the job. + ensureThreads(); } @Override @@ -509,7 +498,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP @ManagedAttribute("number of threads in the pool") public int getThreads() { - return _threadsStarted.get(); + return _counts.getWord0(); } /** @@ -519,7 +508,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP @ManagedAttribute("number of idle threads in the pool") public int getIdleThreads() { - return _threadsIdle.get(); + return _counts.getWord3(); } /** @@ -549,20 +538,29 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP return getMaxThreads() - getThreads() + getIdleThreads() - getQueueSize() <= getLowThreadsThreshold(); } - private boolean startThreads(int threadsToStart) + private void ensureThreads() { - while (threadsToStart > 0 && isRunning()) + while (isRunning()) { - int threads = _threadsStarted.get(); - if (threads >= _maxThreads) - return false; + long count = _counts.get(); + int threads = AtomicWords.getWord0(count); + int starting = AtomicWords.getWord1(count); + int idle = AtomicWords.getWord3(count); + int queue = getQueueSize(); - if (!_threadsStarted.compareAndSet(threads, threads + 1)) + if (threads >= _maxThreads) + break; + if (threads >= _minThreads && (starting + idle) >= queue) + break; + if (!_counts.compareAndSet(count, threads + 1, starting + 1, 0, idle)) continue; boolean started = false; try { + if (LOG.isDebugEnabled()) + LOG.debug("Starting thread {}",this); + Thread thread = newThread(_runnable); thread.setDaemon(isDaemon()); thread.setPriority(getThreadsPriority()); @@ -573,15 +571,13 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP _lastShrink.set(System.nanoTime()); thread.start(); started = true; - --threadsToStart; } finally { if (!started) - _threadsStarted.decrementAndGet(); + _counts.add(-1,-1,0,0); } } - return true; } protected Thread newThread(Runnable runnable) @@ -671,17 +667,24 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP @Override public String toString() { - return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]", + long count = _counts.get(); + int threads = AtomicWords.getWord0(count); + int starting = AtomicWords.getWord1(count); + int idle = AtomicWords.getWord3(count); + int queue = getQueueSize(); + + return String.format("%s[%s]@%x{%s,%d<=%d<=%d,s=%d,i=%d,r=%d,q=%d}[%s]", getClass().getSimpleName(), _name, hashCode(), getState(), getMinThreads(), - getThreads(), + threads, getMaxThreads(), - getIdleThreads(), + starting, + idle, getReservedThreads(), - _jobs.size(), + queue, _tryExecutor); } @@ -756,19 +759,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP return null; } - private static Runnable SHRINK = ()->{}; private class Runner implements Runnable { @Override public void run() { boolean idle = false; + Runnable job = null; try { - Runnable job = _jobs.poll(); - if (job != null && _threadsIdle.get() == 0) - startThreads(1); + job = _jobs.poll(); + idle = job==null; + _counts.add(0,-1,0,idle?1:0); while (true) { @@ -777,15 +780,29 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP if (!idle) { idle = true; - _threadsIdle.incrementAndGet(); + _counts.add(0,0,0,1); } job = idleJobPoll(); - if (job == SHRINK) + + if (job == null) { - if (LOG.isDebugEnabled()) - LOG.debug("shrinking {}", this); - break; + // maybe we should shrink? + int size = getThreads(); + if (size > _minThreads) + { + long last = _lastShrink.get(); + long now = System.nanoTime(); + if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout)) + { + if (_lastShrink.compareAndSet(last, now)) + { + if (LOG.isDebugEnabled()) + LOG.debug("shrinking {}", QueuedThreadPool.this); + break; + } + } + } } } @@ -795,15 +812,14 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP if (idle) { idle = false; - if (_threadsIdle.decrementAndGet() == 0) - startThreads(1); + _counts.add(0,0,0,-1); } if (LOG.isDebugEnabled()) - LOG.debug("run {}", job); + LOG.debug("run {} in {}", job, QueuedThreadPool.this); runJob(job); if (LOG.isDebugEnabled()) - LOG.debug("ran {}", job); + LOG.debug("ran {} in {}", job, QueuedThreadPool.this); // Clear interrupted status Thread.interrupted(); @@ -821,19 +837,13 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP } catch (Throwable e) { - LOG.warn(String.format("Unexpected thread death: %s in %s", this, QueuedThreadPool.this), e); + LOG.warn(String.format("Unexpected thread death: %s in %s", job, QueuedThreadPool.this), e); } finally { - if (idle) - _threadsIdle.decrementAndGet(); - + _counts.add(-1,0,0,idle?-1:0); removeThread(Thread.currentThread()); - - int threads = _threadsStarted.decrementAndGet(); - // We should start a new thread if threads are now less than min threads or we have queued jobs - if (threads < getMinThreads() || getQueueSize()>0) - startThreads(1); + ensureThreads(); } } @@ -841,20 +851,6 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP { if (_idleTimeout <= 0) return _jobs.take(); - - // maybe we should shrink? - int size = _threadsStarted.get(); - if (size > _minThreads) - { - long last = _lastShrink.get(); - long now = System.nanoTime(); - if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout)) - { - if (_lastShrink.compareAndSet(last, now)) - return SHRINK; - } - } - return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS); } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java index 06e28fc4ee9..1f00ac25f9c 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java @@ -28,6 +28,7 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; +import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; @@ -50,14 +51,26 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest private final CountDownLatch _run = new CountDownLatch(1); private final CountDownLatch _stopping = new CountDownLatch(1); private final CountDownLatch _stopped = new CountDownLatch(1); + private final String _name; private final boolean _fail; RunningJob() { - this(false); + this(null, false); + } + + public RunningJob(String name) + { + this(name, false); } public RunningJob(boolean fail) { + this(null, fail); + } + + public RunningJob(String name, boolean fail) + { + _name = name; _fail = fail; } @@ -93,6 +106,14 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest if (!_stopped.await(10,TimeUnit.SECONDS)) throw new IllegalStateException(); } + + @Override + public String toString() + { + if (_name==null) + return super.toString(); + return String.format("%s@%x",_name,hashCode()); + } } private class CloseableJob extends RunningJob implements Closeable @@ -121,42 +142,43 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest waitForThreads(tp,2); waitForIdle(tp,2); - // Doesn't shrink less than 1 - Thread.sleep(1100); + // Doesn't shrink to less than min threads + Thread.sleep(3*tp.getIdleTimeout()/2); waitForThreads(tp,2); waitForIdle(tp,2); // Run job0 - RunningJob job0=new RunningJob(); + RunningJob job0=new RunningJob("JOB0"); tp.execute(job0); assertTrue(job0._run.await(10,TimeUnit.SECONDS)); waitForIdle(tp,1); // Run job1 - RunningJob job1=new RunningJob(); + RunningJob job1=new RunningJob("JOB1"); tp.execute(job1); assertTrue(job1._run.await(10,TimeUnit.SECONDS)); - waitForThreads(tp,3); - waitForIdle(tp,1); + waitForThreads(tp,2); + waitForIdle(tp,0); // Run job2 - RunningJob job2=new RunningJob(); + RunningJob job2=new RunningJob("JOB2"); tp.execute(job2); assertTrue(job2._run.await(10,TimeUnit.SECONDS)); - waitForThreads(tp,4); - waitForIdle(tp,1); + waitForThreads(tp,3); + waitForIdle(tp,0); // Run job3 - RunningJob job3=new RunningJob(); + RunningJob job3=new RunningJob("JOB3"); tp.execute(job3); assertTrue(job3._run.await(10,TimeUnit.SECONDS)); waitForThreads(tp,4); + waitForIdle(tp,0); assertThat(tp.getIdleThreads(),is(0)); Thread.sleep(100); assertThat(tp.getIdleThreads(),is(0)); // Run job4. will be queued - RunningJob job4=new RunningJob(); + RunningJob job4=new RunningJob("JOB4"); tp.execute(job4); assertFalse(job4._run.await(1,TimeUnit.SECONDS)); @@ -166,21 +188,32 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest // job4 should now run assertTrue(job4._run.await(10,TimeUnit.SECONDS)); - waitForThreads(tp,4); - waitForIdle(tp,0); - - // finish job 1,2,3,4 + assertThat(tp.getThreads(),is(4)); + assertThat(tp.getIdleThreads(),is(0)); + + // finish job 1 job1._stopping.countDown(); + assertTrue(job1._stopped.await(10,TimeUnit.SECONDS)); + waitForIdle(tp,1); + assertThat(tp.getThreads(),is(4)); + + // finish job 2,3,4 job2._stopping.countDown(); job3._stopping.countDown(); job4._stopping.countDown(); - assertTrue(job1._stopped.await(10,TimeUnit.SECONDS)); assertTrue(job2._stopped.await(10,TimeUnit.SECONDS)); assertTrue(job3._stopped.await(10,TimeUnit.SECONDS)); assertTrue(job4._stopped.await(10,TimeUnit.SECONDS)); - waitForThreads(tp,2); - waitForIdle(tp,2); + waitForIdle(tp,4); + assertThat(tp.getThreads(),is(4)); + + long duration = System.nanoTime(); + waitForThreads(tp,3); + assertThat(tp.getIdleThreads(),is(3)); + duration = System.nanoTime() - duration; + assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout()/2L)); + assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout()*2L)); } @Test @@ -188,78 +221,83 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest { try (StacklessLogging stackless = new StacklessLogging(QueuedThreadPool.class)) { - QueuedThreadPool tp = new QueuedThreadPool(); + QueuedThreadPool tp= new QueuedThreadPool(); tp.setMinThreads(2); tp.setMaxThreads(4); tp.setIdleTimeout(900); - tp.setThreadsPriority(Thread.NORM_PRIORITY - 1); + tp.setThreadsPriority(Thread.NORM_PRIORITY-1); tp.start(); // min threads started - waitForThreads(tp, 2); - waitForIdle(tp, 2); + waitForThreads(tp,2); + waitForIdle(tp,2); - // Doesn't shrink less than 1 - Thread.sleep(1100); - waitForThreads(tp, 2); - waitForIdle(tp, 2); + // Doesn't shrink to less than min threads + Thread.sleep(3*tp.getIdleTimeout()/2); + waitForThreads(tp,2); + waitForIdle(tp,2); // Run job0 - RunningJob job0 = new RunningJob(true); + RunningJob job0=new RunningJob("JOB0", true); tp.execute(job0); - assertTrue(job0._run.await(10, TimeUnit.SECONDS)); - waitForIdle(tp, 1); + assertTrue(job0._run.await(10,TimeUnit.SECONDS)); + waitForIdle(tp,1); // Run job1 - RunningJob job1 = new RunningJob(true); + RunningJob job1=new RunningJob("JOB1", true); tp.execute(job1); - assertTrue(job1._run.await(10, TimeUnit.SECONDS)); - waitForThreads(tp, 3); - waitForIdle(tp, 1); + assertTrue(job1._run.await(10,TimeUnit.SECONDS)); + waitForThreads(tp,2); + waitForIdle(tp,0); // Run job2 - RunningJob job2 = new RunningJob(true); + RunningJob job2=new RunningJob("JOB2", true); tp.execute(job2); - assertTrue(job2._run.await(10, TimeUnit.SECONDS)); - waitForThreads(tp, 4); - waitForIdle(tp, 1); + assertTrue(job2._run.await(10,TimeUnit.SECONDS)); + waitForThreads(tp,3); + waitForIdle(tp,0); // Run job3 - RunningJob job3 = new RunningJob(true); + RunningJob job3=new RunningJob("JOB3", true); tp.execute(job3); - assertTrue(job3._run.await(10, TimeUnit.SECONDS)); - waitForThreads(tp, 4); - assertThat(tp.getIdleThreads(), is(0)); + assertTrue(job3._run.await(10,TimeUnit.SECONDS)); + waitForThreads(tp,4); + waitForIdle(tp,0); + assertThat(tp.getIdleThreads(),is(0)); Thread.sleep(100); - assertThat(tp.getIdleThreads(), is(0)); + assertThat(tp.getIdleThreads(),is(0)); // Run job4. will be queued - RunningJob job4 = new RunningJob(true); + RunningJob job4=new RunningJob("JOB4", true); tp.execute(job4); - assertFalse(job4._run.await(1, TimeUnit.SECONDS)); + assertFalse(job4._run.await(1,TimeUnit.SECONDS)); // finish job 0 job0._stopping.countDown(); - assertTrue(job0._stopped.await(10, TimeUnit.SECONDS)); + assertTrue(job0._stopped.await(10,TimeUnit.SECONDS)); // job4 should now run - assertTrue(job4._run.await(10, TimeUnit.SECONDS)); - waitForThreads(tp, 4); - waitForIdle(tp, 0); + assertTrue(job4._run.await(10,TimeUnit.SECONDS)); + assertThat(tp.getThreads(),is(4)); + assertThat(tp.getIdleThreads(),is(0)); - // finish job 1,2,3,4 + // finish job 1 job1._stopping.countDown(); + assertTrue(job1._stopped.await(10,TimeUnit.SECONDS)); + waitForThreads(tp,3); + assertThat(tp.getIdleThreads(),is(0)); + + // finish job 2,3,4 job2._stopping.countDown(); job3._stopping.countDown(); job4._stopping.countDown(); - assertTrue(job1._stopped.await(10, TimeUnit.SECONDS)); - assertTrue(job2._stopped.await(10, TimeUnit.SECONDS)); - assertTrue(job3._stopped.await(10, TimeUnit.SECONDS)); - assertTrue(job4._stopped.await(10, TimeUnit.SECONDS)); + assertTrue(job2._stopped.await(10,TimeUnit.SECONDS)); + assertTrue(job3._stopped.await(10,TimeUnit.SECONDS)); + assertTrue(job4._stopped.await(10,TimeUnit.SECONDS)); - waitForThreads(tp, 2); - waitForIdle(tp, 2); + waitForIdle(tp,2); + assertThat(tp.getThreads(),is(2)); } } @@ -268,7 +306,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest { QueuedThreadPool tp= new QueuedThreadPool(); tp.setDetailedDump(true); - tp.setMinThreads(3); + tp.setMinThreads(1); tp.setMaxThreads(10); tp.setIdleTimeout(500); @@ -288,8 +326,16 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest assertTrue(job2._run.await(5, TimeUnit.SECONDS)); assertTrue(job3._run.await(5, TimeUnit.SECONDS)); - waitForThreads(tp, 4); waitForThreads(tp, 3); + assertThat(tp.getIdleThreads(),is(0)); + + job1._stopping.countDown(); + assertTrue(job1._stopped.await(10,TimeUnit.SECONDS)); + waitForIdle(tp, 1); + assertThat(tp.getThreads(),is(3)); + + waitForIdle(tp, 0); + assertThat(tp.getThreads(),is(2)); RunningJob job4 = new RunningJob(); tp.execute(job4); @@ -506,20 +552,20 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest String dump = pool.dump(); // TODO use hamcrest 2.0 regex matcher assertThat(dump,containsString("STOPPED")); - assertThat(dump,containsString(",3<=0<=4,i=0,r=-1,q=0")); + assertThat(dump,containsString(",3<=0<=4,s=0,i=0,r=-1,q=0")); assertThat(dump,containsString("[NO_TRY]")); pool.setReservedThreads(2); dump = pool.dump(); assertThat(dump,containsString("STOPPED")); - assertThat(dump,containsString(",3<=0<=4,i=0,r=2,q=0")); + assertThat(dump,containsString(",3<=0<=4,s=0,i=0,r=2,q=0")); assertThat(dump,containsString("[NO_TRY]")); pool.start(); waitForIdle(pool,3); dump = pool.dump(); assertThat(count(dump," - STARTED"),is(2)); - assertThat(dump,containsString(",3<=3<=4,i=3,r=2,q=0")); + assertThat(dump,containsString(",3<=3<=4,s=0,i=3,r=2,q=0")); assertThat(dump,containsString("[ReservedThreadExecutor@")); assertThat(count(dump," IDLE "),is(3)); assertThat(count(dump," RESERVED "),is(0)); @@ -542,7 +588,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest dump = pool.dump(); assertThat(count(dump," - STARTED"),is(2)); - assertThat(dump,containsString(",3<=3<=4,i=2,r=2,q=0")); + assertThat(dump,containsString(",3<=3<=4,s=0,i=2,r=2,q=0")); assertThat(dump,containsString("[ReservedThreadExecutor@")); assertThat(count(dump," IDLE "),is(2)); assertThat(count(dump," WAITING "),is(1)); @@ -552,7 +598,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest pool.setDetailedDump(true); dump = pool.dump(); assertThat(count(dump," - STARTED"),is(2)); - assertThat(dump,containsString(",3<=3<=4,i=2,r=2,q=0")); + assertThat(dump,containsString(",3<=3<=4,s=0,i=2,r=2,q=0")); assertThat(dump,containsString("s=0/2")); assertThat(dump,containsString("[ReservedThreadExecutor@")); assertThat(count(dump," IDLE "),is(2)); @@ -566,7 +612,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest dump = pool.dump(); assertThat(count(dump," - STARTED"),is(2)); - assertThat(dump,containsString(",3<=3<=4,i=1,r=2,q=0")); + assertThat(dump,containsString(",3<=3<=4,s=0,i=1,r=2,q=0")); assertThat(dump,containsString("s=1/2")); assertThat(dump,containsString("[ReservedThreadExecutor@")); assertThat(count(dump," IDLE "),is(1));