Fixed QueuedThreadPool dump of known threads

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-04-24 09:33:14 +02:00
parent ab03504a2f
commit 438f0b86a3
3 changed files with 212 additions and 99 deletions

View File

@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
@ -135,7 +136,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
@Override
protected void doStart() throws Exception
{
_tryExecutor = new ReservedThreadExecutor(this,_reservedThreads);
_tryExecutor = _reservedThreads==0 ? NO_TRY : new ReservedThreadExecutor(this,_reservedThreads);
addBean(_tryExecutor);
super.doStart();
@ -603,7 +604,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
String knownMethod = "";
for (StackTraceElement t : trace)
{
if ("idleJobPoll".equals(t.getMethodName()) && t.getClassName().endsWith("QueuedThreadPool"))
if ("idleJobPoll".equals(t.getMethodName()) && t.getClassName().endsWith("QueuedThreadPool$Runner"))
{
knownMethod = "IDLE ";
break;
@ -636,11 +637,10 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
@Override
public void dump(Appendable out, String indent) throws IOException
{
String s = thread.getId()+" "+thread.getName()+" "+thread.getState()+" "+thread.getPriority();
if (known.length()==0)
Dumpable.dumpObjects(out, indent, s, (Object[])trace);
if (StringUtil.isBlank(known))
Dumpable.dumpObjects(out, indent, String.format("%s %s %s %d", thread.getId(), thread.getName(), thread.getState(), thread.getPriority()), (Object[])trace);
else
Dumpable.dumpObjects(out, indent, s);
Dumpable.dumpObjects(out, indent, String.format("%s %s %s %s %d", thread.getId(), thread.getName(), known, thread.getState(), thread.getPriority()));
}
@Override
@ -671,7 +671,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
@Override
public String toString()
{
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,q=%d}[%s]",
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]",
getClass().getSimpleName(),
_name,
hashCode(),
@ -680,100 +680,12 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
getThreads(),
getMaxThreads(),
getIdleThreads(),
getReservedThreads(),
_jobs.size(),
_tryExecutor);
}
private Runnable _runnable = new Runnable()
{
@Override
public void run()
{
boolean idle = false;
try
{
Runnable job = _jobs.poll();
if (job != null && _threadsIdle.get() == 0)
startThreads(1);
while (true)
{
if (job == null)
{
if (!idle)
{
idle = true;
_threadsIdle.incrementAndGet();
}
if (_idleTimeout <= 0)
job = _jobs.take();
else
{
// maybe we should shrink?
int size = _threadsStarted.get();
if (size > _minThreads)
{
long last = _lastShrink.get();
long now = System.nanoTime();
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
{
if (_lastShrink.compareAndSet(last, now))
break;
}
}
job = _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
}
}
// run job
if (job != null)
{
if (idle)
{
idle = false;
if (_threadsIdle.decrementAndGet() == 0)
startThreads(1);
}
if (LOG.isDebugEnabled())
LOG.debug("run {}", job);
runJob(job);
if (LOG.isDebugEnabled())
LOG.debug("ran {}", job);
// Clear interrupted status
Thread.interrupted();
}
if (!isRunning())
break;
job = _jobs.poll();
}
}
catch (InterruptedException e)
{
LOG.ignore(e);
}
catch (Throwable e)
{
LOG.warn(String.format("Unexpected thread death: %s in %s", this, QueuedThreadPool.this), e);
}
finally
{
if (idle)
_threadsIdle.decrementAndGet();
removeThread(Thread.currentThread());
if (_threadsStarted.decrementAndGet() < getMinThreads())
startThreads(1);
}
}
};
private final Runnable _runnable = new Runner();
/**
* <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
@ -843,4 +755,101 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
}
return null;
}
private static Runnable SHRINK = ()->{};
private class Runner implements Runnable
{
@Override
public void run()
{
boolean idle = false;
try
{
Runnable job = _jobs.poll();
if (job != null && _threadsIdle.get() == 0)
startThreads(1);
while (true)
{
if (job == null)
{
if (!idle)
{
idle = true;
_threadsIdle.incrementAndGet();
}
job = idleJobPoll();
if (job==SHRINK)
break;
}
// run job
if (job != null)
{
if (idle)
{
idle = false;
if (_threadsIdle.decrementAndGet() == 0)
startThreads(1);
}
if (LOG.isDebugEnabled())
LOG.debug("run {}", job);
runJob(job);
if (LOG.isDebugEnabled())
LOG.debug("ran {}", job);
// Clear interrupted status
Thread.interrupted();
}
if (!isRunning())
break;
job = _jobs.poll();
}
}
catch (InterruptedException e)
{
LOG.ignore(e);
}
catch (Throwable e)
{
LOG.warn(String.format("Unexpected thread death: %s in %s", this, QueuedThreadPool.this), e);
}
finally
{
if (idle)
_threadsIdle.decrementAndGet();
removeThread(Thread.currentThread());
if (_threadsStarted.decrementAndGet() < getMinThreads())
startThreads(1);
}
}
private Runnable idleJobPoll() throws InterruptedException
{
if (_idleTimeout <= 0)
return _jobs.take();
// maybe we should shrink?
int size = _threadsStarted.get();
if (size > _minThreads)
{
long last = _lastShrink.get();
long now = System.nanoTime();
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
{
if (_lastShrink.compareAndSet(last, now))
return SHRINK;
}
}
return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
}
}
}

View File

@ -75,5 +75,19 @@ public interface TryExecutor extends Executor
}
}
public static final TryExecutor NO_TRY = task -> false;
TryExecutor NO_TRY = new TryExecutor()
{
@Override
public boolean tryExecute(Runnable task)
{
return false;
}
@Override
public String toString()
{
return "NO_TRY";
}
};
}

View File

@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -368,10 +369,99 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
});
}
@Test
public void testDump() throws Exception
{
QueuedThreadPool pool = new QueuedThreadPool(4, 3);
String dump = pool.dump();
// TODO use hamcrest 2.0 regex matcher
assertThat(dump,containsString("STOPPED"));
assertThat(dump,containsString(",3<=0<=4,i=0,r=-1,q=0"));
assertThat(dump,containsString("[NO_TRY]"));
pool.setReservedThreads(2);
dump = pool.dump();
assertThat(dump,containsString("STOPPED"));
assertThat(dump,containsString(",3<=0<=4,i=0,r=2,q=0"));
assertThat(dump,containsString("[NO_TRY]"));
pool.start();
dump = pool.dump();
assertThat(count(dump," - STARTED"),is(2));
assertThat(dump,containsString(",3<=3<=4,i=3,r=2,q=0"));
assertThat(dump,containsString("[ReservedThreadExecutor@"));
assertThat(count(dump," IDLE "),is(3));
assertThat(count(dump," RESERVED "),is(0));
CountDownLatch started = new CountDownLatch(1);
CountDownLatch waiting = new CountDownLatch(1);
pool.execute(()->
{
try
{
started.countDown();
waiting.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
});
started.await();
dump = pool.dump();
assertThat(count(dump," - STARTED"),is(2));
assertThat(dump,containsString(",3<=3<=4,i=2,r=2,q=0"));
assertThat(dump,containsString("[ReservedThreadExecutor@"));
assertThat(count(dump," IDLE "),is(2));
assertThat(count(dump," WAITING "),is(1));
assertThat(count(dump," RESERVED "),is(0));
assertThat(count(dump,"QueuedThreadPoolTest.lambda$testDump$"),is(0));
pool.setDetailedDump(true);
dump = pool.dump();
assertThat(count(dump," - STARTED"),is(2));
assertThat(dump,containsString(",3<=3<=4,i=2,r=2,q=0"));
assertThat(dump,containsString("s=0/2"));
assertThat(dump,containsString("[ReservedThreadExecutor@"));
assertThat(count(dump," IDLE "),is(2));
assertThat(count(dump," WAITING "),is(1));
assertThat(count(dump," RESERVED "),is(0));
assertThat(count(dump,"QueuedThreadPoolTest.lambda$testDump$"),is(1));
assertFalse(pool.tryExecute(()->{}));
while(pool.getIdleThreads()==2)
Thread.sleep(10);
dump = pool.dump();
assertThat(count(dump," - STARTED"),is(2));
assertThat(dump,containsString(",3<=3<=4,i=1,r=2,q=0"));
assertThat(dump,containsString("s=1/2"));
assertThat(dump,containsString("[ReservedThreadExecutor@"));
assertThat(count(dump," IDLE "),is(1));
assertThat(count(dump," WAITING "),is(1));
assertThat(count(dump," RESERVED "),is(1));
assertThat(count(dump,"QueuedThreadPoolTest.lambda$testDump$"),is(1));
}
private int count(String s, String p)
{
int c = 0;
int i = s.indexOf(p);
while (i>=0)
{
c++;
i = s.indexOf(p, i+1);
}
return c;
}
@Override
protected SizedThreadPool newPool(int max)
{
return new QueuedThreadPool(max);
}
}