Issue #4105 QueuedThreadPool
Shrink the QTP under steady load. Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
parent
6fc42d8ba2
commit
3ad6780b27
|
@ -159,6 +159,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
||||||
}
|
}
|
||||||
addBean(_tryExecutor);
|
addBean(_tryExecutor);
|
||||||
|
|
||||||
|
_lastShrink.set(System.nanoTime());
|
||||||
|
|
||||||
super.doStart();
|
super.doStart();
|
||||||
// The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped.
|
// The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped.
|
||||||
_counts.set(0, 0); // threads, idle
|
_counts.set(0, 0); // threads, idle
|
||||||
|
@ -290,6 +292,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
||||||
public void setIdleTimeout(int idleTimeout)
|
public void setIdleTimeout(int idleTimeout)
|
||||||
{
|
{
|
||||||
_idleTimeout = idleTimeout;
|
_idleTimeout = idleTimeout;
|
||||||
|
ReservedThreadExecutor reserved = getBean(ReservedThreadExecutor.class);
|
||||||
|
if (reserved != null)
|
||||||
|
reserved.setIdleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -857,17 +862,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Runner started for {}", QueuedThreadPool.this);
|
LOG.debug("Runner started for {}", QueuedThreadPool.this);
|
||||||
|
|
||||||
Runnable job = null;
|
boolean idle = true;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
Runnable job = null;
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
// If we had a job, signal that we are idle again
|
// If we had a job,
|
||||||
if (job != null)
|
if (job != null)
|
||||||
{
|
{
|
||||||
|
// signal that we are idle again
|
||||||
if (!addCounts(0, 1))
|
if (!addCounts(0, 1))
|
||||||
break;
|
break;
|
||||||
job = null;
|
idle = true;
|
||||||
}
|
}
|
||||||
// else check we are still running
|
// else check we are still running
|
||||||
else if (_counts.getHi() == Integer.MIN_VALUE)
|
else if (_counts.getHi() == Integer.MIN_VALUE)
|
||||||
|
@ -881,19 +888,13 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
||||||
job = _jobs.poll();
|
job = _jobs.poll();
|
||||||
if (job == null)
|
if (job == null)
|
||||||
{
|
{
|
||||||
// Wait for a job
|
// No job immediately available maybe we should shrink?
|
||||||
long idleTimeout = getIdleTimeout();
|
long idleTimeout = getIdleTimeout();
|
||||||
job = idleJobPoll(idleTimeout);
|
|
||||||
|
|
||||||
// If still no job?
|
|
||||||
if (job == null)
|
|
||||||
{
|
|
||||||
// maybe we should shrink
|
|
||||||
if (getThreads() > _minThreads && idleTimeout > 0)
|
if (getThreads() > _minThreads && idleTimeout > 0)
|
||||||
{
|
{
|
||||||
long last = _lastShrink.get();
|
long last = _lastShrink.get();
|
||||||
long now = System.nanoTime();
|
long now = System.nanoTime();
|
||||||
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout))
|
if ((now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout))
|
||||||
{
|
{
|
||||||
if (_lastShrink.compareAndSet(last, now))
|
if (_lastShrink.compareAndSet(last, now))
|
||||||
{
|
{
|
||||||
|
@ -903,10 +904,17 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for a job
|
||||||
|
job = idleJobPoll(idleTimeout);
|
||||||
|
|
||||||
|
// If still no job?
|
||||||
|
if (job == null)
|
||||||
// continue to try again
|
// continue to try again
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
idle = false;
|
||||||
|
|
||||||
// run job
|
// run job
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
|
@ -936,7 +944,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
||||||
removeThread(thread);
|
removeThread(thread);
|
||||||
|
|
||||||
// Decrement the total thread count and the idle count if we had no job
|
// Decrement the total thread count and the idle count if we had no job
|
||||||
addCounts(-1, job == null ? -1 : 0);
|
addCounts(-1, idle ? -1 : 0);
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("{} exited for {}", thread, QueuedThreadPool.this);
|
LOG.debug("{} exited for {}", thread, QueuedThreadPool.this);
|
||||||
|
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.hamcrest.Matchers.lessThan;
|
||||||
import static org.hamcrest.core.StringContains.containsString;
|
import static org.hamcrest.core.StringContains.containsString;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
@ -251,7 +252,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
||||||
job1._stopping.countDown();
|
job1._stopping.countDown();
|
||||||
assertTrue(job1._stopped.await(10, TimeUnit.SECONDS));
|
assertTrue(job1._stopped.await(10, TimeUnit.SECONDS));
|
||||||
waitForIdle(tp, 1);
|
waitForIdle(tp, 1);
|
||||||
assertThat(tp.getThreads(), is(4));
|
waitForThreads(tp, 4);
|
||||||
|
|
||||||
// finish job 2,3,4
|
// finish job 2,3,4
|
||||||
job2._stopping.countDown();
|
job2._stopping.countDown();
|
||||||
|
@ -261,12 +262,12 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
||||||
assertTrue(job3._stopped.await(10, TimeUnit.SECONDS));
|
assertTrue(job3._stopped.await(10, TimeUnit.SECONDS));
|
||||||
assertTrue(job4._stopped.await(10, TimeUnit.SECONDS));
|
assertTrue(job4._stopped.await(10, TimeUnit.SECONDS));
|
||||||
|
|
||||||
waitForIdle(tp, 4);
|
waitForIdle(tp, 3);
|
||||||
assertThat(tp.getThreads(), is(4));
|
assertThat(tp.getThreads(), is(3));
|
||||||
|
|
||||||
long duration = System.nanoTime();
|
long duration = System.nanoTime();
|
||||||
waitForThreads(tp, 3);
|
waitForThreads(tp, 2);
|
||||||
assertThat(tp.getIdleThreads(), is(3));
|
assertThat(tp.getIdleThreads(), is(2));
|
||||||
duration = System.nanoTime() - duration;
|
duration = System.nanoTime() - duration;
|
||||||
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout() / 2L));
|
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout() / 2L));
|
||||||
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout() * 2L));
|
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout() * 2L));
|
||||||
|
@ -505,6 +506,57 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
||||||
tp.stop();
|
tp.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSteadyShrink() throws Exception
|
||||||
|
{
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
Runnable job = () ->
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
latch.await();
|
||||||
|
}
|
||||||
|
catch(InterruptedException e)
|
||||||
|
{
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
QueuedThreadPool tp = new QueuedThreadPool();
|
||||||
|
tp.setMinThreads(2);
|
||||||
|
tp.setMaxThreads(10);
|
||||||
|
tp.setIdleTimeout(500);
|
||||||
|
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);
|
||||||
|
|
||||||
|
tp.start();
|
||||||
|
waitForIdle(tp, 2);
|
||||||
|
waitForThreads(tp, 2);
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++)
|
||||||
|
tp.execute(job);
|
||||||
|
|
||||||
|
waitForThreads(tp, 10);
|
||||||
|
int threads = tp.getThreads();
|
||||||
|
// let the jobs run
|
||||||
|
latch.countDown();
|
||||||
|
|
||||||
|
for (int i = 5; i-- > 0; )
|
||||||
|
{
|
||||||
|
Thread.sleep(250);
|
||||||
|
tp.execute(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert that steady rate of jobs doesn't prevent some idling out
|
||||||
|
assertThat(tp.getThreads(), lessThan(threads));
|
||||||
|
threads = tp.getThreads();
|
||||||
|
for (int i = 5; i-- > 0; )
|
||||||
|
{
|
||||||
|
Thread.sleep(250);
|
||||||
|
tp.execute(job);
|
||||||
|
}
|
||||||
|
assertThat(tp.getThreads(), lessThan(threads));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEnsureThreads() throws Exception
|
public void testEnsureThreads() throws Exception
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue