Introduced setQueue(), removed maxQueued() and renamed maxIdleTimeMs() to idleTimeout().

The reason for this change is that CometD needs a way to explicitly set the queue implementation
in load testing to measure latencies.
Property maxQueued was used to decide, at start time, what queue implementation to use, but
now that can be set explicitly with setQueue().
The rename was done because in all other places the idle timeout has been renamed so.
This commit is contained in:
Simone Bordet 2012-12-11 14:45:14 +01:00
parent d161673ff7
commit 8e98148350
2 changed files with 148 additions and 148 deletions

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
@ -53,39 +52,37 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
private final AtomicInteger _threadsStarted = new AtomicInteger(); private final AtomicInteger _threadsStarted = new AtomicInteger();
private final AtomicInteger _threadsIdle = new AtomicInteger(); private final AtomicInteger _threadsIdle = new AtomicInteger();
private final AtomicLong _lastShrink = new AtomicLong(); private final AtomicLong _lastShrink = new AtomicLong();
private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
private final Object _joinLock = new Object(); private final Object _joinLock = new Object();
private BlockingQueue<Runnable> _jobs; private BlockingQueue<Runnable> _jobs;
private String _name; private String _name = "qtp" + hashCode();
private int _maxIdleTimeMs=60000; private int _idleTimeout;
private int _maxThreads; private int _maxThreads;
private int _minThreads; private int _minThreads;
private int _maxQueued=-1; private int _priority = Thread.NORM_PRIORITY;
private int _priority=Thread.NORM_PRIORITY; private boolean _daemon = false;
private boolean _daemon=false; private boolean _detailedDump = false;
private boolean _detailedDump=false;
public QueuedThreadPool() public QueuedThreadPool()
{ {
this(200,8,60000); this(200);
} }
public QueuedThreadPool(int maxThreads) public QueuedThreadPool(int maxThreads)
{ {
this(maxThreads,8,60000); this(maxThreads, 8);
} }
public QueuedThreadPool(int maxThreads, int minThreads) public QueuedThreadPool(int maxThreads, int minThreads)
{ {
this(maxThreads,minThreads,60000); this(maxThreads, minThreads, 60000);
} }
public QueuedThreadPool(int maxThreads, int minThreads, int maxIdleTimeMs) public QueuedThreadPool(int maxThreads, int minThreads, int idleTimeout)
{ {
_name="qtp"+super.hashCode();
setMinThreads(minThreads); setMinThreads(minThreads);
setMaxThreads(maxThreads); setMaxThreads(maxThreads);
setMaxIdleTimeMs(maxIdleTimeMs); setIdleTimeout(idleTimeout);
setStopTimeout(5000); setStopTimeout(5000);
} }
@ -95,12 +92,8 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
super.doStart(); super.doStart();
_threadsStarted.set(0); _threadsStarted.set(0);
if (_jobs==null) if (_jobs == null)
{ setQueue(new BlockingArrayQueue<Runnable>(_minThreads, _minThreads));
int maxQueued = getMaxQueued();
_jobs=maxQueued>0 ?new ArrayBlockingQueue<Runnable>(maxQueued)
:new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
}
startThreads(_minThreads); startThreads(_minThreads);
} }
@ -110,50 +103,56 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
{ {
super.doStop(); super.doStop();
long timeout=getStopTimeout(); long timeout = getStopTimeout();
BlockingQueue<Runnable> jobs = getQueue(); BlockingQueue<Runnable> jobs = getQueue();
// If no stop timeout, clear job queue // If no stop timeout, clear job queue
if (timeout<=0) if (timeout <= 0)
jobs.clear(); jobs.clear();
// Fill job Q with noop jobs to wakeup idle // Fill job Q with noop jobs to wakeup idle
Runnable noop = new Runnable(){@Override public void run(){}}; Runnable noop = new Runnable()
for (int i=_threadsStarted.get();i-->0;) {
@Override
public void run()
{
}
};
for (int i = _threadsStarted.get(); i-- > 0; )
jobs.offer(noop); jobs.offer(noop);
// try to jobs complete naturally for half our stop time // try to jobs complete naturally for half our stop time
long stopby=System.currentTimeMillis()+timeout/2; long stopby = System.currentTimeMillis() + timeout / 2;
for (Thread thread : _threads) for (Thread thread : _threads)
{ {
long canwait =stopby-System.currentTimeMillis(); long canwait = stopby - System.currentTimeMillis();
if (canwait>0) if (canwait > 0)
thread.join(canwait); thread.join(canwait);
} }
// If we still have threads running, get a bit more aggressive // If we still have threads running, get a bit more aggressive
// interrupt remaining threads // interrupt remaining threads
if (_threadsStarted.get()>0) if (_threadsStarted.get() > 0)
for (Thread thread : _threads) for (Thread thread : _threads)
thread.interrupt(); thread.interrupt();
// wait again for the other half of our stop time // wait again for the other half of our stop time
stopby=System.currentTimeMillis()+timeout/2; stopby = System.currentTimeMillis() + timeout / 2;
for (Thread thread : _threads) for (Thread thread : _threads)
{ {
long canwait =stopby-System.currentTimeMillis(); long canwait = stopby - System.currentTimeMillis();
if (canwait>0) if (canwait > 0)
thread.join(canwait); thread.join(canwait);
} }
Thread.yield(); Thread.yield();
int size=_threads.size(); int size = _threads.size();
if (size>0) if (size > 0)
{ {
LOG.warn("{} threads could not be stopped", size); LOG.warn("{} threads could not be stopped", size);
if ((size<=Runtime.getRuntime().availableProcessors()) || LOG.isDebugEnabled()) if ((size <= Runtime.getRuntime().availableProcessors()) || LOG.isDebugEnabled())
{ {
for (Thread unstopped : _threads) for (Thread unstopped : _threads)
{ {
@ -178,50 +177,56 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
*/ */
public void setDaemon(boolean daemon) public void setDaemon(boolean daemon)
{ {
_daemon=daemon; _daemon = daemon;
} }
/** Set the maximum thread idle time. /**
* Set the maximum thread idle time.
* Threads that are idle for longer than this period may be * Threads that are idle for longer than this period may be
* stopped. * stopped.
* Delegated to the named or anonymous Pool. * Delegated to the named or anonymous Pool.
* @see #getMaxIdleTimeMs *
* @param maxIdleTimeMs Max idle time in ms. * @param idleTimeout Max idle time in ms.
* @see #getIdleTimeout
*/ */
public void setMaxIdleTimeMs(int maxIdleTimeMs) public void setIdleTimeout(int idleTimeout)
{ {
_maxIdleTimeMs=maxIdleTimeMs; _idleTimeout = idleTimeout;
} }
/** Set the maximum number of threads. /**
* Set the maximum number of threads.
* Delegated to the named or anonymous Pool. * Delegated to the named or anonymous Pool.
* @see #getMaxThreads *
* @param maxThreads maximum number of threads. * @param maxThreads maximum number of threads.
* @see #getMaxThreads
*/ */
@Override @Override
public void setMaxThreads(int maxThreads) public void setMaxThreads(int maxThreads)
{ {
_maxThreads=maxThreads; _maxThreads = maxThreads;
if (_minThreads>_maxThreads) if (_minThreads > _maxThreads)
_minThreads=_maxThreads; _minThreads = _maxThreads;
} }
/** Set the minimum number of threads. /**
* Set the minimum number of threads.
* Delegated to the named or anonymous Pool. * Delegated to the named or anonymous Pool.
* @see #getMinThreads *
* @param minThreads minimum number of threads * @param minThreads minimum number of threads
* @see #getMinThreads
*/ */
@Override @Override
public void setMinThreads(int minThreads) public void setMinThreads(int minThreads)
{ {
_minThreads=minThreads; _minThreads = minThreads;
if (_minThreads>_maxThreads) if (_minThreads > _maxThreads)
_maxThreads=_minThreads; _maxThreads = _minThreads;
int threads=_threadsStarted.get(); int threads = _threadsStarted.get();
if (isStarted() && threads<_minThreads) if (isStarted() && threads < _minThreads)
startThreads(_minThreads-threads); startThreads(_minThreads - threads);
} }
/** /**
@ -231,51 +236,38 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
{ {
if (isRunning()) if (isRunning())
throw new IllegalStateException("started"); throw new IllegalStateException("started");
_name= name; _name = name;
} }
/** Set the priority of the pool threads. /**
* @param priority the new thread priority. * Set the priority of the pool threads.
*
* @param priority the new thread priority.
*/ */
public void setThreadsPriority(int priority) public void setThreadsPriority(int priority)
{ {
_priority=priority; _priority = priority;
} }
/** /**
* @return maximum queue size * Get the maximum thread idle time.
*/
@ManagedAttribute("maximum queue size")
public int getMaxQueued()
{
return _maxQueued;
}
/**
* @param max job queue size
*/
public void setMaxQueued(int max)
{
if (isRunning())
throw new IllegalStateException("started");
_maxQueued=max;
}
/** Get the maximum thread idle time.
* Delegated to the named or anonymous Pool. * Delegated to the named or anonymous Pool.
* @see #setMaxIdleTimeMs *
* @return Max idle time in ms. * @return Max idle time in ms.
* @see #setIdleTimeout
*/ */
@ManagedAttribute("maximum time a thread may be idle in ms") @ManagedAttribute("maximum time a thread may be idle in ms")
public int getMaxIdleTimeMs() public int getIdleTimeout()
{ {
return _maxIdleTimeMs; return _idleTimeout;
} }
/** Set the maximum number of threads. /**
* Set the maximum number of threads.
* Delegated to the named or anonymous Pool. * Delegated to the named or anonymous Pool.
* @see #setMaxThreads *
* @return maximum number of threads. * @return maximum number of threads.
* @see #setMaxThreads
*/ */
@Override @Override
@ManagedAttribute("maximum number of threads in the pool") @ManagedAttribute("maximum number of threads in the pool")
@ -284,10 +276,12 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
return _maxThreads; return _maxThreads;
} }
/** Get the minimum number of threads. /**
* Get the minimum number of threads.
* Delegated to the named or anonymous Pool. * Delegated to the named or anonymous Pool.
* @see #setMinThreads *
* @return minimum number of threads. * @return minimum number of threads.
* @see #setMinThreads
*/ */
@Override @Override
@ManagedAttribute("minimum number of threads in the pool") @ManagedAttribute("minimum number of threads in the pool")
@ -305,8 +299,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
return _name; return _name;
} }
/** Get the priority of the pool threads. /**
* @return the priority of the pool threads. * Get the priority of the pool threads.
*
* @return the priority of the pool threads.
*/ */
@ManagedAttribute("priority of threads in the pool") @ManagedAttribute("priority of threads in the pool")
public int getThreadsPriority() public int getThreadsPriority()
@ -336,12 +332,8 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
@Override @Override
public boolean dispatch(Runnable job) public boolean dispatch(Runnable job)
{ {
LOG.debug("{} dispatched {}",this,job); LOG.debug("{} dispatched {}", this, job);
if (isRunning()) return isRunning() && _jobs.offer(job);
{
return _jobs.offer(job);
}
return false;
} }
@Override @Override
@ -349,7 +341,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
{ {
if (!dispatch(job)) if (!dispatch(job))
{ {
LOG.warn("{} rejected {}",this,job); LOG.warn("{} rejected {}", this, job);
throw new RejectedExecutionException(job.toString()); throw new RejectedExecutionException(job.toString());
} }
} }
@ -397,31 +389,31 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
@ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs") @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs")
public boolean isLowOnThreads() public boolean isLowOnThreads()
{ {
return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get(); return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get();
} }
private boolean startThreads(int threadsToStart) private boolean startThreads(int threadsToStart)
{ {
while(threadsToStart>0) while (threadsToStart > 0)
{ {
int threads=_threadsStarted.get(); int threads = _threadsStarted.get();
if (threads>=_maxThreads) if (threads >= _maxThreads)
return false; return false;
if (!_threadsStarted.compareAndSet(threads,threads+1)) if (!_threadsStarted.compareAndSet(threads, threads + 1))
continue; continue;
boolean started=false; boolean started = false;
try try
{ {
Thread thread=newThread(_runnable); Thread thread = newThread(_runnable);
thread.setDaemon(isDaemon()); thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority()); thread.setPriority(getThreadsPriority());
thread.setName(_name+"-"+thread.getId()); thread.setName(_name + "-" + thread.getId());
_threads.add(thread); _threads.add(thread);
thread.start(); thread.start();
started=true; started = true;
} }
finally finally
{ {
@ -451,10 +443,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
public void dump(Appendable out, String indent) throws IOException public void dump(Appendable out, String indent) throws IOException
{ {
List<Object> dump = new ArrayList<>(getMaxThreads()); List<Object> dump = new ArrayList<>(getMaxThreads());
for (final Thread thread: _threads) for (final Thread thread : _threads)
{ {
final StackTraceElement[] trace=thread.getStackTrace(); final StackTraceElement[] trace = thread.getStackTrace();
boolean inIdleJobPoll=false; boolean inIdleJobPoll = false;
for (StackTraceElement t : trace) for (StackTraceElement t : trace)
{ {
if ("idleJobPoll".equals(t.getMethodName())) if ("idleJobPoll".equals(t.getMethodName()))
@ -463,7 +455,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
break; break;
} }
} }
final boolean idle=inIdleJobPoll; final boolean idle = inIdleJobPoll;
if (isDetailedDump()) if (isDetailedDump())
{ {
@ -472,9 +464,9 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
@Override @Override
public void dump(Appendable out, String indent) throws IOException public void dump(Appendable out, String indent) throws IOException
{ {
out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n'); out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "").append('\n');
if (!idle) if (!idle)
ContainerLifeCycle.dump(out,indent,Arrays.asList(trace)); ContainerLifeCycle.dump(out, indent, Arrays.asList(trace));
} }
@Override @Override
@ -486,23 +478,23 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
} }
else else
{ {
dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":"")); dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : ""));
} }
} }
ContainerLifeCycle.dumpObject(out,this); ContainerLifeCycle.dumpObject(out, this);
ContainerLifeCycle.dump(out,indent,dump); ContainerLifeCycle.dump(out, indent, dump);
} }
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}",_name,getState(),getMinThreads(),getThreads(),getMaxThreads(),getIdleThreads(),(_jobs==null?-1:_jobs.size())); return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size()));
} }
private Runnable idleJobPoll() throws InterruptedException private Runnable idleJobPoll() throws InterruptedException
{ {
return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS); return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
} }
private Runnable _runnable = new Runnable() private Runnable _runnable = new Runnable()
@ -510,23 +502,23 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
@Override @Override
public void run() public void run()
{ {
boolean shrink=false; boolean shrink = false;
try try
{ {
Runnable job=_jobs.poll(); Runnable job = _jobs.poll();
if(job!=null && _threadsIdle.get()==0) if (job != null && _threadsIdle.get() == 0)
{ {
startThreads(1); startThreads(1);
} }
while (isRunning()) while (isRunning())
{ {
// Job loop // Job loop
while (job!=null && isRunning()) while (job != null && isRunning())
{ {
runJob(job); runJob(job);
job=_jobs.poll(); job = _jobs.poll();
} }
// Idle loop // Idle loop
@ -534,46 +526,46 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
{ {
_threadsIdle.incrementAndGet(); _threadsIdle.incrementAndGet();
while (isRunning() && job==null) while (isRunning() && job == null)
{ {
if (_maxIdleTimeMs<=0) if (_idleTimeout <= 0)
job=_jobs.take(); job = _jobs.take();
else else
{ {
// maybe we should shrink? // maybe we should shrink?
final int size=_threadsStarted.get(); final int size = _threadsStarted.get();
if (size>_minThreads) if (size > _minThreads)
{ {
long last=_lastShrink.get(); long last = _lastShrink.get();
long now=System.currentTimeMillis(); long now = System.currentTimeMillis();
if (last==0 || (now-last)>_maxIdleTimeMs) if (last == 0 || (now - last) > _idleTimeout)
{ {
shrink=_lastShrink.compareAndSet(last,now) && shrink = _lastShrink.compareAndSet(last, now) &&
_threadsStarted.compareAndSet(size,size-1); _threadsStarted.compareAndSet(size, size - 1);
if (shrink) if (shrink)
{ {
return; return;
} }
} }
} }
job=idleJobPoll(); job = idleJobPoll();
} }
} }
} }
finally finally
{ {
if(_threadsIdle.decrementAndGet()==0) if (_threadsIdle.decrementAndGet() == 0)
{ {
startThreads(1); startThreads(1);
} }
} }
} }
} }
catch(InterruptedException e) catch (InterruptedException e)
{ {
LOG.ignore(e); LOG.ignore(e);
} }
catch(Throwable e) catch (Throwable e)
{ {
LOG.warn(e); LOG.warn(e);
} }
@ -605,6 +597,14 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
return _jobs; return _jobs;
} }
/**
* @param queue the job queue
*/
public void setQueue(BlockingQueue<Runnable> queue)
{
_jobs = queue;
}
/** /**
* @param id The thread ID to interrupt. * @param id The thread ID to interrupt.
* @return true if the thread was found and interrupted. * @return true if the thread was found and interrupted.
@ -612,9 +612,9 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
@ManagedOperation("interrupt a pool thread") @ManagedOperation("interrupt a pool thread")
public boolean interruptThread(@Name("id") long id) public boolean interruptThread(@Name("id") long id)
{ {
for (Thread thread: _threads) for (Thread thread : _threads)
{ {
if (thread.getId()==id) if (thread.getId() == id)
{ {
thread.interrupt(); thread.interrupt();
return true; return true;
@ -630,9 +630,9 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
@ManagedOperation("dump a pool thread stack") @ManagedOperation("dump a pool thread stack")
public String dumpThread(@Name("id") long id) public String dumpThread(@Name("id") long id)
{ {
for (Thread thread: _threads) for (Thread thread : _threads)
{ {
if (thread.getId()==id) if (thread.getId() == id)
{ {
StringBuilder buf = new StringBuilder(); StringBuilder buf = new StringBuilder();
buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n"); buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.util.thread; package org.eclipse.jetty.util.thread;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -30,6 +28,8 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import static org.junit.Assert.assertTrue;
@RunWith(AdvancedRunner.class) @RunWith(AdvancedRunner.class)
public class QueuedThreadPoolTest public class QueuedThreadPoolTest
{ {
@ -75,7 +75,7 @@ public class QueuedThreadPoolTest
QueuedThreadPool tp= new QueuedThreadPool(); QueuedThreadPool tp= new QueuedThreadPool();
tp.setMinThreads(5); tp.setMinThreads(5);
tp.setMaxThreads(10); tp.setMaxThreads(10);
tp.setMaxIdleTimeMs(1000); tp.setIdleTimeout(1000);
tp.setThreadsPriority(Thread.NORM_PRIORITY-1); tp.setThreadsPriority(Thread.NORM_PRIORITY-1);
tp.start(); tp.start();
@ -174,7 +174,7 @@ public class QueuedThreadPoolTest
QueuedThreadPool tp= new QueuedThreadPool(); QueuedThreadPool tp= new QueuedThreadPool();
tp.setMinThreads(2); tp.setMinThreads(2);
tp.setMaxThreads(10); tp.setMaxThreads(10);
tp.setMaxIdleTimeMs(400); tp.setIdleTimeout(400);
tp.setThreadsPriority(Thread.NORM_PRIORITY-1); tp.setThreadsPriority(Thread.NORM_PRIORITY-1);
tp.start(); tp.start();