diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 106d8b72f3..12f5904417 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -114,7 +114,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon protected boolean dispatchAsync=true; protected boolean alwaysSessionAsync = true; - private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000); + private TaskRunnerFactory sessionTaskRunner; private final ThreadPoolExecutor asyncConnectionThread; // Connection state variables @@ -186,6 +186,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private ConnectionAudit connectionAudit = new ConnectionAudit(); private DestinationSource destinationSource; private final Object ensureConnectionInfoSentMutex = new Object(); + private boolean useDedicatedTaskRunner; /** * Construct an ActiveMQConnection @@ -644,7 +645,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon // factory // then we may need to call // factory.onConnectionClose(this); - sessionTaskRunner.shutdown(); + if (sessionTaskRunner != null) { + sessionTaskRunner.shutdown(); + } closed.set(true); closing.set(false); } @@ -927,7 +930,20 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon transportListeners.remove(transportListener); } + public boolean isUseDedicatedTaskRunner() { + return useDedicatedTaskRunner; + } + + public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { + this.useDedicatedTaskRunner = useDedicatedTaskRunner; + } + public TaskRunnerFactory getSessionTaskRunner() { + synchronized (this) { + if (sessionTaskRunner == null) { + sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner()); + } + } return sessionTaskRunner; } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 780dd8c8a5..69265898d4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -107,12 +107,13 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private boolean watchTopicAdvisories = true; private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE; private long warnAboutUnstartedConnectionTimeout = 500L; - private int sendTimeout =0; + private int sendTimeout = 0; private boolean sendAcksAsync=true; private TransportListener transportListener; - private ExceptionListener exceptionListener; - private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; - private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; + private ExceptionListener exceptionListener; + private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; + private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; + private boolean useDedicatedTaskRunner; // ///////////////////////////////////////////// // @@ -313,6 +314,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne connection.setSendAcksAsync(isSendAcksAsync()); connection.setAuditDepth(getAuditDepth()); connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber()); + connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); if (transportListener != null) { connection.addTransportListener(transportListener); } @@ -903,4 +905,12 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { this.auditMaximumProducerNumber = auditMaximumProducerNumber; } + + public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { + this.useDedicatedTaskRunner = useDedicatedTaskRunner; + } + + public boolean isUseDedicatedTaskRunner() { + return useDedicatedTaskRunner; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java index 319a95af95..bfe0c635cc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java @@ -91,6 +91,10 @@ public class ActiveMQSessionExecutor implements Task { if (taskRunner == null) { synchronized (this) { if (this.taskRunner == null) { + if (!isRunning()) { + // stop has been called + return; + } this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ Session: " + session.getSessionId()); } @@ -142,11 +146,12 @@ public class ActiveMQSessionExecutor implements Task { void stop() throws JMSException { try { if (messageQueue.isRunning()) { - messageQueue.stop(); - TaskRunner taskRunner = this.taskRunner; - if (taskRunner != null) { - this.taskRunner = null; - taskRunner.shutdown(); + synchronized(this) { + messageQueue.stop(); + if (this.taskRunner != null) { + this.taskRunner.shutdown(); + this.taskRunner = null; + } } } } catch (InterruptedException e) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 6248728ee6..c0734a69c3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -933,7 +933,7 @@ public class BrokerService implements Service { public TaskRunnerFactory getPersistenceTaskRunnerFactory() { if (taskRunnerFactory == null) { persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, - true, 1000); + true, 1000, isDedicatedTaskRunner()); } return persistenceTaskRunnerFactory; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 186e1fdb90..b367fa5571 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -899,21 +899,26 @@ public class TransportConnection implements Connection, Task, CommandVisitor { for (TransportConnectionState cs : connectionStates) { cs.getContext().getStopping().set(true); } - new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress()) { - @Override - public void run() { - serviceLock.writeLock().lock(); - try { - doStop(); - } catch (Throwable e) { - LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress() - + "': ", e); - } finally { - stopped.countDown(); - serviceLock.writeLock().unlock(); + try { + new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress()) { + @Override + public void run() { + serviceLock.writeLock().lock(); + try { + doStop(); + } catch (Throwable e) { + LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress() + + "': ", e); + } finally { + stopped.countDown(); + serviceLock.writeLock().unlock(); + } } - } - }.start(); + }.start(); + } catch (Throwable t) { + LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t); + stopped.countDown(); + } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index afaa48bc66..13a7969316 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -31,9 +31,8 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -65,7 +64,6 @@ import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; -import org.apache.activemq.thread.DeterministicTaskRunner; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; @@ -86,7 +84,7 @@ import org.apache.commons.logging.LogFactory; */ public class Queue extends BaseDestination implements Task, UsageListener { protected static final Log LOG = LogFactory.getLog(Queue.class); - protected TaskRunnerFactory taskFactory; + protected final TaskRunnerFactory taskFactory; protected TaskRunner taskRunner; protected final List consumers = new ArrayList(50); protected PendingMessageCursor messages; @@ -108,9 +106,11 @@ public class Queue extends BaseDestination implements Task, UsageListener { private int timeBeforeDispatchStarts = 0; private int consumersBeforeDispatchStarts = 0; private CountDownLatch consumersBeforeStartsLatch; + private AtomicLong pendingWakeups = new AtomicLong(); + private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { public void run() { - wakeup(); + asyncWakeup(); } }; private final Runnable expireMessagesTask = new Runnable() { @@ -164,26 +164,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { // since it turns into a shared blocking queue which can lead to a network deadlock. // If we are cursoring to disk..it's not and issue because it does not block due // to large disk sizes. - if( messages instanceof VMPendingMessageCursor ) { + if (messages instanceof VMPendingMessageCursor) { this.systemUsage = brokerService.getSystemUsage(); memoryUsage.setParent(systemUsage.getMemoryUsage()); } - if (isOptimizedDispatch()) { - this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName()); - }else { - final Queue queue = this; - this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() { - public Thread newThread(Runnable runnable) { - Thread thread = new QueueThread(runnable, "QueueThread:"+destination, queue); - thread.setDaemon(true); - thread.setPriority(Thread.NORM_PRIORITY); - return thread; - } - }); - - this.taskRunner = new DeterministicTaskRunner(this.executor,this); - } + this.taskRunner = + taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName()); super.initialize(); if (store != null) { @@ -591,6 +578,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } }; doBrowse(browsedMessages, this.getMaxExpirePageSize()); + asyncWakeup(); } public void gc(){ @@ -1190,8 +1178,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { } catch (Throwable e) { LOG.error("Failed to page in more queue messages ", e); } - } - return !messagesWaitingForSpace.isEmpty(); + } + return pendingWakeups.decrementAndGet() > 0; } } @@ -1297,7 +1285,6 @@ public class Queue extends BaseDestination implements Task, UsageListener { } catch (IOException e) { LOG.error("Failed to remove expired Message from the store ",e); } - asyncWakeup(); } protected ConnectionContext createConnectionContext() { @@ -1336,14 +1323,16 @@ public class Queue extends BaseDestination implements Task, UsageListener { public void wakeup() { if (optimizedDispatch || isSlave()) { iterate(); + pendingWakeups.incrementAndGet(); } else { asyncWakeup(); } } - - public void asyncWakeup() { + + private void asyncWakeup() { try { - this.taskRunner.wakeup(); + pendingWakeups.incrementAndGet(); + this.taskRunner.wakeup(); } catch (InterruptedException e) { LOG.warn("Async task tunner failed to wakeup ", e); } @@ -1432,7 +1421,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { pagedInPendingDispatch.add(qmr); } } - doWakeUp = true; + doWakeUp = true; } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java index 735f572c85..98235390ba 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java @@ -55,6 +55,7 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory { private boolean recoverReferenceStore=true; private boolean forceRecoverReferenceStore=false; private long checkpointInterval = 1000 * 20; + private boolean useDedicatedTaskRunner; /** @@ -109,13 +110,21 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory { this.dataDirectory = dataDirectory; } + public boolean isUseDedicatedTaskRunner() { + return useDedicatedTaskRunner; + } + + public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { + this.useDedicatedTaskRunner = useDedicatedTaskRunner; + } + /** * @return the taskRunnerFactory */ public TaskRunnerFactory getTaskRunnerFactory() { if (taskRunnerFactory == null) { taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority, - true, 1000); + true, 1000, isUseDedicatedTaskRunner()); } return taskRunnerFactory; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java index 309eb4b223..f532353afb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java @@ -53,6 +53,7 @@ public class JournalPersistenceAdapterFactory extends DataSourceSupport implemen private boolean failIfJournalIsLocked; private int journalThreadPriority = Thread.MAX_PRIORITY; private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); + private boolean useDedicatedTaskRunner; public PersistenceAdapter createPersistenceAdapter() throws IOException { jdbcPersistenceAdapter.setDataSource(getDataSource()); @@ -110,10 +111,18 @@ public class JournalPersistenceAdapterFactory extends DataSourceSupport implemen this.useJournal = useJournal; } + public boolean isUseDedicatedTaskRunner() { + return useDedicatedTaskRunner; + } + + public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { + this.useDedicatedTaskRunner = useDedicatedTaskRunner; + } + public TaskRunnerFactory getTaskRunnerFactory() { if (taskRunnerFactory == null) { taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, - true, 1000); + true, 1000, isUseDedicatedTaskRunner()); } return taskRunnerFactory; } diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java index 92db903867..5b5fd28b8c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java @@ -43,7 +43,7 @@ public class TaskRunnerFactory { this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000); } - public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) { + private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) { this(name,priority,daemon,maxIterationsPerRun,false); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java index 54af39b280..8caab5e6a6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java @@ -333,7 +333,9 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { if (mcast != null) { mcast.close(); } - runner.interrupt(); + if (runner != null) { + runner.interrupt(); + } getExecutor().shutdownNow(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index 37ea391b49..20242001f8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -43,7 +43,8 @@ public class VMTransport implements Transport, Task { private static final Object DISCONNECT = new Object(); private static final AtomicLong NEXT_ID = new AtomicLong(0); - private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000); + // still possible to configure dedicated task runner through system property but not programmatically + private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000, false); protected VMTransport peer; protected TransportListener transportListener; protected boolean disposed; diff --git a/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java b/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java index 9b03dba84f..c5907e9d9a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java @@ -55,7 +55,7 @@ public class ThreadTracker { @SuppressWarnings("serial") class Trace extends Throwable { - public int count; + public int count = 1; public final int size; Trace() { super(); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java new file mode 100755 index 0000000000..c1864207c4 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import junit.framework.Test; + +public class DedicatedTaskRunnerBrokerTest extends BrokerTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + broker.setDedicatedTaskRunner(true); + return broker; + } + + public static Test suite() { + return suite(DedicatedTaskRunnerBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java index d33972efac..ba868a12fd 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -47,7 +47,7 @@ import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.amq.AMQPersistenceAdapter; +import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -105,7 +105,7 @@ public class QueueDuplicatesFromStoreTest extends TestCase { final DestinationStatistics destinationStatistics = new DestinationStatistics(); consumerInfo.setExclusive(true); final Queue queue = new Queue(brokerService, destination, - queueMessageStore, destinationStatistics, null); + queueMessageStore, destinationStatistics, brokerService.getTaskRunnerFactory()); // a workaround for this issue // queue.setUseCache(false); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java index c12354b1fe..0de1b4f827 100755 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java @@ -42,13 +42,15 @@ import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; -import junit.framework.TestCase; +import junit.framework.Test; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; +import org.apache.activemq.util.ThreadTracker; import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,7 +59,7 @@ import org.apache.commons.logging.LogFactory; * @version $Revision: 1.5 $ * A Test case for AMQ-1479 */ -public class DurableConsumerTest extends TestCase { +public class DurableConsumerTest extends CombinationTestSupport { private static final Log LOG = LogFactory.getLog(DurableConsumerTest.class); private static int COUNT = 1024*10; private static String CONSUMER_NAME = "DURABLE_TEST"; @@ -71,8 +73,8 @@ public class DurableConsumerTest extends TestCase { private static final String TOPIC_NAME = "failoverTopic"; private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)"; - - + public boolean useDedicatedTaskRunner = false; + private class SimpleTopicSubscriber implements MessageListener, ExceptionListener { private TopicConnection topicConnection = null; @@ -176,8 +178,7 @@ public class DurableConsumerTest extends TestCase { final int id = i; Thread thread = new Thread( new Runnable() { public void run() { - - SimpleTopicSubscriber sub = new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis()+"-"+id, TOPIC_NAME); + new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis()+"-"+id, TOPIC_NAME); } } ); thread.start(); @@ -192,7 +193,13 @@ public class DurableConsumerTest extends TestCase { Thread.sleep(10000); assertEquals(0, exceptions.size()); } - + + // makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028 + // with use dedicatedTaskRunner=true and produce OOM + public void initCombosForTestConcurrentDurableConsumer() { + addCombinationValues("useDedicatedTaskRunner", new Object[] {Boolean.TRUE, Boolean.FALSE}); + } + public void testConcurrentDurableConsumer() throws Exception { broker.start(); @@ -247,7 +254,7 @@ public class DurableConsumerTest extends TestCase { } }; - ExecutorService executor = Executors.newCachedThreadPool(); + ExecutorService executor = Executors.newFixedThreadPool(numConsumers); for (int i=0; i= initialReconnectDelay); + assertTrue("took at least initialReconnectDelay time: " + duration + " e:" + expected, duration >= initialReconnectDelay); } } }