jetty-9 QueuedThreadPool creates new threads when idleThreads goes to 0

This commit is contained in:
Greg Wilkins 2012-09-17 17:36:48 +10:00
parent 9e622be846
commit b3381b4996
2 changed files with 58 additions and 52 deletions

View File

@ -44,6 +44,7 @@ import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
import org.omg.CORBA._IDLTypeStub;
@ManagedObject("A thread pool with no max bound by default")
public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
@ -102,12 +103,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
:new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
}
int threads=_threadsStarted.get();
while (isRunning() && threads<_minThreads)
{
startThread(threads);
threads=_threadsStarted.get();
}
startThreads(_minThreads);
}
@Override
@ -226,11 +222,8 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
_maxThreads=_minThreads;
int threads=_threadsStarted.get();
while (isStarted() && threads<_minThreads)
{
startThread(threads);
threads=_threadsStarted.get();
}
if (isStarted() && threads<_minThreads)
startThreads(_minThreads-threads);
}
/**
@ -347,19 +340,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
LOG.debug("{} dispatched {}",this,job);
if (isRunning())
{
final int jobQ = _jobs.size();
final int idle = getIdleThreads();
if(_jobs.offer(job))
{
// If we had no idle threads or the jobQ is greater than the idle threads
if (idle==0 || jobQ>idle)
{
int threads=_threadsStarted.get();
if (threads<_maxThreads)
startThread(threads);
}
return true;
}
return _jobs.offer(job);
}
return false;
}
@ -417,30 +398,38 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
}
private boolean startThread(int threads)
private boolean startThreads(int threadsToStart)
{
final int next=threads+1;
if (!_threadsStarted.compareAndSet(threads,next))
return false;
boolean started=false;
try
while(threadsToStart>0)
{
Thread thread=newThread(_runnable);
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(_name+"-"+thread.getId());
_threads.add(thread);
int threads=_threadsStarted.get();
if (threads>=_maxThreads)
return false;
if (!_threadsStarted.compareAndSet(threads,threads+1))
continue;
thread.start();
started=true;
boolean started=false;
try
{
Thread thread=newThread(_runnable);
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(_name+"-"+thread.getId());
_threads.add(thread);
thread.start();
started=true;
}
finally
{
if (!started)
_threadsStarted.decrementAndGet();
}
if (started)
threadsToStart--;
}
finally
{
if (!started)
_threadsStarted.decrementAndGet();
}
return started;
return true;
}
protected Thread newThread(Runnable runnable)
@ -506,7 +495,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
@Override
public String toString()
{
return String.format("%s{%s,%d<=%d<=%d/%d,%d}",_name,getState(),getMinThreads(),getIdleThreads(),getThreads(),getMaxThreads(),(_jobs==null?-1:_jobs.size()));
return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}",_name,getState(),getMinThreads(),getThreads(),getMaxThreads(),getIdleThreads(),(_jobs==null?-1:_jobs.size()));
}
private Runnable idleJobPoll() throws InterruptedException
@ -523,6 +512,12 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
try
{
Runnable job=_jobs.poll();
if(job!=null && _threadsIdle.get()==0)
{
startThreads(1);
}
while (isRunning())
{
// Job loop
@ -554,7 +549,9 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
shrink=_lastShrink.compareAndSet(last,now) &&
_threadsStarted.compareAndSet(size,size-1);
if (shrink)
{
return;
}
}
}
job=idleJobPoll();
@ -563,7 +560,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
}
finally
{
_threadsIdle.decrementAndGet();
if(_threadsIdle.decrementAndGet()==0)
{
startThreads(1);
}
}
}
}

View File

@ -77,9 +77,9 @@ public class QueuedThreadPoolTest
tp.setMaxThreads(10);
tp.setMaxIdleTimeMs(1000);
tp.setThreadsPriority(Thread.NORM_PRIORITY-1);
tp.start();
waitForThreads(tp,5);
waitForIdle(tp,5);
@ -106,15 +106,20 @@ public class QueuedThreadPoolTest
jobs[i]=new RunningJob();
tp.dispatch(jobs[i]);
}
waitForIdle(tp,0);
waitForThreads(tp,5);
waitForIdle(tp,1);
waitForThreads(tp,6);
job=new RunningJob();
tp.dispatch(job);
waitForThreads(tp,6);
waitForIdle(tp,1);
waitForThreads(tp,7);
job.stop();
waitForThreads(tp,5);
waitForIdle(tp,2);
waitForThreads(tp,7);
waitForThreads(tp,6);
waitForIdle(tp,1);
jobs[0].stop();
waitForIdle(tp,1);
@ -132,6 +137,7 @@ public class QueuedThreadPoolTest
jobs[i]=new RunningJob();
tp.dispatch(jobs[i]);
}
waitForIdle(tp,0);
waitForThreads(tp,10);
for (int i=0;i<9;i++)