Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-10.0.x

This commit is contained in:
Greg Wilkins 2019-04-25 11:26:33 +02:00
commit 812f3dbd2e
6 changed files with 412 additions and 85 deletions

View File

@ -186,7 +186,22 @@
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-infinispan</artifactId>
<artifactId>infinispan-remote</artifactId>
<version>10.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>infinispan-remote-query</artifactId>
<version>10.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>infinispan-embedded</artifactId>
<version>10.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>infinispan-embedded-query</artifactId>
<version>10.0.0-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -53,7 +53,7 @@ public class DumpableCollection implements Dumpable
@Override
public void dump(Appendable out, String indent) throws IOException
{
Object[] array = _collection.toArray();
Dumpable.dumpObjects(out,indent,_name + " size="+array.length, array);
Object[] array = (_collection == null ? null : _collection.toArray());
Dumpable.dumpObjects(out,indent,_name + " size="+(array == null ? 0 : array.length), array);
}
}

View File

@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
@ -135,7 +136,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
@Override
protected void doStart() throws Exception
{
_tryExecutor = new ReservedThreadExecutor(this,_reservedThreads);
_tryExecutor = _reservedThreads==0 ? NO_TRY : new ReservedThreadExecutor(this,_reservedThreads);
addBean(_tryExecutor);
super.doStart();
@ -473,7 +474,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
else
{
// Make sure there is at least one thread executing the job.
if (getThreads() == 0)
if (getQueueSize() > 0 && getIdleThreads() == 0)
startThreads(1);
}
}
@ -603,7 +604,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
String knownMethod = "";
for (StackTraceElement t : trace)
{
if ("idleJobPoll".equals(t.getMethodName()) && t.getClassName().endsWith("QueuedThreadPool"))
if ("idleJobPoll".equals(t.getMethodName()) && t.getClassName().equals(Runner.class.getName()))
{
knownMethod = "IDLE ";
break;
@ -636,11 +637,10 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
@Override
public void dump(Appendable out, String indent) throws IOException
{
String s = thread.getId()+" "+thread.getName()+" "+thread.getState()+" "+thread.getPriority();
if (known.length()==0)
Dumpable.dumpObjects(out, indent, s, (Object[])trace);
if (StringUtil.isBlank(known))
Dumpable.dumpObjects(out, indent, String.format("%s %s %s %d", thread.getId(), thread.getName(), thread.getState(), thread.getPriority()), (Object[])trace);
else
Dumpable.dumpObjects(out, indent, s);
Dumpable.dumpObjects(out, indent, String.format("%s %s %s %s %d", thread.getId(), thread.getName(), known, thread.getState(), thread.getPriority()));
}
@Override
@ -671,7 +671,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
@Override
public String toString()
{
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,q=%d}[%s]",
return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]",
getClass().getSimpleName(),
_name,
hashCode(),
@ -680,11 +680,84 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
getThreads(),
getMaxThreads(),
getIdleThreads(),
getReservedThreads(),
_jobs.size(),
_tryExecutor);
}
private Runnable _runnable = new Runnable()
private final Runnable _runnable = new Runner();
/**
* <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
* <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
*
* @param job the job to run
*/
protected void runJob(Runnable job)
{
job.run();
}
/**
* @return the job queue
*/
protected BlockingQueue<Runnable> getQueue()
{
return _jobs;
}
/**
* @param queue the job queue
* @deprecated pass the queue to the constructor instead
*/
@Deprecated
public void setQueue(BlockingQueue<Runnable> queue)
{
throw new UnsupportedOperationException("Use constructor injection");
}
/**
* @param id the thread ID to interrupt.
* @return true if the thread was found and interrupted.
*/
@ManagedOperation("interrupts a pool thread")
public boolean interruptThread(@Name("id") long id)
{
for (Thread thread : _threads)
{
if (thread.getId() == id)
{
thread.interrupt();
return true;
}
}
return false;
}
/**
* @param id the thread ID to interrupt.
* @return the stack frames dump
*/
@ManagedOperation("dumps a pool thread stack")
public String dumpThread(@Name("id") long id)
{
for (Thread thread : _threads)
{
if (thread.getId() == id)
{
StringBuilder buf = new StringBuilder();
buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
buf.append(thread.getState()).append(":").append(System.lineSeparator());
for (StackTraceElement element : thread.getStackTrace())
buf.append(" at ").append(element.toString()).append(System.lineSeparator());
return buf.toString();
}
}
return null;
}
private static Runnable SHRINK = ()->{};
private class Runner implements Runnable
{
@Override
public void run()
@ -707,24 +780,12 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
_threadsIdle.incrementAndGet();
}
if (_idleTimeout <= 0)
job = _jobs.take();
else
job = idleJobPoll();
if (job == SHRINK)
{
// maybe we should shrink?
int size = _threadsStarted.get();
if (size > _minThreads)
{
long last = _lastShrink.get();
long now = System.nanoTime();
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
{
if (_lastShrink.compareAndSet(last, now))
break;
}
}
job = _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
if (LOG.isDebugEnabled())
LOG.debug("shrinking {}", this);
break;
}
}
@ -769,68 +830,32 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
removeThread(Thread.currentThread());
if (_threadsStarted.decrementAndGet() < getMinThreads())
int threads = _threadsStarted.decrementAndGet();
// We should start a new thread if threads are now less than min threads or we have queued jobs
if (threads < getMinThreads() || getQueueSize()>0)
startThreads(1);
}
}
};
/**
* <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
* <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
*
* @param job the job to run
*/
protected void runJob(Runnable job)
{
job.run();
}
/**
* @return the job queue
*/
protected BlockingQueue<Runnable> getQueue()
{
return _jobs;
}
/**
* @param id the thread ID to interrupt.
* @return true if the thread was found and interrupted.
*/
@ManagedOperation("interrupts a pool thread")
public boolean interruptThread(@Name("id") long id)
{
for (Thread thread : _threads)
private Runnable idleJobPoll() throws InterruptedException
{
if (thread.getId() == id)
{
thread.interrupt();
return true;
}
}
return false;
}
if (_idleTimeout <= 0)
return _jobs.take();
/**
* @param id the thread ID to interrupt.
* @return the stack frames dump
*/
@ManagedOperation("dumps a pool thread stack")
public String dumpThread(@Name("id") long id)
{
for (Thread thread : _threads)
{
if (thread.getId() == id)
// maybe we should shrink?
int size = _threadsStarted.get();
if (size > _minThreads)
{
StringBuilder buf = new StringBuilder();
buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ");
buf.append(thread.getState()).append(":").append(System.lineSeparator());
for (StackTraceElement element : thread.getStackTrace())
buf.append(" at ").append(element.toString()).append(System.lineSeparator());
return buf.toString();
long last = _lastShrink.get();
long now = System.nanoTime();
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
{
if (_lastShrink.compareAndSet(last, now))
return SHRINK;
}
}
return _jobs.poll(_idleTimeout, TimeUnit.MILLISECONDS);
}
return null;
}
}

View File

@ -75,5 +75,19 @@ public interface TryExecutor extends Executor
}
}
public static final TryExecutor NO_TRY = task -> false;
TryExecutor NO_TRY = new TryExecutor()
{
@Override
public boolean tryExecute(Runnable task)
{
return false;
}
@Override
public String toString()
{
return "NO_TRY";
}
};
}

View File

@ -0,0 +1,54 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.component;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.ArrayList;
import java.util.Collection;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
public class DumpableCollectionTest
{
@Test
public void testNullDumpableCollection () throws Exception
{
DumpableCollection dc = new DumpableCollection("null test", null);
String dump = dc.dump();
assertThat(dump, Matchers.containsString("size=0"));
}
@Test
public void testNonNullDumpableCollection () throws Exception
{
Collection<String> collection = new ArrayList<>();
collection.add("one");
collection.add("two");
collection.add("three");
DumpableCollection dc = new DumpableCollection("non null test", collection);
String dump = dc.dump();
assertThat(dump, Matchers.containsString("one"));
assertThat(dump, Matchers.containsString("two"));
assertThat(dump, Matchers.containsString("three"));
}
}

View File

@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -49,6 +50,16 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
private final CountDownLatch _run = new CountDownLatch(1);
private final CountDownLatch _stopping = new CountDownLatch(1);
private final CountDownLatch _stopped = new CountDownLatch(1);
private final boolean _fail;
RunningJob()
{
this(false);
}
public RunningJob(boolean fail)
{
_fail = fail;
}
@Override
public void run()
@ -57,6 +68,12 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
{
_run.countDown();
_stopping.await();
if (_fail)
throw new IllegalStateException("Testing!");
}
catch(IllegalStateException e)
{
throw e;
}
catch(Exception e)
{
@ -166,6 +183,119 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
waitForIdle(tp,2);
}
@Test
public void testThreadPoolFailingJobs() throws Exception
{
try (StacklessLogging stackless = new StacklessLogging(QueuedThreadPool.class))
{
QueuedThreadPool tp = new QueuedThreadPool();
tp.setMinThreads(2);
tp.setMaxThreads(4);
tp.setIdleTimeout(900);
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);
tp.start();
// min threads started
waitForThreads(tp, 2);
waitForIdle(tp, 2);
// Doesn't shrink less than 1
Thread.sleep(1100);
waitForThreads(tp, 2);
waitForIdle(tp, 2);
// Run job0
RunningJob job0 = new RunningJob(true);
tp.execute(job0);
assertTrue(job0._run.await(10, TimeUnit.SECONDS));
waitForIdle(tp, 1);
// Run job1
RunningJob job1 = new RunningJob(true);
tp.execute(job1);
assertTrue(job1._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 3);
waitForIdle(tp, 1);
// Run job2
RunningJob job2 = new RunningJob(true);
tp.execute(job2);
assertTrue(job2._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 4);
waitForIdle(tp, 1);
// Run job3
RunningJob job3 = new RunningJob(true);
tp.execute(job3);
assertTrue(job3._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 4);
assertThat(tp.getIdleThreads(), is(0));
Thread.sleep(100);
assertThat(tp.getIdleThreads(), is(0));
// Run job4. will be queued
RunningJob job4 = new RunningJob(true);
tp.execute(job4);
assertFalse(job4._run.await(1, TimeUnit.SECONDS));
// finish job 0
job0._stopping.countDown();
assertTrue(job0._stopped.await(10, TimeUnit.SECONDS));
// job4 should now run
assertTrue(job4._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 4);
waitForIdle(tp, 0);
// finish job 1,2,3,4
job1._stopping.countDown();
job2._stopping.countDown();
job3._stopping.countDown();
job4._stopping.countDown();
assertTrue(job1._stopped.await(10, TimeUnit.SECONDS));
assertTrue(job2._stopped.await(10, TimeUnit.SECONDS));
assertTrue(job3._stopped.await(10, TimeUnit.SECONDS));
assertTrue(job4._stopped.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 2);
waitForIdle(tp, 2);
}
}
@Test
public void testExecuteNoIdleThreads() throws Exception
{
QueuedThreadPool tp= new QueuedThreadPool();
tp.setDetailedDump(true);
tp.setMinThreads(3);
tp.setMaxThreads(10);
tp.setIdleTimeout(500);
tp.start();
RunningJob job1 = new RunningJob();
tp.execute(job1);
RunningJob job2 = new RunningJob();
tp.execute(job2);
RunningJob job3 = new RunningJob();
tp.execute(job3);
// make sure these jobs have started running
assertTrue(job1._run.await(5, TimeUnit.SECONDS));
assertTrue(job2._run.await(5, TimeUnit.SECONDS));
assertTrue(job3._run.await(5, TimeUnit.SECONDS));
waitForThreads(tp, 4);
waitForThreads(tp, 3);
RunningJob job4 = new RunningJob();
tp.execute(job4);
assertTrue(job4._run.await(5, TimeUnit.SECONDS));
}
@Test
public void testLifeCycleStop() throws Exception
{
@ -368,10 +498,99 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
});
}
@Test
public void testDump() throws Exception
{
QueuedThreadPool pool = new QueuedThreadPool(4, 3);
String dump = pool.dump();
// TODO use hamcrest 2.0 regex matcher
assertThat(dump,containsString("STOPPED"));
assertThat(dump,containsString(",3<=0<=4,i=0,r=-1,q=0"));
assertThat(dump,containsString("[NO_TRY]"));
pool.setReservedThreads(2);
dump = pool.dump();
assertThat(dump,containsString("STOPPED"));
assertThat(dump,containsString(",3<=0<=4,i=0,r=2,q=0"));
assertThat(dump,containsString("[NO_TRY]"));
pool.start();
waitForIdle(pool,3);
dump = pool.dump();
assertThat(count(dump," - STARTED"),is(2));
assertThat(dump,containsString(",3<=3<=4,i=3,r=2,q=0"));
assertThat(dump,containsString("[ReservedThreadExecutor@"));
assertThat(count(dump," IDLE "),is(3));
assertThat(count(dump," RESERVED "),is(0));
CountDownLatch started = new CountDownLatch(1);
CountDownLatch waiting = new CountDownLatch(1);
pool.execute(()->
{
try
{
started.countDown();
waiting.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
});
started.await();
dump = pool.dump();
assertThat(count(dump," - STARTED"),is(2));
assertThat(dump,containsString(",3<=3<=4,i=2,r=2,q=0"));
assertThat(dump,containsString("[ReservedThreadExecutor@"));
assertThat(count(dump," IDLE "),is(2));
assertThat(count(dump," WAITING "),is(1));
assertThat(count(dump," RESERVED "),is(0));
assertThat(count(dump,"QueuedThreadPoolTest.lambda$testDump$"),is(0));
pool.setDetailedDump(true);
dump = pool.dump();
assertThat(count(dump," - STARTED"),is(2));
assertThat(dump,containsString(",3<=3<=4,i=2,r=2,q=0"));
assertThat(dump,containsString("s=0/2"));
assertThat(dump,containsString("[ReservedThreadExecutor@"));
assertThat(count(dump," IDLE "),is(2));
assertThat(count(dump," WAITING "),is(1));
assertThat(count(dump," RESERVED "),is(0));
assertThat(count(dump,"QueuedThreadPoolTest.lambda$testDump$"),is(1));
assertFalse(pool.tryExecute(()->{}));
while(pool.getIdleThreads()==2)
Thread.sleep(10);
dump = pool.dump();
assertThat(count(dump," - STARTED"),is(2));
assertThat(dump,containsString(",3<=3<=4,i=1,r=2,q=0"));
assertThat(dump,containsString("s=1/2"));
assertThat(dump,containsString("[ReservedThreadExecutor@"));
assertThat(count(dump," IDLE "),is(1));
assertThat(count(dump," WAITING "),is(1));
assertThat(count(dump," RESERVED "),is(1));
assertThat(count(dump,"QueuedThreadPoolTest.lambda$testDump$"),is(1));
}
private int count(String s, String p)
{
int c = 0;
int i = s.indexOf(p);
while (i>=0)
{
c++;
i = s.indexOf(p, i+1);
}
return c;
}
@Override
protected SizedThreadPool newPool(int max)
{
return new QueuedThreadPool(max);
}
}