Merge pull request #4149 from eclipse/jetty-9.4.x-4121-qtp-threadfactory
Issue #4121 - ThreadFactory support in QTP
This commit is contained in:
commit
c19d33dc59
|
@ -26,6 +26,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
@ -45,7 +46,7 @@ import org.eclipse.jetty.util.log.Logger;
|
||||||
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
|
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
|
||||||
|
|
||||||
@ManagedObject("A thread pool")
|
@ManagedObject("A thread pool")
|
||||||
public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadPool, Dumpable, TryExecutor
|
public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor
|
||||||
{
|
{
|
||||||
private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
|
private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
|
||||||
private static Runnable NOOP = () ->
|
private static Runnable NOOP = () ->
|
||||||
|
@ -67,6 +68,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
||||||
private final Object _joinLock = new Object();
|
private final Object _joinLock = new Object();
|
||||||
private final BlockingQueue<Runnable> _jobs;
|
private final BlockingQueue<Runnable> _jobs;
|
||||||
private final ThreadGroup _threadGroup;
|
private final ThreadGroup _threadGroup;
|
||||||
|
private final ThreadFactory _threadFactory;
|
||||||
private String _name = "qtp" + hashCode();
|
private String _name = "qtp" + hashCode();
|
||||||
private int _idleTimeout;
|
private int _idleTimeout;
|
||||||
private int _maxThreads;
|
private int _maxThreads;
|
||||||
|
@ -114,7 +116,17 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
||||||
this(maxThreads, minThreads, idleTimeout, -1, queue, threadGroup);
|
this(maxThreads, minThreads, idleTimeout, -1, queue, threadGroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads, @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
|
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads,
|
||||||
|
@Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads,
|
||||||
|
@Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
|
||||||
|
{
|
||||||
|
this(maxThreads, minThreads, idleTimeout, reservedThreads, queue, threadGroup, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads,
|
||||||
|
@Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads,
|
||||||
|
@Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup,
|
||||||
|
@Name("threadFactory") ThreadFactory threadFactory)
|
||||||
{
|
{
|
||||||
if (maxThreads < minThreads)
|
if (maxThreads < minThreads)
|
||||||
throw new IllegalArgumentException("max threads (" + maxThreads + ") less than min threads (" + minThreads + ")");
|
throw new IllegalArgumentException("max threads (" + maxThreads + ") less than min threads (" + minThreads + ")");
|
||||||
|
@ -131,6 +143,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
||||||
_jobs = queue;
|
_jobs = queue;
|
||||||
_threadGroup = threadGroup;
|
_threadGroup = threadGroup;
|
||||||
setThreadPoolBudget(new ThreadPoolBudget(this));
|
setThreadPoolBudget(new ThreadPoolBudget(this));
|
||||||
|
_threadFactory = threadFactory == null ? this : threadFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -639,7 +652,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
||||||
boolean started = false;
|
boolean started = false;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
Thread thread = newThread(_runnable);
|
Thread thread = _threadFactory.newThread(_runnable);
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Starting {}", thread);
|
LOG.debug("Starting {}", thread);
|
||||||
_threads.add(thread);
|
_threads.add(thread);
|
||||||
|
@ -669,7 +682,8 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Thread newThread(Runnable runnable)
|
@Override
|
||||||
|
public Thread newThread(Runnable runnable)
|
||||||
{
|
{
|
||||||
Thread thread = new Thread(_threadGroup, runnable);
|
Thread thread = new Thread(_threadGroup, runnable);
|
||||||
thread.setDaemon(isDaemon());
|
thread.setDaemon(isDaemon());
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.util.thread;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.util.MultiException;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
public class ThreadFactoryTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void testThreadFactory() throws Exception
|
||||||
|
{
|
||||||
|
ThreadGroup threadGroup = new ThreadGroup("my-group");
|
||||||
|
MyThreadFactory threadFactory = new MyThreadFactory(threadGroup);
|
||||||
|
|
||||||
|
QueuedThreadPool qtp = new QueuedThreadPool(200, 10, 2000, 0, null, threadGroup, threadFactory);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
qtp.start();
|
||||||
|
|
||||||
|
int testThreads = 2000;
|
||||||
|
CountDownLatch threadsLatch = new CountDownLatch(testThreads);
|
||||||
|
MultiException mex = new MultiException();
|
||||||
|
|
||||||
|
for (int i = 0; i < testThreads; i++)
|
||||||
|
{
|
||||||
|
qtp.execute(() ->
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(20, 500));
|
||||||
|
Thread thread = Thread.currentThread();
|
||||||
|
|
||||||
|
if (!thread.getName().startsWith("My-"))
|
||||||
|
{
|
||||||
|
mex.add(new AssertionError("Thread " + thread.getName() + " does not start with 'My-'"));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!thread.getThreadGroup().getName().equalsIgnoreCase("my-group"))
|
||||||
|
{
|
||||||
|
mex.add(new AssertionError("Thread Group " + thread.getThreadGroup().getName() + " is not 'my-group'"));
|
||||||
|
}
|
||||||
|
|
||||||
|
threadsLatch.countDown();
|
||||||
|
}
|
||||||
|
catch (InterruptedException e)
|
||||||
|
{
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(threadsLatch.await(5, TimeUnit.SECONDS), "Did not see all tasks finish");
|
||||||
|
mex.ifExceptionThrow();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
qtp.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MyThreadFactory implements ThreadFactory
|
||||||
|
{
|
||||||
|
private final ThreadGroup threadGroup;
|
||||||
|
|
||||||
|
public MyThreadFactory(ThreadGroup threadGroup)
|
||||||
|
{
|
||||||
|
this.threadGroup = threadGroup;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable runnable)
|
||||||
|
{
|
||||||
|
Thread thread = new Thread(threadGroup, runnable);
|
||||||
|
thread.setDaemon(false);
|
||||||
|
thread.setPriority(Thread.MIN_PRIORITY);
|
||||||
|
thread.setName("My-" + thread.getId());
|
||||||
|
return thread;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue