Non blocking ReservedThreadExecutor (#6535) (#6559)

A call to offer must never block, nor even yield, since to do so give an opportunity for the allocated CPU core to change, defeating the whole purpose of the class.
There is also some reasonable level of diagnostic warnings if a reserved thread misses too many offers consecutively, based on tracking the state of the reserved thread.

Remove the stack data structure entirely.  ReservedThreads all poll the same SynchronousQueue and tryExecute does a non blocking offer.

Added test for busy shrinking

Remember last time we hit zero reserved threads

Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Greg Wilkins 2021-07-30 09:08:36 +10:00 committed by GitHub
parent d8a890f71e
commit 6201d3b107
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 224 additions and 166 deletions

View File

@ -13,34 +13,45 @@
package org.eclipse.jetty.util.thread;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.eclipse.jetty.util.AtomicBiInteger.getHi;
import static org.eclipse.jetty.util.AtomicBiInteger.getLo;
/**
* An Executor using preallocated/reserved Threads from a wrapped Executor.
* An Executor using pre-allocated/reserved Threads from a wrapped Executor.
* <p>Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed
* with a Thread immediately being assigned the Runnable task, or fail if no Thread is
* available.
* <p>Threads are reserved lazily, with a new reserved thread being allocated from a
* wrapped {@link Executor} when an execution fails. If the {@link #setIdleTimeout(long, TimeUnit)}
* is set to non zero (default 1 minute), then the reserved thread pool will shrink by 1 thread
* whenever it has been idle for that period.
* <p>Threads are reserved lazily, with a new reserved threads being allocated from the
* {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been
* idle for more than {@link #getIdleTimeoutMs()} then one reserved thread will return to
* the executor.
*/
@ManagedObject("A pool for reserved threads")
public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor
public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor, Dumpable
{
private static final Logger LOG = LoggerFactory.getLogger(ReservedThreadExecutor.class);
private static final long DEFAULT_IDLE_TIMEOUT = TimeUnit.MINUTES.toNanos(1);
private static final Runnable STOP = new Runnable()
{
@Override
@ -57,13 +68,13 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
private final Executor _executor;
private final int _capacity;
private final ConcurrentLinkedDeque<ReservedThread> _stack;
private final AtomicInteger _size = new AtomicInteger();
private final AtomicInteger _pending = new AtomicInteger();
private final Set<ReservedThread> _threads = ConcurrentHashMap.newKeySet();
private final SynchronousQueue<Runnable> _queue = new SynchronousQueue<>(false);
private final AtomicBiInteger _count = new AtomicBiInteger(); // hi=pending; lo=size;
private final AtomicLong _lastEmptyTime = new AtomicLong(System.nanoTime());
private ThreadPoolBudget.Lease _lease;
private long _idleTime = 1L;
private TimeUnit _idleTimeUnit = TimeUnit.MINUTES;
private long _idleTimeNanos = DEFAULT_IDLE_TIMEOUT;
/**
* @param executor The executor to use to obtain threads
@ -75,7 +86,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
{
_executor = executor;
_capacity = reservedThreads(executor, capacity);
_stack = new ConcurrentLinkedDeque<>();
if (LOG.isDebugEnabled())
LOG.debug("{}", this);
}
@ -121,42 +131,39 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
@ManagedAttribute(value = "available reserved threads", readonly = true)
public int getAvailable()
{
return _stack.size();
return _count.getLo();
}
@ManagedAttribute(value = "pending reserved threads", readonly = true)
public int getPending()
{
return _pending.get();
return _count.getHi();
}
@ManagedAttribute(value = "idletimeout in MS", readonly = true)
@ManagedAttribute(value = "idle timeout in ms", readonly = true)
public long getIdleTimeoutMs()
{
if (_idleTimeUnit == null)
return 0;
return _idleTimeUnit.toMillis(_idleTime);
return NANOSECONDS.toMillis(_idleTimeNanos);
}
/**
* Set the idle timeout for shrinking the reserved thread pool
*
* @param idleTime Time to wait before shrinking, or 0 for no timeout.
* @param idleTime Time to wait before shrinking, or 0 for default timeout.
* @param idleTimeUnit Time units for idle timeout
*/
public void setIdleTimeout(long idleTime, TimeUnit idleTimeUnit)
{
if (isRunning())
throw new IllegalStateException();
_idleTime = idleTime;
_idleTimeUnit = idleTimeUnit;
_idleTimeNanos = (idleTime <= 0 || idleTimeUnit == null) ? DEFAULT_IDLE_TIMEOUT : idleTimeUnit.toNanos(idleTime);
}
@Override
public void doStart() throws Exception
{
_lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _capacity);
_size.set(0);
_count.set(0, 0);
super.doStart();
}
@ -168,26 +175,22 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
super.doStop();
while (true)
// Offer STOP task to all waiting reserved threads.
for (int i = _count.getAndSetLo(-1); i-- > 0;)
{
int size = _size.get();
// If no reserved threads left try setting size to -1 to
// atomically prevent other threads adding themselves to stack.
if (size == 0 && _size.compareAndSet(size, -1))
break;
ReservedThread thread = _stack.pollFirst();
if (thread == null)
// yield to wait for any reserved threads that have incremented the size but not yet polled
Thread.yield();
_queue.offer(STOP);
}
// Interrupt any reserved thread missed the offer so it doesn't wait too long.
for (ReservedThread reserved : _threads)
{
// Reserved thread must have incremented size but not yet added itself to queue.
// We will spin until it is added.
Thread.onSpinWait();
continue;
}
_size.decrementAndGet();
thread.stop();
Thread thread = reserved._thread;
if (thread != null)
thread.interrupt();
}
_threads.clear();
_count.getAndSetHi(0);
}
@Override
@ -207,52 +210,61 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
{
if (LOG.isDebugEnabled())
LOG.debug("{} tryExecute {}", this, task);
if (task == null)
return false;
ReservedThread thread = _stack.pollFirst();
if (thread == null)
{
if (task != STOP)
startReservedThread();
return false;
}
// Offer will only succeed if there is a reserved thread waiting
boolean offered = _queue.offer(task);
int size = _size.decrementAndGet();
if (!thread.offer(task))
return false;
// If the offer succeeded we need to reduce the size, unless it is set to -1 in the meantime
int size = _count.getLo();
while (offered && size > 0 && !_count.compareAndSetLo(size, --size))
size = _count.getLo();
// If size is 0 and we are not stopping, start a new reserved thread
if (size == 0 && task != STOP)
startReservedThread();
return true;
return offered;
}
private void startReservedThread()
{
try
{
while (true)
{
// Not atomic, but there is a re-check in ReservedThread.run().
int pending = _pending.get();
int size = _size.get();
if (pending + size >= _capacity)
long count = _count.get();
int pending = getHi(count);
int size = getLo(count);
if (size < 0 || pending + size >= _capacity)
return;
if (_pending.compareAndSet(pending, pending + 1))
{
if (size == 0)
_lastEmptyTime.set(System.nanoTime());
if (!_count.compareAndSet(count, pending + 1, size))
continue;
if (LOG.isDebugEnabled())
LOG.debug("{} startReservedThread p={}", this, pending + 1);
_executor.execute(new ReservedThread());
try
{
ReservedThread thread = new ReservedThread();
_threads.add(thread);
_executor.execute(thread);
}
catch (Throwable e)
{
_count.add(-1, 0);
if (LOG.isDebugEnabled())
LOG.debug("ignored", e);
}
return;
}
}
}
catch (RejectedExecutionException e)
@Override
public void dump(Appendable out, String indent) throws IOException
{
LOG.trace("IGNORED", e);
}
Dumpable.dumpObjects(out, indent, this,
new DumpableCollection("reserved", _threads));
}
@Override
@ -261,120 +273,124 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
return String.format("%s@%x{s=%d/%d,p=%d}",
getClass().getSimpleName(),
hashCode(),
_size.get(),
_count.getLo(),
_capacity,
_pending.get());
_count.getHi());
}
private enum State
{
PENDING,
RESERVED,
RUNNING,
IDLE,
STOPPED
}
private class ReservedThread implements Runnable
{
private final SynchronousQueue<Runnable> _task = new SynchronousQueue<>();
private boolean _starting = true;
public boolean offer(Runnable task)
{
if (LOG.isDebugEnabled())
LOG.debug("{} offer {}", this, task);
try
{
_task.put(task);
return true;
}
catch (Throwable e)
{
LOG.trace("IGNORED", e);
_size.getAndIncrement();
_stack.offerFirst(this);
return false;
}
}
public void stop()
{
offer(STOP);
}
// The state and thread are kept only for dumping
private volatile State _state = State.PENDING;
private volatile Thread _thread;
private Runnable reservedWait()
{
if (LOG.isDebugEnabled())
LOG.debug("{} waiting", this);
LOG.debug("{} waiting {}", this, ReservedThreadExecutor.this);
while (true)
// Keep waiting until stopped, tasked or idle
while (_count.getLo() >= 0)
{
try
{
Runnable task = _idleTime <= 0 ? _task.take() : _task.poll(_idleTime, _idleTimeUnit);
// Always poll at some period as safety to ensure we don't poll forever.
Runnable task = _queue.poll(_idleTimeNanos, NANOSECONDS);
if (LOG.isDebugEnabled())
LOG.debug("{} task={}", this, task);
LOG.debug("{} task={} {}", this, task, ReservedThreadExecutor.this);
if (task != null)
return task;
if (_stack.remove(this))
// we have idled out
int size = _count.getLo();
// decrement size if we have not also been stopped.
while (size > 0)
{
if (LOG.isDebugEnabled())
LOG.debug("{} IDLE", this);
_size.decrementAndGet();
return STOP;
if (_count.compareAndSetLo(size, --size))
break;
size = _count.getLo();
}
_state = size >= 0 ? State.IDLE : State.STOPPED;
return STOP;
}
catch (InterruptedException e)
{
LOG.trace("IGNORED", e);
if (LOG.isDebugEnabled())
LOG.debug("ignored", e);
}
}
_state = State.STOPPED;
return STOP;
}
@Override
public void run()
{
while (isRunning())
_thread = Thread.currentThread();
try
{
// test and increment size BEFORE decrementing pending,
// so that we don't have a race starting new pending.
int size = _size.get();
// Are we stopped?
if (size < 0)
return;
// Are we surplus to capacity?
if (size >= _capacity)
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("{} size {} > capacity {}", this, size, _capacity);
if (_starting)
_pending.decrementAndGet();
return;
long count = _count.get();
// reduce pending if this thread was pending
int pending = getHi(count) - (_state == State.PENDING ? 1 : 0);
int size = getLo(count);
State next;
if (size < 0 || size >= _capacity)
{
// The executor has stopped or this thread is excess to capacity
next = State.STOPPED;
}
else
{
long now = System.nanoTime();
long lastEmpty = _lastEmptyTime.get();
if (size > 0 && _idleTimeNanos < (now - lastEmpty) && _lastEmptyTime.compareAndSet(lastEmpty, now))
{
// it has been too long since we hit zero reserved threads, so are "busy" idle
next = State.IDLE;
}
else
{
// We will become a reserved thread if we can update the count below.
next = State.RESERVED;
size++;
}
}
// If we cannot update size then recalculate
if (!_size.compareAndSet(size, size + 1))
// Update count for pending and size
if (!_count.compareAndSet(count, pending, size))
continue;
if (_starting)
{
if (LOG.isDebugEnabled())
LOG.debug("{} started", this);
_pending.decrementAndGet();
_starting = false;
}
LOG.debug("{} was={} next={} size={}+{} capacity={}", this, _state, next, pending, size, _capacity);
_state = next;
if (next != State.RESERVED)
break;
// Insert ourselves in the stack. Size is already incremented, but
// that only effects the decision to keep other threads reserved.
_stack.offerFirst(this);
// Once added to the stack, we must always wait for a job on the _task Queue
// and never return early, else we may leave a thread blocked offering a _task.
// We are reserved whilst we are waiting for an offered _task.
Runnable task = reservedWait();
// Is the task the STOP poison pill?
if (task == STOP)
// return on STOP poison pill
break;
// Run the task
try
{
_state = State.RUNNING;
task.run();
}
catch (Throwable e)
@ -382,15 +398,24 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
LOG.warn("Unable to run task", e);
}
}
}
finally
{
if (LOG.isDebugEnabled())
LOG.debug("{} Exited", this);
LOG.debug("{} exited {}", this, ReservedThreadExecutor.this);
_threads.remove(this);
_thread = null;
}
}
@Override
public String toString()
{
return String.format("%s@%x", ReservedThreadExecutor.this, hashCode());
return String.format("%s@%x{%s,thread=%s}",
getClass().getSimpleName(),
hashCode(),
_state,
_thread);
}
}
}

View File

@ -22,10 +22,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -180,6 +180,35 @@ public class ReservedThreadExecutorTest
assertThat(_reservedExecutor.getAvailable(), is(0));
}
@Test
public void testBusyShrink() throws Exception
{
final long IDLE = 1000;
_reservedExecutor.stop();
_reservedExecutor.setIdleTimeout(IDLE, TimeUnit.MILLISECONDS);
_reservedExecutor.start();
assertThat(_reservedExecutor.getAvailable(), is(0));
assertThat(_reservedExecutor.tryExecute(NOOP), is(false));
assertThat(_reservedExecutor.tryExecute(NOOP), is(false));
_executor.startThread();
_executor.startThread();
waitForAvailable(2);
int available = _reservedExecutor.getAvailable();
assertThat(available, is(2));
for (int i = 10; i-- > 0;)
{
assertThat(_reservedExecutor.tryExecute(NOOP), is(true));
Thread.sleep(200);
}
assertThat(_reservedExecutor.getAvailable(), is(1));
}
@Test
public void testReservedIdleTimeoutWithOneReservedThread() throws Exception
{
@ -261,7 +290,6 @@ public class ReservedThreadExecutorTest
}
}
@Disabled
@Test
public void stressTest() throws Exception
{
@ -271,9 +299,9 @@ public class ReservedThreadExecutorTest
reserved.setIdleTimeout(0, null);
reserved.start();
final int LOOPS = 1000000;
final int LOOPS = 200000;
final AtomicInteger executions = new AtomicInteger(LOOPS);
final CountDownLatch executed = new CountDownLatch(executions.get());
final CountDownLatch executed = new CountDownLatch(LOOPS);
final AtomicInteger usedReserved = new AtomicInteger(0);
final AtomicInteger usedPool = new AtomicInteger(0);
@ -322,10 +350,15 @@ public class ReservedThreadExecutorTest
assertTrue(executed.await(60, TimeUnit.SECONDS));
// ensure tryExecute is still working
while (!reserved.tryExecute(() -> {}))
Thread.yield();
reserved.stop();
pool.stop();
assertThat(usedReserved.get(), greaterThan(0));
assertThat(usedReserved.get() + usedPool.get(), is(LOOPS));
System.err.printf("reserved=%d pool=%d total=%d%n", usedReserved.get(), usedPool.get(), LOOPS);
// System.err.printf("reserved=%d pool=%d total=%d%n", usedReserved.get(), usedPool.get(), LOOPS);
}
}