diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index aa469ce3243..54ef1a6be45 100755 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.util.thread; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -146,6 +147,9 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP @Override protected void doStop() throws Exception { + if (LOG.isDebugEnabled()) + LOG.debug("Stopping {}", this); + removeBean(_tryExecutor); _tryExecutor = TryExecutor.NO_TRY; @@ -163,11 +167,13 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP for (int i = _threadsStarted.get(); i-- > 0; ) jobs.offer(noop); - // try to jobs complete naturally for half our stop time + // try to let jobs complete naturally for half our stop time long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2; for (Thread thread : _threads) { long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime()); + if (LOG.isDebugEnabled()) + LOG.debug("Waiting for {} for {}", thread, canwait); if (canwait > 0) thread.join(canwait); } @@ -177,13 +183,19 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP // interrupt remaining threads if (_threadsStarted.get() > 0) for (Thread thread : _threads) + { + if (LOG.isDebugEnabled()) + LOG.debug("Interrupting {}", thread); thread.interrupt(); + } // wait again for the other half of our stop time stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2; for (Thread thread : _threads) { long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime()); + if (LOG.isDebugEnabled()) + LOG.debug("Waiting for {} for {}", thread, canwait); if (canwait > 0) thread.join(canwait); } @@ -213,6 +225,25 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP } } + // Close any un-executed jobs + while (!_jobs.isEmpty()) + { + Runnable job = _jobs.poll(); + if (job instanceof Closeable) + { + try + { + ((Closeable)job).close(); + } + catch (Throwable t) + { + LOG.warn(t); + } + } + else if (job != noop) + LOG.warn("Stopped without executing or closing {}", job); + } + if (_budget!=null) _budget.reset(); @@ -535,6 +566,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP thread.setDaemon(isDaemon()); thread.setPriority(getThreadsPriority()); thread.setName(_name + "-" + thread.getId()); + if (LOG.isDebugEnabled()) + LOG.debug("Starting {}", thread); _threads.add(thread); _lastShrink.set(System.nanoTime()); thread.start(); diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java index c896a1dd2f2..ccf30c343ad 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java @@ -18,22 +18,30 @@ package org.eclipse.jetty.util.thread; -import org.eclipse.jetty.util.log.StacklessLogging; -import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; -import org.junit.jupiter.api.Test; - +import java.io.Closeable; +import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.log.StacklessLogging; +import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; +import org.junit.jupiter.api.Test; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class QueuedThreadPoolTest extends AbstractThreadPoolTest { + private static final Logger LOG = Log.getLogger(QueuedThreadPoolTest.class); private final AtomicInteger _jobs=new AtomicInteger(); private class RunningJob implements Runnable @@ -41,6 +49,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest private final CountDownLatch _run = new CountDownLatch(1); private final CountDownLatch _stopping = new CountDownLatch(1); private final CountDownLatch _stopped = new CountDownLatch(1); + @Override public void run() { @@ -51,7 +60,7 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest } catch(Exception e) { - e.printStackTrace(); + LOG.debug(e); } finally { @@ -69,6 +78,17 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest } } + private class CloseableJob extends RunningJob implements Closeable + { + private final CountDownLatch _closed = new CountDownLatch(1); + + @Override + public void close() throws IOException + { + _closed.countDown(); + } + } + @Test public void testThreadPool() throws Exception { @@ -146,6 +166,58 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest waitForIdle(tp,2); } + @Test + public void testLifeCycleStop() throws Exception + { + QueuedThreadPool tp= new QueuedThreadPool(); + tp.setName("TestPool"); + tp.setMinThreads(1); + tp.setMaxThreads(2); + tp.setIdleTimeout(900); + tp.setStopTimeout(500); + tp.setThreadsPriority(Thread.NORM_PRIORITY-1); + tp.start(); + + // min threads started + waitForThreads(tp,1); + waitForIdle(tp,1); + + // Run job0 and job1 + RunningJob job0=new RunningJob(); + RunningJob job1=new RunningJob(); + tp.execute(job0); + tp.execute(job1); + + // Add a more jobs (which should not be run) + RunningJob job2=new RunningJob(); + CloseableJob job3=new CloseableJob(); + RunningJob job4=new RunningJob(); + tp.execute(job2); + tp.execute(job3); + tp.execute(job4); + + // Wait until the first 2 start running + waitForThreads(tp,2); + waitForIdle(tp,0); + + // Queue should be empty after thread pool is stopped + tp.stop(); + assertThat(tp.getQueue().size(), is(0)); + + // First 2 jobs closed by InterruptedException + assertThat(job0._stopped.await(200, TimeUnit.MILLISECONDS), is(true)); + assertThat(job1._stopped.await(200, TimeUnit.MILLISECONDS), is(true)); + + // Verify RunningJobs in the queue have not been run + assertThat(job2._run.await(200, TimeUnit.MILLISECONDS), is(false)); + assertThat(job4._run.await(200, TimeUnit.MILLISECONDS), is(false)); + + // Verify ClosableJobs have not been run but have been closed + assertThat(job4._run.await(200, TimeUnit.MILLISECONDS), is(false)); + assertThat(job3._closed.await(200, TimeUnit.MILLISECONDS), is(true)); + } + + @Test public void testShrink() throws Exception {