Improve #3550 fix with a single Atomic
Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
parent
c4d51b09df
commit
787380a91e
|
@ -0,0 +1,182 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.util;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* An AtomicLong with additional methods to treat it as two hi/lo integers.
|
||||
*/
|
||||
public class AtomicWords extends AtomicLong
|
||||
{
|
||||
/**
|
||||
* Sets the hi and lo values.
|
||||
*
|
||||
* @param w0 the 0th word
|
||||
* @param w1 the 1st word
|
||||
* @param w2 the 2nd word
|
||||
* @param w3 the 3rd word
|
||||
*/
|
||||
public void set(int w0, int w1, int w2, int w3)
|
||||
{
|
||||
set(encode(w0, w1, w2, w3));
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically sets the word values to the given updated values only if
|
||||
* the current encoded value is as expected.
|
||||
*
|
||||
* @param expectEncoded the expected encoded value
|
||||
* @param w0 the 0th word
|
||||
* @param w1 the 1st word
|
||||
* @param w2 the 2nd word
|
||||
* @param w3 the 3rd word
|
||||
* @return {@code true} if successful.
|
||||
*/
|
||||
public boolean compareAndSet(long expectEncoded, int w0, int w1, int w2, int w3)
|
||||
{
|
||||
return compareAndSet(expectEncoded,encode(w0, w1, w2, w3));
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically adds the given deltas to the current hi and lo values.
|
||||
*
|
||||
* @param delta0 the delta to apply to the 0th word value
|
||||
* @param delta1 the delta to apply to the 1st word value
|
||||
* @param delta2 the delta to apply to the 2nd word value
|
||||
* @param delta3 the delta to apply to the 3rd word value
|
||||
*/
|
||||
public void add(int delta0, int delta1, int delta2, int delta3)
|
||||
{
|
||||
while(true)
|
||||
{
|
||||
long encoded = get();
|
||||
long update = encode(getWord0(encoded)+delta0,
|
||||
getWord1(encoded)+delta1,
|
||||
getWord2(encoded)+delta2,
|
||||
getWord3(encoded)+delta3);
|
||||
if (compareAndSet(encoded,update))
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Gets word 0 value
|
||||
*
|
||||
* @return the 16 bit value as an int
|
||||
*/
|
||||
public int getWord0()
|
||||
{
|
||||
return getWord0(get());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Gets word 1 value
|
||||
*
|
||||
* @return the 16 bit value as an int
|
||||
*/
|
||||
public int getWord1()
|
||||
{
|
||||
return getWord1(get());
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets word 2 value
|
||||
*
|
||||
* @return the 16 bit value as an int
|
||||
*/
|
||||
public int getWord2()
|
||||
{
|
||||
return getWord2(get());
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets word 3 value
|
||||
*
|
||||
* @return the 16 bit value as an int
|
||||
*/
|
||||
public int getWord3()
|
||||
{
|
||||
return getWord3(get());
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets word 0 value from the given encoded value.
|
||||
*
|
||||
* @param encoded the encoded value
|
||||
* @return the 16 bit value as an int
|
||||
*/
|
||||
public static int getWord0(long encoded)
|
||||
{
|
||||
return (int) ((encoded>>48)&0xFFFFL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets word 0 value from the given encoded value.
|
||||
*
|
||||
* @param encoded the encoded value
|
||||
* @return the 16 bit value as an int
|
||||
*/
|
||||
public static int getWord1(long encoded)
|
||||
{
|
||||
return (int) ((encoded>>32)&0xFFFFL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets word 0 value from the given encoded value.
|
||||
*
|
||||
* @param encoded the encoded value
|
||||
* @return the 16 bit value as an int
|
||||
*/
|
||||
public static int getWord2(long encoded)
|
||||
{
|
||||
return (int) ((encoded>>16)&0xFFFFL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets word 0 value from the given encoded value.
|
||||
*
|
||||
* @param encoded the encoded value
|
||||
* @return the 16 bit value as an int
|
||||
*/
|
||||
public static int getWord3(long encoded)
|
||||
{
|
||||
return (int) (encoded&0xFFFFL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Encodes 4 16 bit words values into a long.
|
||||
*
|
||||
* @param w0 the 0th word
|
||||
* @param w1 the 1st word
|
||||
* @param w2 the 2nd word
|
||||
* @param w3 the 3rd word
|
||||
* @return the encoded value
|
||||
*/
|
||||
public static long encode(int w0, int w1, int w2, int w3)
|
||||
{
|
||||
long wl0 = ((long)w0)&0xFFFFL;
|
||||
long wl1 = ((long)w1)&0xFFFFL;
|
||||
long wl2 = ((long)w2)&0xFFFFL;
|
||||
long wl3 = ((long)w3)&0xFFFFL;
|
||||
return (wl0<<48)+(wl1<<32)+(wl2<<16)+wl3;
|
||||
}
|
||||
}
|
|
@ -27,9 +27,9 @@ import java.util.concurrent.BlockingQueue;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.eclipse.jetty.util.AtomicWords;
|
||||
import org.eclipse.jetty.util.BlockingArrayQueue;
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
|
@ -49,8 +49,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
{
|
||||
private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
|
||||
|
||||
private final AtomicInteger _threadsStarted = new AtomicInteger();
|
||||
private final AtomicInteger _threadsIdle = new AtomicInteger();
|
||||
private final AtomicWords _counts = new AtomicWords();
|
||||
private final AtomicLong _lastShrink = new AtomicLong();
|
||||
private final Set<Thread> _threads = ConcurrentHashMap.newKeySet();
|
||||
private final Object _joinLock = new Object();
|
||||
|
@ -140,9 +139,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
addBean(_tryExecutor);
|
||||
|
||||
super.doStart();
|
||||
_threadsStarted.set(0);
|
||||
|
||||
startThreads(_minThreads);
|
||||
_counts.set(0,0,0,0);
|
||||
ensureThreads();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -165,42 +163,25 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
|
||||
// Fill job Q with noop jobs to wakeup idle
|
||||
Runnable noop = () -> {};
|
||||
for (int i = _threadsStarted.get(); i-- > 0; )
|
||||
for (int i = _counts.getWord0(); i-- > 0; )
|
||||
jobs.offer(noop);
|
||||
|
||||
// try to let jobs complete naturally for half our stop time
|
||||
long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
|
||||
for (Thread thread : _threads)
|
||||
{
|
||||
long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Waiting for {} for {}", thread, canwait);
|
||||
if (canwait > 0)
|
||||
thread.join(canwait);
|
||||
}
|
||||
joinThreads(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2);
|
||||
|
||||
// If we still have threads running, get a bit more aggressive
|
||||
|
||||
// interrupt remaining threads
|
||||
if (_threadsStarted.get() > 0)
|
||||
for (Thread thread : _threads)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Interrupting {}", thread);
|
||||
thread.interrupt();
|
||||
}
|
||||
|
||||
// wait again for the other half of our stop time
|
||||
stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
|
||||
for (Thread thread : _threads)
|
||||
{
|
||||
long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Waiting for {} for {}", thread, canwait);
|
||||
if (canwait > 0)
|
||||
thread.join(canwait);
|
||||
LOG.debug("Interrupting {}", thread);
|
||||
thread.interrupt();
|
||||
}
|
||||
|
||||
// wait again for the other half of our stop time
|
||||
joinThreads(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2);
|
||||
|
||||
Thread.yield();
|
||||
int size = _threads.size();
|
||||
if (size > 0)
|
||||
|
@ -254,6 +235,18 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
}
|
||||
}
|
||||
|
||||
private void joinThreads(long stopByNanos) throws InterruptedException
|
||||
{
|
||||
for (Thread thread : _threads)
|
||||
{
|
||||
long canWait = TimeUnit.NANOSECONDS.toMillis(stopByNanos - System.nanoTime());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Waiting for {} for {}", thread, canWait);
|
||||
if (canWait > 0)
|
||||
thread.join(canWait);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Thread Pool should use Daemon Threading.
|
||||
*
|
||||
|
@ -308,9 +301,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
if (_minThreads > _maxThreads)
|
||||
_maxThreads = _minThreads;
|
||||
|
||||
int threads = _threadsStarted.get();
|
||||
if (isStarted() && threads < _minThreads)
|
||||
startThreads(_minThreads - threads);
|
||||
if (isStarted())
|
||||
ensureThreads();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -471,12 +463,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
LOG.warn("{} rejected {}", this, job);
|
||||
throw new RejectedExecutionException(job.toString());
|
||||
}
|
||||
else
|
||||
{
|
||||
// Make sure there is at least one thread executing the job.
|
||||
if (getQueueSize() > 0 && getIdleThreads() == 0)
|
||||
startThreads(1);
|
||||
}
|
||||
|
||||
// Make sure there is at least one thread executing the job.
|
||||
ensureThreads();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -509,7 +498,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
@ManagedAttribute("number of threads in the pool")
|
||||
public int getThreads()
|
||||
{
|
||||
return _threadsStarted.get();
|
||||
return _counts.getWord0();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -519,7 +508,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
@ManagedAttribute("number of idle threads in the pool")
|
||||
public int getIdleThreads()
|
||||
{
|
||||
return _threadsIdle.get();
|
||||
return _counts.getWord3();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -549,20 +538,29 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
return getMaxThreads() - getThreads() + getIdleThreads() - getQueueSize() <= getLowThreadsThreshold();
|
||||
}
|
||||
|
||||
private boolean startThreads(int threadsToStart)
|
||||
private void ensureThreads()
|
||||
{
|
||||
while (threadsToStart > 0 && isRunning())
|
||||
while (isRunning())
|
||||
{
|
||||
int threads = _threadsStarted.get();
|
||||
if (threads >= _maxThreads)
|
||||
return false;
|
||||
long count = _counts.get();
|
||||
int threads = AtomicWords.getWord0(count);
|
||||
int starting = AtomicWords.getWord1(count);
|
||||
int idle = AtomicWords.getWord3(count);
|
||||
int queue = getQueueSize();
|
||||
|
||||
if (!_threadsStarted.compareAndSet(threads, threads + 1))
|
||||
if (threads >= _maxThreads)
|
||||
break;
|
||||
if (threads >= _minThreads && (starting + idle) >= queue)
|
||||
break;
|
||||
if (!_counts.compareAndSet(count, threads + 1, starting + 1, 0, idle))
|
||||
continue;
|
||||
|
||||
boolean started = false;
|
||||
try
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Starting thread {}",this);
|
||||
|
||||
Thread thread = newThread(_runnable);
|
||||
thread.setDaemon(isDaemon());
|
||||
thread.setPriority(getThreadsPriority());
|
||||
|
@ -573,15 +571,13 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
_lastShrink.set(System.nanoTime());
|
||||
thread.start();
|
||||
started = true;
|
||||
--threadsToStart;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (!started)
|
||||
_threadsStarted.decrementAndGet();
|
||||
_counts.add(-1,-1,0,0);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
protected Thread newThread(Runnable runnable)
|
||||
|
@ -671,17 +667,24 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]",
|
||||
long count = _counts.get();
|
||||
int threads = AtomicWords.getWord0(count);
|
||||
int starting = AtomicWords.getWord1(count);
|
||||
int idle = AtomicWords.getWord3(count);
|
||||
int queue = getQueueSize();
|
||||
|
||||
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,s=%d,i=%d,r=%d,q=%d}[%s]",
|
||||
getClass().getSimpleName(),
|
||||
_name,
|
||||
hashCode(),
|
||||
getState(),
|
||||
getMinThreads(),
|
||||
getThreads(),
|
||||
threads,
|
||||
getMaxThreads(),
|
||||
getIdleThreads(),
|
||||
starting,
|
||||
idle,
|
||||
getReservedThreads(),
|
||||
_jobs.size(),
|
||||
queue,
|
||||
_tryExecutor);
|
||||
}
|
||||
|
||||
|
@ -756,19 +759,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
return null;
|
||||
}
|
||||
|
||||
private static Runnable SHRINK = ()->{};
|
||||
private class Runner implements Runnable
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
boolean idle = false;
|
||||
Runnable job = null;
|
||||
|
||||
try
|
||||
{
|
||||
Runnable job = _jobs.poll();
|
||||
if (job != null && _threadsIdle.get() == 0)
|
||||
startThreads(1);
|
||||
job = _jobs.poll();
|
||||
idle = job==null;
|
||||
_counts.add(0,-1,0,idle?1:0);
|
||||
|
||||
while (true)
|
||||
{
|
||||
|
@ -777,15 +780,29 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
if (!idle)
|
||||
{
|
||||
idle = true;
|
||||
_threadsIdle.incrementAndGet();
|
||||
_counts.add(0,0,0,1);
|
||||
}
|
||||
|
||||
job = idleJobPoll();
|
||||
if (job == SHRINK)
|
||||
|
||||
if (job == null)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("shrinking {}", this);
|
||||
break;
|
||||
// maybe we should shrink?
|
||||
int size = getThreads();
|
||||
if (size > _minThreads)
|
||||
{
|
||||
long last = _lastShrink.get();
|
||||
long now = System.nanoTime();
|
||||
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
|
||||
{
|
||||
if (_lastShrink.compareAndSet(last, now))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("shrinking {}", QueuedThreadPool.this);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -795,15 +812,14 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
if (idle)
|
||||
{
|
||||
idle = false;
|
||||
if (_threadsIdle.decrementAndGet() == 0)
|
||||
startThreads(1);
|
||||
_counts.add(0,0,0,-1);
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("run {}", job);
|
||||
LOG.debug("run {} in {}", job, QueuedThreadPool.this);
|
||||
runJob(job);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("ran {}", job);
|
||||
LOG.debug("ran {} in {}", job, QueuedThreadPool.this);
|
||||
|
||||
// Clear interrupted status
|
||||
Thread.interrupted();
|
||||
|
@ -821,19 +837,13 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
LOG.warn(String.format("Unexpected thread death: %s in %s", this, QueuedThreadPool.this), e);
|
||||
LOG.warn(String.format("Unexpected thread death: %s in %s", job, QueuedThreadPool.this), e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (idle)
|
||||
_threadsIdle.decrementAndGet();
|
||||
|
||||
_counts.add(-1,0,0,idle?-1:0);
|
||||
removeThread(Thread.currentThread());
|
||||
|
||||
int threads = _threadsStarted.decrementAndGet();
|
||||
// We should start a new thread if threads are now less than min threads or we have queued jobs
|
||||
if (threads < getMinThreads() || getQueueSize()>0)
|
||||
startThreads(1);
|
||||
ensureThreads();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -841,20 +851,6 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
{
|
||||
if (_idleTimeout <= 0)
|
||||
return _jobs.take();
|
||||
|
||||
// maybe we should shrink?
|
||||
int size = _threadsStarted.get();
|
||||
if (size > _minThreads)
|
||||
{
|
||||
long last = _lastShrink.get();
|
||||
long now = System.nanoTime();
|
||||
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
|
||||
{
|
||||
if (_lastShrink.compareAndSet(last, now))
|
||||
return SHRINK;
|
||||
}
|
||||
}
|
||||
|
||||
return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.eclipse.jetty.util.log.Log;
|
|||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
@ -50,14 +51,26 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
private final CountDownLatch _run = new CountDownLatch(1);
|
||||
private final CountDownLatch _stopping = new CountDownLatch(1);
|
||||
private final CountDownLatch _stopped = new CountDownLatch(1);
|
||||
private final String _name;
|
||||
private final boolean _fail;
|
||||
RunningJob()
|
||||
{
|
||||
this(false);
|
||||
this(null, false);
|
||||
}
|
||||
|
||||
public RunningJob(String name)
|
||||
{
|
||||
this(name, false);
|
||||
}
|
||||
|
||||
public RunningJob(boolean fail)
|
||||
{
|
||||
this(null, fail);
|
||||
}
|
||||
|
||||
public RunningJob(String name, boolean fail)
|
||||
{
|
||||
_name = name;
|
||||
_fail = fail;
|
||||
}
|
||||
|
||||
|
@ -93,6 +106,14 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
if (!_stopped.await(10,TimeUnit.SECONDS))
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
if (_name==null)
|
||||
return super.toString();
|
||||
return String.format("%s@%x",_name,hashCode());
|
||||
}
|
||||
}
|
||||
|
||||
private class CloseableJob extends RunningJob implements Closeable
|
||||
|
@ -121,42 +142,43 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
waitForThreads(tp,2);
|
||||
waitForIdle(tp,2);
|
||||
|
||||
// Doesn't shrink less than 1
|
||||
Thread.sleep(1100);
|
||||
// Doesn't shrink to less than min threads
|
||||
Thread.sleep(3*tp.getIdleTimeout()/2);
|
||||
waitForThreads(tp,2);
|
||||
waitForIdle(tp,2);
|
||||
|
||||
// Run job0
|
||||
RunningJob job0=new RunningJob();
|
||||
RunningJob job0=new RunningJob("JOB0");
|
||||
tp.execute(job0);
|
||||
assertTrue(job0._run.await(10,TimeUnit.SECONDS));
|
||||
waitForIdle(tp,1);
|
||||
|
||||
// Run job1
|
||||
RunningJob job1=new RunningJob();
|
||||
RunningJob job1=new RunningJob("JOB1");
|
||||
tp.execute(job1);
|
||||
assertTrue(job1._run.await(10,TimeUnit.SECONDS));
|
||||
waitForThreads(tp,3);
|
||||
waitForIdle(tp,1);
|
||||
waitForThreads(tp,2);
|
||||
waitForIdle(tp,0);
|
||||
|
||||
// Run job2
|
||||
RunningJob job2=new RunningJob();
|
||||
RunningJob job2=new RunningJob("JOB2");
|
||||
tp.execute(job2);
|
||||
assertTrue(job2._run.await(10,TimeUnit.SECONDS));
|
||||
waitForThreads(tp,4);
|
||||
waitForIdle(tp,1);
|
||||
waitForThreads(tp,3);
|
||||
waitForIdle(tp,0);
|
||||
|
||||
// Run job3
|
||||
RunningJob job3=new RunningJob();
|
||||
RunningJob job3=new RunningJob("JOB3");
|
||||
tp.execute(job3);
|
||||
assertTrue(job3._run.await(10,TimeUnit.SECONDS));
|
||||
waitForThreads(tp,4);
|
||||
waitForIdle(tp,0);
|
||||
assertThat(tp.getIdleThreads(),is(0));
|
||||
Thread.sleep(100);
|
||||
assertThat(tp.getIdleThreads(),is(0));
|
||||
|
||||
// Run job4. will be queued
|
||||
RunningJob job4=new RunningJob();
|
||||
RunningJob job4=new RunningJob("JOB4");
|
||||
tp.execute(job4);
|
||||
assertFalse(job4._run.await(1,TimeUnit.SECONDS));
|
||||
|
||||
|
@ -166,21 +188,32 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
|
||||
// job4 should now run
|
||||
assertTrue(job4._run.await(10,TimeUnit.SECONDS));
|
||||
waitForThreads(tp,4);
|
||||
waitForIdle(tp,0);
|
||||
|
||||
// finish job 1,2,3,4
|
||||
assertThat(tp.getThreads(),is(4));
|
||||
assertThat(tp.getIdleThreads(),is(0));
|
||||
|
||||
// finish job 1
|
||||
job1._stopping.countDown();
|
||||
assertTrue(job1._stopped.await(10,TimeUnit.SECONDS));
|
||||
waitForIdle(tp,1);
|
||||
assertThat(tp.getThreads(),is(4));
|
||||
|
||||
// finish job 2,3,4
|
||||
job2._stopping.countDown();
|
||||
job3._stopping.countDown();
|
||||
job4._stopping.countDown();
|
||||
assertTrue(job1._stopped.await(10,TimeUnit.SECONDS));
|
||||
assertTrue(job2._stopped.await(10,TimeUnit.SECONDS));
|
||||
assertTrue(job3._stopped.await(10,TimeUnit.SECONDS));
|
||||
assertTrue(job4._stopped.await(10,TimeUnit.SECONDS));
|
||||
|
||||
waitForThreads(tp,2);
|
||||
waitForIdle(tp,2);
|
||||
waitForIdle(tp,4);
|
||||
assertThat(tp.getThreads(),is(4));
|
||||
|
||||
long duration = System.nanoTime();
|
||||
waitForThreads(tp,3);
|
||||
assertThat(tp.getIdleThreads(),is(3));
|
||||
duration = System.nanoTime() - duration;
|
||||
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout()/2L));
|
||||
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout()*2L));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -188,78 +221,83 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
{
|
||||
try (StacklessLogging stackless = new StacklessLogging(QueuedThreadPool.class))
|
||||
{
|
||||
QueuedThreadPool tp = new QueuedThreadPool();
|
||||
QueuedThreadPool tp= new QueuedThreadPool();
|
||||
tp.setMinThreads(2);
|
||||
tp.setMaxThreads(4);
|
||||
tp.setIdleTimeout(900);
|
||||
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);
|
||||
tp.setThreadsPriority(Thread.NORM_PRIORITY-1);
|
||||
|
||||
tp.start();
|
||||
|
||||
// min threads started
|
||||
waitForThreads(tp, 2);
|
||||
waitForIdle(tp, 2);
|
||||
waitForThreads(tp,2);
|
||||
waitForIdle(tp,2);
|
||||
|
||||
// Doesn't shrink less than 1
|
||||
Thread.sleep(1100);
|
||||
waitForThreads(tp, 2);
|
||||
waitForIdle(tp, 2);
|
||||
// Doesn't shrink to less than min threads
|
||||
Thread.sleep(3*tp.getIdleTimeout()/2);
|
||||
waitForThreads(tp,2);
|
||||
waitForIdle(tp,2);
|
||||
|
||||
// Run job0
|
||||
RunningJob job0 = new RunningJob(true);
|
||||
RunningJob job0=new RunningJob("JOB0", true);
|
||||
tp.execute(job0);
|
||||
assertTrue(job0._run.await(10, TimeUnit.SECONDS));
|
||||
waitForIdle(tp, 1);
|
||||
assertTrue(job0._run.await(10,TimeUnit.SECONDS));
|
||||
waitForIdle(tp,1);
|
||||
|
||||
// Run job1
|
||||
RunningJob job1 = new RunningJob(true);
|
||||
RunningJob job1=new RunningJob("JOB1", true);
|
||||
tp.execute(job1);
|
||||
assertTrue(job1._run.await(10, TimeUnit.SECONDS));
|
||||
waitForThreads(tp, 3);
|
||||
waitForIdle(tp, 1);
|
||||
assertTrue(job1._run.await(10,TimeUnit.SECONDS));
|
||||
waitForThreads(tp,2);
|
||||
waitForIdle(tp,0);
|
||||
|
||||
// Run job2
|
||||
RunningJob job2 = new RunningJob(true);
|
||||
RunningJob job2=new RunningJob("JOB2", true);
|
||||
tp.execute(job2);
|
||||
assertTrue(job2._run.await(10, TimeUnit.SECONDS));
|
||||
waitForThreads(tp, 4);
|
||||
waitForIdle(tp, 1);
|
||||
assertTrue(job2._run.await(10,TimeUnit.SECONDS));
|
||||
waitForThreads(tp,3);
|
||||
waitForIdle(tp,0);
|
||||
|
||||
// Run job3
|
||||
RunningJob job3 = new RunningJob(true);
|
||||
RunningJob job3=new RunningJob("JOB3", true);
|
||||
tp.execute(job3);
|
||||
assertTrue(job3._run.await(10, TimeUnit.SECONDS));
|
||||
waitForThreads(tp, 4);
|
||||
assertThat(tp.getIdleThreads(), is(0));
|
||||
assertTrue(job3._run.await(10,TimeUnit.SECONDS));
|
||||
waitForThreads(tp,4);
|
||||
waitForIdle(tp,0);
|
||||
assertThat(tp.getIdleThreads(),is(0));
|
||||
Thread.sleep(100);
|
||||
assertThat(tp.getIdleThreads(), is(0));
|
||||
assertThat(tp.getIdleThreads(),is(0));
|
||||
|
||||
// Run job4. will be queued
|
||||
RunningJob job4 = new RunningJob(true);
|
||||
RunningJob job4=new RunningJob("JOB4", true);
|
||||
tp.execute(job4);
|
||||
assertFalse(job4._run.await(1, TimeUnit.SECONDS));
|
||||
assertFalse(job4._run.await(1,TimeUnit.SECONDS));
|
||||
|
||||
// finish job 0
|
||||
job0._stopping.countDown();
|
||||
assertTrue(job0._stopped.await(10, TimeUnit.SECONDS));
|
||||
assertTrue(job0._stopped.await(10,TimeUnit.SECONDS));
|
||||
|
||||
// job4 should now run
|
||||
assertTrue(job4._run.await(10, TimeUnit.SECONDS));
|
||||
waitForThreads(tp, 4);
|
||||
waitForIdle(tp, 0);
|
||||
assertTrue(job4._run.await(10,TimeUnit.SECONDS));
|
||||
assertThat(tp.getThreads(),is(4));
|
||||
assertThat(tp.getIdleThreads(),is(0));
|
||||
|
||||
// finish job 1,2,3,4
|
||||
// finish job 1
|
||||
job1._stopping.countDown();
|
||||
assertTrue(job1._stopped.await(10,TimeUnit.SECONDS));
|
||||
waitForThreads(tp,3);
|
||||
assertThat(tp.getIdleThreads(),is(0));
|
||||
|
||||
// finish job 2,3,4
|
||||
job2._stopping.countDown();
|
||||
job3._stopping.countDown();
|
||||
job4._stopping.countDown();
|
||||
assertTrue(job1._stopped.await(10, TimeUnit.SECONDS));
|
||||
assertTrue(job2._stopped.await(10, TimeUnit.SECONDS));
|
||||
assertTrue(job3._stopped.await(10, TimeUnit.SECONDS));
|
||||
assertTrue(job4._stopped.await(10, TimeUnit.SECONDS));
|
||||
assertTrue(job2._stopped.await(10,TimeUnit.SECONDS));
|
||||
assertTrue(job3._stopped.await(10,TimeUnit.SECONDS));
|
||||
assertTrue(job4._stopped.await(10,TimeUnit.SECONDS));
|
||||
|
||||
waitForThreads(tp, 2);
|
||||
waitForIdle(tp, 2);
|
||||
waitForIdle(tp,2);
|
||||
assertThat(tp.getThreads(),is(2));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -268,7 +306,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
{
|
||||
QueuedThreadPool tp= new QueuedThreadPool();
|
||||
tp.setDetailedDump(true);
|
||||
tp.setMinThreads(3);
|
||||
tp.setMinThreads(1);
|
||||
tp.setMaxThreads(10);
|
||||
tp.setIdleTimeout(500);
|
||||
|
||||
|
@ -288,8 +326,16 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
assertTrue(job2._run.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(job3._run.await(5, TimeUnit.SECONDS));
|
||||
|
||||
waitForThreads(tp, 4);
|
||||
waitForThreads(tp, 3);
|
||||
assertThat(tp.getIdleThreads(),is(0));
|
||||
|
||||
job1._stopping.countDown();
|
||||
assertTrue(job1._stopped.await(10,TimeUnit.SECONDS));
|
||||
waitForIdle(tp, 1);
|
||||
assertThat(tp.getThreads(),is(3));
|
||||
|
||||
waitForIdle(tp, 0);
|
||||
assertThat(tp.getThreads(),is(2));
|
||||
|
||||
RunningJob job4 = new RunningJob();
|
||||
tp.execute(job4);
|
||||
|
@ -506,20 +552,20 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
String dump = pool.dump();
|
||||
// TODO use hamcrest 2.0 regex matcher
|
||||
assertThat(dump,containsString("STOPPED"));
|
||||
assertThat(dump,containsString(",3<=0<=4,i=0,r=-1,q=0"));
|
||||
assertThat(dump,containsString(",3<=0<=4,s=0,i=0,r=-1,q=0"));
|
||||
assertThat(dump,containsString("[NO_TRY]"));
|
||||
|
||||
pool.setReservedThreads(2);
|
||||
dump = pool.dump();
|
||||
assertThat(dump,containsString("STOPPED"));
|
||||
assertThat(dump,containsString(",3<=0<=4,i=0,r=2,q=0"));
|
||||
assertThat(dump,containsString(",3<=0<=4,s=0,i=0,r=2,q=0"));
|
||||
assertThat(dump,containsString("[NO_TRY]"));
|
||||
|
||||
pool.start();
|
||||
waitForIdle(pool,3);
|
||||
dump = pool.dump();
|
||||
assertThat(count(dump," - STARTED"),is(2));
|
||||
assertThat(dump,containsString(",3<=3<=4,i=3,r=2,q=0"));
|
||||
assertThat(dump,containsString(",3<=3<=4,s=0,i=3,r=2,q=0"));
|
||||
assertThat(dump,containsString("[ReservedThreadExecutor@"));
|
||||
assertThat(count(dump," IDLE "),is(3));
|
||||
assertThat(count(dump," RESERVED "),is(0));
|
||||
|
@ -542,7 +588,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
|
||||
dump = pool.dump();
|
||||
assertThat(count(dump," - STARTED"),is(2));
|
||||
assertThat(dump,containsString(",3<=3<=4,i=2,r=2,q=0"));
|
||||
assertThat(dump,containsString(",3<=3<=4,s=0,i=2,r=2,q=0"));
|
||||
assertThat(dump,containsString("[ReservedThreadExecutor@"));
|
||||
assertThat(count(dump," IDLE "),is(2));
|
||||
assertThat(count(dump," WAITING "),is(1));
|
||||
|
@ -552,7 +598,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
pool.setDetailedDump(true);
|
||||
dump = pool.dump();
|
||||
assertThat(count(dump," - STARTED"),is(2));
|
||||
assertThat(dump,containsString(",3<=3<=4,i=2,r=2,q=0"));
|
||||
assertThat(dump,containsString(",3<=3<=4,s=0,i=2,r=2,q=0"));
|
||||
assertThat(dump,containsString("s=0/2"));
|
||||
assertThat(dump,containsString("[ReservedThreadExecutor@"));
|
||||
assertThat(count(dump," IDLE "),is(2));
|
||||
|
@ -566,7 +612,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
|
||||
dump = pool.dump();
|
||||
assertThat(count(dump," - STARTED"),is(2));
|
||||
assertThat(dump,containsString(",3<=3<=4,i=1,r=2,q=0"));
|
||||
assertThat(dump,containsString(",3<=3<=4,s=0,i=1,r=2,q=0"));
|
||||
assertThat(dump,containsString("s=1/2"));
|
||||
assertThat(dump,containsString("[ReservedThreadExecutor@"));
|
||||
assertThat(count(dump," IDLE "),is(1));
|
||||
|
|
Loading…
Reference in New Issue