Fix idle race by allowing idle count to go negative (#3694)

* Fix idle race by allowing idle count to go negative
* Fixed flakey dump test
* don't exit Runner on exceptions
* cleanup after pair programming with sbordet
* longer benchmark runs
* optimized by removing need to check isRunning

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Code reformatting.
* Fixed stop logic.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Greg Wilkins 2019-05-29 17:24:01 +02:00 committed by GitHub
parent 8548a49200
commit 0c61ec3e4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 494 additions and 549 deletions

View File

@ -18,9 +18,12 @@
package org.eclipse.jetty.util.thread.jmh;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
@ -42,16 +45,16 @@ import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
@State(Scope.Benchmark)
@Warmup(iterations = 5, time = 10000, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 3, time = 10000, timeUnit = TimeUnit.MILLISECONDS)
@Warmup(iterations = 8, time = 1000, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 3, time = 1000, timeUnit = TimeUnit.MILLISECONDS)
public class ThreadPoolBenchmark
{
public enum Type
{
QTP, ETP;
QTP, ETP, LQTP, LETP, AQTP, AETP;
}
@Param({ "QTP", "ETP"})
@Param({ "QTP", "ETP" /*, "LQTP", "LETP", "AQTP", "AETP" */ })
Type type;
@Param({ "200" })
@ -65,11 +68,39 @@ public class ThreadPoolBenchmark
switch(type)
{
case QTP:
pool = new QueuedThreadPool(size,size);
{
QueuedThreadPool qtp = new QueuedThreadPool(size, size, new BlockingArrayQueue<>(32768, 32768));
qtp.setReservedThreads(0);
pool = qtp;
break;
}
case ETP:
pool = new ExecutorThreadPool(size,size);
pool = new ExecutorThreadPool(size, size, new BlockingArrayQueue<>(32768, 32768));
break;
case LQTP:
{
QueuedThreadPool qtp = new QueuedThreadPool(size, size, new LinkedBlockingQueue<>());
qtp.setReservedThreads(0);
pool = qtp;
break;
}
case LETP:
pool = new ExecutorThreadPool(size, size, new LinkedBlockingQueue<>());
break;
case AQTP:
{
QueuedThreadPool qtp = new QueuedThreadPool(size, size, new ArrayBlockingQueue<>(32768));
qtp.setReservedThreads(0);
pool = qtp;
break;
}
case AETP:
pool = new ExecutorThreadPool(size, size, new ArrayBlockingQueue<>(32768));
break;
}
LifeCycle.start(pool);

View File

@ -25,6 +25,21 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class AtomicBiInteger extends AtomicLong
{
public AtomicBiInteger()
{
}
public AtomicBiInteger(long encoded)
{
super(encoded);
}
public AtomicBiInteger(int hi, int lo)
{
super(encode(hi, lo));
}
/**
* @return the hi value
*/

View File

@ -1,175 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.util.concurrent.atomic.AtomicLong;
/**
* An AtomicLong with additional methods to treat it as three 21 bit unsigned words.
*/
public class AtomicTriInteger extends AtomicLong
{
public static int MAX_VALUE = 0x1FFFFF;
public static int MIN_VALUE = 0;
/**
* Sets the hi and lo values.
*
* @param w0 the 0th word
* @param w1 the 1st word
* @param w2 the 2nd word
*/
public void set(int w0, int w1, int w2)
{
set(encode(w0, w1, w2));
}
/**
* Atomically sets the word values to the given updated values only if
* the current encoded value is as expected.
*
* @param expectEncoded the expected encoded value
* @param w0 the 0th word
* @param w1 the 1st word
* @param w2 the 2nd word
* @return {@code true} if successful.
*/
public boolean compareAndSet(long expectEncoded, int w0, int w1, int w2)
{
return compareAndSet(expectEncoded, encode(w0, w1, w2));
}
/**
* Atomically adds the given deltas to the current hi and lo values.
*
* @param delta0 the delta to apply to the 0th word value
* @param delta1 the delta to apply to the 1st word value
* @param delta2 the delta to apply to the 2nd word value
*/
public void add(int delta0, int delta1, int delta2)
{
while (true)
{
long encoded = get();
long update = encode(
getWord0(encoded) + delta0,
getWord1(encoded) + delta1,
getWord2(encoded) + delta2);
if (compareAndSet(encoded, update))
return;
}
}
/**
* Gets word 0 value
*
* @return the 16 bit value as an int
*/
public int getWord0()
{
return getWord0(get());
}
/**
* Gets word 1 value
*
* @return the 16 bit value as an int
*/
public int getWord1()
{
return getWord1(get());
}
/**
* Gets word 2 value
*
* @return the 16 bit value as an int
*/
public int getWord2()
{
return getWord2(get());
}
/**
* Gets word 0 value from the given encoded value.
*
* @param encoded the encoded value
* @return the 16 bit value as an int
*/
public static int getWord0(long encoded)
{
return (int)((encoded >> 42) & MAX_VALUE);
}
/**
* Gets word 0 value from the given encoded value.
*
* @param encoded the encoded value
* @return the 16 bit value as an int
*/
public static int getWord1(long encoded)
{
return (int)((encoded >> 21) & MAX_VALUE);
}
/**
* Gets word 0 value from the given encoded value.
*
* @param encoded the encoded value
* @return the 16 bit value as an int
*/
public static int getWord2(long encoded)
{
return (int)(encoded & MAX_VALUE);
}
/**
* Encodes 4 16 bit words values into a long.
*
* @param w0 the 0th word
* @param w1 the 1st word
* @param w2 the 2nd word
* @return the encoded value
*/
public static long encode(int w0, int w1, int w2)
{
if (w0 < MIN_VALUE
|| w0 > MAX_VALUE
|| w1 < MIN_VALUE
|| w1 > MAX_VALUE
|| w2 < MIN_VALUE
|| w2 > MAX_VALUE)
throw new IllegalArgumentException(String.format("Words must be %d <= word <= %d: %d, %d, %d", MIN_VALUE, MAX_VALUE, w0, w1, w2));
long wl0 = ((long)w0) & MAX_VALUE;
long wl1 = ((long)w1) & MAX_VALUE;
long wl2 = ((long)w2) & MAX_VALUE;
return (wl0 << 42) + (wl1 << 21) + (wl2);
}
@Override
public String toString()
{
long encoded = get();
int w0 = getWord0(encoded);
int w1 = getWord1(encoded);
int w2 = getWord2(encoded);
return String.format("{%d,%d,%d}", w0, w1, w2);
}
}

View File

@ -484,13 +484,56 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
@Override
public int drainTo(Collection<? super E> c)
{
throw new UnsupportedOperationException();
return drainTo(c, Integer.MAX_VALUE);
}
@Override
public int drainTo(Collection<? super E> c, int maxElements)
{
throw new UnsupportedOperationException();
int elements = 0;
_tailLock.lock();
try
{
_headLock.lock();
try
{
final int head = _indexes[HEAD_OFFSET];
final int tail = _indexes[TAIL_OFFSET];
final int capacity = _elements.length;
int i = head;
while (i!=tail && elements<maxElements)
{
elements++;
c.add((E)_elements[i]);
++i;
if (i == capacity)
i = 0;
}
if (i==tail)
{
_indexes[HEAD_OFFSET] = 0;
_indexes[TAIL_OFFSET] = 0;
_size.set(0);
}
else
{
_indexes[HEAD_OFFSET] = i;
_size.addAndGet(-elements);
}
}
finally
{
_headLock.unlock();
}
}
finally
{
_tailLock.unlock();
}
return elements;
}
/*----------------------------------------------------------------------------*/
@ -505,7 +548,6 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
_tailLock.lock();
try
{
_headLock.lock();
try
{

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -63,7 +64,12 @@ public class ExecutorThreadPool extends ContainerLifeCycle implements ThreadPool
public ExecutorThreadPool(int maxThreads, int minThreads)
{
this(new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()), minThreads, -1, null);
this(maxThreads, minThreads, new LinkedBlockingQueue<>());
}
public ExecutorThreadPool(int maxThreads, int minThreads, BlockingQueue<Runnable> queue)
{
this(new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, queue), minThreads, -1, null);
}
public ExecutorThreadPool(ThreadPoolExecutor executor)

View File

@ -29,7 +29,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.AtomicTriInteger;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
@ -48,15 +48,16 @@ import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadPool, Dumpable, TryExecutor
{
private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
private static Runnable NOOP = () -> {};
/**
* Encodes thread counts: <dl>
* <dt>Word0</dt><dd>Total thread count (including starting and idle)</dd>
* <dt>Word1</dt><dd>Starting threads</dd>
* <dt>Word2</dt><dd>Idle threads</dd>
* Encodes thread counts:
* <dl>
* <dt>Hi</dt><dd>Total thread count or Integer.MIN_VALUE if stopping</dd>
* <dt>Lo</dt><dd>Net idle threads == idle threads - job queue size</dd>
* </dl>
*/
private final AtomicTriInteger _counts = new AtomicTriInteger();
private final AtomicBiInteger _counts = new AtomicBiInteger(Integer.MIN_VALUE, 0);
private final AtomicLong _lastShrink = new AtomicLong();
private final Set<Thread> _threads = ConcurrentHashMap.newKeySet();
private final Object _joinLock = new Object();
@ -89,6 +90,11 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
this(maxThreads, minThreads, 60000);
}
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("queue") BlockingQueue<Runnable> queue)
{
this(maxThreads, minThreads, 60000, -1, queue, null);
}
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout)
{
this(maxThreads, minThreads, idleTimeout, null);
@ -106,11 +112,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads, @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
{
if (maxThreads < minThreads) {
if (maxThreads < minThreads)
throw new IllegalArgumentException("max threads (" + maxThreads + ") less than min threads ("
+ minThreads + ")");
}
setMinThreads(minThreads);
setMaxThreads(maxThreads);
setIdleTimeout(idleTimeout);
@ -118,7 +122,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
setReservedThreads(reservedThreads);
if (queue == null)
{
int capacity=Math.max(_minThreads, 8);
int capacity = Math.max(_minThreads, 8) * 1024;
queue = new BlockingArrayQueue<>(capacity, capacity);
}
_jobs = queue;
@ -155,8 +159,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
addBean(_tryExecutor);
super.doStart();
_counts.set(0,0,0); // threads, starting, idle
// The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped.
_counts.set(0, 0); // threads, idle
ensureThreads();
}
@ -171,17 +175,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
super.doStop();
// Signal the Runner threads that we are stopping
int threads = _counts.getAndSetHi(Integer.MIN_VALUE);
// If stop timeout try to gracefully stop
long timeout = getStopTimeout();
BlockingQueue<Runnable> jobs = getQueue();
// If no stop timeout, clear job queue
if (timeout <= 0)
jobs.clear();
// Fill job Q with noop jobs to wakeup idle
Runnable noop = () -> {};
for (int i = getThreads(); i-- > 0; )
jobs.offer(noop);
if (timeout > 0)
{
// Fill the job queue with noop jobs to wakeup idle threads.
for (int i = 0; i < threads; ++i)
{
jobs.offer(NOOP);
}
// try to let jobs complete naturally for half our stop time
joinThreads(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2);
@ -215,8 +221,11 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
else
{
for (Thread unstopped : _threads)
{
LOG.warn("{} Couldn't stop {}", this, unstopped);
}
}
}
// Close any un-executed jobs
while (!_jobs.isEmpty())
@ -233,7 +242,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
LOG.warn(t);
}
}
else if (job != noop)
else if (job != NOOP)
LOG.warn("Stopped without executing or closing {}", job);
}
@ -291,8 +300,6 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
@Override
public void setMaxThreads(int maxThreads)
{
if (maxThreads<AtomicTriInteger.MIN_VALUE || maxThreads>AtomicTriInteger.MAX_VALUE)
throw new IllegalArgumentException("maxThreads="+maxThreads);
if (_budget != null)
_budget.check(maxThreads);
_maxThreads = maxThreads;
@ -473,15 +480,48 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
@Override
public void execute(Runnable job)
{
if (!isRunning() || !_jobs.offer(job))
// Determine if we need to start a thread, use and idle thread or just queue this job
boolean startThread;
while (true)
{
// Get the atomic counts
long counts = _counts.get();
// Get the number of threads started (might not yet be running)
int threads = AtomicBiInteger.getHi(counts);
if (threads == Integer.MIN_VALUE)
throw new RejectedExecutionException(job.toString());
// Get the number of truly idle threads. This count is reduced by the
// job queue size so that any threads that are idle but are about to take
// a job from the queue are not counted.
int idle = AtomicBiInteger.getLo(counts);
// Start a thread if we have insufficient idle threads to meet demand
// and we are not at max threads.
startThread = (idle <= 0 && threads < _maxThreads);
// The job will be run by an idle thread when available
if (!_counts.compareAndSet(counts, threads + (startThread ? 1 : 0), idle - 1))
continue;
break;
}
if (!_jobs.offer(job))
{
// reverse our changes to _counts.
if (addCounts(startThread ? -1 : 0, 1))
LOG.warn("{} rejected {}", this, job);
throw new RejectedExecutionException(job.toString());
}
if (LOG.isDebugEnabled())
LOG.debug("queue {}",job);
// Make sure there is at least one thread executing the job.
ensureThreads();
LOG.debug("queue {} startThread={}", job, startThread);
// Start a thread if one was needed
if (startThread)
startThread();
}
@Override
@ -500,12 +540,16 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
synchronized (_joinLock)
{
while (isRunning())
{
_joinLock.wait();
}
}
while (isStopping())
{
Thread.sleep(1);
}
}
/**
* @return the total number of threads currently in the pool
@ -514,7 +558,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
@ManagedAttribute("number of threads in the pool")
public int getThreads()
{
return _counts.getWord0();
int threads = _counts.getHi();
return Math.max(0, threads);
}
/**
@ -524,7 +569,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
@ManagedAttribute("number of idle threads in the pool")
public int getIdleThreads()
{
return _counts.getWord2();
int idle = _counts.getLo();
return Math.max(0, idle);
}
/**
@ -556,27 +602,33 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
private void ensureThreads()
{
while (isRunning())
while (true)
{
long counts = _counts.get();
int threads = AtomicTriInteger.getWord0(counts);
int starting = AtomicTriInteger.getWord1(counts);
int idle = AtomicTriInteger.getWord2(counts);
int queue = getQueueSize();
int threads = AtomicBiInteger.getHi(counts);
if (threads == Integer.MIN_VALUE)
break;
if (threads >= _maxThreads)
break;
if (threads >= _minThreads && (starting + idle) >= queue)
break;
if (!_counts.compareAndSet(counts, threads + 1, starting + 1, idle))
// If we have less than min threads
// OR insufficient idle threads to meet demand
int idle = AtomicBiInteger.getLo(counts);
if (threads < _minThreads || (idle < 0 && threads < _maxThreads))
{
// Then try to start a thread.
if (_counts.compareAndSet(counts, threads + 1, idle))
startThread();
// Otherwise continue to check state again.
continue;
}
break;
}
}
protected void startThread()
{
boolean started = false;
try
{
if (LOG.isDebugEnabled())
LOG.debug("Starting thread {}",this);
Thread thread = newThread(_runnable);
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
@ -591,9 +643,23 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
finally
{
if (!started)
_counts.add(-1,-1,0); // threads, starting, idle
addCounts(-1, 0); // threads, idle
}
}
private boolean addCounts(int deltaThreads, int deltaIdle)
{
while (true)
{
long encoded = _counts.get();
int threads = AtomicBiInteger.getHi(encoded);
int idle = AtomicBiInteger.getLo(encoded);
if (threads == Integer.MIN_VALUE) // This is a marker that the pool is stopped.
return false;
long update = AtomicBiInteger.encode(threads + deltaThreads, idle + deltaIdle);
if (_counts.compareAndSet(encoded, update))
return true;
}
}
protected Thread newThread(Runnable runnable)
@ -684,12 +750,11 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
public String toString()
{
long count = _counts.get();
int threads = AtomicTriInteger.getWord0(count);
int starting = AtomicTriInteger.getWord1(count);
int idle = AtomicTriInteger.getWord2(count);
int threads = Math.max(0, AtomicBiInteger.getHi(count));
int idle = Math.max(0, AtomicBiInteger.getLo(count));
int queue = getQueueSize();
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,s=%d,i=%d,r=%d,q=%d}[%s]",
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]",
getClass().getSimpleName(),
_name,
hashCode(),
@ -697,7 +762,6 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
getMinThreads(),
threads,
getMaxThreads(),
starting,
idle,
getReservedThreads(),
queue,
@ -768,7 +832,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
buf.append(thread.getState()).append(":").append(System.lineSeparator());
for (StackTraceElement element : thread.getStackTrace())
{
buf.append(" at ").append(element.toString()).append(System.lineSeparator());
}
return buf.toString();
}
}
@ -777,36 +843,56 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
private class Runner implements Runnable
{
private Runnable idleJobPoll(long idleTimeout) throws InterruptedException
{
if (idleTimeout <= 0)
return _jobs.take();
return _jobs.poll(idleTimeout, TimeUnit.MILLISECONDS);
}
@Override
public void run()
{
boolean idle = false;
if (LOG.isDebugEnabled())
LOG.debug("Runner started for {}", QueuedThreadPool.this);
Runnable job = null;
try
{
job = _jobs.poll();
idle = job==null;
_counts.add(0,-1,idle?1:0); // threads, starting, idle
if (LOG.isDebugEnabled())
LOG.debug("Runner started with {} for {}", job, QueuedThreadPool.this);
// All threads start idle (not yet taken a job)
if (!addCounts(0, 1))
return;
while (true)
{
if (job == null)
// If we had a job, signal that we are idle again
if (job != null)
{
if (!idle)
if (!addCounts(0, 1))
break;
}
// else check we are still running
else if (_counts.getHi() == Integer.MIN_VALUE)
{
idle = true;
_counts.add(0,0,1); // threads, starting, idle
break;
}
try
{
// Look for an immediately available job
job = _jobs.poll();
if (job == null)
{
// Wait for a job
long idleTimeout = getIdleTimeout();
job = idleJobPoll(idleTimeout);
// maybe we should shrink?
if (job == null && getThreads() > _minThreads && idleTimeout > 0)
// If still no job?
if (job == null)
{
// maybe we should shrink
if (getThreads() > _minThreads && idleTimeout > 0)
{
long last = _lastShrink.get();
long now = System.nanoTime();
@ -820,33 +906,21 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
}
}
}
// continue to try again
continue;
}
}
// run job
if (job != null)
{
if (idle)
{
idle = false;
_counts.add(0,0,-1); // threads, starting, idle
}
if (LOG.isDebugEnabled())
LOG.debug("run {} in {}", job, QueuedThreadPool.this);
runJob(job);
if (LOG.isDebugEnabled())
LOG.debug("ran {} in {}", job, QueuedThreadPool.this);
// Clear interrupted status
// Clear any interrupted status
Thread.interrupted();
}
if (!isRunning())
break;
job = _jobs.poll();
}
}
catch (InterruptedException e)
{
if (LOG.isDebugEnabled())
@ -855,24 +929,24 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
}
catch (Throwable e)
{
LOG.warn(String.format("Unexpected thread death: %s in %s", job, QueuedThreadPool.this), e);
LOG.warn(e);
}
}
}
finally
{
_counts.add(-1,0,idle?-1:0); // threads, starting, idle
removeThread(Thread.currentThread());
ensureThreads();
Thread thread = Thread.currentThread();
removeThread(thread);
// Decrement the total thread count and the idle count if we had no job
addCounts(-1, job == null ? -1 : 0);
if (LOG.isDebugEnabled())
LOG.debug("Runner exited for {}", QueuedThreadPool.this);
}
}
LOG.debug("{} exited for {}", thread, QueuedThreadPool.this);
private Runnable idleJobPoll(long idleTimeout) throws InterruptedException
{
if (idleTimeout <= 0)
return _jobs.take();
return _jobs.poll(idleTimeout, TimeUnit.MILLISECONDS);
// There is a chance that we shrunk just as a job was queued for us, so
// check again if we have sufficient threads to meet demand
ensureThreads();
}
}
}
}

View File

@ -21,7 +21,6 @@ package org.eclipse.jetty.util.thread;
import java.io.IOException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
@ -70,16 +69,12 @@ public class ScheduledExecutorScheduler extends AbstractLifeCycle implements Sch
@Override
protected void doStart() throws Exception
{
scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactory()
{
@Override
public Thread newThread(Runnable r)
scheduler = new ScheduledThreadPoolExecutor(1, r ->
{
Thread thread = ScheduledExecutorScheduler.this.thread = new Thread(threadGroup, r, name);
thread.setDaemon(daemon);
thread.setContextClassLoader(classloader);
return thread;
}
});
scheduler.setRemoveOnCancelPolicy(true);
super.doStart();

View File

@ -1,103 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import org.junit.jupiter.api.Test;
import static org.eclipse.jetty.util.AtomicTriInteger.MAX_VALUE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AtomicTriIntegerTest
{
@Test
public void testBitOperations()
{
long encoded;
encoded = AtomicTriInteger.encode(0,0,0);
assertThat(AtomicTriInteger.getWord0(encoded),is(0));
assertThat(AtomicTriInteger.getWord1(encoded),is(0));
assertThat(AtomicTriInteger.getWord2(encoded),is(0));
encoded = AtomicTriInteger.encode(1,2,3);
assertThat(AtomicTriInteger.getWord0(encoded),is(1));
assertThat(AtomicTriInteger.getWord1(encoded),is(2));
assertThat(AtomicTriInteger.getWord2(encoded),is(3));
encoded = AtomicTriInteger.encode(MAX_VALUE, MAX_VALUE, MAX_VALUE);
assertThat(AtomicTriInteger.getWord0(encoded),is(MAX_VALUE));
assertThat(AtomicTriInteger.getWord1(encoded),is(MAX_VALUE));
assertThat(AtomicTriInteger.getWord2(encoded),is(MAX_VALUE));
assertThrows(IllegalArgumentException.class,()-> AtomicTriInteger.encode(-1, MAX_VALUE, MAX_VALUE));
assertThrows(IllegalArgumentException.class,()-> AtomicTriInteger.encode(MAX_VALUE, -1, MAX_VALUE));
assertThrows(IllegalArgumentException.class,()-> AtomicTriInteger.encode(MAX_VALUE, MAX_VALUE, -1));
assertThrows(IllegalArgumentException.class,()-> AtomicTriInteger.encode(MAX_VALUE+1, MAX_VALUE, MAX_VALUE));
assertThrows(IllegalArgumentException.class,()-> AtomicTriInteger.encode(MAX_VALUE, MAX_VALUE+1, MAX_VALUE));
assertThrows(IllegalArgumentException.class,()-> AtomicTriInteger.encode(MAX_VALUE, MAX_VALUE, MAX_VALUE+1));
}
@Test
public void testSetGet()
{
AtomicTriInteger ati = new AtomicTriInteger();
ati.set(1,2,3);
assertThat(ati.getWord0(),is(1));
assertThat(ati.getWord1(),is(2));
assertThat(ati.getWord2(),is(3));
}
@Test
public void testCompareAndSet()
{
AtomicTriInteger ati = new AtomicTriInteger();
ati.set(1,2,3);
long value = ati.get();
ati.set(2,3,4);
assertFalse(ati.compareAndSet(value,5,6,7));
assertThat(ati.getWord0(),is(2));
assertThat(ati.getWord1(),is(3));
assertThat(ati.getWord2(),is(4));
value = ati.get();
assertTrue(ati.compareAndSet(value,6,7,8));
assertThat(ati.getWord0(),is(6));
assertThat(ati.getWord1(),is(7));
assertThat(ati.getWord2(),is(8));
}
@Test
public void testAdd()
{
AtomicTriInteger ati = new AtomicTriInteger();
ati.set(1,2,3);
ati.add(-1,-2,4);
assertThat(ati.getWord0(),is(0));
assertThat(ati.getWord1(),is(0));
assertThat(ati.getWord2(),is(7));
}
}

View File

@ -18,12 +18,9 @@
package org.eclipse.jetty.util;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -31,10 +28,16 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class BlockingArrayQueueTest
{
@Test
@ -494,4 +497,28 @@ public class BlockingArrayQueueTest
assertTrue(iterator.hasNext());
assertFalse(iterator.hasPrevious());
}
@Test
public void testDrainTo() throws Exception
{
BlockingArrayQueue<String> queue = new BlockingArrayQueue<>();
queue.add("one");
queue.add("two");
queue.add("three");
queue.add("four");
queue.add("five");
queue.add("six");
List<String> to = new ArrayList<>();
queue.drainTo(to,3);
assertThat(to, Matchers.contains("one", "two", "three"));
assertThat(queue.size(),Matchers.is(3));
assertThat(queue, Matchers.contains("four", "five", "six"));
queue.drainTo(to);
assertThat(to, Matchers.contains("one", "two", "three", "four", "five", "six"));
assertThat(queue.size(),Matchers.is(0));
assertThat(queue, Matchers.empty());
}
}

View File

@ -181,6 +181,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
RunningJob job4=new RunningJob("JOB4");
tp.execute(job4);
assertFalse(job4._run.await(1,TimeUnit.SECONDS));
assertThat(tp.getThreads(),is(4));
// finish job 0
job0._stopping.countDown();
@ -214,12 +215,12 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
duration = System.nanoTime() - duration;
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout()/2L));
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout()*2L));
tp.stop();
}
@Test
public void testThreadPoolFailingJobs() throws Exception
{
try (StacklessLogging stackless = new StacklessLogging(QueuedThreadPool.class))
{
QueuedThreadPool tp= new QueuedThreadPool();
tp.setMinThreads(2);
@ -227,6 +228,8 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
tp.setIdleTimeout(900);
tp.setThreadsPriority(Thread.NORM_PRIORITY-1);
try (StacklessLogging stackless = new StacklessLogging(QueuedThreadPool.class))
{
tp.start();
// min threads started
@ -297,8 +300,10 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertTrue(job4._stopped.await(10,TimeUnit.SECONDS));
waitForIdle(tp,2);
assertThat(tp.getThreads(),is(2));
waitForThreads(tp,2);
}
tp.stop();
}
@Test
@ -340,6 +345,8 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
RunningJob job4 = new RunningJob();
tp.execute(job4);
assertTrue(job4._run.await(5, TimeUnit.SECONDS));
tp.stop();
}
@Test
@ -364,7 +371,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
tp.execute(job0);
tp.execute(job1);
// Add a more jobs (which should not be run)
// Add more jobs (which should not be run)
RunningJob job2=new RunningJob();
CloseableJob job3=new CloseableJob();
RunningJob job4=new RunningJob();
@ -391,6 +398,8 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
// Verify ClosableJobs have not been run but have been closed
assertThat(job4._run.await(200, TimeUnit.MILLISECONDS), is(false));
assertThat(job3._closed.await(200, TimeUnit.MILLISECONDS), is(true));
tp.stop();
}
@ -437,6 +446,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
}
waitForThreads(tp,2);
waitForIdle(tp,2);
tp.stop();
}
@Test
@ -484,6 +494,27 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertEquals(idle, tp.getIdleThreads());
}
private void waitForReserved(QueuedThreadPool tp, int reserved)
{
long now=TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
long start=now;
ReservedThreadExecutor reservedThreadExecutor = tp.getBean(ReservedThreadExecutor.class);
while (reservedThreadExecutor.getAvailable()!=reserved && (now-start)<10000)
{
try
{
Thread.sleep(50);
}
catch(InterruptedException ignored)
{
}
now=TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
assertEquals(reserved, reservedThreadExecutor.getAvailable());
}
private void waitForThreads(QueuedThreadPool tp, int threads)
{
long now=TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
@ -520,6 +551,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
Thread.sleep(100);
assertThat(tp.getThreads(),greaterThanOrEqualTo(5));
}
tp.stop();
}
@Test
@ -548,24 +580,26 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
public void testDump() throws Exception
{
QueuedThreadPool pool = new QueuedThreadPool(4, 3);
pool.setIdleTimeout(10000);
String dump = pool.dump();
// TODO use hamcrest 2.0 regex matcher
assertThat(dump,containsString("STOPPED"));
assertThat(dump,containsString(",3<=0<=4,s=0,i=0,r=-1,q=0"));
assertThat(dump,containsString(",3<=0<=4,i=0,r=-1,q=0"));
assertThat(dump,containsString("[NO_TRY]"));
pool.setReservedThreads(2);
dump = pool.dump();
assertThat(dump,containsString("STOPPED"));
assertThat(dump,containsString(",3<=0<=4,s=0,i=0,r=2,q=0"));
assertThat(dump,containsString(",3<=0<=4,i=0,r=2,q=0"));
assertThat(dump,containsString("[NO_TRY]"));
pool.start();
waitForIdle(pool,3);
Thread.sleep(250); // TODO need to give time for threads to read idle poll after setting idle
dump = pool.dump();
assertThat(count(dump," - STARTED"),is(2));
assertThat(dump,containsString(",3<=3<=4,s=0,i=3,r=2,q=0"));
assertThat(dump,containsString(",3<=3<=4,i=3,r=2,q=0"));
assertThat(dump,containsString("[ReservedThreadExecutor@"));
assertThat(count(dump," IDLE "),is(3));
assertThat(count(dump," RESERVED "),is(0));
@ -585,10 +619,10 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
}
});
started.await();
Thread.sleep(250); // TODO need to give time for threads to read idle poll after setting idle
dump = pool.dump();
assertThat(count(dump," - STARTED"),is(2));
assertThat(dump,containsString(",3<=3<=4,s=0,i=2,r=2,q=0"));
assertThat(dump,containsString(",3<=3<=4,i=2,r=2,q=0"));
assertThat(dump,containsString("[ReservedThreadExecutor@"));
assertThat(count(dump," IDLE "),is(2));
assertThat(count(dump," WAITING "),is(1));
@ -598,7 +632,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
pool.setDetailedDump(true);
dump = pool.dump();
assertThat(count(dump," - STARTED"),is(2));
assertThat(dump,containsString(",3<=3<=4,s=0,i=2,r=2,q=0"));
assertThat(dump,containsString(",3<=3<=4,i=2,r=2,q=0"));
assertThat(dump,containsString("s=0/2"));
assertThat(dump,containsString("[ReservedThreadExecutor@"));
assertThat(count(dump," IDLE "),is(2));
@ -607,12 +641,11 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
assertThat(count(dump,"QueuedThreadPoolTest.lambda$testDump$"),is(1));
assertFalse(pool.tryExecute(()->{}));
while(pool.getIdleThreads()==2)
Thread.sleep(10);
waitForReserved(pool,1);
Thread.sleep(250); // TODO need to give time for threads to read idle poll after setting idle
dump = pool.dump();
assertThat(count(dump," - STARTED"),is(2));
assertThat(dump,containsString(",3<=3<=4,s=0,i=1,r=2,q=0"));
assertThat(dump,containsString(",3<=3<=4,i=1,r=2,q=0"));
assertThat(dump,containsString("s=1/2"));
assertThat(dump,containsString("[ReservedThreadExecutor@"));
assertThat(count(dump," IDLE "),is(1));