Issue #4105 QueuedThreadPool

Shrink the QTP under steady load.

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-10-02 15:23:48 +10:00
parent 6fc42d8ba2
commit 3ad6780b27
2 changed files with 87 additions and 27 deletions

View File

@ -159,6 +159,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
}
addBean(_tryExecutor);
_lastShrink.set(System.nanoTime());
super.doStart();
// The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped.
_counts.set(0, 0); // threads, idle
@ -290,6 +292,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
public void setIdleTimeout(int idleTimeout)
{
_idleTimeout = idleTimeout;
ReservedThreadExecutor reserved = getBean(ReservedThreadExecutor.class);
if (reserved != null)
reserved.setIdleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
}
/**
@ -857,17 +862,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
if (LOG.isDebugEnabled())
LOG.debug("Runner started for {}", QueuedThreadPool.this);
Runnable job = null;
boolean idle = true;
try
{
Runnable job = null;
while (true)
{
// If we had a job, signal that we are idle again
// If we had a job,
if (job != null)
{
// signal that we are idle again
if (!addCounts(0, 1))
break;
job = null;
idle = true;
}
// else check we are still running
else if (_counts.getHi() == Integer.MIN_VALUE)
@ -881,19 +888,13 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
job = _jobs.poll();
if (job == null)
{
// Wait for a job
// No job immediately available maybe we should shrink?
long idleTimeout = getIdleTimeout();
job = idleJobPoll(idleTimeout);
// If still no job?
if (job == null)
{
// maybe we should shrink
if (getThreads() > _minThreads && idleTimeout > 0)
{
long last = _lastShrink.get();
long now = System.nanoTime();
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout))
if ((now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout))
{
if (_lastShrink.compareAndSet(last, now))
{
@ -903,10 +904,17 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
}
}
}
// Wait for a job
job = idleJobPoll(idleTimeout);
// If still no job?
if (job == null)
// continue to try again
continue;
}
}
idle = false;
// run job
if (LOG.isDebugEnabled())
@ -936,7 +944,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
removeThread(thread);
// Decrement the total thread count and the idle count if we had no job
addCounts(-1, job == null ? -1 : 0);
addCounts(-1, idle ? -1 : 0);
if (LOG.isDebugEnabled())
LOG.debug("{} exited for {}", thread, QueuedThreadPool.this);

View File

@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -251,7 +252,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
job1._stopping.countDown();
assertTrue(job1._stopped.await(10, TimeUnit.SECONDS));
waitForIdle(tp, 1);
assertThat(tp.getThreads(), is(4));
waitForThreads(tp, 4);
// finish job 2,3,4
job2._stopping.countDown();
@ -261,12 +262,12 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertTrue(job3._stopped.await(10, TimeUnit.SECONDS));
assertTrue(job4._stopped.await(10, TimeUnit.SECONDS));
waitForIdle(tp, 4);
assertThat(tp.getThreads(), is(4));
waitForIdle(tp, 3);
assertThat(tp.getThreads(), is(3));
long duration = System.nanoTime();
waitForThreads(tp, 3);
assertThat(tp.getIdleThreads(), is(3));
waitForThreads(tp, 2);
assertThat(tp.getIdleThreads(), is(2));
duration = System.nanoTime() - duration;
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout() / 2L));
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout() * 2L));
@ -505,6 +506,57 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
tp.stop();
}
@Test
public void testSteadyShrink() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
Runnable job = () ->
{
try
{
latch.await();
}
catch(InterruptedException e)
{
e.printStackTrace();
}
};
QueuedThreadPool tp = new QueuedThreadPool();
tp.setMinThreads(2);
tp.setMaxThreads(10);
tp.setIdleTimeout(500);
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);
tp.start();
waitForIdle(tp, 2);
waitForThreads(tp, 2);
for (int i = 0; i < 10; i++)
tp.execute(job);
waitForThreads(tp, 10);
int threads = tp.getThreads();
// let the jobs run
latch.countDown();
for (int i = 5; i-- > 0; )
{
Thread.sleep(250);
tp.execute(job);
}
// Assert that steady rate of jobs doesn't prevent some idling out
assertThat(tp.getThreads(), lessThan(threads));
threads = tp.getThreads();
for (int i = 5; i-- > 0; )
{
Thread.sleep(250);
tp.execute(job);
}
assertThat(tp.getThreads(), lessThan(threads));
}
@Test
public void testEnsureThreads() throws Exception
{