Fixes #1805 - ReservedThreadExecutor should start ReservedThreads lazily.

Removed doStart() that was eagerly starting ReservedThreads.
Other small code cleanups.
This commit is contained in:
Simone Bordet 2017-09-08 10:27:36 +02:00
parent 5d8c605d96
commit c1a4153861
1 changed files with 33 additions and 46 deletions

View File

@ -38,14 +38,14 @@ import org.eclipse.jetty.util.log.Logger;
public class ReservedThreadExecutor extends AbstractLifeCycle implements Executor public class ReservedThreadExecutor extends AbstractLifeCycle implements Executor
{ {
private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class); private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class);
private final Executor _executor; private final Executor _executor;
private final Locker _locker = new Locker(); private final Locker _locker = new Locker();
private final ReservedThread[] _queue; private final ReservedThread[] _queue;
private int _head; private int _head;
private int _size; private int _size;
private int _pending; private int _pending;
public ReservedThreadExecutor(Executor executor) public ReservedThreadExecutor(Executor executor)
{ {
this(executor,1); this(executor,1);
@ -54,7 +54,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
/** /**
* @param executor The executor to use to obtain threads * @param executor The executor to use to obtain threads
* @param capacity The number of threads to preallocate. If less than 0 then capacity * @param capacity The number of threads to preallocate. If less than 0 then capacity
* is calculated based on a heuristic from the number of available processors and * is calculated based on a heuristic from the number of available processors and
* thread pool size. * thread pool size.
*/ */
public ReservedThreadExecutor(Executor executor,int capacity) public ReservedThreadExecutor(Executor executor,int capacity)
@ -74,7 +74,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
capacity = cpus; capacity = cpus;
} }
} }
_queue = new ReservedThread[capacity]; _queue = new ReservedThread[capacity];
} }
@ -106,21 +106,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
return _pending; return _pending;
} }
} }
@Override
public void doStart() throws Exception
{
try (Locker.Lock lock = _locker.lock())
{
_head = _size = _pending = 0;
while (_pending<_queue.length)
{
_executor.execute(new ReservedThread());
_pending++;
}
}
}
@Override @Override
public void doStop() throws Exception public void doStop() throws Exception
{ {
@ -135,15 +121,15 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
thread._wakeup.signal(); thread._wakeup.signal();
} }
} }
} }
@Override @Override
public void execute(Runnable task) throws RejectedExecutionException public void execute(Runnable task) throws RejectedExecutionException
{ {
if (!tryExecute(task)) if (!tryExecute(task))
throw new RejectedExecutionException(); throw new RejectedExecutionException();
} }
/** /**
* @param task The task to run * @param task The task to run
* @return True iff a reserved thread was available and has been assigned the task to run. * @return True iff a reserved thread was available and has been assigned the task to run.
@ -152,7 +138,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{ {
if (task==null) if (task==null)
return false; return false;
try (Locker.Lock lock = _locker.lock()) try (Locker.Lock lock = _locker.lock())
{ {
if (_size==0) if (_size==0)
@ -164,21 +150,21 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
} }
return false; return false;
} }
ReservedThread thread = _queue[_head]; ReservedThread thread = _queue[_head];
_queue[_head] = null; _queue[_head] = null;
_head = (_head+1)%_queue.length; _head = (_head+1)%_queue.length;
_size--; _size--;
if (_size==0 && _pending<_queue.length) if (_size==0 && _pending<_queue.length)
{ {
_executor.execute(new ReservedThread()); _executor.execute(new ReservedThread());
_pending++; _pending++;
} }
thread._task = task; thread._task = task;
thread._wakeup.signal(); thread._wakeup.signal();
return true; return true;
} }
catch(RejectedExecutionException e) catch(RejectedExecutionException e)
@ -188,23 +174,31 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
} }
} }
@Override
public String toString()
{
try (Locker.Lock lock = _locker.lock())
{
return String.format("%s{s=%d,p=%d}",super.toString(),_size,_pending);
}
}
private class ReservedThread implements Runnable private class ReservedThread implements Runnable
{ {
private Condition _wakeup = null; private Condition _wakeup = null;
private Runnable _task = null; private Runnable _task = null;
private void reservedWait() throws InterruptedException private void reservedWait() throws InterruptedException
{ {
_wakeup.await(); _wakeup.await();
} }
@Override @Override
public void run() public void run()
{ {
while (true) while (true)
{ {
Runnable task = null; Runnable task = null;
try (Locker.Lock lock = _locker.lock()) try (Locker.Lock lock = _locker.lock())
{ {
// if this is our first loop, decrement pending count // if this is our first loop, decrement pending count
@ -213,20 +207,24 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
_pending--; _pending--;
_wakeup = _locker.newCondition(); _wakeup = _locker.newCondition();
} }
// Exit if no longer running or there now too many preallocated threads // Exit if no longer running or there now too many preallocated threads
if (!isRunning() || _size>=_queue.length) if (!isRunning() || _size>=_queue.length)
break; break;
// Insert ourselves in the queue // Insert ourselves in the queue
_queue[(_head+_size++)%_queue.length] = this; _queue[(_head+_size++)%_queue.length] = this;
// Wait for a task, ignoring spurious interrupts // Wait for a task, ignoring spurious wakeups
do while (isRunning() && task==null)
{ {
try try
{ {
if (LOG.isDebugEnabled())
LOG.debug("{} waiting", this);
reservedWait(); reservedWait();
if (LOG.isDebugEnabled())
LOG.debug("{} woken up", this);
task = _task; task = _task;
_task = null; _task = null;
} }
@ -235,7 +233,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
LOG.ignore(e); LOG.ignore(e);
} }
} }
while (isRunning() && task==null);
} }
// Run any task // Run any task
@ -245,22 +242,12 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{ {
task.run(); task.run();
} }
catch (Exception e) catch (Throwable e)
{ {
LOG.warn(e); LOG.warn(e);
break;
} }
} }
} }
} }
} }
@Override
public String toString()
{
try (Locker.Lock lock = _locker.lock())
{
return String.format("%s{s=%d,p=%d}",super.toString(),_size,_pending);
}
}
} }