Jetty 9.4.x 4105 4121 4122 queued thread pool (#4146)
Several QTP fixes: * #4105 Threads without jobs now check if they should idle die before waiting rather than before, this allows idling under steady load.3ad6780
* #4121 ThreadFactory behaviour supported by doing thread config within newThread call.7b306d7
* #4122 Always clear the interrupted status.c37a4ff
task = queue.poll(timeout); Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
parent
7810f2dec2
commit
813fcb79ab
|
@ -55,8 +55,10 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
/**
|
||||
* Encodes thread counts:
|
||||
* <dl>
|
||||
* <dt>Hi</dt><dd>Total thread count or Integer.MIN_VALUE if stopping</dd>
|
||||
* <dt>Lo</dt><dd>Net idle threads == idle threads - job queue size</dd>
|
||||
* <dt>Hi</dt><dd>Total thread count or Integer.MIN_VALUE if the pool is stopping</dd>
|
||||
* <dt>Lo</dt><dd>Net idle threads == idle threads - job queue size. Essentially if positive,
|
||||
* this represents the effective number of idle threads, and if negative it represents the
|
||||
* demand for more threads</dd>
|
||||
* </dl>
|
||||
*/
|
||||
private final AtomicBiInteger _counts = new AtomicBiInteger(Integer.MIN_VALUE, 0);
|
||||
|
@ -159,6 +161,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
}
|
||||
addBean(_tryExecutor);
|
||||
|
||||
_lastShrink.set(System.nanoTime());
|
||||
|
||||
super.doStart();
|
||||
// The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped.
|
||||
_counts.set(0, 0); // threads, idle
|
||||
|
@ -290,6 +294,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
public void setIdleTimeout(int idleTimeout)
|
||||
{
|
||||
_idleTimeout = idleTimeout;
|
||||
ReservedThreadExecutor reserved = getBean(ReservedThreadExecutor.class);
|
||||
if (reserved != null)
|
||||
reserved.setIdleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -443,7 +450,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
@ManagedAttribute("size of the job queue")
|
||||
public int getQueueSize()
|
||||
{
|
||||
return _jobs.size();
|
||||
// The idle counter encodes demand, which is the effective queue size
|
||||
int idle = _counts.getLo();
|
||||
return Math.max(0, -idle);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -631,9 +640,6 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
try
|
||||
{
|
||||
Thread thread = newThread(_runnable);
|
||||
thread.setDaemon(isDaemon());
|
||||
thread.setPriority(getThreadsPriority());
|
||||
thread.setName(_name + "-" + thread.getId());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Starting {}", thread);
|
||||
_threads.add(thread);
|
||||
|
@ -665,7 +671,11 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
|
||||
protected Thread newThread(Runnable runnable)
|
||||
{
|
||||
return new Thread(_threadGroup, runnable);
|
||||
Thread thread = new Thread(_threadGroup, runnable);
|
||||
thread.setDaemon(isDaemon());
|
||||
thread.setPriority(getThreadsPriority());
|
||||
thread.setName(_name + "-" + thread.getId());
|
||||
return thread;
|
||||
}
|
||||
|
||||
protected void removeThread(Thread thread)
|
||||
|
@ -857,17 +867,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Runner started for {}", QueuedThreadPool.this);
|
||||
|
||||
Runnable job = null;
|
||||
boolean idle = true;
|
||||
try
|
||||
{
|
||||
Runnable job = null;
|
||||
while (true)
|
||||
{
|
||||
// If we had a job, signal that we are idle again
|
||||
// If we had a job,
|
||||
if (job != null)
|
||||
{
|
||||
// signal that we are idle again
|
||||
if (!addCounts(0, 1))
|
||||
break;
|
||||
job = null;
|
||||
idle = true;
|
||||
}
|
||||
// else check we are still running
|
||||
else if (_counts.getHi() == Integer.MIN_VALUE)
|
||||
|
@ -881,42 +893,37 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
job = _jobs.poll();
|
||||
if (job == null)
|
||||
{
|
||||
// Wait for a job
|
||||
// No job immediately available maybe we should shrink?
|
||||
long idleTimeout = getIdleTimeout();
|
||||
if (idleTimeout > 0 && getThreads() > _minThreads)
|
||||
{
|
||||
long last = _lastShrink.get();
|
||||
long now = System.nanoTime();
|
||||
if ((now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout) && _lastShrink.compareAndSet(last, now))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("shrinking {}", QueuedThreadPool.this);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for a job, only after we have checked if we should shrink
|
||||
job = idleJobPoll(idleTimeout);
|
||||
|
||||
// If still no job?
|
||||
if (job == null)
|
||||
{
|
||||
// maybe we should shrink
|
||||
if (getThreads() > _minThreads && idleTimeout > 0)
|
||||
{
|
||||
long last = _lastShrink.get();
|
||||
long now = System.nanoTime();
|
||||
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout))
|
||||
{
|
||||
if (_lastShrink.compareAndSet(last, now))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("shrinking {}", QueuedThreadPool.this);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// continue to try again
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
idle = false;
|
||||
|
||||
// run job
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("run {} in {}", job, QueuedThreadPool.this);
|
||||
runJob(job);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("ran {} in {}", job, QueuedThreadPool.this);
|
||||
|
||||
// Clear any interrupted status
|
||||
Thread.interrupted();
|
||||
}
|
||||
catch (InterruptedException e)
|
||||
{
|
||||
|
@ -928,6 +935,11 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
{
|
||||
LOG.warn(e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
// Clear any interrupted status
|
||||
Thread.interrupted();
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
|
@ -936,7 +948,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
|||
removeThread(thread);
|
||||
|
||||
// Decrement the total thread count and the idle count if we had no job
|
||||
addCounts(-1, job == null ? -1 : 0);
|
||||
addCounts(-1, idle ? -1 : 0);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} exited for {}", thread, QueuedThreadPool.this);
|
||||
|
||||
|
|
|
@ -28,14 +28,13 @@ import org.eclipse.jetty.util.log.Log;
|
|||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
@ -188,7 +187,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
QueuedThreadPool tp = new QueuedThreadPool();
|
||||
tp.setMinThreads(2);
|
||||
tp.setMaxThreads(4);
|
||||
tp.setIdleTimeout(900);
|
||||
tp.setIdleTimeout(1000);
|
||||
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);
|
||||
|
||||
tp.start();
|
||||
|
@ -199,44 +198,49 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
|
||||
// Doesn't shrink to less than min threads
|
||||
Thread.sleep(3 * tp.getIdleTimeout() / 2);
|
||||
waitForThreads(tp, 2);
|
||||
waitForIdle(tp, 2);
|
||||
assertThat(tp.getThreads(), is(2));
|
||||
assertThat(tp.getIdleThreads(), is(2));
|
||||
|
||||
// Run job0
|
||||
RunningJob job0 = new RunningJob("JOB0");
|
||||
tp.execute(job0);
|
||||
assertTrue(job0._run.await(10, TimeUnit.SECONDS));
|
||||
waitForIdle(tp, 1);
|
||||
assertThat(tp.getThreads(), is(2));
|
||||
assertThat(tp.getIdleThreads(), is(1));
|
||||
|
||||
// Run job1
|
||||
RunningJob job1 = new RunningJob("JOB1");
|
||||
tp.execute(job1);
|
||||
assertTrue(job1._run.await(10, TimeUnit.SECONDS));
|
||||
waitForThreads(tp, 2);
|
||||
waitForIdle(tp, 0);
|
||||
assertThat(tp.getThreads(), is(2));
|
||||
assertThat(tp.getIdleThreads(), is(0));
|
||||
|
||||
// Run job2
|
||||
RunningJob job2 = new RunningJob("JOB2");
|
||||
tp.execute(job2);
|
||||
assertTrue(job2._run.await(10, TimeUnit.SECONDS));
|
||||
waitForThreads(tp, 3);
|
||||
waitForIdle(tp, 0);
|
||||
assertThat(tp.getThreads(), is(3));
|
||||
assertThat(tp.getIdleThreads(), is(0));
|
||||
|
||||
// Run job3
|
||||
RunningJob job3 = new RunningJob("JOB3");
|
||||
tp.execute(job3);
|
||||
assertTrue(job3._run.await(10, TimeUnit.SECONDS));
|
||||
waitForThreads(tp, 4);
|
||||
waitForIdle(tp, 0);
|
||||
assertThat(tp.getThreads(), is(4));
|
||||
assertThat(tp.getIdleThreads(), is(0));
|
||||
|
||||
// Check no short term change
|
||||
Thread.sleep(100);
|
||||
assertThat(tp.getThreads(), is(4));
|
||||
assertThat(tp.getIdleThreads(), is(0));
|
||||
|
||||
// Run job4. will be queued
|
||||
RunningJob job4 = new RunningJob("JOB4");
|
||||
tp.execute(job4);
|
||||
assertFalse(job4._run.await(1, TimeUnit.SECONDS));
|
||||
assertFalse(job4._run.await(250, TimeUnit.MILLISECONDS));
|
||||
assertThat(tp.getThreads(), is(4));
|
||||
assertThat(tp.getIdleThreads(), is(0));
|
||||
assertThat(tp.getQueueSize(), is(1));
|
||||
|
||||
// finish job 0
|
||||
job0._stopping.countDown();
|
||||
|
@ -246,12 +250,13 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
assertTrue(job4._run.await(10, TimeUnit.SECONDS));
|
||||
assertThat(tp.getThreads(), is(4));
|
||||
assertThat(tp.getIdleThreads(), is(0));
|
||||
assertThat(tp.getQueueSize(), is(0));
|
||||
|
||||
// finish job 1
|
||||
// finish job 1, and it's thread will become idle
|
||||
job1._stopping.countDown();
|
||||
assertTrue(job1._stopped.await(10, TimeUnit.SECONDS));
|
||||
waitForIdle(tp, 1);
|
||||
assertThat(tp.getThreads(), is(4));
|
||||
waitForThreads(tp, 4);
|
||||
|
||||
// finish job 2,3,4
|
||||
job2._stopping.countDown();
|
||||
|
@ -261,15 +266,9 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
assertTrue(job3._stopped.await(10, TimeUnit.SECONDS));
|
||||
assertTrue(job4._stopped.await(10, TimeUnit.SECONDS));
|
||||
|
||||
waitForIdle(tp, 4);
|
||||
assertThat(tp.getThreads(), is(4));
|
||||
|
||||
long duration = System.nanoTime();
|
||||
waitForThreads(tp, 3);
|
||||
assertThat(tp.getIdleThreads(), is(3));
|
||||
duration = System.nanoTime() - duration;
|
||||
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout() / 2L));
|
||||
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout() * 2L));
|
||||
// Eventually all will have 3 idle threads
|
||||
waitForIdle(tp, 3);
|
||||
assertThat(tp.getThreads(), is(3));
|
||||
|
||||
tp.stop();
|
||||
}
|
||||
|
@ -505,6 +504,58 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
tp.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSteadyShrink() throws Exception
|
||||
{
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Runnable job = () ->
|
||||
{
|
||||
try
|
||||
{
|
||||
latch.await();
|
||||
}
|
||||
catch(InterruptedException e)
|
||||
{
|
||||
e.printStackTrace();
|
||||
}
|
||||
};
|
||||
|
||||
QueuedThreadPool tp = new QueuedThreadPool();
|
||||
tp.setMinThreads(2);
|
||||
tp.setMaxThreads(10);
|
||||
int timeout = 500;
|
||||
tp.setIdleTimeout(timeout);
|
||||
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);
|
||||
|
||||
tp.start();
|
||||
waitForIdle(tp, 2);
|
||||
waitForThreads(tp, 2);
|
||||
|
||||
for (int i = 0; i < 10; i++)
|
||||
tp.execute(job);
|
||||
|
||||
waitForThreads(tp, 10);
|
||||
int threads = tp.getThreads();
|
||||
// let the jobs run
|
||||
latch.countDown();
|
||||
|
||||
for (int i = 5; i-- > 0; )
|
||||
{
|
||||
Thread.sleep(timeout / 2);
|
||||
tp.execute(job);
|
||||
}
|
||||
|
||||
// Assert that steady rate of jobs doesn't prevent some idling out
|
||||
assertThat(tp.getThreads(), lessThan(threads));
|
||||
threads = tp.getThreads();
|
||||
for (int i = 5; i-- > 0; )
|
||||
{
|
||||
Thread.sleep(timeout / 2);
|
||||
tp.execute(job);
|
||||
}
|
||||
assertThat(tp.getThreads(), lessThan(threads));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnsureThreads() throws Exception
|
||||
{
|
||||
|
@ -605,7 +656,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
}
|
||||
now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
}
|
||||
assertEquals(idle, tp.getIdleThreads());
|
||||
assertThat(tp.getIdleThreads(), is(idle));
|
||||
}
|
||||
|
||||
private void waitForReserved(QueuedThreadPool tp, int reserved)
|
||||
|
@ -624,7 +675,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
}
|
||||
now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
}
|
||||
assertEquals(reserved, reservedThreadExecutor.getAvailable());
|
||||
assertThat(reservedThreadExecutor.getAvailable(), is(reserved));
|
||||
}
|
||||
|
||||
private void waitForThreads(QueuedThreadPool tp, int threads)
|
||||
|
@ -642,7 +693,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
|||
}
|
||||
now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
|
||||
}
|
||||
assertEquals(threads, tp.getThreads());
|
||||
assertThat(tp.getThreads(), is(threads));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue