diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 05742b0299a..d15d2c19a55 100755 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedOperation; @@ -135,7 +136,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP @Override protected void doStart() throws Exception { - _tryExecutor = new ReservedThreadExecutor(this,_reservedThreads); + _tryExecutor = _reservedThreads==0 ? NO_TRY : new ReservedThreadExecutor(this,_reservedThreads); addBean(_tryExecutor); super.doStart(); @@ -603,7 +604,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP String knownMethod = ""; for (StackTraceElement t : trace) { - if ("idleJobPoll".equals(t.getMethodName()) && t.getClassName().endsWith("QueuedThreadPool")) + if ("idleJobPoll".equals(t.getMethodName()) && t.getClassName().endsWith("QueuedThreadPool$Runner")) { knownMethod = "IDLE "; break; @@ -636,11 +637,10 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP @Override public void dump(Appendable out, String indent) throws IOException { - String s = thread.getId()+" "+thread.getName()+" "+thread.getState()+" "+thread.getPriority(); - if (known.length()==0) - Dumpable.dumpObjects(out, indent, s, (Object[])trace); + if (StringUtil.isBlank(known)) + Dumpable.dumpObjects(out, indent, String.format("%s %s %s %d", thread.getId(), thread.getName(), thread.getState(), thread.getPriority()), (Object[])trace); else - Dumpable.dumpObjects(out, indent, s); + Dumpable.dumpObjects(out, indent, String.format("%s %s %s %s %d", thread.getId(), thread.getName(), known, thread.getState(), thread.getPriority())); } @Override @@ -671,7 +671,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP @Override public String toString() { - return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,q=%d}[%s]", + return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]", getClass().getSimpleName(), _name, hashCode(), @@ -680,100 +680,12 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP getThreads(), getMaxThreads(), getIdleThreads(), + getReservedThreads(), _jobs.size(), _tryExecutor); } - private Runnable _runnable = new Runnable() - { - @Override - public void run() - { - boolean idle = false; - - try - { - Runnable job = _jobs.poll(); - if (job != null && _threadsIdle.get() == 0) - startThreads(1); - - while (true) - { - if (job == null) - { - if (!idle) - { - idle = true; - _threadsIdle.incrementAndGet(); - } - - if (_idleTimeout <= 0) - job = _jobs.take(); - else - { - // maybe we should shrink? - int size = _threadsStarted.get(); - if (size > _minThreads) - { - long last = _lastShrink.get(); - long now = System.nanoTime(); - if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout)) - { - if (_lastShrink.compareAndSet(last, now)) - break; - } - } - - job = _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS); - } - } - - // run job - if (job != null) - { - if (idle) - { - idle = false; - if (_threadsIdle.decrementAndGet() == 0) - startThreads(1); - } - - if (LOG.isDebugEnabled()) - LOG.debug("run {}", job); - runJob(job); - if (LOG.isDebugEnabled()) - LOG.debug("ran {}", job); - - // Clear interrupted status - Thread.interrupted(); - } - - if (!isRunning()) - break; - - job = _jobs.poll(); - } - } - catch (InterruptedException e) - { - LOG.ignore(e); - } - catch (Throwable e) - { - LOG.warn(String.format("Unexpected thread death: %s in %s", this, QueuedThreadPool.this), e); - } - finally - { - if (idle) - _threadsIdle.decrementAndGet(); - - removeThread(Thread.currentThread()); - - if (_threadsStarted.decrementAndGet() < getMinThreads()) - startThreads(1); - } - } - }; + private final Runnable _runnable = new Runner(); /** *
Runs the given job in the {@link Thread#currentThread() current thread}.
@@ -843,4 +755,101 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP } return null; } + + private static Runnable SHRINK = ()->{}; + private class Runner implements Runnable + { + @Override + public void run() + { + boolean idle = false; + + try + { + Runnable job = _jobs.poll(); + if (job != null && _threadsIdle.get() == 0) + startThreads(1); + + while (true) + { + if (job == null) + { + if (!idle) + { + idle = true; + _threadsIdle.incrementAndGet(); + } + + job = idleJobPoll(); + if (job==SHRINK) + break; + } + + // run job + if (job != null) + { + if (idle) + { + idle = false; + if (_threadsIdle.decrementAndGet() == 0) + startThreads(1); + } + + if (LOG.isDebugEnabled()) + LOG.debug("run {}", job); + runJob(job); + if (LOG.isDebugEnabled()) + LOG.debug("ran {}", job); + + // Clear interrupted status + Thread.interrupted(); + } + + if (!isRunning()) + break; + + job = _jobs.poll(); + } + } + catch (InterruptedException e) + { + LOG.ignore(e); + } + catch (Throwable e) + { + LOG.warn(String.format("Unexpected thread death: %s in %s", this, QueuedThreadPool.this), e); + } + finally + { + if (idle) + _threadsIdle.decrementAndGet(); + + removeThread(Thread.currentThread()); + + if (_threadsStarted.decrementAndGet() < getMinThreads()) + startThreads(1); + } + } + + private Runnable idleJobPoll() throws InterruptedException + { + if (_idleTimeout <= 0) + return _jobs.take(); + + // maybe we should shrink? + int size = _threadsStarted.get(); + if (size > _minThreads) + { + long last = _lastShrink.get(); + long now = System.nanoTime(); + if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout)) + { + if (_lastShrink.compareAndSet(last, now)) + return SHRINK; + } + } + + return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS); + } + } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TryExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TryExecutor.java index a9696fb6db9..034261a5ca4 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TryExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TryExecutor.java @@ -75,5 +75,19 @@ public interface TryExecutor extends Executor } } - public static final TryExecutor NO_TRY = task -> false; + TryExecutor NO_TRY = new TryExecutor() + { + @Override + public boolean tryExecute(Runnable task) + { + return false; + } + + @Override + public String toString() + { + return "NO_TRY"; + } + }; + } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java index ccf30c343ad..0199c7f0171 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.core.StringContains.containsString; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -368,10 +369,99 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest }); } + @Test + public void testDump() throws Exception + { + QueuedThreadPool pool = new QueuedThreadPool(4, 3); + + String dump = pool.dump(); + // TODO use hamcrest 2.0 regex matcher + assertThat(dump,containsString("STOPPED")); + assertThat(dump,containsString(",3<=0<=4,i=0,r=-1,q=0")); + assertThat(dump,containsString("[NO_TRY]")); + + pool.setReservedThreads(2); + dump = pool.dump(); + assertThat(dump,containsString("STOPPED")); + assertThat(dump,containsString(",3<=0<=4,i=0,r=2,q=0")); + assertThat(dump,containsString("[NO_TRY]")); + + pool.start(); + dump = pool.dump(); + assertThat(count(dump," - STARTED"),is(2)); + assertThat(dump,containsString(",3<=3<=4,i=3,r=2,q=0")); + assertThat(dump,containsString("[ReservedThreadExecutor@")); + assertThat(count(dump," IDLE "),is(3)); + assertThat(count(dump," RESERVED "),is(0)); + + + CountDownLatch started = new CountDownLatch(1); + CountDownLatch waiting = new CountDownLatch(1); + pool.execute(()-> + { + try + { + started.countDown(); + waiting.await(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + }); + started.await(); + + dump = pool.dump(); + assertThat(count(dump," - STARTED"),is(2)); + assertThat(dump,containsString(",3<=3<=4,i=2,r=2,q=0")); + assertThat(dump,containsString("[ReservedThreadExecutor@")); + assertThat(count(dump," IDLE "),is(2)); + assertThat(count(dump," WAITING "),is(1)); + assertThat(count(dump," RESERVED "),is(0)); + assertThat(count(dump,"QueuedThreadPoolTest.lambda$testDump$"),is(0)); + + pool.setDetailedDump(true); + dump = pool.dump(); + assertThat(count(dump," - STARTED"),is(2)); + assertThat(dump,containsString(",3<=3<=4,i=2,r=2,q=0")); + assertThat(dump,containsString("s=0/2")); + assertThat(dump,containsString("[ReservedThreadExecutor@")); + assertThat(count(dump," IDLE "),is(2)); + assertThat(count(dump," WAITING "),is(1)); + assertThat(count(dump," RESERVED "),is(0)); + assertThat(count(dump,"QueuedThreadPoolTest.lambda$testDump$"),is(1)); + + assertFalse(pool.tryExecute(()->{})); + while(pool.getIdleThreads()==2) + Thread.sleep(10); + + dump = pool.dump(); + assertThat(count(dump," - STARTED"),is(2)); + assertThat(dump,containsString(",3<=3<=4,i=1,r=2,q=0")); + assertThat(dump,containsString("s=1/2")); + assertThat(dump,containsString("[ReservedThreadExecutor@")); + assertThat(count(dump," IDLE "),is(1)); + assertThat(count(dump," WAITING "),is(1)); + assertThat(count(dump," RESERVED "),is(1)); + assertThat(count(dump,"QueuedThreadPoolTest.lambda$testDump$"),is(1)); + } + + private int count(String s, String p) + { + int c = 0; + int i = s.indexOf(p); + while (i>=0) + { + c++; + i = s.indexOf(p, i+1); + } + return c; + } + @Override protected SizedThreadPool newPool(int max) { return new QueuedThreadPool(max); } - + }