Issue #4105 QueuedThreadPool
Shrink the QTP under steady load. Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
parent
6fc42d8ba2
commit
3ad6780b27
|
@ -159,6 +159,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
}
|
||||
addBean(_tryExecutor);
|
||||
|
||||
_lastShrink.set(System.nanoTime());
|
||||
|
||||
super.doStart();
|
||||
// The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped.
|
||||
_counts.set(0, 0); // threads, idle
|
||||
|
@ -290,6 +292,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
public void setIdleTimeout(int idleTimeout)
|
||||
{
|
||||
_idleTimeout = idleTimeout;
|
||||
ReservedThreadExecutor reserved = getBean(ReservedThreadExecutor.class);
|
||||
if (reserved != null)
|
||||
reserved.setIdleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -857,17 +862,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Runner started for {}", QueuedThreadPool.this);
|
||||
|
||||
Runnable job = null;
|
||||
boolean idle = true;
|
||||
try
|
||||
{
|
||||
Runnable job = null;
|
||||
while (true)
|
||||
{
|
||||
// If we had a job, signal that we are idle again
|
||||
// If we had a job,
|
||||
if (job != null)
|
||||
{
|
||||
// signal that we are idle again
|
||||
if (!addCounts(0, 1))
|
||||
break;
|
||||
job = null;
|
||||
idle = true;
|
||||
}
|
||||
// else check we are still running
|
||||
else if (_counts.getHi() == Integer.MIN_VALUE)
|
||||
|
@ -881,33 +888,34 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
job = _jobs.poll();
|
||||
if (job == null)
|
||||
{
|
||||
// Wait for a job
|
||||
// No job immediately available maybe we should shrink?
|
||||
long idleTimeout = getIdleTimeout();
|
||||
if (getThreads() > _minThreads && idleTimeout > 0)
|
||||
{
|
||||
long last = _lastShrink.get();
|
||||
long now = System.nanoTime();
|
||||
if ((now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout))
|
||||
{
|
||||
if (_lastShrink.compareAndSet(last, now))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("shrinking {}", QueuedThreadPool.this);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for a job
|
||||
job = idleJobPoll(idleTimeout);
|
||||
|
||||
// If still no job?
|
||||
if (job == null)
|
||||
{
|
||||
// maybe we should shrink
|
||||
if (getThreads() > _minThreads && idleTimeout > 0)
|
||||
{
|
||||
long last = _lastShrink.get();
|
||||
long now = System.nanoTime();
|
||||
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout))
|
||||
{
|
||||
if (_lastShrink.compareAndSet(last, now))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("shrinking {}", QueuedThreadPool.this);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// continue to try again
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
idle = false;
|
||||
|
||||
// run job
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("run {} in {}", job, QueuedThreadPool.this);
|
||||
|
@ -936,7 +944,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
removeThread(thread);
|
||||
|
||||
// Decrement the total thread count and the idle count if we had no job
|
||||
addCounts(-1, job == null ? -1 : 0);
|
||||
addCounts(-1, idle ? -1 : 0);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} exited for {}", thread, QueuedThreadPool.this);
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test;
|
|||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
|
@ -251,7 +252,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
job1._stopping.countDown();
|
||||
assertTrue(job1._stopped.await(10, TimeUnit.SECONDS));
|
||||
waitForIdle(tp, 1);
|
||||
assertThat(tp.getThreads(), is(4));
|
||||
waitForThreads(tp, 4);
|
||||
|
||||
// finish job 2,3,4
|
||||
job2._stopping.countDown();
|
||||
|
@ -261,12 +262,12 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
assertTrue(job3._stopped.await(10, TimeUnit.SECONDS));
|
||||
assertTrue(job4._stopped.await(10, TimeUnit.SECONDS));
|
||||
|
||||
waitForIdle(tp, 4);
|
||||
assertThat(tp.getThreads(), is(4));
|
||||
waitForIdle(tp, 3);
|
||||
assertThat(tp.getThreads(), is(3));
|
||||
|
||||
long duration = System.nanoTime();
|
||||
waitForThreads(tp, 3);
|
||||
assertThat(tp.getIdleThreads(), is(3));
|
||||
waitForThreads(tp, 2);
|
||||
assertThat(tp.getIdleThreads(), is(2));
|
||||
duration = System.nanoTime() - duration;
|
||||
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout() / 2L));
|
||||
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout() * 2L));
|
||||
|
@ -505,6 +506,57 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
tp.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSteadyShrink() throws Exception
|
||||
{
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Runnable job = () ->
|
||||
{
|
||||
try
|
||||
{
|
||||
latch.await();
|
||||
}
|
||||
catch(InterruptedException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
};
|
||||
|
||||
QueuedThreadPool tp = new QueuedThreadPool();
|
||||
tp.setMinThreads(2);
|
||||
tp.setMaxThreads(10);
|
||||
tp.setIdleTimeout(500);
|
||||
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);
|
||||
|
||||
tp.start();
|
||||
waitForIdle(tp, 2);
|
||||
waitForThreads(tp, 2);
|
||||
|
||||
for (int i = 0; i < 10; i++)
|
||||
tp.execute(job);
|
||||
|
||||
waitForThreads(tp, 10);
|
||||
int threads = tp.getThreads();
|
||||
// let the jobs run
|
||||
latch.countDown();
|
||||
|
||||
for (int i = 5; i-- > 0; )
|
||||
{
|
||||
Thread.sleep(250);
|
||||
tp.execute(job);
|
||||
}
|
||||
|
||||
// Assert that steady rate of jobs doesn't prevent some idling out
|
||||
assertThat(tp.getThreads(), lessThan(threads));
|
||||
threads = tp.getThreads();
|
||||
for (int i = 5; i-- > 0; )
|
||||
{
|
||||
Thread.sleep(250);
|
||||
tp.execute(job);
|
||||
}
|
||||
assertThat(tp.getThreads(), lessThan(threads));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnsureThreads() throws Exception
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue