From 8e98148350f215e2ec58d0b936ab08ce22229d6a Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 11 Dec 2012 14:45:14 +0100 Subject: [PATCH] Introduced setQueue(), removed maxQueued() and renamed maxIdleTimeMs() to idleTimeout(). The reason for this change is that CometD needs a way to explicitly set the queue implementation in load testing to measure latencies. Property maxQueued was used to decide, at start time, what queue implementation to use, but now that can be set explicitly with setQueue(). The rename was done because in all other places the idle timeout has been renamed so. --- .../jetty/util/thread/QueuedThreadPool.java | 288 +++++++++--------- .../util/thread/QueuedThreadPoolTest.java | 8 +- 2 files changed, 148 insertions(+), 148 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 0cab552d58d..9ae8aca8afa 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.RejectedExecutionException; @@ -53,39 +52,37 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo private final AtomicInteger _threadsStarted = new AtomicInteger(); private final AtomicInteger _threadsIdle = new AtomicInteger(); private final AtomicLong _lastShrink = new AtomicLong(); - private final ConcurrentLinkedQueue _threads=new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue _threads = new ConcurrentLinkedQueue<>(); private final Object _joinLock = new Object(); private BlockingQueue _jobs; - private String _name; - private int _maxIdleTimeMs=60000; + private String _name = "qtp" + hashCode(); + private int _idleTimeout; private int _maxThreads; private int _minThreads; - private int _maxQueued=-1; - private int _priority=Thread.NORM_PRIORITY; - private boolean _daemon=false; - private boolean _detailedDump=false; + private int _priority = Thread.NORM_PRIORITY; + private boolean _daemon = false; + private boolean _detailedDump = false; public QueuedThreadPool() { - this(200,8,60000); + this(200); } public QueuedThreadPool(int maxThreads) { - this(maxThreads,8,60000); + this(maxThreads, 8); } public QueuedThreadPool(int maxThreads, int minThreads) { - this(maxThreads,minThreads,60000); + this(maxThreads, minThreads, 60000); } - public QueuedThreadPool(int maxThreads, int minThreads, int maxIdleTimeMs) + public QueuedThreadPool(int maxThreads, int minThreads, int idleTimeout) { - _name="qtp"+super.hashCode(); setMinThreads(minThreads); setMaxThreads(maxThreads); - setMaxIdleTimeMs(maxIdleTimeMs); + setIdleTimeout(idleTimeout); setStopTimeout(5000); } @@ -95,12 +92,8 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo super.doStart(); _threadsStarted.set(0); - if (_jobs==null) - { - int maxQueued = getMaxQueued(); - _jobs=maxQueued>0 ?new ArrayBlockingQueue(maxQueued) - :new BlockingArrayQueue(_minThreads,_minThreads); - } + if (_jobs == null) + setQueue(new BlockingArrayQueue(_minThreads, _minThreads)); startThreads(_minThreads); } @@ -110,50 +103,56 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo { super.doStop(); - long timeout=getStopTimeout(); + long timeout = getStopTimeout(); BlockingQueue jobs = getQueue(); - + // If no stop timeout, clear job queue - if (timeout<=0) + if (timeout <= 0) jobs.clear(); - - // Fill job Q with noop jobs to wakeup idle - Runnable noop = new Runnable(){@Override public void run(){}}; - for (int i=_threadsStarted.get();i-->0;) + + // Fill job Q with noop jobs to wakeup idle + Runnable noop = new Runnable() + { + @Override + public void run() + { + } + }; + for (int i = _threadsStarted.get(); i-- > 0; ) jobs.offer(noop); - + // try to jobs complete naturally for half our stop time - long stopby=System.currentTimeMillis()+timeout/2; + long stopby = System.currentTimeMillis() + timeout / 2; for (Thread thread : _threads) { - long canwait =stopby-System.currentTimeMillis(); - if (canwait>0) + long canwait = stopby - System.currentTimeMillis(); + if (canwait > 0) thread.join(canwait); } - + // If we still have threads running, get a bit more aggressive // interrupt remaining threads - if (_threadsStarted.get()>0) + if (_threadsStarted.get() > 0) for (Thread thread : _threads) thread.interrupt(); - + // wait again for the other half of our stop time - stopby=System.currentTimeMillis()+timeout/2; + stopby = System.currentTimeMillis() + timeout / 2; for (Thread thread : _threads) { - long canwait =stopby-System.currentTimeMillis(); - if (canwait>0) + long canwait = stopby - System.currentTimeMillis(); + if (canwait > 0) thread.join(canwait); } - + Thread.yield(); - int size=_threads.size(); - if (size>0) + int size = _threads.size(); + if (size > 0) { LOG.warn("{} threads could not be stopped", size); - if ((size<=Runtime.getRuntime().availableProcessors()) || LOG.isDebugEnabled()) + if ((size <= Runtime.getRuntime().availableProcessors()) || LOG.isDebugEnabled()) { for (Thread unstopped : _threads) { @@ -178,50 +177,56 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo */ public void setDaemon(boolean daemon) { - _daemon=daemon; + _daemon = daemon; } - /** Set the maximum thread idle time. + /** + * Set the maximum thread idle time. * Threads that are idle for longer than this period may be * stopped. * Delegated to the named or anonymous Pool. - * @see #getMaxIdleTimeMs - * @param maxIdleTimeMs Max idle time in ms. + * + * @param idleTimeout Max idle time in ms. + * @see #getIdleTimeout */ - public void setMaxIdleTimeMs(int maxIdleTimeMs) + public void setIdleTimeout(int idleTimeout) { - _maxIdleTimeMs=maxIdleTimeMs; + _idleTimeout = idleTimeout; } - /** Set the maximum number of threads. + /** + * Set the maximum number of threads. * Delegated to the named or anonymous Pool. - * @see #getMaxThreads + * * @param maxThreads maximum number of threads. + * @see #getMaxThreads */ @Override public void setMaxThreads(int maxThreads) { - _maxThreads=maxThreads; - if (_minThreads>_maxThreads) - _minThreads=_maxThreads; + _maxThreads = maxThreads; + if (_minThreads > _maxThreads) + _minThreads = _maxThreads; } - /** Set the minimum number of threads. + /** + * Set the minimum number of threads. * Delegated to the named or anonymous Pool. - * @see #getMinThreads + * * @param minThreads minimum number of threads + * @see #getMinThreads */ @Override public void setMinThreads(int minThreads) { - _minThreads=minThreads; + _minThreads = minThreads; - if (_minThreads>_maxThreads) - _maxThreads=_minThreads; + if (_minThreads > _maxThreads) + _maxThreads = _minThreads; - int threads=_threadsStarted.get(); - if (isStarted() && threads<_minThreads) - startThreads(_minThreads-threads); + int threads = _threadsStarted.get(); + if (isStarted() && threads < _minThreads) + startThreads(_minThreads - threads); } /** @@ -231,51 +236,38 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo { if (isRunning()) throw new IllegalStateException("started"); - _name= name; + _name = name; } - /** Set the priority of the pool threads. - * @param priority the new thread priority. + /** + * Set the priority of the pool threads. + * + * @param priority the new thread priority. */ public void setThreadsPriority(int priority) { - _priority=priority; + _priority = priority; } /** - * @return maximum queue size - */ - @ManagedAttribute("maximum queue size") - public int getMaxQueued() - { - return _maxQueued; - } - - /** - * @param max job queue size - */ - public void setMaxQueued(int max) - { - if (isRunning()) - throw new IllegalStateException("started"); - _maxQueued=max; - } - - /** Get the maximum thread idle time. + * Get the maximum thread idle time. * Delegated to the named or anonymous Pool. - * @see #setMaxIdleTimeMs + * * @return Max idle time in ms. + * @see #setIdleTimeout */ @ManagedAttribute("maximum time a thread may be idle in ms") - public int getMaxIdleTimeMs() + public int getIdleTimeout() { - return _maxIdleTimeMs; + return _idleTimeout; } - /** Set the maximum number of threads. + /** + * Set the maximum number of threads. * Delegated to the named or anonymous Pool. - * @see #setMaxThreads + * * @return maximum number of threads. + * @see #setMaxThreads */ @Override @ManagedAttribute("maximum number of threads in the pool") @@ -284,10 +276,12 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo return _maxThreads; } - /** Get the minimum number of threads. + /** + * Get the minimum number of threads. * Delegated to the named or anonymous Pool. - * @see #setMinThreads + * * @return minimum number of threads. + * @see #setMinThreads */ @Override @ManagedAttribute("minimum number of threads in the pool") @@ -305,8 +299,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo return _name; } - /** Get the priority of the pool threads. - * @return the priority of the pool threads. + /** + * Get the priority of the pool threads. + * + * @return the priority of the pool threads. */ @ManagedAttribute("priority of threads in the pool") public int getThreadsPriority() @@ -336,12 +332,8 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @Override public boolean dispatch(Runnable job) { - LOG.debug("{} dispatched {}",this,job); - if (isRunning()) - { - return _jobs.offer(job); - } - return false; + LOG.debug("{} dispatched {}", this, job); + return isRunning() && _jobs.offer(job); } @Override @@ -349,7 +341,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo { if (!dispatch(job)) { - LOG.warn("{} rejected {}",this,job); + LOG.warn("{} rejected {}", this, job); throw new RejectedExecutionException(job.toString()); } } @@ -397,31 +389,31 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @ManagedAttribute("True if the pools is at maxThreads and there are not idle threads than queued jobs") public boolean isLowOnThreads() { - return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get(); + return _threadsStarted.get() == _maxThreads && _jobs.size() >= _threadsIdle.get(); } private boolean startThreads(int threadsToStart) { - while(threadsToStart>0) + while (threadsToStart > 0) { - int threads=_threadsStarted.get(); - if (threads>=_maxThreads) + int threads = _threadsStarted.get(); + if (threads >= _maxThreads) return false; - - if (!_threadsStarted.compareAndSet(threads,threads+1)) + + if (!_threadsStarted.compareAndSet(threads, threads + 1)) continue; - boolean started=false; + boolean started = false; try { - Thread thread=newThread(_runnable); + Thread thread = newThread(_runnable); thread.setDaemon(isDaemon()); thread.setPriority(getThreadsPriority()); - thread.setName(_name+"-"+thread.getId()); + thread.setName(_name + "-" + thread.getId()); _threads.add(thread); thread.start(); - started=true; + started = true; } finally { @@ -451,10 +443,10 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo public void dump(Appendable out, String indent) throws IOException { List dump = new ArrayList<>(getMaxThreads()); - for (final Thread thread: _threads) + for (final Thread thread : _threads) { - final StackTraceElement[] trace=thread.getStackTrace(); - boolean inIdleJobPoll=false; + final StackTraceElement[] trace = thread.getStackTrace(); + boolean inIdleJobPoll = false; for (StackTraceElement t : trace) { if ("idleJobPoll".equals(t.getMethodName())) @@ -463,7 +455,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo break; } } - final boolean idle=inIdleJobPoll; + final boolean idle = inIdleJobPoll; if (isDetailedDump()) { @@ -472,9 +464,9 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @Override public void dump(Appendable out, String indent) throws IOException { - out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n'); + out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle ? " IDLE" : "").append('\n'); if (!idle) - ContainerLifeCycle.dump(out,indent,Arrays.asList(trace)); + ContainerLifeCycle.dump(out, indent, Arrays.asList(trace)); } @Override @@ -486,23 +478,23 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo } else { - dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":"")); + dump.add(thread.getId() + " " + thread.getName() + " " + thread.getState() + " @ " + (trace.length > 0 ? trace[0] : "???") + (idle ? " IDLE" : "")); } } - ContainerLifeCycle.dumpObject(out,this); - ContainerLifeCycle.dump(out,indent,dump); + ContainerLifeCycle.dumpObject(out, this); + ContainerLifeCycle.dump(out, indent, dump); } @Override public String toString() { - return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}",_name,getState(),getMinThreads(),getThreads(),getMaxThreads(),getIdleThreads(),(_jobs==null?-1:_jobs.size())); + return String.format("%s{%s,%d<=%d<=%d,i=%d,q=%d}", _name, getState(), getMinThreads(), getThreads(), getMaxThreads(), getIdleThreads(), (_jobs == null ? -1 : _jobs.size())); } private Runnable idleJobPoll() throws InterruptedException { - return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS); + return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS); } private Runnable _runnable = new Runnable() @@ -510,23 +502,23 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @Override public void run() { - boolean shrink=false; + boolean shrink = false; try { - Runnable job=_jobs.poll(); + Runnable job = _jobs.poll(); - if(job!=null && _threadsIdle.get()==0) + if (job != null && _threadsIdle.get() == 0) { startThreads(1); } - + while (isRunning()) { // Job loop - while (job!=null && isRunning()) + while (job != null && isRunning()) { runJob(job); - job=_jobs.poll(); + job = _jobs.poll(); } // Idle loop @@ -534,46 +526,46 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo { _threadsIdle.incrementAndGet(); - while (isRunning() && job==null) + while (isRunning() && job == null) { - if (_maxIdleTimeMs<=0) - job=_jobs.take(); + if (_idleTimeout <= 0) + job = _jobs.take(); else { // maybe we should shrink? - final int size=_threadsStarted.get(); - if (size>_minThreads) + final int size = _threadsStarted.get(); + if (size > _minThreads) { - long last=_lastShrink.get(); - long now=System.currentTimeMillis(); - if (last==0 || (now-last)>_maxIdleTimeMs) + long last = _lastShrink.get(); + long now = System.currentTimeMillis(); + if (last == 0 || (now - last) > _idleTimeout) { - shrink=_lastShrink.compareAndSet(last,now) && - _threadsStarted.compareAndSet(size,size-1); + shrink = _lastShrink.compareAndSet(last, now) && + _threadsStarted.compareAndSet(size, size - 1); if (shrink) { return; } } } - job=idleJobPoll(); + job = idleJobPoll(); } } } finally { - if(_threadsIdle.decrementAndGet()==0) + if (_threadsIdle.decrementAndGet() == 0) { startThreads(1); } } } } - catch(InterruptedException e) + catch (InterruptedException e) { LOG.ignore(e); } - catch(Throwable e) + catch (Throwable e) { LOG.warn(e); } @@ -605,6 +597,14 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo return _jobs; } + /** + * @param queue the job queue + */ + public void setQueue(BlockingQueue queue) + { + _jobs = queue; + } + /** * @param id The thread ID to interrupt. * @return true if the thread was found and interrupted. @@ -612,9 +612,9 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @ManagedOperation("interrupt a pool thread") public boolean interruptThread(@Name("id") long id) { - for (Thread thread: _threads) + for (Thread thread : _threads) { - if (thread.getId()==id) + if (thread.getId() == id) { thread.interrupt(); return true; @@ -630,9 +630,9 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo @ManagedOperation("dump a pool thread stack") public String dumpThread(@Name("id") long id) { - for (Thread thread: _threads) + for (Thread thread : _threads) { - if (thread.getId()==id) + if (thread.getId() == id) { StringBuilder buf = new StringBuilder(); buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n"); diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java index bdc914cc389..429a34e3036 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.util.thread; -import static org.junit.Assert.assertTrue; - import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -30,6 +28,8 @@ import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; +import static org.junit.Assert.assertTrue; + @RunWith(AdvancedRunner.class) public class QueuedThreadPoolTest { @@ -75,7 +75,7 @@ public class QueuedThreadPoolTest QueuedThreadPool tp= new QueuedThreadPool(); tp.setMinThreads(5); tp.setMaxThreads(10); - tp.setMaxIdleTimeMs(1000); + tp.setIdleTimeout(1000); tp.setThreadsPriority(Thread.NORM_PRIORITY-1); tp.start(); @@ -174,7 +174,7 @@ public class QueuedThreadPoolTest QueuedThreadPool tp= new QueuedThreadPool(); tp.setMinThreads(2); tp.setMaxThreads(10); - tp.setMaxIdleTimeMs(400); + tp.setIdleTimeout(400); tp.setThreadsPriority(Thread.NORM_PRIORITY-1); tp.start();