Issue #4105 QueuedThreadPool

Shrink the QTP under steady load.

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-10-02 15:23:48 +10:00
parent 6fc42d8ba2
commit 3ad6780b27
2 changed files with 87 additions and 27 deletions

View File

@ -159,6 +159,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
} }
addBean(_tryExecutor); addBean(_tryExecutor);
_lastShrink.set(System.nanoTime());
super.doStart(); super.doStart();
// The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped. // The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped.
_counts.set(0, 0); // threads, idle _counts.set(0, 0); // threads, idle
@ -290,6 +292,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
public void setIdleTimeout(int idleTimeout) public void setIdleTimeout(int idleTimeout)
{ {
_idleTimeout = idleTimeout; _idleTimeout = idleTimeout;
ReservedThreadExecutor reserved = getBean(ReservedThreadExecutor.class);
if (reserved != null)
reserved.setIdleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
} }
/** /**
@ -857,17 +862,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Runner started for {}", QueuedThreadPool.this); LOG.debug("Runner started for {}", QueuedThreadPool.this);
Runnable job = null; boolean idle = true;
try try
{ {
Runnable job = null;
while (true) while (true)
{ {
// If we had a job, signal that we are idle again // If we had a job,
if (job != null) if (job != null)
{ {
// signal that we are idle again
if (!addCounts(0, 1)) if (!addCounts(0, 1))
break; break;
job = null; idle = true;
} }
// else check we are still running // else check we are still running
else if (_counts.getHi() == Integer.MIN_VALUE) else if (_counts.getHi() == Integer.MIN_VALUE)
@ -881,19 +888,13 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
job = _jobs.poll(); job = _jobs.poll();
if (job == null) if (job == null)
{ {
// Wait for a job // No job immediately available maybe we should shrink?
long idleTimeout = getIdleTimeout(); long idleTimeout = getIdleTimeout();
job = idleJobPoll(idleTimeout);
// If still no job?
if (job == null)
{
// maybe we should shrink
if (getThreads() > _minThreads && idleTimeout > 0) if (getThreads() > _minThreads && idleTimeout > 0)
{ {
long last = _lastShrink.get(); long last = _lastShrink.get();
long now = System.nanoTime(); long now = System.nanoTime();
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout)) if ((now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout))
{ {
if (_lastShrink.compareAndSet(last, now)) if (_lastShrink.compareAndSet(last, now))
{ {
@ -903,10 +904,17 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
} }
} }
} }
// Wait for a job
job = idleJobPoll(idleTimeout);
// If still no job?
if (job == null)
// continue to try again // continue to try again
continue; continue;
} }
}
idle = false;
// run job // run job
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -936,7 +944,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
removeThread(thread); removeThread(thread);
// Decrement the total thread count and the idle count if we had no job // Decrement the total thread count and the idle count if we had no job
addCounts(-1, job == null ? -1 : 0); addCounts(-1, idle ? -1 : 0);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} exited for {}", thread, QueuedThreadPool.this); LOG.debug("{} exited for {}", thread, QueuedThreadPool.this);

View File

@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.core.StringContains.containsString; import static org.hamcrest.core.StringContains.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
@ -251,7 +252,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
job1._stopping.countDown(); job1._stopping.countDown();
assertTrue(job1._stopped.await(10, TimeUnit.SECONDS)); assertTrue(job1._stopped.await(10, TimeUnit.SECONDS));
waitForIdle(tp, 1); waitForIdle(tp, 1);
assertThat(tp.getThreads(), is(4)); waitForThreads(tp, 4);
// finish job 2,3,4 // finish job 2,3,4
job2._stopping.countDown(); job2._stopping.countDown();
@ -261,12 +262,12 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertTrue(job3._stopped.await(10, TimeUnit.SECONDS)); assertTrue(job3._stopped.await(10, TimeUnit.SECONDS));
assertTrue(job4._stopped.await(10, TimeUnit.SECONDS)); assertTrue(job4._stopped.await(10, TimeUnit.SECONDS));
waitForIdle(tp, 4); waitForIdle(tp, 3);
assertThat(tp.getThreads(), is(4)); assertThat(tp.getThreads(), is(3));
long duration = System.nanoTime(); long duration = System.nanoTime();
waitForThreads(tp, 3); waitForThreads(tp, 2);
assertThat(tp.getIdleThreads(), is(3)); assertThat(tp.getIdleThreads(), is(2));
duration = System.nanoTime() - duration; duration = System.nanoTime() - duration;
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout() / 2L)); assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout() / 2L));
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout() * 2L)); assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout() * 2L));
@ -505,6 +506,57 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
tp.stop(); tp.stop();
} }
@Test
public void testSteadyShrink() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
Runnable job = () ->
{
try
{
latch.await();
}
catch(InterruptedException e)
{
e.printStackTrace();
}
};
QueuedThreadPool tp = new QueuedThreadPool();
tp.setMinThreads(2);
tp.setMaxThreads(10);
tp.setIdleTimeout(500);
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);
tp.start();
waitForIdle(tp, 2);
waitForThreads(tp, 2);
for (int i = 0; i < 10; i++)
tp.execute(job);
waitForThreads(tp, 10);
int threads = tp.getThreads();
// let the jobs run
latch.countDown();
for (int i = 5; i-- > 0; )
{
Thread.sleep(250);
tp.execute(job);
}
// Assert that steady rate of jobs doesn't prevent some idling out
assertThat(tp.getThreads(), lessThan(threads));
threads = tp.getThreads();
for (int i = 5; i-- > 0; )
{
Thread.sleep(250);
tp.execute(job);
}
assertThat(tp.getThreads(), lessThan(threads));
}
@Test @Test
public void testEnsureThreads() throws Exception public void testEnsureThreads() throws Exception
{ {