This closes #512
This commit is contained in:
commit
0230a4026c
|
@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
|
||||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
|
|
||||||
@Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)")
|
@Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)")
|
||||||
|
@ -139,8 +140,8 @@ public class PrintData extends LockAbstract {
|
||||||
|
|
||||||
Set<Long> pgTXs = cursorACKs.getPgTXs();
|
Set<Long> pgTXs = cursorACKs.getPgTXs();
|
||||||
|
|
||||||
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
|
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
final ExecutorService executor = Executors.newFixedThreadPool(10);
|
final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
ExecutorFactory execfactory = new ExecutorFactory() {
|
ExecutorFactory execfactory = new ExecutorFactory() {
|
||||||
@Override
|
@Override
|
||||||
public Executor getExecutor() {
|
public Executor getExecutor() {
|
||||||
|
|
|
@ -87,6 +87,7 @@ import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFac
|
||||||
import org.apache.activemq.artemis.jms.persistence.config.PersistedDestination;
|
import org.apache.activemq.artemis.jms.persistence.config.PersistedDestination;
|
||||||
import org.apache.activemq.artemis.jms.persistence.config.PersistedType;
|
import org.apache.activemq.artemis.jms.persistence.config.PersistedType;
|
||||||
import org.apache.activemq.artemis.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
|
import org.apache.activemq.artemis.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.Base64;
|
import org.apache.activemq.artemis.utils.Base64;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
|
|
||||||
|
@ -142,7 +143,7 @@ public final class XmlDataExporter extends LockAbstract {
|
||||||
String pagingDir,
|
String pagingDir,
|
||||||
String largeMessagesDir) throws Exception {
|
String largeMessagesDir) throws Exception {
|
||||||
config = new ConfigurationImpl().setBindingsDirectory(bindingsDir).setJournalDirectory(journalDir).setPagingDirectory(pagingDir).setLargeMessagesDirectory(largeMessagesDir).setJournalType(JournalType.NIO);
|
config = new ConfigurationImpl().setBindingsDirectory(bindingsDir).setJournalDirectory(journalDir).setPagingDirectory(pagingDir).setLargeMessagesDirectory(largeMessagesDir).setJournalType(JournalType.NIO);
|
||||||
final ExecutorService executor = Executors.newFixedThreadPool(1);
|
final ExecutorService executor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
ExecutorFactory executorFactory = new ExecutorFactory() {
|
ExecutorFactory executorFactory = new ExecutorFactory() {
|
||||||
@Override
|
@Override
|
||||||
public Executor getExecutor() {
|
public Executor getExecutor() {
|
||||||
|
@ -678,8 +679,8 @@ public final class XmlDataExporter extends LockAbstract {
|
||||||
*/
|
*/
|
||||||
private void printPagedMessagesAsXML() {
|
private void printPagedMessagesAsXML() {
|
||||||
try {
|
try {
|
||||||
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
|
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
final ExecutorService executor = Executors.newFixedThreadPool(10);
|
final ExecutorService executor = Executors.newFixedThreadPool(10, ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
ExecutorFactory executorFactory = new ExecutorFactory() {
|
ExecutorFactory executorFactory = new ExecutorFactory() {
|
||||||
@Override
|
@Override
|
||||||
public Executor getExecutor() {
|
public Executor getExecutor() {
|
||||||
|
|
|
@ -89,4 +89,9 @@ public final class ActiveMQThreadFactory implements ThreadFactory {
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ActiveMQThreadFactory defaultThreadFactory() {
|
||||||
|
String callerClassName = Thread.currentThread().getStackTrace()[2].getClassName();
|
||||||
|
return new ActiveMQThreadFactory(callerClassName, false, null);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,7 +133,7 @@ public final class UUIDGenerator {
|
||||||
public static byte[] getHardwareAddress() {
|
public static byte[] getHardwareAddress() {
|
||||||
try {
|
try {
|
||||||
// check if we have enough security permissions to create and shutdown an executor
|
// check if we have enough security permissions to create and shutdown an executor
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
ExecutorService executor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
executor.shutdownNow();
|
executor.shutdownNow();
|
||||||
}
|
}
|
||||||
catch (Throwable t) {
|
catch (Throwable t) {
|
||||||
|
@ -259,7 +259,7 @@ public final class UUIDGenerator {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static byte[] findFirstMatchingHardwareAddress(List<NetworkInterface> ifaces) {
|
private static byte[] findFirstMatchingHardwareAddress(List<NetworkInterface> ifaces) {
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(ifaces.size());
|
ExecutorService executor = Executors.newFixedThreadPool(ifaces.size(), ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
Collection<Callable<byte[]>> tasks = new ArrayList<>(ifaces.size());
|
Collection<Callable<byte[]>> tasks = new ArrayList<>(ifaces.size());
|
||||||
|
|
||||||
for (final NetworkInterface networkInterface : ifaces) {
|
for (final NetworkInterface networkInterface : ifaces) {
|
||||||
|
|
|
@ -48,14 +48,14 @@ public class ReferenceCounterTest extends Assert {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReferenceWithExecutor() throws Exception {
|
public void testReferenceWithExecutor() throws Exception {
|
||||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
internalTestReferenceNoExecutor(executor);
|
internalTestReferenceNoExecutor(executor);
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReferenceValidExecutorUsed() throws Exception {
|
public void testReferenceValidExecutorUsed() throws Exception {
|
||||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
LatchRunner runner = new LatchRunner();
|
LatchRunner runner = new LatchRunner();
|
||||||
ReferenceCounterUtil counter = new ReferenceCounterUtil(runner, executor);
|
ReferenceCounterUtil counter = new ReferenceCounterUtil(runner, executor);
|
||||||
counter.increment();
|
counter.increment();
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -106,7 +107,7 @@ public class ClientThreadPoolsTest {
|
||||||
0L, TimeUnit.MILLISECONDS,
|
0L, TimeUnit.MILLISECONDS,
|
||||||
new LinkedBlockingQueue<Runnable>());
|
new LinkedBlockingQueue<Runnable>());
|
||||||
|
|
||||||
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1);
|
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
ActiveMQClient.injectPools(poolExecutor, scheduledThreadPoolExecutor);
|
ActiveMQClient.injectPools(poolExecutor, scheduledThreadPoolExecutor);
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
|
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
|
||||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
|
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.derby.jdbc.EmbeddedDriver;
|
import org.apache.derby.jdbc.EmbeddedDriver;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -55,7 +56,7 @@ public class JDBCSequentialFileFactoryTest {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
Executor executor = Executors.newSingleThreadExecutor();
|
Executor executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
factory = new JDBCSequentialFileFactory(connectionUrl, tableName, className, executor);
|
factory = new JDBCSequentialFileFactory(connectionUrl, tableName, className, executor);
|
||||||
factory.start();
|
factory.start();
|
||||||
|
|
|
@ -1597,9 +1597,12 @@ public final class JMSBridgeImpl implements JMSBridge {
|
||||||
*/
|
*/
|
||||||
private ExecutorService createExecutor() {
|
private ExecutorService createExecutor() {
|
||||||
ExecutorService service = Executors.newFixedThreadPool(3, new ThreadFactory() {
|
ExecutorService service = Executors.newFixedThreadPool(3, new ThreadFactory() {
|
||||||
|
|
||||||
|
ThreadGroup group = new ThreadGroup("JMSBridgeImpl");
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Thread newThread(Runnable r) {
|
public Thread newThread(Runnable r) {
|
||||||
final Thread thr = new Thread(r);
|
final Thread thr = new Thread(group, r);
|
||||||
if (moduleTccl != null) {
|
if (moduleTccl != null) {
|
||||||
AccessController.doPrivileged(new PrivilegedAction() {
|
AccessController.doPrivileged(new PrivilegedAction() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.util.concurrent.Executors;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.qpid.proton.engine.Connection;
|
import org.apache.qpid.proton.engine.Connection;
|
||||||
import org.apache.qpid.proton.engine.Link;
|
import org.apache.qpid.proton.engine.Link;
|
||||||
import org.apache.qpid.proton.engine.Session;
|
import org.apache.qpid.proton.engine.Session;
|
||||||
|
@ -50,7 +51,7 @@ public class AbstractConnectionContextTest {
|
||||||
private class TestConnectionContext extends AbstractConnectionContext {
|
private class TestConnectionContext extends AbstractConnectionContext {
|
||||||
|
|
||||||
public TestConnectionContext(AMQPConnectionCallback connectionCallback) {
|
public TestConnectionContext(AMQPConnectionCallback connectionCallback) {
|
||||||
super(connectionCallback, Executors.newSingleThreadExecutor(), null);
|
super(connectionCallback, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.proton.plug.test.invm;
|
||||||
|
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.proton.plug.AMQPClientConnectionContext;
|
import org.proton.plug.AMQPClientConnectionContext;
|
||||||
import org.proton.plug.context.client.ProtonClientConnectionContext;
|
import org.proton.plug.context.client.ProtonClientConnectionContext;
|
||||||
import org.proton.plug.test.minimalclient.Connector;
|
import org.proton.plug.test.minimalclient.Connector;
|
||||||
|
@ -34,6 +35,6 @@ public class InVMTestConnector implements Connector {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AMQPClientConnectionContext connect(String host, int port) throws Exception {
|
public AMQPClientConnectionContext connect(String host, int port) throws Exception {
|
||||||
return new ProtonClientConnectionContext(new ProtonINVMSPI(), Executors.newSingleThreadExecutor(), null);
|
return new ProtonClientConnectionContext(new ProtonINVMSPI(), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.proton.plug.AMQPConnectionContext;
|
import org.proton.plug.AMQPConnectionContext;
|
||||||
import org.proton.plug.AMQPConnectionCallback;
|
import org.proton.plug.AMQPConnectionCallback;
|
||||||
import org.proton.plug.AMQPSessionCallback;
|
import org.proton.plug.AMQPSessionCallback;
|
||||||
|
@ -35,11 +36,11 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
|
||||||
|
|
||||||
AMQPConnectionContext returningConnection;
|
AMQPConnectionContext returningConnection;
|
||||||
|
|
||||||
ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), Executors.newSingleThreadExecutor(), null);
|
ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI(), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
|
||||||
|
|
||||||
final ExecutorService mainExecutor = Executors.newSingleThreadExecutor();
|
final ExecutorService mainExecutor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
final ExecutorService returningExecutor = Executors.newSingleThreadExecutor();
|
final ExecutorService returningExecutor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
public ProtonINVMSPI() {
|
public ProtonINVMSPI() {
|
||||||
mainExecutor.execute(new Runnable() {
|
mainExecutor.execute(new Runnable() {
|
||||||
|
|
|
@ -29,6 +29,7 @@ import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelInitializer;
|
import io.netty.channel.ChannelInitializer;
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.proton.plug.AMQPClientConnectionContext;
|
import org.proton.plug.AMQPClientConnectionContext;
|
||||||
import org.proton.plug.context.client.ProtonClientConnectionContextFactory;
|
import org.proton.plug.context.client.ProtonClientConnectionContextFactory;
|
||||||
|
|
||||||
|
@ -60,7 +61,7 @@ public class SimpleAMQPConnector implements Connector {
|
||||||
|
|
||||||
AMQPClientSPI clientConnectionSPI = new AMQPClientSPI(future.channel());
|
AMQPClientSPI clientConnectionSPI = new AMQPClientSPI(future.channel());
|
||||||
|
|
||||||
final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI, Executors.newSingleThreadExecutor(), null);
|
final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
|
||||||
|
|
||||||
future.channel().pipeline().addLast(new ChannelDuplexHandler() {
|
future.channel().pipeline().addLast(new ChannelDuplexHandler() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.proton.plug.AMQPConnectionContext;
|
import org.proton.plug.AMQPConnectionContext;
|
||||||
import org.proton.plug.AMQPConnectionCallback;
|
import org.proton.plug.AMQPConnectionCallback;
|
||||||
import org.proton.plug.AMQPSessionCallback;
|
import org.proton.plug.AMQPSessionCallback;
|
||||||
|
@ -44,7 +45,7 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
ExecutorService executorService = Executors.newSingleThreadExecutor();
|
ExecutorService executorService = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
|
|
@ -40,6 +40,7 @@ import io.netty.handler.codec.ByteToMessageDecoder;
|
||||||
import io.netty.util.ResourceLeakDetector;
|
import io.netty.util.ResourceLeakDetector;
|
||||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
import io.netty.util.concurrent.GlobalEventExecutor;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.proton.plug.AMQPServerConnectionContext;
|
import org.proton.plug.AMQPServerConnectionContext;
|
||||||
import org.proton.plug.context.server.ProtonServerConnectionContextFactory;
|
import org.proton.plug.context.server.ProtonServerConnectionContextFactory;
|
||||||
import org.proton.plug.test.Constants;
|
import org.proton.plug.test.Constants;
|
||||||
|
@ -125,7 +126,7 @@ public class MinimalServer {
|
||||||
@Override
|
@Override
|
||||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||||
super.channelActive(ctx);
|
super.channelActive(ctx);
|
||||||
connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel()), Executors.newSingleThreadExecutor(), null);
|
connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel()), Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null);
|
||||||
//ctx.read();
|
//ctx.read();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.activemq.artemis.rest.util.LinkHeaderLinkStrategy;
|
||||||
import org.apache.activemq.artemis.rest.util.LinkStrategy;
|
import org.apache.activemq.artemis.rest.util.LinkStrategy;
|
||||||
import org.apache.activemq.artemis.rest.util.TimeoutTask;
|
import org.apache.activemq.artemis.rest.util.TimeoutTask;
|
||||||
import org.apache.activemq.artemis.spi.core.naming.BindingRegistry;
|
import org.apache.activemq.artemis.spi.core.naming.BindingRegistry;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.XMLUtil;
|
import org.apache.activemq.artemis.utils.XMLUtil;
|
||||||
|
|
||||||
public class MessageServiceManager {
|
public class MessageServiceManager {
|
||||||
|
@ -126,7 +127,7 @@ public class MessageServiceManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (threadPool == null)
|
if (threadPool == null)
|
||||||
threadPool = Executors.newCachedThreadPool();
|
threadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
timeoutTaskInterval = configuration.getTimeoutTaskInterval();
|
timeoutTaskInterval = configuration.getTimeoutTaskInterval();
|
||||||
timeoutTask = new TimeoutTask(timeoutTaskInterval);
|
timeoutTask = new TimeoutTask(timeoutTaskInterval);
|
||||||
threadPool.execute(timeoutTask);
|
threadPool.execute(timeoutTask);
|
||||||
|
|
|
@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -41,6 +40,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
|
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
import org.apache.activemq.artemis.utils.ConfigurationHelper;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
@ -107,10 +107,10 @@ public class InVMConnector extends AbstractConnector {
|
||||||
private static synchronized ExecutorService getInVMExecutor() {
|
private static synchronized ExecutorService getInVMExecutor() {
|
||||||
if (threadPoolExecutor == null) {
|
if (threadPoolExecutor == null) {
|
||||||
if (ActiveMQClient.getGlobalThreadPoolSize() <= -1) {
|
if (ActiveMQClient.getGlobalThreadPoolSize() <= -1) {
|
||||||
threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory());
|
threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
threadPoolExecutor = new ActiveMQThreadPoolExecutor(0, ActiveMQClient.getGlobalThreadPoolSize(), 60L, TimeUnit.SECONDS, Executors.defaultThreadFactory());
|
threadPoolExecutor = new ActiveMQThreadPoolExecutor(0, ActiveMQClient.getGlobalThreadPoolSize(), 60L, TimeUnit.SECONDS, ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return threadPoolExecutor;
|
return threadPoolExecutor;
|
||||||
|
|
|
@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.activemq.artemis.spi.core.security.jaas.JaasCallbackHandler;
|
import org.apache.activemq.artemis.spi.core.security.jaas.JaasCallbackHandler;
|
||||||
import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoader;
|
import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoader;
|
||||||
import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoginModule;
|
import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoginModule;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
|
@ -119,7 +120,7 @@ public class PropertiesLoginModuleRaceConditionTest {
|
||||||
options.put("baseDir", temp.getRoot().getAbsolutePath());
|
options.put("baseDir", temp.getRoot().getAbsolutePath());
|
||||||
|
|
||||||
errors = new ArrayBlockingQueue<>(processorCount());
|
errors = new ArrayBlockingQueue<>(processorCount());
|
||||||
pool = Executors.newFixedThreadPool(processorCount());
|
pool = Executors.newFixedThreadPool(processorCount(), ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
callback = new JaasCallbackHandler(USERNAME, PASSWORD, null);
|
callback = new JaasCallbackHandler(USERNAME, PASSWORD, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.LinkedListIterator;
|
import org.apache.activemq.artemis.utils.LinkedListIterator;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
import org.apache.activemq.artemis.utils.ReferenceCounter;
|
import org.apache.activemq.artemis.utils.ReferenceCounter;
|
||||||
|
@ -170,8 +171,8 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
@Test
|
@Test
|
||||||
public void testScheduleNow() throws Exception {
|
public void testScheduleNow() throws Exception {
|
||||||
|
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(50);
|
ExecutorService executor = Executors.newFixedThreadPool(50, ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1);
|
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
try {
|
try {
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
// it's better to run the test a few times instead of run millions of messages here
|
// it's better to run the test a few times instead of run millions of messages here
|
||||||
|
|
|
@ -128,6 +128,7 @@ import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
|
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
|
||||||
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
|
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
|
@ -472,7 +473,7 @@ public abstract class ActiveMQTestBase extends Assert {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final OrderedExecutorFactory getOrderedExecutor() {
|
protected final OrderedExecutorFactory getOrderedExecutor() {
|
||||||
final ExecutorService executor = Executors.newCachedThreadPool();
|
final ExecutorService executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
executorSet.add(executor);
|
executorSet.add(executor);
|
||||||
return new OrderedExecutorFactory(executor);
|
return new OrderedExecutorFactory(executor);
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.jboss.byteman.contrib.bmunit.BMRule;
|
import org.jboss.byteman.contrib.bmunit.BMRule;
|
||||||
import org.jboss.byteman.contrib.bmunit.BMRules;
|
import org.jboss.byteman.contrib.bmunit.BMRules;
|
||||||
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
|
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
|
||||||
|
@ -49,7 +50,7 @@ public class HierarchicalObjectRepositoryTest {
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
latch = new CountDownLatch(1);
|
latch = new CountDownLatch(1);
|
||||||
latch2 = new CountDownLatch(1);
|
latch2 = new CountDownLatch(1);
|
||||||
executor = Executors.newSingleThreadExecutor();
|
executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
repo = new HierarchicalObjectRepository<>();
|
repo = new HierarchicalObjectRepository<>();
|
||||||
addToRepo(repo, A);
|
addToRepo(repo, A);
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
|
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
|
||||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -55,7 +56,7 @@ public class CoreClientTest extends ActiveMQTestBase {
|
||||||
@Test
|
@Test
|
||||||
public void testCoreClientWithInjectedThreadPools() throws Exception {
|
public void testCoreClientWithInjectedThreadPools() throws Exception {
|
||||||
|
|
||||||
ExecutorService threadPool = Executors.newCachedThreadPool();
|
ExecutorService threadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(10);
|
ScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(10);
|
||||||
|
|
||||||
ServerLocator locator = createNonHALocator(false);
|
ServerLocator locator = createNonHALocator(false);
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.management.NotificationListener;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -550,7 +551,7 @@ public class ClusteredGroupingTest extends ClusterTestBase {
|
||||||
session.close();
|
session.close();
|
||||||
|
|
||||||
// need thread pool to service both consumers and producers plus a thread to cycle nodes
|
// need thread pool to service both consumers and producers plus a thread to cycle nodes
|
||||||
ExecutorService executorService = Executors.newFixedThreadPool(groups.size() * 2 + 1);
|
ExecutorService executorService = Executors.newFixedThreadPool(groups.size() * 2 + 1, ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
final AtomicInteger producerCounter = new AtomicInteger(0);
|
final AtomicInteger producerCounter = new AtomicInteger(0);
|
||||||
final CountDownLatch okToConsume = new CountDownLatch(groups.size() + 1);
|
final CountDownLatch okToConsume = new CountDownLatch(groups.size() + 1);
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
|
||||||
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestBase;
|
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestBase;
|
||||||
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding;
|
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.IDGenerator;
|
import org.apache.activemq.artemis.utils.IDGenerator;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
|
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
|
||||||
|
@ -1627,11 +1628,11 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
|
||||||
|
|
||||||
final AtomicLong seqGenerator = new AtomicLong(1);
|
final AtomicLong seqGenerator = new AtomicLong(1);
|
||||||
|
|
||||||
final ExecutorService executor = Executors.newCachedThreadPool();
|
final ExecutorService executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
OrderedExecutorFactory factory = new OrderedExecutorFactory(executor);
|
OrderedExecutorFactory factory = new OrderedExecutorFactory(executor);
|
||||||
|
|
||||||
final ExecutorService deleteExecutor = Executors.newCachedThreadPool();
|
final ExecutorService deleteExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
final JournalStorageManager storage = new JournalStorageManager(config, factory, null);
|
final JournalStorageManager storage = new JournalStorageManager(config, factory, null);
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ import javax.jms.MessageListener;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQMessageConsumer;
|
import org.apache.activemq.ActiveMQMessageConsumer;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
|
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -93,7 +94,7 @@ public class JMSConsumer2Test extends BasicOpenWireTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final ExecutorService executor = Executors.newCachedThreadPool();
|
final ExecutorService executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
consumer.setMessageListener(new MessageListener() {
|
consumer.setMessageListener(new MessageListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(Message m) {
|
public void onMessage(Message m) {
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfi
|
||||||
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
|
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
|
||||||
import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl;
|
import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl;
|
||||||
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
|
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -73,7 +74,7 @@ public class MultipleProducersPagingTest extends ActiveMQTestBase {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
executor = Executors.newCachedThreadPool();
|
executor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
AddressSettings addressSettings = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setPageSizeBytes(50000).setMaxSizeBytes(404850);
|
AddressSettings addressSettings = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setPageSizeBytes(50000).setMaxSizeBytes(404850);
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -75,7 +76,7 @@ public class AddressFullLoggingTest extends ActiveMQTestBase {
|
||||||
final ClientMessage message = session.createMessage(false);
|
final ClientMessage message = session.createMessage(false);
|
||||||
message.getBodyBuffer().writeBytes(new byte[1024]);
|
message.getBodyBuffer().writeBytes(new byte[1024]);
|
||||||
|
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
ExecutorService executor = Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
Callable<Object> sendMessageTask = new Callable<Object>() {
|
Callable<Object> sendMessageTask = new Callable<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public Object call() throws ActiveMQException {
|
public Object call() throws ActiveMQException {
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfigu
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -70,7 +71,7 @@ public class FileLockTimeoutTest extends ActiveMQTestBase {
|
||||||
server2.getConfiguration().setJournalLockAcquisitionTimeout(5000);
|
server2.getConfiguration().setJournalLockAcquisitionTimeout(5000);
|
||||||
|
|
||||||
// if something happens that causes the timeout to misbehave we don't want the test to hang
|
// if something happens that causes the timeout to misbehave we don't want the test to hang
|
||||||
ExecutorService service = Executors.newSingleThreadExecutor();
|
ExecutorService service = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
Runnable r = new Runnable() {
|
Runnable r = new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
|
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -42,8 +43,8 @@ public class SuppliedThreadPoolTest extends ActiveMQTestBase {
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
serviceRegistry = new ServiceRegistryImpl();
|
serviceRegistry = new ServiceRegistryImpl();
|
||||||
serviceRegistry.setExecutorService(Executors.newFixedThreadPool(1));
|
serviceRegistry.setExecutorService(Executors.newFixedThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()));
|
||||||
serviceRegistry.setScheduledExecutorService(Executors.newScheduledThreadPool(1));
|
serviceRegistry.setScheduledExecutorService(Executors.newScheduledThreadPool(1, ActiveMQThreadFactory.defaultThreadFactory()));
|
||||||
server = new ActiveMQServerImpl(null, null, null, null, serviceRegistry);
|
server = new ActiveMQServerImpl(null, null, null, null, serviceRegistry);
|
||||||
server.start();
|
server.start();
|
||||||
server.waitForActivation(100, TimeUnit.MILLISECONDS);
|
server.waitForActivation(100, TimeUnit.MILLISECONDS);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.artemis.tests.timing.core.server.impl;
|
package org.apache.activemq.artemis.tests.timing.core.server.impl;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
|
||||||
|
@ -69,7 +70,7 @@ public class QueueImplTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testScheduledNoConsumer() throws Exception {
|
public void testScheduledNoConsumer() throws Exception {
|
||||||
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor());
|
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
|
||||||
|
|
||||||
// Send one scheduled
|
// Send one scheduled
|
||||||
|
|
||||||
|
@ -134,7 +135,7 @@ public class QueueImplTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testScheduled() throws Exception {
|
public void testScheduled() throws Exception {
|
||||||
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor());
|
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
|
||||||
|
|
||||||
FakeConsumer consumer = null;
|
FakeConsumer consumer = null;
|
||||||
|
|
||||||
|
@ -232,7 +233,7 @@ public class QueueImplTest extends ActiveMQTestBase {
|
||||||
public void disconnect() {
|
public void disconnect() {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor());
|
QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
|
||||||
MessageReference messageReference = generateReference(queue, 1);
|
MessageReference messageReference = generateReference(queue, 1);
|
||||||
queue.addConsumer(consumer);
|
queue.addConsumer(consumer);
|
||||||
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
|
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
|
||||||
|
|
|
@ -71,7 +71,7 @@ public class MultiThreadAsynchronousFileTest extends AIOTestBase {
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
pollerExecutor = Executors.newCachedThreadPool(new ActiveMQThreadFactory("ActiveMQ-AIO-poller-pool" + System.identityHashCode(this), false, this.getClass().getClassLoader()));
|
pollerExecutor = Executors.newCachedThreadPool(new ActiveMQThreadFactory("ActiveMQ-AIO-poller-pool" + System.identityHashCode(this), false, this.getClass().getClassLoader()));
|
||||||
executor = Executors.newSingleThreadExecutor();
|
executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
|
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.tests.unit.util.FakePagingManager;
|
import org.apache.activemq.artemis.tests.unit.util.FakePagingManager;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -770,7 +771,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
executor = Executors.newSingleThreadExecutor();
|
executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
|
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -43,7 +44,7 @@ public class OperationContextUnitTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCompleteTaskAfterPaging() throws Exception {
|
public void testCompleteTaskAfterPaging() throws Exception {
|
||||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
try {
|
try {
|
||||||
OperationContextImpl impl = new OperationContextImpl(executor);
|
OperationContextImpl impl = new OperationContextImpl(executor);
|
||||||
final CountDownLatch latch1 = new CountDownLatch(1);
|
final CountDownLatch latch1 = new CountDownLatch(1);
|
||||||
|
@ -102,7 +103,7 @@ public class OperationContextUnitTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCaptureExceptionOnExecutor() throws Exception {
|
public void testCaptureExceptionOnExecutor() throws Exception {
|
||||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
|
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
@ -148,7 +149,7 @@ public class OperationContextUnitTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCaptureExceptionOnFailure() throws Exception {
|
public void testCaptureExceptionOnFailure() throws Exception {
|
||||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||||
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
|
import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl;
|
||||||
import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader;
|
import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader;
|
||||||
import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl;
|
import org.apache.activemq.artemis.core.transaction.impl.ResourceManagerImpl;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||||
|
@ -69,7 +70,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
executor = Executors.newSingleThreadExecutor();
|
executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
factory = new OrderedExecutorFactory(executor);
|
factory = new OrderedExecutorFactory(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,7 +90,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
PostOffice postOffice = new FakePostOffice();
|
PostOffice postOffice = new FakePostOffice();
|
||||||
|
|
||||||
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize());
|
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
journal = new JournalStorageManager(configuration, factory, null);
|
journal = new JournalStorageManager(configuration, factory, null);
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
|
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -71,7 +72,7 @@ public class NettyAcceptorFactoryTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Acceptor acceptor = factory.createAcceptor("netty", null, params, handler, listener, Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize()), null);
|
Acceptor acceptor = factory.createAcceptor("netty", null, params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()), null);
|
||||||
|
|
||||||
Assert.assertTrue(acceptor instanceof NettyAcceptor);
|
Assert.assertTrue(acceptor instanceof NettyAcceptor);
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -93,7 +94,7 @@ public class NettyAcceptorTest extends ActiveMQTestBase {
|
||||||
public void connectionReadyForWrites(Object connectionID, boolean ready) {
|
public void connectionReadyForWrites(Object connectionID, boolean ready) {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
pool2 = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize());
|
pool2 = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, null);
|
NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, null);
|
||||||
|
|
||||||
addActiveMQComponent(acceptor);
|
addActiveMQComponent(acceptor);
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -64,7 +65,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
|
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
|
||||||
|
|
||||||
connector.start();
|
connector.start();
|
||||||
Assert.assertTrue(connector.isStarted());
|
Assert.assertTrue(connector.isStarted());
|
||||||
|
@ -101,7 +102,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
};
|
};
|
||||||
|
|
||||||
try {
|
try {
|
||||||
new NettyConnector(params, null, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
|
new NettyConnector(params, null, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
|
||||||
|
|
||||||
Assert.fail("Should throw Exception");
|
Assert.fail("Should throw Exception");
|
||||||
}
|
}
|
||||||
|
@ -110,7 +111,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
new NettyConnector(params, handler, null, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
|
new NettyConnector(params, handler, null, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
|
||||||
|
|
||||||
Assert.fail("Should throw Exception");
|
Assert.fail("Should throw Exception");
|
||||||
}
|
}
|
||||||
|
@ -152,7 +153,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
|
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
|
||||||
|
|
||||||
System.setProperty(NettyConnector.JAVAX_KEYSTORE_PATH_PROP_NAME, "client-side-keystore.jks");
|
System.setProperty(NettyConnector.JAVAX_KEYSTORE_PATH_PROP_NAME, "client-side-keystore.jks");
|
||||||
System.setProperty(NettyConnector.JAVAX_KEYSTORE_PASSWORD_PROP_NAME, "secureexample");
|
System.setProperty(NettyConnector.JAVAX_KEYSTORE_PASSWORD_PROP_NAME, "secureexample");
|
||||||
|
@ -198,7 +199,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
|
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
|
||||||
|
|
||||||
System.setProperty(NettyConnector.JAVAX_KEYSTORE_PATH_PROP_NAME, "bad path");
|
System.setProperty(NettyConnector.JAVAX_KEYSTORE_PATH_PROP_NAME, "bad path");
|
||||||
System.setProperty(NettyConnector.JAVAX_KEYSTORE_PASSWORD_PROP_NAME, "bad password");
|
System.setProperty(NettyConnector.JAVAX_KEYSTORE_PASSWORD_PROP_NAME, "bad password");
|
||||||
|
@ -246,7 +247,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
|
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
|
||||||
|
|
||||||
connector.start();
|
connector.start();
|
||||||
Assert.assertTrue(connector.isStarted());
|
Assert.assertTrue(connector.isStarted());
|
||||||
|
@ -285,7 +286,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
|
NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()));
|
||||||
|
|
||||||
connector.start();
|
connector.start();
|
||||||
Assert.assertTrue(connector.isStarted());
|
Assert.assertTrue(connector.isStarted());
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||||
import org.apache.activemq.artemis.utils.LinkedListIterator;
|
import org.apache.activemq.artemis.utils.LinkedListIterator;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -65,8 +66,8 @@ public class QueueImplTest extends ActiveMQTestBase {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
|
scheduledExecutor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
executor = Executors.newSingleThreadExecutor();
|
executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -27,12 +27,13 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.QueueFactory;
|
import org.apache.activemq.artemis.core.server.QueueFactory;
|
||||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
|
|
||||||
public class FakeQueueFactory implements QueueFactory {
|
public class FakeQueueFactory implements QueueFactory {
|
||||||
|
|
||||||
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
|
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
private final ExecutorService executor = Executors.newSingleThreadExecutor();
|
private final ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
|
|
||||||
private PostOffice postOffice;
|
private PostOffice postOffice;
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.tests.unit.UnitTestLogger;
|
import org.apache.activemq.artemis.tests.unit.UnitTestLogger;
|
||||||
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
@ -160,7 +161,7 @@ public final class SpawnedVMSupport {
|
||||||
public static void assertProcessExits(final boolean sameValue,
|
public static void assertProcessExits(final boolean sameValue,
|
||||||
final int value,
|
final int value,
|
||||||
final Process p) throws InterruptedException, ExecutionException, TimeoutException {
|
final Process p) throws InterruptedException, ExecutionException, TimeoutException {
|
||||||
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
|
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory());
|
||||||
Future<Integer> future = executor.submit(new Callable<Integer>() {
|
Future<Integer> future = executor.submit(new Callable<Integer>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue