diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java
index c1510eaff40..81d41d0c7a2 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java
@@ -13,34 +13,45 @@
package org.eclipse.jetty.util.thread;
-import java.util.concurrent.ConcurrentLinkedDeque;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
+import org.eclipse.jetty.util.component.Dumpable;
+import org.eclipse.jetty.util.component.DumpableCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.eclipse.jetty.util.AtomicBiInteger.getHi;
+import static org.eclipse.jetty.util.AtomicBiInteger.getLo;
+
+
/**
- * An Executor using preallocated/reserved Threads from a wrapped Executor.
+ * An Executor using pre-allocated/reserved Threads from a wrapped Executor.
*
Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed
* with a Thread immediately being assigned the Runnable task, or fail if no Thread is
* available.
- *
Threads are reserved lazily, with a new reserved thread being allocated from a
- * wrapped {@link Executor} when an execution fails. If the {@link #setIdleTimeout(long, TimeUnit)}
- * is set to non zero (default 1 minute), then the reserved thread pool will shrink by 1 thread
- * whenever it has been idle for that period.
+ *
Threads are reserved lazily, with a new reserved threads being allocated from the
+ * {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been
+ * idle for more than {@link #getIdleTimeoutMs()} then one reserved thread will return to
+ * the executor.
*/
@ManagedObject("A pool for reserved threads")
-public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor
+public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor, Dumpable
{
private static final Logger LOG = LoggerFactory.getLogger(ReservedThreadExecutor.class);
+ private static final long DEFAULT_IDLE_TIMEOUT = TimeUnit.MINUTES.toNanos(1);
private static final Runnable STOP = new Runnable()
{
@Override
@@ -57,13 +68,13 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
private final Executor _executor;
private final int _capacity;
- private final ConcurrentLinkedDeque _stack;
- private final AtomicInteger _size = new AtomicInteger();
- private final AtomicInteger _pending = new AtomicInteger();
+ private final Set _threads = ConcurrentHashMap.newKeySet();
+ private final SynchronousQueue _queue = new SynchronousQueue<>(false);
+ private final AtomicBiInteger _count = new AtomicBiInteger(); // hi=pending; lo=size;
+ private final AtomicLong _lastEmptyTime = new AtomicLong(System.nanoTime());
private ThreadPoolBudget.Lease _lease;
- private long _idleTime = 1L;
- private TimeUnit _idleTimeUnit = TimeUnit.MINUTES;
+ private long _idleTimeNanos = DEFAULT_IDLE_TIMEOUT;
/**
* @param executor The executor to use to obtain threads
@@ -75,7 +86,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
{
_executor = executor;
_capacity = reservedThreads(executor, capacity);
- _stack = new ConcurrentLinkedDeque<>();
if (LOG.isDebugEnabled())
LOG.debug("{}", this);
}
@@ -121,42 +131,39 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
@ManagedAttribute(value = "available reserved threads", readonly = true)
public int getAvailable()
{
- return _stack.size();
+ return _count.getLo();
}
@ManagedAttribute(value = "pending reserved threads", readonly = true)
public int getPending()
{
- return _pending.get();
+ return _count.getHi();
}
- @ManagedAttribute(value = "idletimeout in MS", readonly = true)
+ @ManagedAttribute(value = "idle timeout in ms", readonly = true)
public long getIdleTimeoutMs()
{
- if (_idleTimeUnit == null)
- return 0;
- return _idleTimeUnit.toMillis(_idleTime);
+ return NANOSECONDS.toMillis(_idleTimeNanos);
}
/**
* Set the idle timeout for shrinking the reserved thread pool
*
- * @param idleTime Time to wait before shrinking, or 0 for no timeout.
+ * @param idleTime Time to wait before shrinking, or 0 for default timeout.
* @param idleTimeUnit Time units for idle timeout
*/
public void setIdleTimeout(long idleTime, TimeUnit idleTimeUnit)
{
if (isRunning())
throw new IllegalStateException();
- _idleTime = idleTime;
- _idleTimeUnit = idleTimeUnit;
+ _idleTimeNanos = (idleTime <= 0 || idleTimeUnit == null) ? DEFAULT_IDLE_TIMEOUT : idleTimeUnit.toNanos(idleTime);
}
@Override
public void doStart() throws Exception
{
_lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _capacity);
- _size.set(0);
+ _count.set(0, 0);
super.doStart();
}
@@ -168,26 +175,22 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
super.doStop();
- while (true)
+ // Offer STOP task to all waiting reserved threads.
+ for (int i = _count.getAndSetLo(-1); i-- > 0;)
{
- int size = _size.get();
- // If no reserved threads left try setting size to -1 to
- // atomically prevent other threads adding themselves to stack.
- if (size == 0 && _size.compareAndSet(size, -1))
- break;
-
- ReservedThread thread = _stack.pollFirst();
- if (thread == null)
- {
- // Reserved thread must have incremented size but not yet added itself to queue.
- // We will spin until it is added.
- Thread.onSpinWait();
- continue;
- }
-
- _size.decrementAndGet();
- thread.stop();
+ // yield to wait for any reserved threads that have incremented the size but not yet polled
+ Thread.yield();
+ _queue.offer(STOP);
}
+ // Interrupt any reserved thread missed the offer so it doesn't wait too long.
+ for (ReservedThread reserved : _threads)
+ {
+ Thread thread = reserved._thread;
+ if (thread != null)
+ thread.interrupt();
+ }
+ _threads.clear();
+ _count.getAndSetHi(0);
}
@Override
@@ -207,52 +210,61 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
{
if (LOG.isDebugEnabled())
LOG.debug("{} tryExecute {}", this, task);
-
if (task == null)
return false;
- ReservedThread thread = _stack.pollFirst();
- if (thread == null)
- {
- if (task != STOP)
- startReservedThread();
- return false;
- }
+ // Offer will only succeed if there is a reserved thread waiting
+ boolean offered = _queue.offer(task);
- int size = _size.decrementAndGet();
- if (!thread.offer(task))
- return false;
+ // If the offer succeeded we need to reduce the size, unless it is set to -1 in the meantime
+ int size = _count.getLo();
+ while (offered && size > 0 && !_count.compareAndSetLo(size, --size))
+ size = _count.getLo();
+ // If size is 0 and we are not stopping, start a new reserved thread
if (size == 0 && task != STOP)
startReservedThread();
- return true;
+ return offered;
}
private void startReservedThread()
{
- try
+ while (true)
{
- while (true)
+ long count = _count.get();
+ int pending = getHi(count);
+ int size = getLo(count);
+ if (size < 0 || pending + size >= _capacity)
+ return;
+ if (size == 0)
+ _lastEmptyTime.set(System.nanoTime());
+ if (!_count.compareAndSet(count, pending + 1, size))
+ continue;
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} startReservedThread p={}", this, pending + 1);
+ try
{
- // Not atomic, but there is a re-check in ReservedThread.run().
- int pending = _pending.get();
- int size = _size.get();
- if (pending + size >= _capacity)
- return;
- if (_pending.compareAndSet(pending, pending + 1))
- {
- if (LOG.isDebugEnabled())
- LOG.debug("{} startReservedThread p={}", this, pending + 1);
- _executor.execute(new ReservedThread());
- return;
- }
+ ReservedThread thread = new ReservedThread();
+ _threads.add(thread);
+ _executor.execute(thread);
}
+ catch (Throwable e)
+ {
+ _count.add(-1, 0);
+ if (LOG.isDebugEnabled())
+ LOG.debug("ignored", e);
+ }
+ return;
}
- catch (RejectedExecutionException e)
- {
- LOG.trace("IGNORED", e);
- }
+ }
+
+ @Override
+ public void dump(Appendable out, String indent) throws IOException
+ {
+ Dumpable.dumpObjects(out, indent, this,
+ new DumpableCollection("reserved", _threads));
}
@Override
@@ -261,136 +273,149 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
return String.format("%s@%x{s=%d/%d,p=%d}",
getClass().getSimpleName(),
hashCode(),
- _size.get(),
+ _count.getLo(),
_capacity,
- _pending.get());
+ _count.getHi());
+ }
+
+ private enum State
+ {
+ PENDING,
+ RESERVED,
+ RUNNING,
+ IDLE,
+ STOPPED
}
private class ReservedThread implements Runnable
{
- private final SynchronousQueue _task = new SynchronousQueue<>();
- private boolean _starting = true;
-
- public boolean offer(Runnable task)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("{} offer {}", this, task);
-
- try
- {
- _task.put(task);
- return true;
- }
- catch (Throwable e)
- {
- LOG.trace("IGNORED", e);
- _size.getAndIncrement();
- _stack.offerFirst(this);
- return false;
- }
- }
-
- public void stop()
- {
- offer(STOP);
- }
+ // The state and thread are kept only for dumping
+ private volatile State _state = State.PENDING;
+ private volatile Thread _thread;
private Runnable reservedWait()
{
if (LOG.isDebugEnabled())
- LOG.debug("{} waiting", this);
+ LOG.debug("{} waiting {}", this, ReservedThreadExecutor.this);
- while (true)
+ // Keep waiting until stopped, tasked or idle
+ while (_count.getLo() >= 0)
{
try
{
- Runnable task = _idleTime <= 0 ? _task.take() : _task.poll(_idleTime, _idleTimeUnit);
+ // Always poll at some period as safety to ensure we don't poll forever.
+ Runnable task = _queue.poll(_idleTimeNanos, NANOSECONDS);
if (LOG.isDebugEnabled())
- LOG.debug("{} task={}", this, task);
+ LOG.debug("{} task={} {}", this, task, ReservedThreadExecutor.this);
if (task != null)
return task;
- if (_stack.remove(this))
+ // we have idled out
+ int size = _count.getLo();
+ // decrement size if we have not also been stopped.
+ while (size > 0)
{
- if (LOG.isDebugEnabled())
- LOG.debug("{} IDLE", this);
- _size.decrementAndGet();
- return STOP;
+ if (_count.compareAndSetLo(size, --size))
+ break;
+ size = _count.getLo();
}
+ _state = size >= 0 ? State.IDLE : State.STOPPED;
+ return STOP;
+
}
catch (InterruptedException e)
{
- LOG.trace("IGNORED", e);
+ if (LOG.isDebugEnabled())
+ LOG.debug("ignored", e);
}
}
+ _state = State.STOPPED;
+ return STOP;
}
@Override
public void run()
{
- while (isRunning())
+ _thread = Thread.currentThread();
+ try
{
- // test and increment size BEFORE decrementing pending,
- // so that we don't have a race starting new pending.
- int size = _size.get();
-
- // Are we stopped?
- if (size < 0)
- return;
-
- // Are we surplus to capacity?
- if (size >= _capacity)
+ while (true)
{
+ long count = _count.get();
+
+ // reduce pending if this thread was pending
+ int pending = getHi(count) - (_state == State.PENDING ? 1 : 0);
+ int size = getLo(count);
+
+ State next;
+ if (size < 0 || size >= _capacity)
+ {
+ // The executor has stopped or this thread is excess to capacity
+ next = State.STOPPED;
+ }
+ else
+ {
+ long now = System.nanoTime();
+ long lastEmpty = _lastEmptyTime.get();
+ if (size > 0 && _idleTimeNanos < (now - lastEmpty) && _lastEmptyTime.compareAndSet(lastEmpty, now))
+ {
+ // it has been too long since we hit zero reserved threads, so are "busy" idle
+ next = State.IDLE;
+ }
+ else
+ {
+ // We will become a reserved thread if we can update the count below.
+ next = State.RESERVED;
+ size++;
+ }
+ }
+
+ // Update count for pending and size
+ if (!_count.compareAndSet(count, pending, size))
+ continue;
+
if (LOG.isDebugEnabled())
- LOG.debug("{} size {} > capacity {}", this, size, _capacity);
- if (_starting)
- _pending.decrementAndGet();
- return;
- }
+ LOG.debug("{} was={} next={} size={}+{} capacity={}", this, _state, next, pending, size, _capacity);
+ _state = next;
+ if (next != State.RESERVED)
+ break;
- // If we cannot update size then recalculate
- if (!_size.compareAndSet(size, size + 1))
- continue;
+ // We are reserved whilst we are waiting for an offered _task.
+ Runnable task = reservedWait();
- if (_starting)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("{} started", this);
- _pending.decrementAndGet();
- _starting = false;
- }
+ // Is the task the STOP poison pill?
+ if (task == STOP)
+ break;
- // Insert ourselves in the stack. Size is already incremented, but
- // that only effects the decision to keep other threads reserved.
- _stack.offerFirst(this);
-
- // Once added to the stack, we must always wait for a job on the _task Queue
- // and never return early, else we may leave a thread blocked offering a _task.
- Runnable task = reservedWait();
-
- if (task == STOP)
- // return on STOP poison pill
- break;
-
- // Run the task
- try
- {
- task.run();
- }
- catch (Throwable e)
- {
- LOG.warn("Unable to run task", e);
+ // Run the task
+ try
+ {
+ _state = State.RUNNING;
+ task.run();
+ }
+ catch (Throwable e)
+ {
+ LOG.warn("Unable to run task", e);
+ }
}
}
-
- if (LOG.isDebugEnabled())
- LOG.debug("{} Exited", this);
+ finally
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} exited {}", this, ReservedThreadExecutor.this);
+ _threads.remove(this);
+ _thread = null;
+ }
}
@Override
public String toString()
{
- return String.format("%s@%x", ReservedThreadExecutor.this, hashCode());
+ return String.format("%s@%x{%s,thread=%s}",
+ getClass().getSimpleName(),
+ hashCode(),
+ _state,
+ _thread);
}
}
-}
+}
\ No newline at end of file
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java
index 5f86fa2b1be..e3a08b876bf 100644
--- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java
@@ -22,10 +22,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -180,6 +180,35 @@ public class ReservedThreadExecutorTest
assertThat(_reservedExecutor.getAvailable(), is(0));
}
+ @Test
+ public void testBusyShrink() throws Exception
+ {
+ final long IDLE = 1000;
+
+ _reservedExecutor.stop();
+ _reservedExecutor.setIdleTimeout(IDLE, TimeUnit.MILLISECONDS);
+ _reservedExecutor.start();
+ assertThat(_reservedExecutor.getAvailable(), is(0));
+
+ assertThat(_reservedExecutor.tryExecute(NOOP), is(false));
+ assertThat(_reservedExecutor.tryExecute(NOOP), is(false));
+
+ _executor.startThread();
+ _executor.startThread();
+
+ waitForAvailable(2);
+
+ int available = _reservedExecutor.getAvailable();
+ assertThat(available, is(2));
+
+ for (int i = 10; i-- > 0;)
+ {
+ assertThat(_reservedExecutor.tryExecute(NOOP), is(true));
+ Thread.sleep(200);
+ }
+ assertThat(_reservedExecutor.getAvailable(), is(1));
+ }
+
@Test
public void testReservedIdleTimeoutWithOneReservedThread() throws Exception
{
@@ -261,7 +290,6 @@ public class ReservedThreadExecutorTest
}
}
- @Disabled
@Test
public void stressTest() throws Exception
{
@@ -271,9 +299,9 @@ public class ReservedThreadExecutorTest
reserved.setIdleTimeout(0, null);
reserved.start();
- final int LOOPS = 1000000;
+ final int LOOPS = 200000;
final AtomicInteger executions = new AtomicInteger(LOOPS);
- final CountDownLatch executed = new CountDownLatch(executions.get());
+ final CountDownLatch executed = new CountDownLatch(LOOPS);
final AtomicInteger usedReserved = new AtomicInteger(0);
final AtomicInteger usedPool = new AtomicInteger(0);
@@ -322,10 +350,15 @@ public class ReservedThreadExecutorTest
assertTrue(executed.await(60, TimeUnit.SECONDS));
+ // ensure tryExecute is still working
+ while (!reserved.tryExecute(() -> {}))
+ Thread.yield();
+
reserved.stop();
pool.stop();
+ assertThat(usedReserved.get(), greaterThan(0));
assertThat(usedReserved.get() + usedPool.get(), is(LOOPS));
- System.err.printf("reserved=%d pool=%d total=%d%n", usedReserved.get(), usedPool.get(), LOOPS);
+ // System.err.printf("reserved=%d pool=%d total=%d%n", usedReserved.get(), usedPool.get(), LOOPS);
}
}