From 3ad6780b27614abeef985e18b77bf56f1ddea0f6 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 2 Oct 2019 15:23:48 +1000 Subject: [PATCH] Issue #4105 QueuedThreadPool Shrink the QTP under steady load. Signed-off-by: Greg Wilkins --- .../jetty/util/thread/QueuedThreadPool.java | 52 +++++++++------- .../util/thread/QueuedThreadPoolTest.java | 62 +++++++++++++++++-- 2 files changed, 87 insertions(+), 27 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 6a110badd5d..eda9cd8f1c0 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -159,6 +159,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP } addBean(_tryExecutor); + _lastShrink.set(System.nanoTime()); + super.doStart(); // The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped. _counts.set(0, 0); // threads, idle @@ -290,6 +292,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP public void setIdleTimeout(int idleTimeout) { _idleTimeout = idleTimeout; + ReservedThreadExecutor reserved = getBean(ReservedThreadExecutor.class); + if (reserved != null) + reserved.setIdleTimeout(idleTimeout, TimeUnit.MILLISECONDS); } /** @@ -857,17 +862,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP if (LOG.isDebugEnabled()) LOG.debug("Runner started for {}", QueuedThreadPool.this); - Runnable job = null; + boolean idle = true; try { + Runnable job = null; while (true) { - // If we had a job, signal that we are idle again + // If we had a job, if (job != null) { + // signal that we are idle again if (!addCounts(0, 1)) break; - job = null; + idle = true; } // else check we are still running else if (_counts.getHi() == Integer.MIN_VALUE) @@ -881,33 +888,34 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP job = _jobs.poll(); if (job == null) { - // Wait for a job + // No job immediately available maybe we should shrink? long idleTimeout = getIdleTimeout(); + if (getThreads() > _minThreads && idleTimeout > 0) + { + long last = _lastShrink.get(); + long now = System.nanoTime(); + if ((now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout)) + { + if (_lastShrink.compareAndSet(last, now)) + { + if (LOG.isDebugEnabled()) + LOG.debug("shrinking {}", QueuedThreadPool.this); + break; + } + } + } + + // Wait for a job job = idleJobPoll(idleTimeout); // If still no job? if (job == null) - { - // maybe we should shrink - if (getThreads() > _minThreads && idleTimeout > 0) - { - long last = _lastShrink.get(); - long now = System.nanoTime(); - if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout)) - { - if (_lastShrink.compareAndSet(last, now)) - { - if (LOG.isDebugEnabled()) - LOG.debug("shrinking {}", QueuedThreadPool.this); - break; - } - } - } // continue to try again continue; - } } + idle = false; + // run job if (LOG.isDebugEnabled()) LOG.debug("run {} in {}", job, QueuedThreadPool.this); @@ -936,7 +944,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP removeThread(thread); // Decrement the total thread count and the idle count if we had no job - addCounts(-1, job == null ? -1 : 0); + addCounts(-1, idle ? -1 : 0); if (LOG.isDebugEnabled()) LOG.debug("{} exited for {}", thread, QueuedThreadPool.this); diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java index 892addbdeff..ad0fcc1a918 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java @@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.core.StringContains.containsString; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -251,7 +252,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest job1._stopping.countDown(); assertTrue(job1._stopped.await(10, TimeUnit.SECONDS)); waitForIdle(tp, 1); - assertThat(tp.getThreads(), is(4)); + waitForThreads(tp, 4); // finish job 2,3,4 job2._stopping.countDown(); @@ -261,12 +262,12 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest assertTrue(job3._stopped.await(10, TimeUnit.SECONDS)); assertTrue(job4._stopped.await(10, TimeUnit.SECONDS)); - waitForIdle(tp, 4); - assertThat(tp.getThreads(), is(4)); + waitForIdle(tp, 3); + assertThat(tp.getThreads(), is(3)); long duration = System.nanoTime(); - waitForThreads(tp, 3); - assertThat(tp.getIdleThreads(), is(3)); + waitForThreads(tp, 2); + assertThat(tp.getIdleThreads(), is(2)); duration = System.nanoTime() - duration; assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout() / 2L)); assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout() * 2L)); @@ -505,6 +506,57 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest tp.stop(); } + @Test + public void testSteadyShrink() throws Exception + { + CountDownLatch latch = new CountDownLatch(1); + Runnable job = () -> + { + try + { + latch.await(); + } + catch(InterruptedException e) + { + e.printStackTrace(); + } + }; + + QueuedThreadPool tp = new QueuedThreadPool(); + tp.setMinThreads(2); + tp.setMaxThreads(10); + tp.setIdleTimeout(500); + tp.setThreadsPriority(Thread.NORM_PRIORITY - 1); + + tp.start(); + waitForIdle(tp, 2); + waitForThreads(tp, 2); + + for (int i = 0; i < 10; i++) + tp.execute(job); + + waitForThreads(tp, 10); + int threads = tp.getThreads(); + // let the jobs run + latch.countDown(); + + for (int i = 5; i-- > 0; ) + { + Thread.sleep(250); + tp.execute(job); + } + + // Assert that steady rate of jobs doesn't prevent some idling out + assertThat(tp.getThreads(), lessThan(threads)); + threads = tp.getThreads(); + for (int i = 5; i-- > 0; ) + { + Thread.sleep(250); + tp.execute(job); + } + assertThat(tp.getThreads(), lessThan(threads)); + } + @Test public void testEnsureThreads() throws Exception {