Non blocking ReservedThreadExecutor (#6535)

A call to offer must never block, nor even yield, since to do so give an opportunity for the allocated CPU core to change, defeating the whole purpose of the class.
There is also some reasonable level of diagnostic warnings if a reserved thread misses too many offers consecutively, based on tracking the state of the reserved thread.

Remove the stack data structure entirely.  ReservedThreads all poll the same SynchronousQueue and tryExecute does a non blocking offer.

Added test for busy shrinking

Remember last time we hit zero reserved threads

Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Greg Wilkins 2021-07-29 09:46:48 +10:00 committed by GitHub
parent ea5c8ed994
commit 735e97d5c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 219 additions and 167 deletions

View File

@ -18,34 +18,44 @@
package org.eclipse.jetty.util.thread; package org.eclipse.jetty.util.thread;
import java.util.concurrent.ConcurrentLinkedDeque; import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.ProcessorUtils; import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.eclipse.jetty.util.AtomicBiInteger.getHi;
import static org.eclipse.jetty.util.AtomicBiInteger.getLo;
/** /**
* An Executor using preallocated/reserved Threads from a wrapped Executor. * An Executor using pre-allocated/reserved Threads from a wrapped Executor.
* <p>Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed * <p>Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed
* with a Thread immediately being assigned the Runnable task, or fail if no Thread is * with a Thread immediately being assigned the Runnable task, or fail if no Thread is
* available. * available.
* <p>Threads are reserved lazily, with a new reserved thread being allocated from a * <p>Threads are reserved lazily, with a new reserved threads being allocated from the
* wrapped {@link Executor} when an execution fails. If the {@link #setIdleTimeout(long, TimeUnit)} * {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been
* is set to non zero (default 1 minute), then the reserved thread pool will shrink by 1 thread * idle for more than {@link #getIdleTimeoutMs()} then one reserved thread will return to
* whenever it has been idle for that period. * the executor.
*/ */
@ManagedObject("A pool for reserved threads") @ManagedObject("A pool for reserved threads")
public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor, Dumpable
{ {
private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class); private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class);
private static final long DEFAULT_IDLE_TIMEOUT = TimeUnit.MINUTES.toNanos(1);
private static final Runnable STOP = new Runnable() private static final Runnable STOP = new Runnable()
{ {
@Override @Override
@ -62,13 +72,13 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
private final Executor _executor; private final Executor _executor;
private final int _capacity; private final int _capacity;
private final ConcurrentLinkedDeque<ReservedThread> _stack; private final Set<ReservedThread> _threads = ConcurrentHashMap.newKeySet();
private final AtomicInteger _size = new AtomicInteger(); private final SynchronousQueue<Runnable> _queue = new SynchronousQueue<>(false);
private final AtomicInteger _pending = new AtomicInteger(); private final AtomicBiInteger _count = new AtomicBiInteger(); // hi=pending; lo=size;
private final AtomicLong _lastEmptyTime = new AtomicLong(System.nanoTime());
private ThreadPoolBudget.Lease _lease; private ThreadPoolBudget.Lease _lease;
private long _idleTime = 1L; private long _idleTimeNanos = DEFAULT_IDLE_TIMEOUT;
private TimeUnit _idleTimeUnit = TimeUnit.MINUTES;
/** /**
* @param executor The executor to use to obtain threads * @param executor The executor to use to obtain threads
@ -80,7 +90,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
{ {
_executor = executor; _executor = executor;
_capacity = reservedThreads(executor, capacity); _capacity = reservedThreads(executor, capacity);
_stack = new ConcurrentLinkedDeque<>();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{}", this); LOG.debug("{}", this);
} }
@ -126,42 +135,39 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
@ManagedAttribute(value = "available reserved threads", readonly = true) @ManagedAttribute(value = "available reserved threads", readonly = true)
public int getAvailable() public int getAvailable()
{ {
return _stack.size(); return _count.getLo();
} }
@ManagedAttribute(value = "pending reserved threads", readonly = true) @ManagedAttribute(value = "pending reserved threads", readonly = true)
public int getPending() public int getPending()
{ {
return _pending.get(); return _count.getHi();
} }
@ManagedAttribute(value = "idletimeout in MS", readonly = true) @ManagedAttribute(value = "idle timeout in ms", readonly = true)
public long getIdleTimeoutMs() public long getIdleTimeoutMs()
{ {
if (_idleTimeUnit == null) return NANOSECONDS.toMillis(_idleTimeNanos);
return 0;
return _idleTimeUnit.toMillis(_idleTime);
} }
/** /**
* Set the idle timeout for shrinking the reserved thread pool * Set the idle timeout for shrinking the reserved thread pool
* *
* @param idleTime Time to wait before shrinking, or 0 for no timeout. * @param idleTime Time to wait before shrinking, or 0 for default timeout.
* @param idleTimeUnit Time units for idle timeout * @param idleTimeUnit Time units for idle timeout
*/ */
public void setIdleTimeout(long idleTime, TimeUnit idleTimeUnit) public void setIdleTimeout(long idleTime, TimeUnit idleTimeUnit)
{ {
if (isRunning()) if (isRunning())
throw new IllegalStateException(); throw new IllegalStateException();
_idleTime = idleTime; _idleTimeNanos = (idleTime <= 0 || idleTimeUnit == null) ? DEFAULT_IDLE_TIMEOUT : idleTimeUnit.toNanos(idleTime);
_idleTimeUnit = idleTimeUnit;
} }
@Override @Override
public void doStart() throws Exception public void doStart() throws Exception
{ {
_lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _capacity); _lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _capacity);
_size.set(0); _count.set(0, 0);
super.doStart(); super.doStart();
} }
@ -173,26 +179,22 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
super.doStop(); super.doStop();
while (true) // Offer STOP task to all waiting reserved threads.
for (int i = _count.getAndSetLo(-1); i-- > 0;)
{ {
int size = _size.get(); // yield to wait for any reserved threads that have incremented the size but not yet polled
// If no reserved threads left try setting size to -1 to Thread.yield();
// atomically prevent other threads adding themselves to stack. _queue.offer(STOP);
if (size == 0 && _size.compareAndSet(size, -1))
break;
ReservedThread thread = _stack.pollFirst();
if (thread == null)
{
// Reserved thread must have incremented size but not yet added itself to queue.
// We will spin until it is added.
Thread.yield();
continue;
}
_size.decrementAndGet();
thread.stop();
} }
// Interrupt any reserved thread missed the offer so it doesn't wait too long.
for (ReservedThread reserved : _threads)
{
Thread thread = reserved._thread;
if (thread != null)
thread.interrupt();
}
_threads.clear();
_count.getAndSetHi(0);
} }
@Override @Override
@ -212,52 +214,60 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} tryExecute {}", this, task); LOG.debug("{} tryExecute {}", this, task);
if (task == null) if (task == null)
return false; return false;
ReservedThread thread = _stack.pollFirst(); // Offer will only succeed if there is a reserved thread waiting
if (thread == null) boolean offered = _queue.offer(task);
{
if (task != STOP)
startReservedThread();
return false;
}
int size = _size.decrementAndGet(); // If the offer succeeded we need to reduce the size, unless it is set to -1 in the meantime
if (!thread.offer(task)) int size = _count.getLo();
return false; while (offered && size > 0 && !_count.compareAndSetLo(size, --size))
size = _count.getLo();
// If size is 0 and we are not stopping, start a new reserved thread
if (size == 0 && task != STOP) if (size == 0 && task != STOP)
startReservedThread(); startReservedThread();
return true; return offered;
} }
private void startReservedThread() private void startReservedThread()
{ {
try while (true)
{ {
while (true) long count = _count.get();
int pending = getHi(count);
int size = getLo(count);
if (size < 0 || pending + size >= _capacity)
return;
if (size == 0)
_lastEmptyTime.set(System.nanoTime());
if (!_count.compareAndSet(count, pending + 1, size))
continue;
if (LOG.isDebugEnabled())
LOG.debug("{} startReservedThread p={}", this, pending + 1);
try
{ {
// Not atomic, but there is a re-check in ReservedThread.run(). ReservedThread thread = new ReservedThread();
int pending = _pending.get(); _threads.add(thread);
int size = _size.get(); _executor.execute(thread);
if (pending + size >= _capacity)
return;
if (_pending.compareAndSet(pending, pending + 1))
{
if (LOG.isDebugEnabled())
LOG.debug("{} startReservedThread p={}", this, pending + 1);
_executor.execute(new ReservedThread());
return;
}
} }
catch (Throwable e)
{
_count.add(-1, 0);
LOG.ignore(e);
}
return;
} }
catch (RejectedExecutionException e) }
{
LOG.ignore(e); @Override
} public void dump(Appendable out, String indent) throws IOException
{
Dumpable.dumpObjects(out, indent, this,
new DumpableCollection("reserved", _threads));
} }
@Override @Override
@ -266,139 +276,148 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
return String.format("%s@%x{s=%d/%d,p=%d}", return String.format("%s@%x{s=%d/%d,p=%d}",
getClass().getSimpleName(), getClass().getSimpleName(),
hashCode(), hashCode(),
_size.get(), _count.getLo(),
_capacity, _capacity,
_pending.get()); _count.getHi());
}
private enum State
{
PENDING,
RESERVED,
RUNNING,
IDLE,
STOPPED
} }
private class ReservedThread implements Runnable private class ReservedThread implements Runnable
{ {
private final SynchronousQueue<Runnable> _task = new SynchronousQueue<>(); // The state and thread are kept only for dumping
private boolean _starting = true; private volatile State _state = State.PENDING;
private volatile Thread _thread;
public boolean offer(Runnable task)
{
if (LOG.isDebugEnabled())
LOG.debug("{} offer {}", this, task);
try
{
_task.put(task);
return true;
}
catch (Throwable e)
{
LOG.ignore(e);
_size.getAndIncrement();
_stack.offerFirst(this);
return false;
}
}
public void stop()
{
offer(STOP);
}
private Runnable reservedWait() private Runnable reservedWait()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} waiting", this); LOG.debug("{} waiting {}", this, ReservedThreadExecutor.this);
while (true) // Keep waiting until stopped, tasked or idle
while (_count.getLo() >= 0)
{ {
try try
{ {
Runnable task = _idleTime <= 0 ? _task.take() : _task.poll(_idleTime, _idleTimeUnit); // Always poll at some period as safety to ensure we don't poll forever.
Runnable task = _queue.poll(_idleTimeNanos, NANOSECONDS);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} task={}", this, task); LOG.debug("{} task={} {}", this, task, ReservedThreadExecutor.this);
if (task != null) if (task != null)
return task; return task;
if (_stack.remove(this)) // we have idled out
int size = _count.getLo();
// decrement size if we have not also been stopped.
while (size > 0)
{ {
if (LOG.isDebugEnabled()) if (_count.compareAndSetLo(size, --size))
LOG.debug("{} IDLE", this); break;
_size.decrementAndGet(); size = _count.getLo();
return STOP;
} }
_state = size >= 0 ? State.IDLE : State.STOPPED;
return STOP;
} }
catch (InterruptedException e) catch (InterruptedException e)
{ {
LOG.ignore(e); LOG.ignore(e);
// If the wait was interrupted, then STOP if we are not running
if (!isRunning())
return STOP;
} }
} }
_state = State.STOPPED;
return STOP;
} }
@Override @Override
public void run() public void run()
{ {
while (isRunning()) _thread = Thread.currentThread();
try
{ {
// test and increment size BEFORE decrementing pending, while (true)
// so that we don't have a race starting new pending.
int size = _size.get();
// Are we stopped?
if (size < 0)
return;
// Are we surplus to capacity?
if (size >= _capacity)
{ {
long count = _count.get();
// reduce pending if this thread was pending
int pending = getHi(count) - (_state == State.PENDING ? 1 : 0);
int size = getLo(count);
State next;
if (size < 0 || size >= _capacity)
{
// The executor has stopped or this thread is excess to capacity
next = State.STOPPED;
}
else
{
long now = System.nanoTime();
long lastEmpty = _lastEmptyTime.get();
if (size > 0 && _idleTimeNanos < (now - lastEmpty) && _lastEmptyTime.compareAndSet(lastEmpty, now))
{
// it has been too long since we hit zero reserved threads, so are "busy" idle
next = State.IDLE;
}
else
{
// We will become a reserved thread if we can update the count below.
next = State.RESERVED;
size++;
}
}
// Update count for pending and size
if (!_count.compareAndSet(count, pending, size))
continue;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} size {} > capacity", this, size, _capacity); LOG.debug("{} was={} next={} size={}+{} capacity={}", this, _state, next, pending, size, _capacity);
if (_starting) _state = next;
_pending.decrementAndGet(); if (next != State.RESERVED)
return; break;
}
// If we cannot update size then recalculate // We are reserved whilst we are waiting for an offered _task.
if (!_size.compareAndSet(size, size + 1)) Runnable task = reservedWait();
continue;
if (_starting) // Is the task the STOP poison pill?
{ if (task == STOP)
if (LOG.isDebugEnabled()) break;
LOG.debug("{} started", this);
_pending.decrementAndGet();
_starting = false;
}
// Insert ourselves in the stack. Size is already incremented, but // Run the task
// that only effects the decision to keep other threads reserved. try
_stack.offerFirst(this); {
_state = State.RUNNING;
// Once added to the stack, we must always wait for a job on the _task Queue task.run();
// and never return early, else we may leave a thread blocked offering a _task. }
Runnable task = reservedWait(); catch (Throwable e)
{
if (task == STOP) LOG.warn("Unable to run task", e);
// return on STOP poison pill }
break;
// Run the task
try
{
task.run();
}
catch (Throwable e)
{
LOG.warn(e);
} }
} }
finally
if (LOG.isDebugEnabled()) {
LOG.debug("{} Exited", this); if (LOG.isDebugEnabled())
LOG.debug("{} exited {}", this, ReservedThreadExecutor.this);
_threads.remove(this);
_thread = null;
}
} }
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x", ReservedThreadExecutor.this, hashCode()); return String.format("%s@%x{%s,thread=%s}",
getClass().getSimpleName(),
hashCode(),
_state,
_thread);
} }
} }
} }

View File

@ -27,10 +27,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -185,6 +185,35 @@ public class ReservedThreadExecutorTest
assertThat(_reservedExecutor.getAvailable(), is(0)); assertThat(_reservedExecutor.getAvailable(), is(0));
} }
@Test
public void testBusyShrink() throws Exception
{
final long IDLE = 1000;
_reservedExecutor.stop();
_reservedExecutor.setIdleTimeout(IDLE, TimeUnit.MILLISECONDS);
_reservedExecutor.start();
assertThat(_reservedExecutor.getAvailable(), is(0));
assertThat(_reservedExecutor.tryExecute(NOOP), is(false));
assertThat(_reservedExecutor.tryExecute(NOOP), is(false));
_executor.startThread();
_executor.startThread();
waitForAvailable(2);
int available = _reservedExecutor.getAvailable();
assertThat(available, is(2));
for (int i = 10; i-- > 0;)
{
assertThat(_reservedExecutor.tryExecute(NOOP), is(true));
Thread.sleep(200);
}
assertThat(_reservedExecutor.getAvailable(), is(1));
}
@Test @Test
public void testReservedIdleTimeoutWithOneReservedThread() throws Exception public void testReservedIdleTimeoutWithOneReservedThread() throws Exception
{ {
@ -266,7 +295,6 @@ public class ReservedThreadExecutorTest
} }
} }
@Disabled
@Test @Test
public void stressTest() throws Exception public void stressTest() throws Exception
{ {
@ -277,9 +305,9 @@ public class ReservedThreadExecutorTest
reserved.setIdleTimeout(0, null); reserved.setIdleTimeout(0, null);
reserved.start(); reserved.start();
final int LOOPS = 1000000; final int LOOPS = 200000;
final AtomicInteger executions = new AtomicInteger(LOOPS); final AtomicInteger executions = new AtomicInteger(LOOPS);
final CountDownLatch executed = new CountDownLatch(executions.get()); final CountDownLatch executed = new CountDownLatch(LOOPS);
final AtomicInteger usedReserved = new AtomicInteger(0); final AtomicInteger usedReserved = new AtomicInteger(0);
final AtomicInteger usedPool = new AtomicInteger(0); final AtomicInteger usedPool = new AtomicInteger(0);
@ -328,10 +356,15 @@ public class ReservedThreadExecutorTest
assertTrue(executed.await(60, TimeUnit.SECONDS)); assertTrue(executed.await(60, TimeUnit.SECONDS));
// ensure tryExecute is still working
while (!reserved.tryExecute(() -> {}))
Thread.yield();
reserved.stop(); reserved.stop();
pool.stop(); pool.stop();
assertThat(usedReserved.get(), greaterThan(0));
assertThat(usedReserved.get() + usedPool.get(), is(LOOPS)); assertThat(usedReserved.get() + usedPool.get(), is(LOOPS));
System.err.printf("reserved=%d pool=%d total=%d%n", usedReserved.get(), usedPool.get(), LOOPS); // System.err.printf("reserved=%d pool=%d total=%d%n", usedReserved.get(), usedPool.get(), LOOPS);
} }
} }