Fixes #6652 - Improve ReservedThreadExecutor dump. (#6653)

Fixes #6652 - Improve ReservedThreadExecutor dump.

Filtering out non-reserved threads in dump() and doStop().

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Simone Bordet 2021-08-24 23:12:08 +02:00 committed by GitHub
parent 3de9d3428e
commit b2a023675c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 37 additions and 25 deletions

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.util.thread; package org.eclipse.jetty.util.thread;
import java.io.IOException; import java.io.IOException;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -26,6 +27,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.eclipse.jetty.util.AtomicBiInteger; import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.ProcessorUtils; import org.eclipse.jetty.util.ProcessorUtils;
@ -42,14 +44,14 @@ import static org.eclipse.jetty.util.AtomicBiInteger.getHi;
import static org.eclipse.jetty.util.AtomicBiInteger.getLo; import static org.eclipse.jetty.util.AtomicBiInteger.getLo;
/** /**
* An Executor using pre-allocated/reserved Threads from a wrapped Executor. * <p>A TryExecutor using pre-allocated/reserved threads from an external Executor.</p>
* <p>Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed * <p>Calls to {@link #tryExecute(Runnable)} on ReservedThreadExecutor will either
* with a Thread immediately being assigned the Runnable task, or fail if no Thread is * succeed with a reserved thread immediately being assigned the task, or fail if
* available. * no reserved thread is available.</p>
* <p>Threads are reserved lazily, with a new reserved threads being allocated from the * <p>Threads are reserved lazily, with new reserved threads being allocated from the external
* {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been * {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been
* idle for more than {@link #getIdleTimeoutMs()} then one reserved thread will return to * idle for more than {@link #getIdleTimeoutMs()} then one reserved thread will return to
* the executor. * the external Executor.</p>
*/ */
@ManagedObject("A pool for reserved threads") @ManagedObject("A pool for reserved threads")
public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor, Dumpable public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor, Dumpable
@ -66,7 +68,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
@Override @Override
public String toString() public String toString()
{ {
return "STOP!"; return "STOP";
} }
}; };
@ -76,7 +78,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
private final SynchronousQueue<Runnable> _queue = new SynchronousQueue<>(false); private final SynchronousQueue<Runnable> _queue = new SynchronousQueue<>(false);
private final AtomicBiInteger _count = new AtomicBiInteger(); // hi=pending; lo=size; private final AtomicBiInteger _count = new AtomicBiInteger(); // hi=pending; lo=size;
private final AtomicLong _lastEmptyTime = new AtomicLong(System.nanoTime()); private final AtomicLong _lastEmptyTime = new AtomicLong(System.nanoTime());
private ThreadPoolBudget.Lease _lease; private ThreadPoolBudget.Lease _lease;
private long _idleTimeNanos = DEFAULT_IDLE_TIMEOUT; private long _idleTimeNanos = DEFAULT_IDLE_TIMEOUT;
@ -179,20 +180,25 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
super.doStop(); super.doStop();
// Offer STOP task to all waiting reserved threads. // Mark this instance as stopped.
for (int i = _count.getAndSetLo(-1); i-- > 0;) int size = _count.getAndSetLo(-1);
// Offer the STOP task to all waiting reserved threads.
for (int i = 0; i < size; ++i)
{ {
// yield to wait for any reserved threads that have incremented the size but not yet polled // Yield to wait for any reserved threads that
// have incremented the size but not yet polled.
Thread.yield(); Thread.yield();
_queue.offer(STOP); _queue.offer(STOP);
} }
// Interrupt any reserved thread missed the offer so it doesn't wait too long.
for (ReservedThread reserved : _threads) // Interrupt any reserved thread missed the offer,
{ // so they do not wait for the whole idle timeout.
Thread thread = reserved._thread; _threads.stream()
if (thread != null) .filter(ReservedThread::isReserved)
thread.interrupt(); .map(t -> t._thread)
} .filter(Objects::nonNull)
.forEach(Thread::interrupt);
_threads.clear(); _threads.clear();
_count.getAndSetHi(0); _count.getAndSetHi(0);
} }
@ -267,13 +273,16 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
public void dump(Appendable out, String indent) throws IOException public void dump(Appendable out, String indent) throws IOException
{ {
Dumpable.dumpObjects(out, indent, this, Dumpable.dumpObjects(out, indent, this,
new DumpableCollection("reserved", _threads)); new DumpableCollection("threads",
_threads.stream()
.filter(ReservedThread::isReserved)
.collect(Collectors.toList())));
} }
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x{s=%d/%d,p=%d}", return String.format("%s@%x{reserved=%d/%d,pending=%d}",
getClass().getSimpleName(), getClass().getSimpleName(),
hashCode(), hashCode(),
_count.getLo(), _count.getLo(),
@ -296,6 +305,11 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
private volatile State _state = State.PENDING; private volatile State _state = State.PENDING;
private volatile Thread _thread; private volatile Thread _thread;
private boolean isReserved()
{
return _state == State.RESERVED;
}
private Runnable reservedWait() private Runnable reservedWait()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -324,7 +338,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
} }
_state = size >= 0 ? State.IDLE : State.STOPPED; _state = size >= 0 ? State.IDLE : State.STOPPED;
return STOP; return STOP;
} }
catch (InterruptedException e) catch (InterruptedException e)
{ {

View File

@ -850,7 +850,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
dump = pool.dump(); dump = pool.dump();
assertThat(count(dump, " - STARTED"), is(2)); assertThat(count(dump, " - STARTED"), is(2));
assertThat(dump, containsString(",3<=3<=4,i=2,r=2,q=0")); assertThat(dump, containsString(",3<=3<=4,i=2,r=2,q=0"));
assertThat(dump, containsString("s=0/2")); assertThat(dump, containsString("reserved=0/2"));
assertThat(dump, containsString("[ReservedThreadExecutor@")); assertThat(dump, containsString("[ReservedThreadExecutor@"));
assertThat(count(dump, " IDLE"), is(2)); assertThat(count(dump, " IDLE"), is(2));
assertThat(count(dump, " WAITING"), is(1)); assertThat(count(dump, " WAITING"), is(1));
@ -865,7 +865,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
dump = pool.dump(); dump = pool.dump();
assertThat(count(dump, " - STARTED"), is(2)); assertThat(count(dump, " - STARTED"), is(2));
assertThat(dump, containsString(",3<=3<=4,i=1,r=2,q=0")); assertThat(dump, containsString(",3<=3<=4,i=1,r=2,q=0"));
assertThat(dump, containsString("s=1/2")); assertThat(dump, containsString("reserved=1/2"));
assertThat(dump, containsString("[ReservedThreadExecutor@")); assertThat(dump, containsString("[ReservedThreadExecutor@"));
assertThat(count(dump, " IDLE"), is(1)); assertThat(count(dump, " IDLE"), is(1));
assertThat(count(dump, " WAITING"), is(1)); assertThat(count(dump, " WAITING"), is(1));

View File

@ -365,6 +365,5 @@ public class ReservedThreadExecutorTest
assertThat(usedReserved.get(), greaterThan(0)); assertThat(usedReserved.get(), greaterThan(0));
assertThat(usedReserved.get() + usedPool.get(), is(LOOPS)); assertThat(usedReserved.get() + usedPool.get(), is(LOOPS));
// System.err.printf("reserved=%d pool=%d total=%d%n", usedReserved.get(), usedPool.get(), LOOPS);
} }
} }