ARTEMIS-518 - Improvement of default thread factory

This commit is contained in:
Erich Duda 2016-05-06 07:54:51 +02:00 committed by Clebert Suconic
parent 8e7f876843
commit a622fa7443
39 changed files with 104 additions and 63 deletions

View File

@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
@Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)") @Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)")
@ -139,8 +140,8 @@ public class PrintData extends LockAbstract {
Set<Long> pgTXs = cursorACKs.getPgTXs(); Set<Long> pgTXs = cursorACKs.getPgTXs();
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1); ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
final ExecutorService executor = Executors.newFixedThreadPool(10); final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
ExecutorFactory execfactory = new ExecutorFactory() { ExecutorFactory execfactory = new ExecutorFactory() {
@Override @Override
public Executor getExecutor() { public Executor getExecutor() {

View File

@ -87,6 +87,7 @@ import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFac
import org.apache.activemq.artemis.jms.persistence.config.PersistedDestination; import org.apache.activemq.artemis.jms.persistence.config.PersistedDestination;
import org.apache.activemq.artemis.jms.persistence.config.PersistedType; import org.apache.activemq.artemis.jms.persistence.config.PersistedType;
import org.apache.activemq.artemis.jms.persistence.impl.journal.JMSJournalStorageManagerImpl; import org.apache.activemq.artemis.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
@ -142,7 +143,7 @@ public final class XmlDataExporter extends LockAbstract {
String pagingDir, String pagingDir,
String largeMessagesDir) throws Exception { String largeMessagesDir) throws Exception {
config = new ConfigurationImpl().setBindingsDirectory(bindingsDir).setJournalDirectory(journalDir).setPagingDirectory(pagingDir).setLargeMessagesDirectory(largeMessagesDir).setJournalType(JournalType.NIO); config = new ConfigurationImpl().setBindingsDirectory(bindingsDir).setJournalDirectory(journalDir).setPagingDirectory(pagingDir).setLargeMessagesDirectory(largeMessagesDir).setJournalType(JournalType.NIO);
final ExecutorService executor = Executors.newFixedThreadPool(1); final ExecutorService executor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
ExecutorFactory executorFactory = new ExecutorFactory() { ExecutorFactory executorFactory = new ExecutorFactory() {
@Override @Override
public Executor getExecutor() { public Executor getExecutor() {
@ -678,8 +679,8 @@ public final class XmlDataExporter extends LockAbstract {
*/ */
private void printPagedMessagesAsXML() { private void printPagedMessagesAsXML() {
try { try {
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1); ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
final ExecutorService executor = Executors.newFixedThreadPool(10); final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
ExecutorFactory executorFactory = new ExecutorFactory() { ExecutorFactory executorFactory = new ExecutorFactory() {
@Override @Override
public Executor getExecutor() { public Executor getExecutor() {

View File

@ -89,4 +89,9 @@ public final class ActiveMQThreadFactory implements ThreadFactory {
return t; return t;
} }
public static ActiveMQThreadFactory defaultThreadFactory() {
String callerClassName = Thread.currentThread().getStackTrace()[2].getClassName();
return new ActiveMQThreadFactory(callerClassName, false, null);
}
} }

View File

@ -133,7 +133,7 @@ public final class UUIDGenerator {
public static byte[] getHardwareAddress() { public static byte[] getHardwareAddress() {
try { try {
// check if we have enough security permissions to create and shutdown an executor // check if we have enough security permissions to create and shutdown an executor
ExecutorService executor = Executors.newFixedThreadPool(1); ExecutorService executor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
executor.shutdownNow(); executor.shutdownNow();
} }
catch (Throwable t) { catch (Throwable t) {
@ -259,7 +259,7 @@ public final class UUIDGenerator {
} }
private static byte[] findFirstMatchingHardwareAddress(List<NetworkInterface> ifaces) { private static byte[] findFirstMatchingHardwareAddress(List<NetworkInterface> ifaces) {
ExecutorService executor = Executors.newFixedThreadPool(ifaces.size()); ExecutorService executor = Executors.newFixedThreadPool(ifaces.size(), ActiveMQThreadFactory.defaultThreadFactory());
Collection<Callable<byte[]>> tasks = new ArrayList<>(ifaces.size()); Collection<Callable<byte[]>> tasks = new ArrayList<>(ifaces.size());
for (final NetworkInterface networkInterface : ifaces) { for (final NetworkInterface networkInterface : ifaces) {

View File

@ -48,14 +48,14 @@ public class ReferenceCounterTest extends Assert {
@Test @Test
public void testReferenceWithExecutor() throws Exception { public void testReferenceWithExecutor() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
internalTestReferenceNoExecutor(executor); internalTestReferenceNoExecutor(executor);
executor.shutdown(); executor.shutdown();
} }
@Test @Test
public void testReferenceValidExecutorUsed() throws Exception { public void testReferenceValidExecutorUsed() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
LatchRunner runner = new LatchRunner(); LatchRunner runner = new LatchRunner();
ReferenceCounterUtil counter = new ReferenceCounterUtil(runner, executor); ReferenceCounterUtil counter = new ReferenceCounterUtil(runner, executor);
counter.increment(); counter.increment();

View File

@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -106,7 +107,7 @@ public class ClientThreadPoolsTest {
0L, TimeUnit.MILLISECONDS, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()); new LinkedBlockingQueue<Runnable>());
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1); ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
ActiveMQClient.injectPools(poolExecutor, scheduledThreadPoolExecutor); ActiveMQClient.injectPools(poolExecutor, scheduledThreadPoolExecutor);

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory; import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.derby.jdbc.EmbeddedDriver; import org.apache.derby.jdbc.EmbeddedDriver;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -55,7 +56,7 @@ public class JDBCSequentialFileFactoryTest {
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
Executor executor = Executors.newSingleThreadExecutor(); Executor executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
factory = new JDBCSequentialFileFactory(connectionUrl, tableName, className, executor); factory = new JDBCSequentialFileFactory(connectionUrl, tableName, className, executor);
factory.start(); factory.start();

View File

@ -1597,9 +1597,12 @@ public final class JMSBridgeImpl implements JMSBridge {
*/ */
private ExecutorService createExecutor() { private ExecutorService createExecutor() {
ExecutorService service = Executors.newFixedThreadPool(3, new ThreadFactory() { ExecutorService service = Executors.newFixedThreadPool(3, new ThreadFactory() {
ThreadGroup group = new ThreadGroup("JMSBridgeImpl");
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
final Thread thr = new Thread(r); final Thread thr = new Thread(group, r);
if (moduleTccl != null) { if (moduleTccl != null) {
AccessController.doPrivileged(new PrivilegedAction() { AccessController.doPrivileged(new PrivilegedAction() {
@Override @Override

View File

@ -20,6 +20,7 @@ import java.util.concurrent.Executors;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Session;
@ -50,7 +51,7 @@ public class AbstractConnectionContextTest {
private class TestConnectionContext extends AbstractConnectionContext { private class TestConnectionContext extends AbstractConnectionContext {
public TestConnectionContext(AMQPConnectionCallback connectionCallback) { public TestConnectionContext(AMQPConnectionCallback connectionCallback) {
super(connectionCallback, Executors.newSingleThreadExecutor(), null); super(connectionCallback, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
} }
@Override @Override

View File

@ -18,6 +18,7 @@ package org.proton.plug.test.invm;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.proton.plug.AMQPClientConnectionContext; import org.proton.plug.AMQPClientConnectionContext;
import org.proton.plug.context.client.ProtonClientConnectionContext; import org.proton.plug.context.client.ProtonClientConnectionContext;
import org.proton.plug.test.minimalclient.Connector; import org.proton.plug.test.minimalclient.Connector;
@ -34,6 +35,6 @@ public class InVMTestConnector implements Connector {
@Override @Override
public AMQPClientConnectionContext connect(String host, int port) throws Exception { public AMQPClientConnectionContext connect(String host, int port) throws Exception {
return new ProtonClientConnectionContext(new ProtonINVMSPI(), Executors.newSingleThreadExecutor(), null); return new ProtonClientConnectionContext(new ProtonINVMSPI(), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
} }
} }

View File

@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.proton.plug.AMQPConnectionContext; import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPConnectionCallback; import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPSessionCallback; import org.proton.plug.AMQPSessionCallback;
@ -35,11 +36,11 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
AMQPConnectionContext returningConnection; AMQPConnectionContext returningConnection;
ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), Executors.newSingleThreadExecutor(), null); ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
final ExecutorService mainExecutor = Executors.newSingleThreadExecutor(); final ExecutorService mainExecutor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
final ExecutorService returningExecutor = Executors.newSingleThreadExecutor(); final ExecutorService returningExecutor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
public ProtonINVMSPI() { public ProtonINVMSPI() {
mainExecutor.execute(new Runnable() { mainExecutor.execute(new Runnable() {

View File

@ -29,6 +29,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.proton.plug.AMQPClientConnectionContext; import org.proton.plug.AMQPClientConnectionContext;
import org.proton.plug.context.client.ProtonClientConnectionContextFactory; import org.proton.plug.context.client.ProtonClientConnectionContextFactory;
@ -60,7 +61,7 @@ public class SimpleAMQPConnector implements Connector {
AMQPClientSPI clientConnectionSPI = new AMQPClientSPI(future.channel()); AMQPClientSPI clientConnectionSPI = new AMQPClientSPI(future.channel());
final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI, Executors.newSingleThreadExecutor(), null); final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
future.channel().pipeline().addLast(new ChannelDuplexHandler() { future.channel().pipeline().addLast(new ChannelDuplexHandler() {
@Override @Override

View File

@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.proton.plug.AMQPConnectionContext; import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPConnectionCallback; import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPSessionCallback; import org.proton.plug.AMQPSessionCallback;
@ -44,7 +45,7 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
this.channel = channel; this.channel = channel;
} }
ExecutorService executorService = Executors.newSingleThreadExecutor(); ExecutorService executorService = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
@Override @Override
public void close() { public void close() {

View File

@ -40,6 +40,7 @@ import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.ResourceLeakDetector; import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.proton.plug.AMQPServerConnectionContext; import org.proton.plug.AMQPServerConnectionContext;
import org.proton.plug.context.server.ProtonServerConnectionContextFactory; import org.proton.plug.context.server.ProtonServerConnectionContextFactory;
import org.proton.plug.test.Constants; import org.proton.plug.test.Constants;
@ -125,7 +126,7 @@ public class MinimalServer {
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx); super.channelActive(ctx);
connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel()), Executors.newSingleThreadExecutor(), null); connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel()), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
//ctx.read(); //ctx.read();
} }

View File

@ -40,6 +40,7 @@ import org.apache.activemq.artemis.rest.util.LinkHeaderLinkStrategy;
import org.apache.activemq.artemis.rest.util.LinkStrategy; import org.apache.activemq.artemis.rest.util.LinkStrategy;
import org.apache.activemq.artemis.rest.util.TimeoutTask; import org.apache.activemq.artemis.rest.util.TimeoutTask;
import org.apache.activemq.artemis.spi.core.naming.BindingRegistry; import org.apache.activemq.artemis.spi.core.naming.BindingRegistry;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.XMLUtil; import org.apache.activemq.artemis.utils.XMLUtil;
public class MessageServiceManager { public class MessageServiceManager {
@ -126,7 +127,7 @@ public class MessageServiceManager {
} }
} }
if (threadPool == null) if (threadPool == null)
threadPool = Executors.newCachedThreadPool(); threadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
timeoutTaskInterval = configuration.getTimeoutTaskInterval(); timeoutTaskInterval = configuration.getTimeoutTaskInterval();
timeoutTask = new TimeoutTask(timeoutTaskInterval); timeoutTask = new TimeoutTask(timeoutTaskInterval);
threadPool.execute(timeoutTask); threadPool.execute(timeoutTask);

View File

@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -41,6 +40,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor; import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -107,10 +107,10 @@ public class InVMConnector extends AbstractConnector {
private static synchronized ExecutorService getInVMExecutor() { private static synchronized ExecutorService getInVMExecutor() {
if (threadPoolExecutor == null) { if (threadPoolExecutor == null) {
if (ActiveMQClient.getGlobalThreadPoolSize() <= -1) { if (ActiveMQClient.getGlobalThreadPoolSize() <= -1) {
threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory()); threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), ActiveMQThreadFactory.defaultThreadFactory());
} }
else { else {
threadPoolExecutor = new ActiveMQThreadPoolExecutor(0, ActiveMQClient.getGlobalThreadPoolSize(), 60L, TimeUnit.SECONDS, Executors.defaultThreadFactory()); threadPoolExecutor = new ActiveMQThreadPoolExecutor(0, ActiveMQClient.getGlobalThreadPoolSize(), 60L, TimeUnit.SECONDS, ActiveMQThreadFactory.defaultThreadFactory());
} }
} }
return threadPoolExecutor; return threadPoolExecutor;

View File

@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.spi.core.security.jaas.JaasCallbackHandler; import org.apache.activemq.artemis.spi.core.security.jaas.JaasCallbackHandler;
import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoader; import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoader;
import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoginModule; import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoginModule;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
@ -119,7 +120,7 @@ public class PropertiesLoginModuleRaceConditionTest {
options.put("baseDir", temp.getRoot().getAbsolutePath()); options.put("baseDir", temp.getRoot().getAbsolutePath());
errors = new ArrayBlockingQueue<>(processorCount()); errors = new ArrayBlockingQueue<>(processorCount());
pool = Executors.newFixedThreadPool(processorCount()); pool = Executors.newFixedThreadPool(processorCount(), ActiveMQThreadFactory.defaultThreadFactory());
callback = new JaasCallbackHandler(USERNAME, PASSWORD, null); callback = new JaasCallbackHandler(USERNAME, PASSWORD, null);
} }

View File

@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.LinkedListIterator; import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.ReferenceCounter;
@ -170,8 +171,8 @@ public class ScheduledDeliveryHandlerTest extends Assert {
@Test @Test
public void testScheduleNow() throws Exception { public void testScheduleNow() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(50); ExecutorService executor = Executors.newFixedThreadPool(50, ActiveMQThreadFactory.defaultThreadFactory());
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1); ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, ActiveMQThreadFactory.defaultThreadFactory());
try { try {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
// it's better to run the test a few times instead of run millions of messages here // it's better to run the test a few times instead of run millions of messages here

View File

@ -128,6 +128,7 @@ import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule; import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -472,7 +473,7 @@ public abstract class ActiveMQTestBase extends Assert {
} }
protected final OrderedExecutorFactory getOrderedExecutor() { protected final OrderedExecutorFactory getOrderedExecutor() {
final ExecutorService executor = Executors.newCachedThreadPool(); final ExecutorService executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
executorSet.add(executor); executorSet.add(executor);
return new OrderedExecutorFactory(executor); return new OrderedExecutorFactory(executor);
} }

View File

@ -22,6 +22,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules; import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner; import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
@ -49,7 +50,7 @@ public class HierarchicalObjectRepositoryTest {
public void setUp() { public void setUp() {
latch = new CountDownLatch(1); latch = new CountDownLatch(1);
latch2 = new CountDownLatch(1); latch2 = new CountDownLatch(1);
executor = Executors.newSingleThreadExecutor(); executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
repo = new HierarchicalObjectRepository<>(); repo = new HierarchicalObjectRepository<>();
addToRepo(repo, A); addToRepo(repo, A);
} }

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -55,7 +56,7 @@ public class CoreClientTest extends ActiveMQTestBase {
@Test @Test
public void testCoreClientWithInjectedThreadPools() throws Exception { public void testCoreClientWithInjectedThreadPools() throws Exception {
ExecutorService threadPool = Executors.newCachedThreadPool(); ExecutorService threadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(10); ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(10);
ServerLocator locator = createNonHALocator(false); ServerLocator locator = createNonHALocator(false);

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
@ -550,7 +551,7 @@ public class ClusteredGroupingTest extends ClusterTestBase {
session.close(); session.close();
// need thread pool to service both consumers and producers plus a thread to cycle nodes // need thread pool to service both consumers and producers plus a thread to cycle nodes
ExecutorService executorService = Executors.newFixedThreadPool(groups.size() * 2 + 1); ExecutorService executorService = Executors.newFixedThreadPool(groups.size() * 2 + 1, ActiveMQThreadFactory.defaultThreadFactory());
final AtomicInteger producerCounter = new AtomicInteger(0); final AtomicInteger producerCounter = new AtomicInteger(0);
final CountDownLatch okToConsume = new CountDownLatch(groups.size() + 1); final CountDownLatch okToConsume = new CountDownLatch(groups.size() + 1);

View File

@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestBase; import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestBase;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding; import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@ -1627,11 +1628,11 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
final AtomicLong seqGenerator = new AtomicLong(1); final AtomicLong seqGenerator = new AtomicLong(1);
final ExecutorService executor = Executors.newCachedThreadPool(); final ExecutorService executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
OrderedExecutorFactory factory = new OrderedExecutorFactory(executor); OrderedExecutorFactory factory = new OrderedExecutorFactory(executor);
final ExecutorService deleteExecutor = Executors.newCachedThreadPool(); final ExecutorService deleteExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
final JournalStorageManager storage = new JournalStorageManager(config, factory, null); final JournalStorageManager storage = new JournalStorageManager(config, factory, null);

View File

@ -32,6 +32,7 @@ import javax.jms.MessageListener;
import javax.jms.Session; import javax.jms.Session;
import org.apache.activemq.ActiveMQMessageConsumer; import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.junit.Test; import org.junit.Test;
@ -93,7 +94,7 @@ public class JMSConsumer2Test extends BasicOpenWireTest {
} }
} }
final ExecutorService executor = Executors.newCachedThreadPool(); final ExecutorService executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
consumer.setMessageListener(new MessageListener() { consumer.setMessageListener(new MessageListener() {
@Override @Override
public void onMessage(Message m) { public void onMessage(Message m) {

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfi
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl; import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -73,7 +74,7 @@ public class MultipleProducersPagingTest extends ActiveMQTestBase {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
executor = Executors.newCachedThreadPool(); executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
AddressSettings addressSettings = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setPageSizeBytes(50000).setMaxSizeBytes(404850); AddressSettings addressSettings = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setPageSizeBytes(50000).setMaxSizeBytes(404850);

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -75,7 +76,7 @@ public class AddressFullLoggingTest extends ActiveMQTestBase {
final ClientMessage message = session.createMessage(false); final ClientMessage message = session.createMessage(false);
message.getBodyBuffer().writeBytes(new byte[1024]); message.getBodyBuffer().writeBytes(new byte[1024]);
ExecutorService executor = Executors.newFixedThreadPool(1); ExecutorService executor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
Callable<Object> sendMessageTask = new Callable<Object>() { Callable<Object> sendMessageTask = new Callable<Object>() {
@Override @Override
public Object call() throws ActiveMQException { public Object call() throws ActiveMQException {

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfigu
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -70,7 +71,7 @@ public class FileLockTimeoutTest extends ActiveMQTestBase {
server2.getConfiguration().setJournalLockAcquisitionTimeout(5000); server2.getConfiguration().setJournalLockAcquisitionTimeout(5000);
// if something happens that causes the timeout to misbehave we don't want the test to hang // if something happens that causes the timeout to misbehave we don't want the test to hang
ExecutorService service = Executors.newSingleThreadExecutor(); ExecutorService service = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
Runnable r = new Runnable() { Runnable r = new Runnable() {
@Override @Override
public void run() { public void run() {

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl; import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -42,8 +43,8 @@ public class SuppliedThreadPoolTest extends ActiveMQTestBase {
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
serviceRegistry = new ServiceRegistryImpl(); serviceRegistry = new ServiceRegistryImpl();
serviceRegistry.setExecutorService(Executors.newFixedThreadPool(1)); serviceRegistry.setExecutorService(Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()));
serviceRegistry.setScheduledExecutorService(Executors.newScheduledThreadPool(1)); serviceRegistry.setScheduledExecutorService(Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()));
server = new ActiveMQServerImpl(null, null, null, null, serviceRegistry); server = new ActiveMQServerImpl(null, null, null, null, serviceRegistry);
server.start(); server.start();
server.waitForActivation(100, TimeUnit.MILLISECONDS); server.waitForActivation(100, TimeUnit.MILLISECONDS);

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.timing.core.server.impl; package org.apache.activemq.artemis.tests.timing.core.server.impl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.Before; import org.junit.Before;
import org.junit.After; import org.junit.After;
@ -69,7 +70,7 @@ public class QueueImplTest extends ActiveMQTestBase {
@Test @Test
public void testScheduledNoConsumer() throws Exception { public void testScheduledNoConsumer() throws Exception {
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor()); QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
// Send one scheduled // Send one scheduled
@ -134,7 +135,7 @@ public class QueueImplTest extends ActiveMQTestBase {
@Test @Test
public void testScheduled() throws Exception { public void testScheduled() throws Exception {
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor()); QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
FakeConsumer consumer = null; FakeConsumer consumer = null;
@ -232,7 +233,7 @@ public class QueueImplTest extends ActiveMQTestBase {
public void disconnect() { public void disconnect() {
} }
}; };
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor()); QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
MessageReference messageReference = generateReference(queue, 1); MessageReference messageReference = generateReference(queue, 1);
queue.addConsumer(consumer); queue.addConsumer(consumer);
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000); messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);

View File

@ -71,7 +71,7 @@ public class MultiThreadAsynchronousFileTest extends AIOTestBase {
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
pollerExecutor = Executors.newCachedThreadPool(new ActiveMQThreadFactory("ActiveMQ-AIO-poller-pool" + System.identityHashCode(this), false, this.getClass().getClassLoader())); pollerExecutor = Executors.newCachedThreadPool(new ActiveMQThreadFactory("ActiveMQ-AIO-poller-pool" + System.identityHashCode(this), false, this.getClass().getClassLoader()));
executor = Executors.newSingleThreadExecutor(); executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
} }
@Override @Override

View File

@ -56,6 +56,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory; import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.apache.activemq.artemis.tests.unit.util.FakePagingManager; import org.apache.activemq.artemis.tests.unit.util.FakePagingManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.After; import org.junit.After;
@ -770,7 +771,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
executor = Executors.newSingleThreadExecutor(); executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
} }
@Override @Override

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -43,7 +44,7 @@ public class OperationContextUnitTest extends ActiveMQTestBase {
@Test @Test
public void testCompleteTaskAfterPaging() throws Exception { public void testCompleteTaskAfterPaging() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
try { try {
OperationContextImpl impl = new OperationContextImpl(executor); OperationContextImpl impl = new OperationContextImpl(executor);
final CountDownLatch latch1 = new CountDownLatch(1); final CountDownLatch latch1 = new CountDownLatch(1);
@ -102,7 +103,7 @@ public class OperationContextUnitTest extends ActiveMQTestBase {
@Test @Test
public void testCaptureExceptionOnExecutor() throws Exception { public void testCaptureExceptionOnExecutor() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
executor.shutdown(); executor.shutdown();
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
@ -148,7 +149,7 @@ public class OperationContextUnitTest extends ActiveMQTestBase {
@Test @Test
public void testCaptureExceptionOnFailure() throws Exception { public void testCaptureExceptionOnFailure() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader; import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader;
import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl; import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
@ -69,7 +70,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
executor = Executors.newSingleThreadExecutor(); executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
factory = new OrderedExecutorFactory(executor); factory = new OrderedExecutorFactory(executor);
} }
@ -89,7 +90,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
PostOffice postOffice = new FakePostOffice(); PostOffice postOffice = new FakePostOffice();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize()); ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory());
journal = new JournalStorageManager(configuration, factory, null); journal = new JournalStorageManager(configuration, factory, null);

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -71,7 +72,7 @@ public class NettyAcceptorFactoryTest extends ActiveMQTestBase {
}; };
Acceptor acceptor = factory.createAcceptor("netty", null, params, handler, listener, Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize()), null); Acceptor acceptor = factory.createAcceptor("netty", null, params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()), null);
Assert.assertTrue(acceptor instanceof NettyAcceptor); Assert.assertTrue(acceptor instanceof NettyAcceptor);
} }

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -93,7 +94,7 @@ public class NettyAcceptorTest extends ActiveMQTestBase {
public void connectionReadyForWrites(Object connectionID, boolean ready) { public void connectionReadyForWrites(Object connectionID, boolean ready) {
} }
}; };
pool2 = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize()); pool2 = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory());
NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, null); NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, null);
addActiveMQComponent(acceptor); addActiveMQComponent(acceptor);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.Test; import org.junit.Test;
import java.util.HashMap; import java.util.HashMap;
@ -64,7 +65,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
} }
}; };
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5)); NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
connector.start(); connector.start();
Assert.assertTrue(connector.isStarted()); Assert.assertTrue(connector.isStarted());
@ -101,7 +102,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
}; };
try { try {
new NettyConnector(params, null, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5)); new NettyConnector(params, null, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
Assert.fail("Should throw Exception"); Assert.fail("Should throw Exception");
} }
@ -110,7 +111,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
} }
try { try {
new NettyConnector(params, handler, null, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5)); new NettyConnector(params, handler, null, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
Assert.fail("Should throw Exception"); Assert.fail("Should throw Exception");
} }
@ -152,7 +153,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
} }
}; };
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5)); NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
System.setProperty(NettyConnector.JAVAX_KEYSTORE_PATH_PROP_NAME, "client-side-keystore.jks"); System.setProperty(NettyConnector.JAVAX_KEYSTORE_PATH_PROP_NAME, "client-side-keystore.jks");
System.setProperty(NettyConnector.JAVAX_KEYSTORE_PASSWORD_PROP_NAME, "secureexample"); System.setProperty(NettyConnector.JAVAX_KEYSTORE_PASSWORD_PROP_NAME, "secureexample");
@ -198,7 +199,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
} }
}; };
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5)); NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
System.setProperty(NettyConnector.JAVAX_KEYSTORE_PATH_PROP_NAME, "bad path"); System.setProperty(NettyConnector.JAVAX_KEYSTORE_PATH_PROP_NAME, "bad path");
System.setProperty(NettyConnector.JAVAX_KEYSTORE_PASSWORD_PROP_NAME, "bad password"); System.setProperty(NettyConnector.JAVAX_KEYSTORE_PASSWORD_PROP_NAME, "bad password");
@ -246,7 +247,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
} }
}; };
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5)); NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
connector.start(); connector.start();
Assert.assertTrue(connector.isStarted()); Assert.assertTrue(connector.isStarted());
@ -285,7 +286,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
} }
}; };
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5)); NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
connector.start(); connector.start();
Assert.assertTrue(connector.isStarted()); Assert.assertTrue(connector.isStarted());

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.LinkedListIterator; import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.junit.After; import org.junit.After;
@ -65,8 +66,8 @@ public class QueueImplTest extends ActiveMQTestBase {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); scheduledExecutor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory());
executor = Executors.newSingleThreadExecutor(); executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
} }
@Override @Override

View File

@ -27,12 +27,13 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
public class FakeQueueFactory implements QueueFactory { public class FakeQueueFactory implements QueueFactory {
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory());
private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
private PostOffice postOffice; private PostOffice postOffice;

View File

@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.activemq.artemis.tests.unit.UnitTestLogger; import org.apache.activemq.artemis.tests.unit.UnitTestLogger;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.Assert; import org.junit.Assert;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
@ -160,7 +161,7 @@ public final class SpawnedVMSupport {
public static void assertProcessExits(final boolean sameValue, public static void assertProcessExits(final boolean sameValue,
final int value, final int value,
final Process p) throws InterruptedException, ExecutionException, TimeoutException { final Process p) throws InterruptedException, ExecutionException, TimeoutException {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory());
Future<Integer> future = executor.submit(new Callable<Integer>() { Future<Integer> future = executor.submit(new Callable<Integer>() {
@Override @Override