Issue #4096 resolve race between doStop and adding new threads to stack
Now using the _size AtomicInteger to resolve the race between stopping and adding to the stack. A _size of -1 now means no more threads can be added to the stack, and the loop in doStop() will now only exit if it can set the size to -1. Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
168a95d334
commit
ac93ff6272
|
@ -155,6 +155,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
|||
public void doStart() throws Exception
|
||||
{
|
||||
_lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _capacity);
|
||||
_size.set(0);
|
||||
super.doStart();
|
||||
}
|
||||
|
||||
|
@ -168,9 +169,21 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
|||
|
||||
while (true)
|
||||
{
|
||||
int size = _size.get();
|
||||
// If no reserved threads left try setting size to -1 to
|
||||
// atomically prevent other threads adding themselves to stack.
|
||||
if (size == 0 && _size.compareAndSet(size, -1))
|
||||
break;
|
||||
|
||||
ReservedThread thread = _stack.pollFirst();
|
||||
if (thread == null)
|
||||
break;
|
||||
{
|
||||
// Reserved thread must have incremented size but not yet added itself to queue.
|
||||
// We will spin until it is added.
|
||||
Thread.yield(); // TODO: use Thread.onSpinWait() in jetty-10
|
||||
continue;
|
||||
}
|
||||
|
||||
_size.decrementAndGet();
|
||||
thread.stop();
|
||||
}
|
||||
|
@ -278,11 +291,12 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} waiting", this);
|
||||
|
||||
Runnable task = null;
|
||||
while (isRunning() && task == null)
|
||||
while (true)
|
||||
{
|
||||
boolean idle = false;
|
||||
if (!isRunning())
|
||||
return STOP;
|
||||
|
||||
boolean idle = false;
|
||||
try (Locker.Lock lock = _locker.lock())
|
||||
{
|
||||
if (_task == null)
|
||||
|
@ -299,8 +313,16 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
|||
LOG.ignore(e);
|
||||
}
|
||||
}
|
||||
task = _task;
|
||||
_task = null;
|
||||
else
|
||||
{
|
||||
Runnable task = _task;
|
||||
_task = null;
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} task={}", this, task);
|
||||
|
||||
return task;
|
||||
}
|
||||
}
|
||||
|
||||
if (idle)
|
||||
|
@ -315,11 +337,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
|||
tryExecute(STOP);
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} task={}", this, task);
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -332,6 +349,8 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
|||
while (true)
|
||||
{
|
||||
int size = _size.get();
|
||||
if (size < 0)
|
||||
return;
|
||||
if (size >= _capacity)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
|
|
Loading…
Reference in New Issue