Fixes #6652 - Improve ReservedThreadExecutor dump.
Filtering out non-reserved threads in dump() and doStop().
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Greg Wilkins <gregw@webtide.com>
(cherry picked from commit b2a023675c
)
This commit is contained in:
parent
18b66d1d47
commit
e8beb58baf
|
@ -14,6 +14,7 @@
|
||||||
package org.eclipse.jetty.util.thread;
|
package org.eclipse.jetty.util.thread;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
@ -21,6 +22,7 @@ import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.eclipse.jetty.util.AtomicBiInteger;
|
import org.eclipse.jetty.util.AtomicBiInteger;
|
||||||
import org.eclipse.jetty.util.ProcessorUtils;
|
import org.eclipse.jetty.util.ProcessorUtils;
|
||||||
|
@ -38,14 +40,14 @@ import static org.eclipse.jetty.util.AtomicBiInteger.getLo;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An Executor using pre-allocated/reserved Threads from a wrapped Executor.
|
* <p>A TryExecutor using pre-allocated/reserved threads from an external Executor.</p>
|
||||||
* <p>Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed
|
* <p>Calls to {@link #tryExecute(Runnable)} on ReservedThreadExecutor will either
|
||||||
* with a Thread immediately being assigned the Runnable task, or fail if no Thread is
|
* succeed with a reserved thread immediately being assigned the task, or fail if
|
||||||
* available.
|
* no reserved thread is available.</p>
|
||||||
* <p>Threads are reserved lazily, with a new reserved threads being allocated from the
|
* <p>Threads are reserved lazily, with new reserved threads being allocated from the external
|
||||||
* {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been
|
* {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been
|
||||||
* idle for more than {@link #getIdleTimeoutMs()} then one reserved thread will return to
|
* idle for more than {@link #getIdleTimeoutMs()} then one reserved thread will return to
|
||||||
* the executor.
|
* the external Executor.</p>
|
||||||
*/
|
*/
|
||||||
@ManagedObject("A pool for reserved threads")
|
@ManagedObject("A pool for reserved threads")
|
||||||
public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor, Dumpable
|
public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor, Dumpable
|
||||||
|
@ -62,7 +64,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return "STOP!";
|
return "STOP";
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -72,7 +74,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
||||||
private final SynchronousQueue<Runnable> _queue = new SynchronousQueue<>(false);
|
private final SynchronousQueue<Runnable> _queue = new SynchronousQueue<>(false);
|
||||||
private final AtomicBiInteger _count = new AtomicBiInteger(); // hi=pending; lo=size;
|
private final AtomicBiInteger _count = new AtomicBiInteger(); // hi=pending; lo=size;
|
||||||
private final AtomicLong _lastEmptyTime = new AtomicLong(System.nanoTime());
|
private final AtomicLong _lastEmptyTime = new AtomicLong(System.nanoTime());
|
||||||
|
|
||||||
private ThreadPoolBudget.Lease _lease;
|
private ThreadPoolBudget.Lease _lease;
|
||||||
private long _idleTimeNanos = DEFAULT_IDLE_TIMEOUT;
|
private long _idleTimeNanos = DEFAULT_IDLE_TIMEOUT;
|
||||||
|
|
||||||
|
@ -175,20 +176,25 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
||||||
|
|
||||||
super.doStop();
|
super.doStop();
|
||||||
|
|
||||||
// Offer STOP task to all waiting reserved threads.
|
// Mark this instance as stopped.
|
||||||
for (int i = _count.getAndSetLo(-1); i-- > 0;)
|
int size = _count.getAndSetLo(-1);
|
||||||
|
|
||||||
|
// Offer the STOP task to all waiting reserved threads.
|
||||||
|
for (int i = 0; i < size; ++i)
|
||||||
{
|
{
|
||||||
// yield to wait for any reserved threads that have incremented the size but not yet polled
|
// Yield to wait for any reserved threads that
|
||||||
|
// have incremented the size but not yet polled.
|
||||||
Thread.yield();
|
Thread.yield();
|
||||||
_queue.offer(STOP);
|
_queue.offer(STOP);
|
||||||
}
|
}
|
||||||
// Interrupt any reserved thread missed the offer so it doesn't wait too long.
|
|
||||||
for (ReservedThread reserved : _threads)
|
// Interrupt any reserved thread missed the offer,
|
||||||
{
|
// so they do not wait for the whole idle timeout.
|
||||||
Thread thread = reserved._thread;
|
_threads.stream()
|
||||||
if (thread != null)
|
.filter(ReservedThread::isReserved)
|
||||||
thread.interrupt();
|
.map(t -> t._thread)
|
||||||
}
|
.filter(Objects::nonNull)
|
||||||
|
.forEach(Thread::interrupt);
|
||||||
_threads.clear();
|
_threads.clear();
|
||||||
_count.getAndSetHi(0);
|
_count.getAndSetHi(0);
|
||||||
}
|
}
|
||||||
|
@ -264,13 +270,16 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
||||||
public void dump(Appendable out, String indent) throws IOException
|
public void dump(Appendable out, String indent) throws IOException
|
||||||
{
|
{
|
||||||
Dumpable.dumpObjects(out, indent, this,
|
Dumpable.dumpObjects(out, indent, this,
|
||||||
new DumpableCollection("reserved", _threads));
|
new DumpableCollection("threads",
|
||||||
|
_threads.stream()
|
||||||
|
.filter(ReservedThread::isReserved)
|
||||||
|
.collect(Collectors.toList())));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return String.format("%s@%x{s=%d/%d,p=%d}",
|
return String.format("%s@%x{reserved=%d/%d,pending=%d}",
|
||||||
getClass().getSimpleName(),
|
getClass().getSimpleName(),
|
||||||
hashCode(),
|
hashCode(),
|
||||||
_count.getLo(),
|
_count.getLo(),
|
||||||
|
@ -293,6 +302,11 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
||||||
private volatile State _state = State.PENDING;
|
private volatile State _state = State.PENDING;
|
||||||
private volatile Thread _thread;
|
private volatile Thread _thread;
|
||||||
|
|
||||||
|
private boolean isReserved()
|
||||||
|
{
|
||||||
|
return _state == State.RESERVED;
|
||||||
|
}
|
||||||
|
|
||||||
private Runnable reservedWait()
|
private Runnable reservedWait()
|
||||||
{
|
{
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
|
@ -321,7 +335,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExec
|
||||||
}
|
}
|
||||||
_state = size >= 0 ? State.IDLE : State.STOPPED;
|
_state = size >= 0 ? State.IDLE : State.STOPPED;
|
||||||
return STOP;
|
return STOP;
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (InterruptedException e)
|
catch (InterruptedException e)
|
||||||
{
|
{
|
||||||
|
|
|
@ -831,7 +831,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
||||||
dump = pool.dump();
|
dump = pool.dump();
|
||||||
assertThat(count(dump, " - STARTED"), is(2));
|
assertThat(count(dump, " - STARTED"), is(2));
|
||||||
assertThat(dump, containsString(",3<=3<=4,i=2,r=2,q=0"));
|
assertThat(dump, containsString(",3<=3<=4,i=2,r=2,q=0"));
|
||||||
assertThat(dump, containsString("s=0/2"));
|
assertThat(dump, containsString("reserved=0/2"));
|
||||||
assertThat(dump, containsString("[ReservedThreadExecutor@"));
|
assertThat(dump, containsString("[ReservedThreadExecutor@"));
|
||||||
assertThat(count(dump, " IDLE"), is(2));
|
assertThat(count(dump, " IDLE"), is(2));
|
||||||
assertThat(count(dump, " WAITING"), is(1));
|
assertThat(count(dump, " WAITING"), is(1));
|
||||||
|
@ -846,7 +846,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
|
||||||
dump = pool.dump();
|
dump = pool.dump();
|
||||||
assertThat(count(dump, " - STARTED"), is(2));
|
assertThat(count(dump, " - STARTED"), is(2));
|
||||||
assertThat(dump, containsString(",3<=3<=4,i=1,r=2,q=0"));
|
assertThat(dump, containsString(",3<=3<=4,i=1,r=2,q=0"));
|
||||||
assertThat(dump, containsString("s=1/2"));
|
assertThat(dump, containsString("reserved=1/2"));
|
||||||
assertThat(dump, containsString("[ReservedThreadExecutor@"));
|
assertThat(dump, containsString("[ReservedThreadExecutor@"));
|
||||||
assertThat(count(dump, " IDLE"), is(1));
|
assertThat(count(dump, " IDLE"), is(1));
|
||||||
assertThat(count(dump, " WAITING"), is(1));
|
assertThat(count(dump, " WAITING"), is(1));
|
||||||
|
|
|
@ -359,6 +359,5 @@ public class ReservedThreadExecutorTest
|
||||||
|
|
||||||
assertThat(usedReserved.get(), greaterThan(0));
|
assertThat(usedReserved.get(), greaterThan(0));
|
||||||
assertThat(usedReserved.get() + usedPool.get(), is(LOOPS));
|
assertThat(usedReserved.get() + usedPool.get(), is(LOOPS));
|
||||||
// System.err.printf("reserved=%d pool=%d total=%d%n", usedReserved.get(), usedPool.get(), LOOPS);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue