resolve https://issues.apache.org/activemq/browse/AMQ-2483 and https://issues.apache.org/activemq/browse/AMQ-2028, keep track of outstanding wakeup requests in a queue with regular task runner avoids build up in determintic task runner. Exposed useDeterministicTaskRunner to validate some test that fail with the -DuseDedicatedTaskRunner=true system property. With broker.useDedicatedTask=false, Queues will use pooled executor for dispatch.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@835874 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-11-13 15:22:07 +00:00
parent 6a4e25c737
commit c808bebd00
16 changed files with 175 additions and 78 deletions

View File

@ -114,7 +114,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected boolean dispatchAsync=true; protected boolean dispatchAsync=true;
protected boolean alwaysSessionAsync = true; protected boolean alwaysSessionAsync = true;
private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000); private TaskRunnerFactory sessionTaskRunner;
private final ThreadPoolExecutor asyncConnectionThread; private final ThreadPoolExecutor asyncConnectionThread;
// Connection state variables // Connection state variables
@ -186,6 +186,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private ConnectionAudit connectionAudit = new ConnectionAudit(); private ConnectionAudit connectionAudit = new ConnectionAudit();
private DestinationSource destinationSource; private DestinationSource destinationSource;
private final Object ensureConnectionInfoSentMutex = new Object(); private final Object ensureConnectionInfoSentMutex = new Object();
private boolean useDedicatedTaskRunner;
/** /**
* Construct an <code>ActiveMQConnection</code> * Construct an <code>ActiveMQConnection</code>
@ -644,7 +645,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// factory // factory
// then we may need to call // then we may need to call
// factory.onConnectionClose(this); // factory.onConnectionClose(this);
if (sessionTaskRunner != null) {
sessionTaskRunner.shutdown(); sessionTaskRunner.shutdown();
}
closed.set(true); closed.set(true);
closing.set(false); closing.set(false);
} }
@ -927,7 +930,20 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
transportListeners.remove(transportListener); transportListeners.remove(transportListener);
} }
public boolean isUseDedicatedTaskRunner() {
return useDedicatedTaskRunner;
}
public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
this.useDedicatedTaskRunner = useDedicatedTaskRunner;
}
public TaskRunnerFactory getSessionTaskRunner() { public TaskRunnerFactory getSessionTaskRunner() {
synchronized (this) {
if (sessionTaskRunner == null) {
sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
}
}
return sessionTaskRunner; return sessionTaskRunner;
} }

View File

@ -113,6 +113,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private ExceptionListener exceptionListener; private ExceptionListener exceptionListener;
private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
private boolean useDedicatedTaskRunner;
// ///////////////////////////////////////////// // /////////////////////////////////////////////
// //
@ -313,6 +314,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setSendAcksAsync(isSendAcksAsync()); connection.setSendAcksAsync(isSendAcksAsync());
connection.setAuditDepth(getAuditDepth()); connection.setAuditDepth(getAuditDepth());
connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber()); connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
if (transportListener != null) { if (transportListener != null) {
connection.addTransportListener(transportListener); connection.addTransportListener(transportListener);
} }
@ -903,4 +905,12 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
this.auditMaximumProducerNumber = auditMaximumProducerNumber; this.auditMaximumProducerNumber = auditMaximumProducerNumber;
} }
public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
this.useDedicatedTaskRunner = useDedicatedTaskRunner;
}
public boolean isUseDedicatedTaskRunner() {
return useDedicatedTaskRunner;
}
} }

View File

@ -91,6 +91,10 @@ public class ActiveMQSessionExecutor implements Task {
if (taskRunner == null) { if (taskRunner == null) {
synchronized (this) { synchronized (this) {
if (this.taskRunner == null) { if (this.taskRunner == null) {
if (!isRunning()) {
// stop has been called
return;
}
this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
"ActiveMQ Session: " + session.getSessionId()); "ActiveMQ Session: " + session.getSessionId());
} }
@ -142,11 +146,12 @@ public class ActiveMQSessionExecutor implements Task {
void stop() throws JMSException { void stop() throws JMSException {
try { try {
if (messageQueue.isRunning()) { if (messageQueue.isRunning()) {
synchronized(this) {
messageQueue.stop(); messageQueue.stop();
TaskRunner taskRunner = this.taskRunner; if (this.taskRunner != null) {
if (taskRunner != null) { this.taskRunner.shutdown();
this.taskRunner = null; this.taskRunner = null;
taskRunner.shutdown(); }
} }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -933,7 +933,7 @@ public class BrokerService implements Service {
public TaskRunnerFactory getPersistenceTaskRunnerFactory() { public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
if (taskRunnerFactory == null) { if (taskRunnerFactory == null) {
persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority, persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
true, 1000); true, 1000, isDedicatedTaskRunner());
} }
return persistenceTaskRunnerFactory; return persistenceTaskRunnerFactory;
} }

View File

@ -899,6 +899,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
for (TransportConnectionState cs : connectionStates) { for (TransportConnectionState cs : connectionStates) {
cs.getContext().getStopping().set(true); cs.getContext().getStopping().set(true);
} }
try {
new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress()) { new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress()) {
@Override @Override
public void run() { public void run() {
@ -914,6 +915,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
} }
} }
}.start(); }.start();
} catch (Throwable t) {
LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
stopped.countDown();
}
} }
} }

View File

@ -31,9 +31,8 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -65,7 +64,6 @@ import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.DeterministicTaskRunner;
import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task; import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
@ -86,7 +84,7 @@ import org.apache.commons.logging.LogFactory;
*/ */
public class Queue extends BaseDestination implements Task, UsageListener { public class Queue extends BaseDestination implements Task, UsageListener {
protected static final Log LOG = LogFactory.getLog(Queue.class); protected static final Log LOG = LogFactory.getLog(Queue.class);
protected TaskRunnerFactory taskFactory; protected final TaskRunnerFactory taskFactory;
protected TaskRunner taskRunner; protected TaskRunner taskRunner;
protected final List<Subscription> consumers = new ArrayList<Subscription>(50); protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
protected PendingMessageCursor messages; protected PendingMessageCursor messages;
@ -108,9 +106,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private int timeBeforeDispatchStarts = 0; private int timeBeforeDispatchStarts = 0;
private int consumersBeforeDispatchStarts = 0; private int consumersBeforeDispatchStarts = 0;
private CountDownLatch consumersBeforeStartsLatch; private CountDownLatch consumersBeforeStartsLatch;
private AtomicLong pendingWakeups = new AtomicLong();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() { public void run() {
wakeup(); asyncWakeup();
} }
}; };
private final Runnable expireMessagesTask = new Runnable() { private final Runnable expireMessagesTask = new Runnable() {
@ -169,21 +169,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
memoryUsage.setParent(systemUsage.getMemoryUsage()); memoryUsage.setParent(systemUsage.getMemoryUsage());
} }
if (isOptimizedDispatch()) { this.taskRunner =
this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue: " + destination.getPhysicalName()); taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
}else {
final Queue queue = this;
this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new QueueThread(runnable, "QueueThread:"+destination, queue);
thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
});
this.taskRunner = new DeterministicTaskRunner(this.executor,this);
}
super.initialize(); super.initialize();
if (store != null) { if (store != null) {
@ -591,6 +578,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} }
}; };
doBrowse(browsedMessages, this.getMaxExpirePageSize()); doBrowse(browsedMessages, this.getMaxExpirePageSize());
asyncWakeup();
} }
public void gc(){ public void gc(){
@ -1191,7 +1179,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
LOG.error("Failed to page in more queue messages ", e); LOG.error("Failed to page in more queue messages ", e);
} }
} }
return !messagesWaitingForSpace.isEmpty(); return pendingWakeups.decrementAndGet() > 0;
} }
} }
@ -1297,7 +1285,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed to remove expired Message from the store ",e); LOG.error("Failed to remove expired Message from the store ",e);
} }
asyncWakeup();
} }
protected ConnectionContext createConnectionContext() { protected ConnectionContext createConnectionContext() {
@ -1336,13 +1323,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public void wakeup() { public void wakeup() {
if (optimizedDispatch || isSlave()) { if (optimizedDispatch || isSlave()) {
iterate(); iterate();
pendingWakeups.incrementAndGet();
} else { } else {
asyncWakeup(); asyncWakeup();
} }
} }
public void asyncWakeup() { private void asyncWakeup() {
try { try {
pendingWakeups.incrementAndGet();
this.taskRunner.wakeup(); this.taskRunner.wakeup();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Async task tunner failed to wakeup ", e); LOG.warn("Async task tunner failed to wakeup ", e);

View File

@ -55,6 +55,7 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
private boolean recoverReferenceStore=true; private boolean recoverReferenceStore=true;
private boolean forceRecoverReferenceStore=false; private boolean forceRecoverReferenceStore=false;
private long checkpointInterval = 1000 * 20; private long checkpointInterval = 1000 * 20;
private boolean useDedicatedTaskRunner;
/** /**
@ -109,13 +110,21 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
this.dataDirectory = dataDirectory; this.dataDirectory = dataDirectory;
} }
public boolean isUseDedicatedTaskRunner() {
return useDedicatedTaskRunner;
}
public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
this.useDedicatedTaskRunner = useDedicatedTaskRunner;
}
/** /**
* @return the taskRunnerFactory * @return the taskRunnerFactory
*/ */
public TaskRunnerFactory getTaskRunnerFactory() { public TaskRunnerFactory getTaskRunnerFactory() {
if (taskRunnerFactory == null) { if (taskRunnerFactory == null) {
taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority, taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", journalThreadPriority,
true, 1000); true, 1000, isUseDedicatedTaskRunner());
} }
return taskRunnerFactory; return taskRunnerFactory;
} }

View File

@ -53,6 +53,7 @@ public class JournalPersistenceAdapterFactory extends DataSourceSupport implemen
private boolean failIfJournalIsLocked; private boolean failIfJournalIsLocked;
private int journalThreadPriority = Thread.MAX_PRIORITY; private int journalThreadPriority = Thread.MAX_PRIORITY;
private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
private boolean useDedicatedTaskRunner;
public PersistenceAdapter createPersistenceAdapter() throws IOException { public PersistenceAdapter createPersistenceAdapter() throws IOException {
jdbcPersistenceAdapter.setDataSource(getDataSource()); jdbcPersistenceAdapter.setDataSource(getDataSource());
@ -110,10 +111,18 @@ public class JournalPersistenceAdapterFactory extends DataSourceSupport implemen
this.useJournal = useJournal; this.useJournal = useJournal;
} }
public boolean isUseDedicatedTaskRunner() {
return useDedicatedTaskRunner;
}
public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
this.useDedicatedTaskRunner = useDedicatedTaskRunner;
}
public TaskRunnerFactory getTaskRunnerFactory() { public TaskRunnerFactory getTaskRunnerFactory() {
if (taskRunnerFactory == null) { if (taskRunnerFactory == null) {
taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority,
true, 1000); true, 1000, isUseDedicatedTaskRunner());
} }
return taskRunnerFactory; return taskRunnerFactory;
} }

View File

@ -43,7 +43,7 @@ public class TaskRunnerFactory {
this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000); this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
} }
public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) { private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
this(name,priority,daemon,maxIterationsPerRun,false); this(name,priority,daemon,maxIterationsPerRun,false);
} }

View File

@ -333,7 +333,9 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
if (mcast != null) { if (mcast != null) {
mcast.close(); mcast.close();
} }
if (runner != null) {
runner.interrupt(); runner.interrupt();
}
getExecutor().shutdownNow(); getExecutor().shutdownNow();
} }
} }

View File

@ -43,7 +43,8 @@ public class VMTransport implements Transport, Task {
private static final Object DISCONNECT = new Object(); private static final Object DISCONNECT = new Object();
private static final AtomicLong NEXT_ID = new AtomicLong(0); private static final AtomicLong NEXT_ID = new AtomicLong(0);
private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000); // still possible to configure dedicated task runner through system property but not programmatically
private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000, false);
protected VMTransport peer; protected VMTransport peer;
protected TransportListener transportListener; protected TransportListener transportListener;
protected boolean disposed; protected boolean disposed;

View File

@ -55,7 +55,7 @@ public class ThreadTracker {
@SuppressWarnings("serial") @SuppressWarnings("serial")
class Trace extends Throwable { class Trace extends Throwable {
public int count; public int count = 1;
public final int size; public final int size;
Trace() { Trace() {
super(); super();

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import junit.framework.Test;
public class DedicatedTaskRunnerBrokerTest extends BrokerTest {
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
broker.setDedicatedTaskRunner(true);
return broker;
}
public static Test suite() {
return suite(DedicatedTaskRunnerBrokerTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}

View File

@ -47,7 +47,7 @@ import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.amq.AMQPersistenceAdapter; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -105,7 +105,7 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
final DestinationStatistics destinationStatistics = new DestinationStatistics(); final DestinationStatistics destinationStatistics = new DestinationStatistics();
consumerInfo.setExclusive(true); consumerInfo.setExclusive(true);
final Queue queue = new Queue(brokerService, destination, final Queue queue = new Queue(brokerService, destination,
queueMessageStore, destinationStatistics, null); queueMessageStore, destinationStatistics, brokerService.getTaskRunnerFactory());
// a workaround for this issue // a workaround for this issue
// queue.setUseCache(false); // queue.setUseCache(false);

View File

@ -42,13 +42,15 @@ import javax.jms.TopicPublisher;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import javax.jms.TopicSubscriber; import javax.jms.TopicSubscriber;
import junit.framework.TestCase; import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.apache.activemq.util.ThreadTracker;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -57,7 +59,7 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.5 $ * @version $Revision: 1.5 $
* A Test case for AMQ-1479 * A Test case for AMQ-1479
*/ */
public class DurableConsumerTest extends TestCase { public class DurableConsumerTest extends CombinationTestSupport {
private static final Log LOG = LogFactory.getLog(DurableConsumerTest.class); private static final Log LOG = LogFactory.getLog(DurableConsumerTest.class);
private static int COUNT = 1024*10; private static int COUNT = 1024*10;
private static String CONSUMER_NAME = "DURABLE_TEST"; private static String CONSUMER_NAME = "DURABLE_TEST";
@ -71,7 +73,7 @@ public class DurableConsumerTest extends TestCase {
private static final String TOPIC_NAME = "failoverTopic"; private static final String TOPIC_NAME = "failoverTopic";
private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)"; private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
public boolean useDedicatedTaskRunner = false;
private class SimpleTopicSubscriber implements MessageListener, ExceptionListener { private class SimpleTopicSubscriber implements MessageListener, ExceptionListener {
@ -176,8 +178,7 @@ public class DurableConsumerTest extends TestCase {
final int id = i; final int id = i;
Thread thread = new Thread( new Runnable() { Thread thread = new Thread( new Runnable() {
public void run() { public void run() {
new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis()+"-"+id, TOPIC_NAME);
SimpleTopicSubscriber sub = new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis()+"-"+id, TOPIC_NAME);
} }
} ); } );
thread.start(); thread.start();
@ -193,6 +194,12 @@ public class DurableConsumerTest extends TestCase {
assertEquals(0, exceptions.size()); assertEquals(0, exceptions.size());
} }
// makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028
// with use dedicatedTaskRunner=true and produce OOM
public void initCombosForTestConcurrentDurableConsumer() {
addCombinationValues("useDedicatedTaskRunner", new Object[] {Boolean.TRUE, Boolean.FALSE});
}
public void testConcurrentDurableConsumer() throws Exception { public void testConcurrentDurableConsumer() throws Exception {
broker.start(); broker.start();
@ -247,7 +254,7 @@ public class DurableConsumerTest extends TestCase {
} }
}; };
ExecutorService executor = Executors.newCachedThreadPool(); ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
for (int i=0; i<numConsumers ; i++) { for (int i=0; i<numConsumers ; i++) {
executor.execute(consumer); executor.execute(consumer);
@ -363,7 +370,6 @@ public class DurableConsumerTest extends TestCase {
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
super.tearDown(); super.tearDown();
if (broker != null) { if (broker != null) {
broker.stop(); broker.stop();
broker = null; broker = null;
@ -392,11 +398,20 @@ public class DurableConsumerTest extends TestCase {
answer.setUseShutdownHook(false); answer.setUseShutdownHook(false);
answer.setUseJmx(false); answer.setUseJmx(false);
answer.setAdvisorySupport(false); answer.setAdvisorySupport(false);
answer.setDedicatedTaskRunner(useDedicatedTaskRunner);
} }
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(bindAddress); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress);
factory.setUseDedicatedTaskRunner(useDedicatedTaskRunner);
return factory;
} }
public static Test suite() {
return suite(DurableConsumerTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
} }

View File

@ -33,7 +33,6 @@ public class DiscoveryTransportNoBrokerTest extends CombinationTestSupport {
private static final Log LOG = LogFactory.getLog(DiscoveryTransportNoBrokerTest.class); private static final Log LOG = LogFactory.getLog(DiscoveryTransportNoBrokerTest.class);
public void testNoExtraThreads() throws Exception { public void testNoExtraThreads() throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
TransportConnector tcp = broker.addConnector("tcp://localhost:0?transport.closeAsync=false"); TransportConnector tcp = broker.addConnector("tcp://localhost:0?transport.closeAsync=false");
@ -110,7 +109,7 @@ public class DiscoveryTransportNoBrokerTest extends CombinationTestSupport {
} catch ( JMSException expected ) { } catch ( JMSException expected ) {
assertTrue("reason is java.io.IOException, was: " + expected.getCause(), expected.getCause() instanceof java.io.IOException); assertTrue("reason is java.io.IOException, was: " + expected.getCause(), expected.getCause() instanceof java.io.IOException);
long duration = System.currentTimeMillis() - startT; long duration = System.currentTimeMillis() - startT;
assertTrue("took at least initialReconnectDelay time: " + duration, duration >= initialReconnectDelay); assertTrue("took at least initialReconnectDelay time: " + duration + " e:" + expected, duration >= initialReconnectDelay);
} }
} }
} }