* Issue #4105 - starting a thread in QTP now increments idle count Signed-off-by: Lachlan Roberts <lachlan@webtide.com> * Issue #4105 - improve comments in test Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
39ee316a7f
commit
ba728eee5d
|
@ -176,7 +176,6 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
removeBean(_tryExecutor);
|
||||
_tryExecutor = TryExecutor.NO_TRY;
|
||||
|
||||
|
||||
// Signal the Runner threads that we are stopping
|
||||
int threads = _counts.getAndSetHi(Integer.MIN_VALUE);
|
||||
|
||||
|
@ -483,7 +482,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
public void execute(Runnable job)
|
||||
{
|
||||
// Determine if we need to start a thread, use and idle thread or just queue this job
|
||||
boolean startThread;
|
||||
int startThread;
|
||||
while (true)
|
||||
{
|
||||
// Get the atomic counts
|
||||
|
@ -501,10 +500,10 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
|
||||
// Start a thread if we have insufficient idle threads to meet demand
|
||||
// and we are not at max threads.
|
||||
startThread = (idle <= 0 && threads < _maxThreads);
|
||||
startThread = (idle <= 0 && threads < _maxThreads) ? 1 : 0;
|
||||
|
||||
// The job will be run by an idle thread when available
|
||||
if (!_counts.compareAndSet(counts, threads + (startThread ? 1 : 0), idle - 1))
|
||||
if (!_counts.compareAndSet(counts, threads + startThread, idle + startThread - 1))
|
||||
continue;
|
||||
|
||||
break;
|
||||
|
@ -513,7 +512,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
if (!_jobs.offer(job))
|
||||
{
|
||||
// reverse our changes to _counts.
|
||||
if (addCounts(startThread ? -1 : 0, 1))
|
||||
if (addCounts(-startThread, 1 - startThread))
|
||||
LOG.warn("{} rejected {}", this, job);
|
||||
throw new RejectedExecutionException(job.toString());
|
||||
}
|
||||
|
@ -522,7 +521,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
LOG.debug("queue {} startThread={}", job, startThread);
|
||||
|
||||
// Start a thread if one was needed
|
||||
if (startThread)
|
||||
while (startThread-- > 0)
|
||||
startThread();
|
||||
}
|
||||
|
||||
|
@ -617,7 +616,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
if (threads < _minThreads || (idle < 0 && threads < _maxThreads))
|
||||
{
|
||||
// Then try to start a thread.
|
||||
if (_counts.compareAndSet(counts, threads + 1, idle))
|
||||
if (_counts.compareAndSet(counts, threads + 1, idle + 1))
|
||||
startThread();
|
||||
// Otherwise continue to check state again.
|
||||
continue;
|
||||
|
@ -645,7 +644,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
finally
|
||||
{
|
||||
if (!started)
|
||||
addCounts(-1, 0); // threads, idle
|
||||
addCounts(-1, -1); // threads, idle
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -859,13 +858,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
LOG.debug("Runner started for {}", QueuedThreadPool.this);
|
||||
|
||||
Runnable job = null;
|
||||
|
||||
try
|
||||
{
|
||||
// All threads start idle (not yet taken a job)
|
||||
if (!addCounts(0, 1))
|
||||
return;
|
||||
|
||||
while (true)
|
||||
{
|
||||
// If we had a job, signal that we are idle again
|
||||
|
@ -873,6 +867,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
{
|
||||
if (!addCounts(0, 1))
|
||||
break;
|
||||
job = null;
|
||||
}
|
||||
// else check we are still running
|
||||
else if (_counts.getHi() == Integer.MIN_VALUE)
|
||||
|
|
|
@ -45,6 +45,61 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
private static final Logger LOG = Log.getLogger(QueuedThreadPoolTest.class);
|
||||
private final AtomicInteger _jobs = new AtomicInteger();
|
||||
|
||||
private static class TestQueuedThreadPool extends QueuedThreadPool
|
||||
{
|
||||
private final AtomicInteger _started;
|
||||
private final CountDownLatch _enteredRemoveThread;
|
||||
private final CountDownLatch _exitRemoveThread;
|
||||
|
||||
public TestQueuedThreadPool(AtomicInteger started, CountDownLatch enteredRemoveThread, CountDownLatch exitRemoveThread)
|
||||
{
|
||||
_started = started;
|
||||
_enteredRemoveThread = enteredRemoveThread;
|
||||
_exitRemoveThread = exitRemoveThread;
|
||||
}
|
||||
|
||||
public void superStartThread()
|
||||
{
|
||||
super.startThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void startThread()
|
||||
{
|
||||
switch (_started.incrementAndGet())
|
||||
{
|
||||
case 1:
|
||||
case 2:
|
||||
case 3:
|
||||
super.startThread();
|
||||
break;
|
||||
|
||||
case 4:
|
||||
// deliberately not start thread
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new IllegalStateException("too many threads started");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void removeThread(Thread thread)
|
||||
{
|
||||
try
|
||||
{
|
||||
_enteredRemoveThread.countDown();
|
||||
_exitRemoveThread.await();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
super.removeThread(thread);
|
||||
}
|
||||
}
|
||||
|
||||
private class RunningJob implements Runnable
|
||||
{
|
||||
final CountDownLatch _run = new CountDownLatch(1);
|
||||
|
@ -450,6 +505,63 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
tp.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnsureThreads() throws Exception
|
||||
{
|
||||
AtomicInteger started = new AtomicInteger(0);
|
||||
|
||||
CountDownLatch enteredRemoveThread = new CountDownLatch(1);
|
||||
CountDownLatch exitRemoveThread = new CountDownLatch(1);
|
||||
TestQueuedThreadPool tp = new TestQueuedThreadPool(started, enteredRemoveThread, exitRemoveThread);
|
||||
|
||||
tp.setMinThreads(2);
|
||||
tp.setMaxThreads(10);
|
||||
tp.setIdleTimeout(400);
|
||||
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);
|
||||
|
||||
tp.start();
|
||||
waitForIdle(tp, 2);
|
||||
waitForThreads(tp, 2);
|
||||
|
||||
RunningJob job1 = new RunningJob();
|
||||
RunningJob job2 = new RunningJob();
|
||||
RunningJob job3 = new RunningJob();
|
||||
tp.execute(job1);
|
||||
tp.execute(job2);
|
||||
tp.execute(job3);
|
||||
|
||||
waitForThreads(tp, 3);
|
||||
waitForIdle(tp, 0);
|
||||
|
||||
// We stop job3, the thread becomes idle, thread decides to shrink, and then blocks in removeThread().
|
||||
job3.stop();
|
||||
assertTrue(enteredRemoveThread.await(5, TimeUnit.SECONDS));
|
||||
waitForThreads(tp, 3);
|
||||
waitForIdle(tp, 1);
|
||||
|
||||
// Executing job4 will not start a new thread because we already have 1 idle thread.
|
||||
RunningJob job4 = new RunningJob();
|
||||
tp.execute(job4);
|
||||
|
||||
// Allow thread to exit from removeThread().
|
||||
// The 4th thread is not actually started in our startThread() until tp.superStartThread() is called.
|
||||
// Delay by 1000ms to check that ensureThreads is only starting one thread even though it is slow to start.
|
||||
assertThat(started.get(), is(3));
|
||||
exitRemoveThread.countDown();
|
||||
Thread.sleep(1000);
|
||||
|
||||
// Now startThreads() should have been called 4 times.
|
||||
// Actually start the thread, and job4 should be run.
|
||||
assertThat(started.get(), is(4));
|
||||
tp.superStartThread();
|
||||
assertTrue(job4._run.await(5, TimeUnit.SECONDS));
|
||||
|
||||
job1.stop();
|
||||
job2.stop();
|
||||
job4.stop();
|
||||
tp.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxStopTime() throws Exception
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue