inject new Q into ThreadPool

This commit is contained in:
Greg Wilkins 2013-03-15 12:30:49 +11:00
parent 17e1839089
commit c577783fa6
3 changed files with 23 additions and 10 deletions

View File

@ -44,8 +44,14 @@
<!-- =========================================================== -->
<Arg name="threadpool">
<New id="threadpool" class="org.eclipse.jetty.util.thread.QueuedThreadPool">
<Set name="minThreads">10</Set>
<Set name="maxThreads">200</Set>
<Arg name="minThreads">10</Arg>
<Arg name="maxThreads">200</Arg>
<Arg name="idleTimeout">60000</Arg>
<Arg name="queue">
<New class="org.eclipse.jetty.util.ConcurrentArrayBlockingQueue$Unbounded">
<Arg>32</Arg>
</New>
</Arg>
<Set name="detailedDump">false</Set>
</New>
</Arg>

View File

@ -54,7 +54,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
private final AtomicLong _lastShrink = new AtomicLong();
private final ConcurrentLinkedQueue<Thread> _threads = new ConcurrentLinkedQueue<>();
private final Object _joinLock = new Object();
private BlockingQueue<Runnable> _jobs;
private final BlockingQueue<Runnable> _jobs;
private String _name = "qtp" + hashCode();
private int _idleTimeout;
private int _maxThreads;
@ -79,11 +79,21 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
}
public QueuedThreadPool(int maxThreads, int minThreads, int idleTimeout)
{
this(maxThreads, minThreads, 60000,null);
}
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue)
{
setMinThreads(minThreads);
setMaxThreads(maxThreads);
setIdleTimeout(idleTimeout);
setStopTimeout(5000);
if (queue==null)
queue=new BlockingArrayQueue<Runnable>(_minThreads, _minThreads);
_jobs=queue;
}
@Override
@ -92,9 +102,6 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
super.doStart();
_threadsStarted.set(0);
if (_jobs == null)
setQueue(new BlockingArrayQueue<Runnable>(_minThreads, _minThreads));
startThreads(_minThreads);
}
@ -602,7 +609,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
*/
public void setQueue(BlockingQueue<Runnable> queue)
{
_jobs = queue;
throw new UnsupportedOperationException("Use constructor injection");
}
/**

View File

@ -104,7 +104,7 @@ public class ConcurrentArrayBlockingQueueUnboundedTest extends ConcurrentArrayQu
int sum = 0;
for (int j = 0; j < iterations * factor; ++j)
sum += queue.take();
System.err.println("Taking reader " + reader + " completed: " + sum);
//System.err.println("Taking reader " + reader + " completed: " + sum);
return sum;
}
}));
@ -116,7 +116,7 @@ public class ConcurrentArrayBlockingQueueUnboundedTest extends ConcurrentArrayQu
int sum = 0;
for (int j = 0; j < iterations * factor; ++j)
sum += queue.poll(5, TimeUnit.SECONDS);
System.err.println("Polling Reader " + reader + " completed: " + sum);
//System.err.println("Polling Reader " + reader + " completed: " + sum);
return sum;
}
}));
@ -131,7 +131,7 @@ public class ConcurrentArrayBlockingQueueUnboundedTest extends ConcurrentArrayQu
{
for (int j = 0; j < iterations; ++j)
queue.offer(1);
System.err.println("Writer " + writer + " completed");
//System.err.println("Writer " + writer + " completed");
return null;
}
});