Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-10.0.x

This commit is contained in:
Greg Wilkins 2019-10-02 22:11:03 +10:00
commit 87a707c219
2 changed files with 122 additions and 59 deletions

View File

@ -55,8 +55,10 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
/**
* Encodes thread counts:
* <dl>
* <dt>Hi</dt><dd>Total thread count or Integer.MIN_VALUE if stopping</dd>
* <dt>Lo</dt><dd>Net idle threads == idle threads - job queue size</dd>
* <dt>Hi</dt><dd>Total thread count or Integer.MIN_VALUE if the pool is stopping</dd>
* <dt>Lo</dt><dd>Net idle threads == idle threads - job queue size. Essentially if positive,
* this represents the effective number of idle threads, and if negative it represents the
* demand for more threads</dd>
* </dl>
*/
private final AtomicBiInteger _counts = new AtomicBiInteger(Integer.MIN_VALUE, 0);
@ -159,6 +161,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
}
addBean(_tryExecutor);
_lastShrink.set(System.nanoTime());
super.doStart();
// The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped.
_counts.set(0, 0); // threads, idle
@ -290,6 +294,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
public void setIdleTimeout(int idleTimeout)
{
_idleTimeout = idleTimeout;
ReservedThreadExecutor reserved = getBean(ReservedThreadExecutor.class);
if (reserved != null)
reserved.setIdleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
}
/**
@ -443,7 +450,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
@ManagedAttribute("size of the job queue")
public int getQueueSize()
{
return _jobs.size();
// The idle counter encodes demand, which is the effective queue size
int idle = _counts.getLo();
return Math.max(0, -idle);
}
/**
@ -631,9 +640,6 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
try
{
Thread thread = newThread(_runnable);
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(_name + "-" + thread.getId());
if (LOG.isDebugEnabled())
LOG.debug("Starting {}", thread);
_threads.add(thread);
@ -665,7 +671,11 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
protected Thread newThread(Runnable runnable)
{
return new Thread(_threadGroup, runnable);
Thread thread = new Thread(_threadGroup, runnable);
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(_name + "-" + thread.getId());
return thread;
}
protected void removeThread(Thread thread)
@ -847,17 +857,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
if (LOG.isDebugEnabled())
LOG.debug("Runner started for {}", QueuedThreadPool.this);
Runnable job = null;
boolean idle = true;
try
{
Runnable job = null;
while (true)
{
// If we had a job, signal that we are idle again
// If we had a job,
if (job != null)
{
// signal that we are idle again
if (!addCounts(0, 1))
break;
job = null;
idle = true;
}
// else check we are still running
else if (_counts.getHi() == Integer.MIN_VALUE)
@ -871,42 +883,37 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
job = _jobs.poll();
if (job == null)
{
// Wait for a job
// No job immediately available maybe we should shrink?
long idleTimeout = getIdleTimeout();
if (idleTimeout > 0 && getThreads() > _minThreads)
{
long last = _lastShrink.get();
long now = System.nanoTime();
if ((now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout) && _lastShrink.compareAndSet(last, now))
{
if (LOG.isDebugEnabled())
LOG.debug("shrinking {}", QueuedThreadPool.this);
break;
}
}
// Wait for a job, only after we have checked if we should shrink
job = idleJobPoll(idleTimeout);
// If still no job?
if (job == null)
{
// maybe we should shrink
if (getThreads() > _minThreads && idleTimeout > 0)
{
long last = _lastShrink.get();
long now = System.nanoTime();
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout))
{
if (_lastShrink.compareAndSet(last, now))
{
if (LOG.isDebugEnabled())
LOG.debug("shrinking {}", QueuedThreadPool.this);
break;
}
}
}
// continue to try again
continue;
}
}
idle = false;
// run job
if (LOG.isDebugEnabled())
LOG.debug("run {} in {}", job, QueuedThreadPool.this);
runJob(job);
if (LOG.isDebugEnabled())
LOG.debug("ran {} in {}", job, QueuedThreadPool.this);
// Clear any interrupted status
Thread.interrupted();
}
catch (InterruptedException e)
{
@ -918,6 +925,11 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
{
LOG.warn(e);
}
finally
{
// Clear any interrupted status
Thread.interrupted();
}
}
}
finally
@ -926,7 +938,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
removeThread(thread);
// Decrement the total thread count and the idle count if we had no job
addCounts(-1, job == null ? -1 : 0);
addCounts(-1, idle ? -1 : 0);
if (LOG.isDebugEnabled())
LOG.debug("{} exited for {}", thread, QueuedThreadPool.this);

View File

@ -28,14 +28,13 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -188,7 +187,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
QueuedThreadPool tp = new QueuedThreadPool();
tp.setMinThreads(2);
tp.setMaxThreads(4);
tp.setIdleTimeout(900);
tp.setIdleTimeout(1000);
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);
tp.start();
@ -199,44 +198,49 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
// Doesn't shrink to less than min threads
Thread.sleep(3 * tp.getIdleTimeout() / 2);
waitForThreads(tp, 2);
waitForIdle(tp, 2);
assertThat(tp.getThreads(), is(2));
assertThat(tp.getIdleThreads(), is(2));
// Run job0
RunningJob job0 = new RunningJob("JOB0");
tp.execute(job0);
assertTrue(job0._run.await(10, TimeUnit.SECONDS));
waitForIdle(tp, 1);
assertThat(tp.getThreads(), is(2));
assertThat(tp.getIdleThreads(), is(1));
// Run job1
RunningJob job1 = new RunningJob("JOB1");
tp.execute(job1);
assertTrue(job1._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 2);
waitForIdle(tp, 0);
assertThat(tp.getThreads(), is(2));
assertThat(tp.getIdleThreads(), is(0));
// Run job2
RunningJob job2 = new RunningJob("JOB2");
tp.execute(job2);
assertTrue(job2._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 3);
waitForIdle(tp, 0);
assertThat(tp.getThreads(), is(3));
assertThat(tp.getIdleThreads(), is(0));
// Run job3
RunningJob job3 = new RunningJob("JOB3");
tp.execute(job3);
assertTrue(job3._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 4);
waitForIdle(tp, 0);
assertThat(tp.getThreads(), is(4));
assertThat(tp.getIdleThreads(), is(0));
// Check no short term change
Thread.sleep(100);
assertThat(tp.getThreads(), is(4));
assertThat(tp.getIdleThreads(), is(0));
// Run job4. will be queued
RunningJob job4 = new RunningJob("JOB4");
tp.execute(job4);
assertFalse(job4._run.await(1, TimeUnit.SECONDS));
assertFalse(job4._run.await(250, TimeUnit.MILLISECONDS));
assertThat(tp.getThreads(), is(4));
assertThat(tp.getIdleThreads(), is(0));
assertThat(tp.getQueueSize(), is(1));
// finish job 0
job0._stopping.countDown();
@ -246,12 +250,13 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertTrue(job4._run.await(10, TimeUnit.SECONDS));
assertThat(tp.getThreads(), is(4));
assertThat(tp.getIdleThreads(), is(0));
assertThat(tp.getQueueSize(), is(0));
// finish job 1
// finish job 1, and it's thread will become idle
job1._stopping.countDown();
assertTrue(job1._stopped.await(10, TimeUnit.SECONDS));
waitForIdle(tp, 1);
assertThat(tp.getThreads(), is(4));
waitForThreads(tp, 4);
// finish job 2,3,4
job2._stopping.countDown();
@ -261,15 +266,9 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertTrue(job3._stopped.await(10, TimeUnit.SECONDS));
assertTrue(job4._stopped.await(10, TimeUnit.SECONDS));
waitForIdle(tp, 4);
assertThat(tp.getThreads(), is(4));
long duration = System.nanoTime();
waitForThreads(tp, 3);
assertThat(tp.getIdleThreads(), is(3));
duration = System.nanoTime() - duration;
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout() / 2L));
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout() * 2L));
// Eventually all will have 3 idle threads
waitForIdle(tp, 3);
assertThat(tp.getThreads(), is(3));
tp.stop();
}
@ -505,6 +504,58 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
tp.stop();
}
@Test
public void testSteadyShrink() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
Runnable job = () ->
{
try
{
latch.await();
}
catch(InterruptedException e)
{
e.printStackTrace();
}
};
QueuedThreadPool tp = new QueuedThreadPool();
tp.setMinThreads(2);
tp.setMaxThreads(10);
int timeout = 500;
tp.setIdleTimeout(timeout);
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);
tp.start();
waitForIdle(tp, 2);
waitForThreads(tp, 2);
for (int i = 0; i < 10; i++)
tp.execute(job);
waitForThreads(tp, 10);
int threads = tp.getThreads();
// let the jobs run
latch.countDown();
for (int i = 5; i-- > 0; )
{
Thread.sleep(timeout / 2);
tp.execute(job);
}
// Assert that steady rate of jobs doesn't prevent some idling out
assertThat(tp.getThreads(), lessThan(threads));
threads = tp.getThreads();
for (int i = 5; i-- > 0; )
{
Thread.sleep(timeout / 2);
tp.execute(job);
}
assertThat(tp.getThreads(), lessThan(threads));
}
@Test
public void testEnsureThreads() throws Exception
{
@ -605,7 +656,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
}
now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
assertEquals(idle, tp.getIdleThreads());
assertThat(tp.getIdleThreads(), is(idle));
}
private void waitForReserved(QueuedThreadPool tp, int reserved)
@ -624,7 +675,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
}
now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
assertEquals(reserved, reservedThreadExecutor.getAvailable());
assertThat(reservedThreadExecutor.getAvailable(), is(reserved));
}
private void waitForThreads(QueuedThreadPool tp, int threads)
@ -642,7 +693,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
}
now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
assertEquals(threads, tp.getThreads());
assertThat(tp.getThreads(), is(threads));
}
@Test