diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 0cab552d58d..9ae8aca8afa 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.RejectedExecutionException; @@ -53,39 +52,37 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo private final AtomicInteger _threadsStarted = new AtomicInteger(); private final AtomicInteger _threadsIdle = new AtomicInteger(); private final AtomicLong _lastShrink = new AtomicLong(); - private final ConcurrentLinkedQueue _threads=new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue _threads = new ConcurrentLinkedQueue<>(); private final Object _joinLock = new Object(); private BlockingQueue _jobs; - private String _name; - private int _maxIdleTimeMs=60000; + private String _name = "qtp" + hashCode(); + private int _idleTimeout; private int _maxThreads; private int _minThreads; - private int _maxQueued=-1; - private int _priority=Thread.NORM_PRIORITY; - private boolean _daemon=false; - private boolean _detailedDump=false; + private int _priority = Thread.NORM_PRIORITY; + private boolean _daemon = false; + private boolean _detailedDump = false; public QueuedThreadPool() { - this(200,8,60000); + this(200); } public QueuedThreadPool(int maxThreads) { - this(maxThreads,8,60000); + this(maxThreads, 8); } public QueuedThreadPool(int maxThreads, int minThreads) { - this(maxThreads,minThreads,60000); + this(maxThreads, minThreads, 60000); } - public QueuedThreadPool(int maxThreads, int minThreads, int maxIdleTimeMs) + public QueuedThreadPool(int maxThreads, int minThreads, int idleTimeout) { - _name="qtp"+super.hashCode(); setMinThreads(minThreads); setMaxThreads(maxThreads); - setMaxIdleTimeMs(maxIdleTimeMs); + setIdleTimeout(idleTimeout); setStopTimeout(5000); } @@ -95,12 +92,8 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo super.doStart(); _threadsStarted.set(0); - if (_jobs==null) - { - int maxQueued = getMaxQueued(); - _jobs=maxQueued>0 ?new ArrayBlockingQueue(maxQueued) - :new BlockingArrayQueue(_minThreads,_minThreads); - } + if (_jobs == null) + setQueue(new BlockingArrayQueue(_minThreads, _minThreads)); startThreads(_minThreads); } @@ -110,50 +103,56 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo { super.doStop(); - long timeout=getStopTimeout(); + long timeout = getStopTimeout(); BlockingQueue jobs = getQueue(); - + // If no stop timeout, clear job queue - if (timeout<=0) + if (timeout <= 0) jobs.clear(); - - // Fill job Q with noop jobs to wakeup idle - Runnable noop = new Runnable(){@Override public void run(){}}; - for (int i=_threadsStarted.get();i-->0;) + + // Fill job Q with noop jobs to wakeup idle + Runnable noop = new Runnable() + { + @Override + public void run() + { + } + }; + for (int i = _threadsStarted.get(); i-- > 0; ) jobs.offer(noop); - + // try to jobs complete naturally for half our stop time - long stopby=System.currentTimeMillis()+timeout/2; + long stopby = System.currentTimeMillis() + timeout / 2; for (Thread thread : _threads) { - long canwait =stopby-System.currentTimeMillis(); - if (canwait>0) + long canwait = stopby - System.currentTimeMillis(); + if (canwait > 0) thread.join(canwait); } - + // If we still have threads running, get a bit more aggressive // interrupt remaining threads - if (_threadsStarted.get()>0) + if (_threadsStarted.get() > 0) for (Thread thread : _threads) thread.interrupt(); - + // wait again for the other half of our stop time - stopby=System.currentTimeMillis()+timeout/2; + stopby = System.currentTimeMillis() + timeout / 2; for (Thread thread : _threads) { - long canwait =stopby-System.currentTimeMillis(); - if (canwait>0) + long canwait = stopby - System.currentTimeMillis(); + if (canwait > 0) thread.join(canwait); } - + Thread.yield(); - int size=_threads.size(); - if (size>0) + int size = _threads.size(); + if (size > 0) { LOG.warn("{} threads could not be stopped", size); - if ((size<=Runtime.getRuntime().availableProcessors()) || LOG.isDebugEnabled()) + if ((size <= Runtime.getRuntime().availableProcessors()) || LOG.isDebugEnabled()) { for (Thread unstopped : _threads) { @@ -178,50 +177,56 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo */ public void setDaemon(boolean daemon) { - _daemon=daemon; + _daemon = daemon; } - /** Set the maximum thread idle time. + /** + * Set the maximum thread idle time. * Threads that are idle for longer than this period may be * stopped. * Delegated to the named or anonymous Pool. - * @see #getMaxIdleTimeMs - * @param maxIdleTimeMs Max idle time in ms. + * + * @param idleTimeout Max idle time in ms. + * @see #getIdleTimeout */ - public void setMaxIdleTimeMs(int maxIdleTimeMs) + public void setIdleTimeout(int idleTimeout) { - _maxIdleTimeMs=maxIdleTimeMs; + _idleTimeout = idleTimeout; } - /** Set the maximum number of threads. + /** + * Set the maximum number of threads. * Delegated to the named or anonymous Pool. - * @see #getMaxThreads + * * @param maxThreads maximum number of threads. + * @see #getMaxThreads */ @Override public void setMaxThreads(int maxThreads) { - _maxThreads=maxThreads; - if (_minThreads>_maxThreads) - _minThreads=_maxThreads; + _maxThreads = maxThreads; + if (_minThreads > _maxThreads) + _minThreads = _maxThreads; } - /** Set the minimum number of threads. + /** + * Set the minimum number of threads. * Delegated to the named or anonymous Pool. - * @see #getMinThreads + * * @param minThreads minimum number of threads + * @see #getMinThreads */ @Override public void setMinThreads(int minThreads) { - _minThreads=minThreads; + _minThreads = minThreads; - if (_minThreads>_maxThreads) - _maxThreads=_minThreads; + if (_minThreads > _maxThreads) + _maxThreads = _minThreads; - int threads=_threadsStarted.get(); - if (isStarted() && threads<_minThreads) - startThreads(_minThreads-threads); + int threads = _threadsStarted.get(); + if (isStarted() && threads < _minThreads) + startThreads(_minThreads - threads); } /** @@ -231,51 +236,38 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo { if (isRunning()) throw new IllegalStateException("started"); - _name= name; + _name = name; } - /** Set the priority of the pool threads. - * @param priority the new thread priority. + /** + * Set the priority of the pool threads. + * + * @param priority the new thread priority. */ public void setThreadsPriority(int priority) { - _priority=priority; + _priority = priority; } /** - * @return maximum queue size - */ - @ManagedAttribute("maximum queue size") - public int getMaxQueued() - { - return _maxQueued; - } - - /** - * @param max job queue size - */ - public void setMaxQueued(int max) - { - if (isRunning()) - throw new IllegalStateException("started"); - _maxQueued=max; - } - - /** Get the maximum thread idle time. + * Get the maximum thread idle time. * Delegated to the named or anonymous Pool. - * @see #setMaxIdleTimeMs + * * @return Max idle time in ms. + * @see #setIdleTimeout */ @ManagedAttribute("maximum time a thread may be idle in ms") - public int getMaxIdleTimeMs() + public int getIdleTimeout() { - return _maxIdleTimeMs; + return _idleTimeout; } - /** Set the maximum number of threads. + /** + * Set the maximum number of threads. * Delegated to the named or anonymous Pool. - * @see #setMaxThreads + * * @return maximum number of threads. + * @see #setMaxThreads */ @Override @ManagedAttribute("maximum number of threads in the pool") @@ -284,10 +276,12 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo return _maxThreads; } - /** Get the minimum number of threads. + /** + * Get the minimum number of threads. * Delegated to the named or anonymous Pool. - * @see #setMinThreads + * * @return minimum number of threads. + * @see #setMinThreads */ @Override @ManagedAttribute("minimum number of threads in the pool") @@ -305,8 +299,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo return _name; } - /** Get the priority of the pool threads. - * @return the priority of the pool threads. + /** + * Get the priority of the pool threads. + * + * @return the priority of the pool threads. */ @ManagedAttribute("priority of threads in the pool") public int getThreadsPriority() @@ -336,12 +332,8 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @Override public boolean dispatch(Runnable job) { - LOG.debug("{} dispatched {}",this,job); - if (isRunning()) - { - return _jobs.offer(job); - } - return false; + LOG.debug("{} dispatched {}", this, job); + return isRunning() && _jobs.offer(job); } @Override @@ -349,7 +341,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo { if (!dispatch(job)) { - LOG.warn("{} rejected {}",this,job); + LOG.warn("{} rejected {}", this, job); throw new RejectedExecutionException(job.toString()); } } @@ -397,31 +389,31 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs") public boolean isLowOnThreads() { - return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get(); + return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get(); } private boolean startThreads(int threadsToStart) { - while(threadsToStart>0) + while (threadsToStart > 0) { - int threads=_threadsStarted.get(); - if (threads>=_maxThreads) + int threads = _threadsStarted.get(); + if (threads >= _maxThreads) return false; - - if (!_threadsStarted.compareAndSet(threads,threads+1)) + + if (!_threadsStarted.compareAndSet(threads, threads + 1)) continue; - boolean started=false; + boolean started = false; try { - Thread thread=newThread(_runnable); + Thread thread = newThread(_runnable); thread.setDaemon(isDaemon()); thread.setPriority(getThreadsPriority()); - thread.setName(_name+"-"+thread.getId()); + thread.setName(_name + "-" + thread.getId()); _threads.add(thread); thread.start(); - started=true; + started = true; } finally { @@ -451,10 +443,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo public void dump(Appendable out, String indent) throws IOException { List dump = new ArrayList<>(getMaxThreads()); - for (final Thread thread: _threads) + for (final Thread thread : _threads) { - final StackTraceElement[] trace=thread.getStackTrace(); - boolean inIdleJobPoll=false; + final StackTraceElement[] trace = thread.getStackTrace(); + boolean inIdleJobPoll = false; for (StackTraceElement t : trace) { if ("idleJobPoll".equals(t.getMethodName())) @@ -463,7 +455,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo break; } } - final boolean idle=inIdleJobPoll; + final boolean idle = inIdleJobPoll; if (isDetailedDump()) { @@ -472,9 +464,9 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @Override public void dump(Appendable out, String indent) throws IOException { - out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n'); + out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "").append('\n'); if (!idle) - ContainerLifeCycle.dump(out,indent,Arrays.asList(trace)); + ContainerLifeCycle.dump(out, indent, Arrays.asList(trace)); } @Override @@ -486,23 +478,23 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } else { - dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":"")); + dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")); } } - ContainerLifeCycle.dumpObject(out,this); - ContainerLifeCycle.dump(out,indent,dump); + ContainerLifeCycle.dumpObject(out, this); + ContainerLifeCycle.dump(out, indent, dump); } @Override public String toString() { - return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}",_name,getState(),getMinThreads(),getThreads(),getMaxThreads(),getIdleThreads(),(_jobs==null?-1:_jobs.size())); + return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size())); } private Runnable idleJobPoll() throws InterruptedException { - return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS); + return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS); } private Runnable _runnable = new Runnable() @@ -510,23 +502,23 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @Override public void run() { - boolean shrink=false; + boolean shrink = false; try { - Runnable job=_jobs.poll(); + Runnable job = _jobs.poll(); - if(job!=null && _threadsIdle.get()==0) + if (job != null && _threadsIdle.get() == 0) { startThreads(1); } - + while (isRunning()) { // Job loop - while (job!=null && isRunning()) + while (job != null && isRunning()) { runJob(job); - job=_jobs.poll(); + job = _jobs.poll(); } // Idle loop @@ -534,46 +526,46 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo { _threadsIdle.incrementAndGet(); - while (isRunning() && job==null) + while (isRunning() && job == null) { - if (_maxIdleTimeMs<=0) - job=_jobs.take(); + if (_idleTimeout <= 0) + job = _jobs.take(); else { // maybe we should shrink? - final int size=_threadsStarted.get(); - if (size>_minThreads) + final int size = _threadsStarted.get(); + if (size > _minThreads) { - long last=_lastShrink.get(); - long now=System.currentTimeMillis(); - if (last==0 || (now-last)>_maxIdleTimeMs) + long last = _lastShrink.get(); + long now = System.currentTimeMillis(); + if (last == 0 || (now - last) > _idleTimeout) { - shrink=_lastShrink.compareAndSet(last,now) && - _threadsStarted.compareAndSet(size,size-1); + shrink = _lastShrink.compareAndSet(last, now) && + _threadsStarted.compareAndSet(size, size - 1); if (shrink) { return; } } } - job=idleJobPoll(); + job = idleJobPoll(); } } } finally { - if(_threadsIdle.decrementAndGet()==0) + if (_threadsIdle.decrementAndGet() == 0) { startThreads(1); } } } } - catch(InterruptedException e) + catch (InterruptedException e) { LOG.ignore(e); } - catch(Throwable e) + catch (Throwable e) { LOG.warn(e); } @@ -605,6 +597,14 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo return _jobs; } + /** + * @param queue the job queue + */ + public void setQueue(BlockingQueue queue) + { + _jobs = queue; + } + /** * @param id The thread ID to interrupt. * @return true if the thread was found and interrupted. @@ -612,9 +612,9 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @ManagedOperation("interrupt a pool thread") public boolean interruptThread(@Name("id") long id) { - for (Thread thread: _threads) + for (Thread thread : _threads) { - if (thread.getId()==id) + if (thread.getId() == id) { thread.interrupt(); return true; @@ -630,9 +630,9 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @ManagedOperation("dump a pool thread stack") public String dumpThread(@Name("id") long id) { - for (Thread thread: _threads) + for (Thread thread : _threads) { - if (thread.getId()==id) + if (thread.getId() == id) { StringBuilder buf = new StringBuilder(); buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n"); diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java index bdc914cc389..429a34e3036 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.util.thread; -import static org.junit.Assert.assertTrue; - import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -30,6 +28,8 @@ import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; +import static org.junit.Assert.assertTrue; + @RunWith(AdvancedRunner.class) public class QueuedThreadPoolTest { @@ -75,7 +75,7 @@ public class QueuedThreadPoolTest QueuedThreadPool tp= new QueuedThreadPool(); tp.setMinThreads(5); tp.setMaxThreads(10); - tp.setMaxIdleTimeMs(1000); + tp.setIdleTimeout(1000); tp.setThreadsPriority(Thread.NORM_PRIORITY-1); tp.start(); @@ -174,7 +174,7 @@ public class QueuedThreadPoolTest QueuedThreadPool tp= new QueuedThreadPool(); tp.setMinThreads(2); tp.setMaxThreads(10); - tp.setMaxIdleTimeMs(400); + tp.setIdleTimeout(400); tp.setThreadsPriority(Thread.NORM_PRIORITY-1); tp.start();