diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index ace1729763e..6dd85adcd25 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -38,14 +38,14 @@ import org.eclipse.jetty.util.log.Logger; public class ReservedThreadExecutor extends AbstractLifeCycle implements Executor { private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class); - + private final Executor _executor; private final Locker _locker = new Locker(); private final ReservedThread[] _queue; private int _head; private int _size; private int _pending; - + public ReservedThreadExecutor(Executor executor) { this(executor,1); @@ -54,7 +54,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo /** * @param executor The executor to use to obtain threads * @param capacity The number of threads to preallocate. If less than 0 then capacity - * is calculated based on a heuristic from the number of available processors and + * is calculated based on a heuristic from the number of available processors and * thread pool size. */ public ReservedThreadExecutor(Executor executor,int capacity) @@ -74,7 +74,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo capacity = cpus; } } - + _queue = new ReservedThread[capacity]; } @@ -106,21 +106,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo return _pending; } } - - @Override - public void doStart() throws Exception - { - try (Locker.Lock lock = _locker.lock()) - { - _head = _size = _pending = 0; - while (_pending<_queue.length) - { - _executor.execute(new ReservedThread()); - _pending++; - } - } - } - + @Override public void doStop() throws Exception { @@ -135,15 +121,15 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo thread._wakeup.signal(); } } - } - + } + @Override public void execute(Runnable task) throws RejectedExecutionException { if (!tryExecute(task)) throw new RejectedExecutionException(); } - + /** * @param task The task to run * @return True iff a reserved thread was available and has been assigned the task to run. @@ -152,7 +138,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo { if (task==null) return false; - + try (Locker.Lock lock = _locker.lock()) { if (_size==0) @@ -164,21 +150,21 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo } return false; } - + ReservedThread thread = _queue[_head]; _queue[_head] = null; _head = (_head+1)%_queue.length; _size--; - + if (_size==0 && _pending<_queue.length) { _executor.execute(new ReservedThread()); _pending++; } - + thread._task = task; thread._wakeup.signal(); - + return true; } catch(RejectedExecutionException e) @@ -188,23 +174,31 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo } } + @Override + public String toString() + { + try (Locker.Lock lock = _locker.lock()) + { + return String.format("%s{s=%d,p=%d}",super.toString(),_size,_pending); + } + } + private class ReservedThread implements Runnable { private Condition _wakeup = null; private Runnable _task = null; - + private void reservedWait() throws InterruptedException { _wakeup.await(); } - + @Override public void run() { while (true) { Runnable task = null; - try (Locker.Lock lock = _locker.lock()) { // if this is our first loop, decrement pending count @@ -213,20 +207,24 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo _pending--; _wakeup = _locker.newCondition(); } - + // Exit if no longer running or there now too many preallocated threads if (!isRunning() || _size>=_queue.length) break; - + // Insert ourselves in the queue _queue[(_head+_size++)%_queue.length] = this; - // Wait for a task, ignoring spurious interrupts - do + // Wait for a task, ignoring spurious wakeups + while (isRunning() && task==null) { try { + if (LOG.isDebugEnabled()) + LOG.debug("{} waiting", this); reservedWait(); + if (LOG.isDebugEnabled()) + LOG.debug("{} woken up", this); task = _task; _task = null; } @@ -235,7 +233,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo LOG.ignore(e); } } - while (isRunning() && task==null); } // Run any task @@ -245,22 +242,12 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo { task.run(); } - catch (Exception e) + catch (Throwable e) { LOG.warn(e); - break; } } } } } - - @Override - public String toString() - { - try (Locker.Lock lock = _locker.lock()) - { - return String.format("%s{s=%d,p=%d}",super.toString(),_size,_pending); - } - } }