diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 49991896555..2b825a35a84 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -45,7 +46,7 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; @ManagedObject("A thread pool") -public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadPool, Dumpable, TryExecutor +public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor { private static final Logger LOG = Log.getLogger(QueuedThreadPool.class); private static Runnable NOOP = () -> @@ -67,6 +68,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP private final Object _joinLock = new Object(); private final BlockingQueue _jobs; private final ThreadGroup _threadGroup; + private final ThreadFactory _threadFactory; private String _name = "qtp" + hashCode(); private int _idleTimeout; private int _maxThreads; @@ -114,7 +116,17 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP this(maxThreads, minThreads, idleTimeout, -1, queue, threadGroup); } - public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads, @Name("queue") BlockingQueue queue, @Name("threadGroup") ThreadGroup threadGroup) + public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, + @Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads, + @Name("queue") BlockingQueue queue, @Name("threadGroup") ThreadGroup threadGroup) + { + this(maxThreads, minThreads, idleTimeout, reservedThreads, queue, threadGroup, null); + } + + public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, + @Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads, + @Name("queue") BlockingQueue queue, @Name("threadGroup") ThreadGroup threadGroup, + @Name("threadFactory") ThreadFactory threadFactory) { if (maxThreads < minThreads) throw new IllegalArgumentException("max threads (" + maxThreads + ") less than min threads (" + minThreads + ")"); @@ -131,6 +143,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP _jobs = queue; _threadGroup = threadGroup; setThreadPoolBudget(new ThreadPoolBudget(this)); + _threadFactory = threadFactory == null ? this : threadFactory; } @Override @@ -639,7 +652,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP boolean started = false; try { - Thread thread = newThread(_runnable); + Thread thread = _threadFactory.newThread(_runnable); if (LOG.isDebugEnabled()) LOG.debug("Starting {}", thread); _threads.add(thread); @@ -669,7 +682,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP } } - protected Thread newThread(Runnable runnable) + @Override + public Thread newThread(Runnable runnable) { Thread thread = new Thread(_threadGroup, runnable); thread.setDaemon(isDaemon()); diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ThreadFactoryTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ThreadFactoryTest.java new file mode 100644 index 00000000000..77c571e4e64 --- /dev/null +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ThreadFactoryTest.java @@ -0,0 +1,104 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.util.MultiException; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ThreadFactoryTest +{ + @Test + public void testThreadFactory() throws Exception + { + ThreadGroup threadGroup = new ThreadGroup("my-group"); + MyThreadFactory threadFactory = new MyThreadFactory(threadGroup); + + QueuedThreadPool qtp = new QueuedThreadPool(200, 10, 2000, 0, null, threadGroup, threadFactory); + try + { + qtp.start(); + + int testThreads = 2000; + CountDownLatch threadsLatch = new CountDownLatch(testThreads); + MultiException mex = new MultiException(); + + for (int i = 0; i < testThreads; i++) + { + qtp.execute(() -> + { + try + { + TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(20, 500)); + Thread thread = Thread.currentThread(); + + if (!thread.getName().startsWith("My-")) + { + mex.add(new AssertionError("Thread " + thread.getName() + " does not start with 'My-'")); + } + + if (!thread.getThreadGroup().getName().equalsIgnoreCase("my-group")) + { + mex.add(new AssertionError("Thread Group " + thread.getThreadGroup().getName() + " is not 'my-group'")); + } + + threadsLatch.countDown(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + }); + } + + assertTrue(threadsLatch.await(5, TimeUnit.SECONDS), "Did not see all tasks finish"); + mex.ifExceptionThrow(); + } + finally + { + qtp.stop(); + } + } + + public static class MyThreadFactory implements ThreadFactory + { + private final ThreadGroup threadGroup; + + public MyThreadFactory(ThreadGroup threadGroup) + { + this.threadGroup = threadGroup; + } + + @Override + public Thread newThread(Runnable runnable) + { + Thread thread = new Thread(threadGroup, runnable); + thread.setDaemon(false); + thread.setPriority(Thread.MIN_PRIORITY); + thread.setName("My-" + thread.getId()); + return thread; + } + } +}