Ensure that new threads are started if a thread exits due to failure.

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-04-24 11:42:23 +02:00
parent f69de3372e
commit c4d51b09df
2 changed files with 102 additions and 3 deletions

View File

@ -783,7 +783,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
job = idleJobPoll();
if (job == SHRINK)
{
LOG.warn("shrinking {}", this);
if (LOG.isDebugEnabled())
LOG.debug("shrinking {}", this);
break;
}
}
@ -829,7 +830,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
removeThread(Thread.currentThread());
if (_threadsStarted.decrementAndGet() < getMinThreads())
int threads = _threadsStarted.decrementAndGet();
// We should start a new thread if threads are now less than min threads or we have queued jobs
if (threads < getMinThreads() || getQueueSize()>0)
startThreads(1);
}
}

View File

@ -50,6 +50,16 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
private final CountDownLatch _run = new CountDownLatch(1);
private final CountDownLatch _stopping = new CountDownLatch(1);
private final CountDownLatch _stopped = new CountDownLatch(1);
private final boolean _fail;
RunningJob()
{
this(false);
}
public RunningJob(boolean fail)
{
_fail = fail;
}
@Override
public void run()
@ -58,6 +68,12 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
{
_run.countDown();
_stopping.await();
if (_fail)
throw new IllegalStateException("Testing!");
}
catch(IllegalStateException e)
{
throw e;
}
catch(Exception e)
{
@ -167,6 +183,86 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
waitForIdle(tp,2);
}
@Test
public void testThreadPoolFailingJobs() throws Exception
{
try (StacklessLogging stackless = new StacklessLogging(QueuedThreadPool.class))
{
QueuedThreadPool tp = new QueuedThreadPool();
tp.setMinThreads(2);
tp.setMaxThreads(4);
tp.setIdleTimeout(900);
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);
tp.start();
// min threads started
waitForThreads(tp, 2);
waitForIdle(tp, 2);
// Doesn't shrink less than 1
Thread.sleep(1100);
waitForThreads(tp, 2);
waitForIdle(tp, 2);
// Run job0
RunningJob job0 = new RunningJob(true);
tp.execute(job0);
assertTrue(job0._run.await(10, TimeUnit.SECONDS));
waitForIdle(tp, 1);
// Run job1
RunningJob job1 = new RunningJob(true);
tp.execute(job1);
assertTrue(job1._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 3);
waitForIdle(tp, 1);
// Run job2
RunningJob job2 = new RunningJob(true);
tp.execute(job2);
assertTrue(job2._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 4);
waitForIdle(tp, 1);
// Run job3
RunningJob job3 = new RunningJob(true);
tp.execute(job3);
assertTrue(job3._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 4);
assertThat(tp.getIdleThreads(), is(0));
Thread.sleep(100);
assertThat(tp.getIdleThreads(), is(0));
// Run job4. will be queued
RunningJob job4 = new RunningJob(true);
tp.execute(job4);
assertFalse(job4._run.await(1, TimeUnit.SECONDS));
// finish job 0
job0._stopping.countDown();
assertTrue(job0._stopped.await(10, TimeUnit.SECONDS));
// job4 should now run
assertTrue(job4._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 4);
waitForIdle(tp, 0);
// finish job 1,2,3,4
job1._stopping.countDown();
job2._stopping.countDown();
job3._stopping.countDown();
job4._stopping.countDown();
assertTrue(job1._stopped.await(10, TimeUnit.SECONDS));
assertTrue(job2._stopped.await(10, TimeUnit.SECONDS));
assertTrue(job3._stopped.await(10, TimeUnit.SECONDS));
assertTrue(job4._stopped.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 2);
waitForIdle(tp, 2);
}
}
@Test
public void testExecuteNoIdleThreads() throws Exception
{
@ -420,6 +516,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertThat(dump,containsString("[NO_TRY]"));
pool.start();
waitForIdle(pool,3);
dump = pool.dump();
assertThat(count(dump," - STARTED"),is(2));
assertThat(dump,containsString(",3<=3<=4,i=3,r=2,q=0"));
@ -427,7 +524,6 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertThat(count(dump," IDLE "),is(3));
assertThat(count(dump," RESERVED "),is(0));
CountDownLatch started = new CountDownLatch(1);
CountDownLatch waiting = new CountDownLatch(1);
pool.execute(()->