diff --git a/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java b/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java index b2c02c8326..1e65ffb06b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java @@ -45,6 +45,7 @@ public class AdvisoryConsumer implements ActiveMQDispatcher { info.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC); info.setPrefetchSize(1000); info.setNoLocal(true); + info.setDispatchAsync(true); this.connection.addDispatcher(info.getConsumerId(), this); this.connection.syncSendPacket(this.info); diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java b/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java index 4d1b1cd59a..0488eaceca 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -23,6 +24,9 @@ import java.util.Collections; import java.util.Date; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -38,12 +42,17 @@ import javax.jms.TemporaryQueue; import javax.jms.TextMessage; import junit.framework.TestCase; +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.vm.VMTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @version */ public class JmsTempDestinationTest extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(JmsTempDestinationTest.class); private Connection connection; private ActiveMQConnectionFactory factory; protected List connections = Collections.synchronizedList(new ArrayList()); @@ -293,4 +302,58 @@ public class JmsTempDestinationTest extends TestCase { assertTrue("failed to throw an exception", true); } } + + public void testSlowConsumerDoesNotBlockFastTempUsers() throws Exception { + ActiveMQConnectionFactory advisoryConnFactory = new ActiveMQConnectionFactory("vm://localhost?asyncQueueDepth=20"); + Connection connection = advisoryConnFactory.createConnection(); + connections.add(connection); + connection.start(); + + final CountDownLatch done = new CountDownLatch(1); + final AtomicBoolean ok = new AtomicBoolean(true); + final AtomicBoolean first = new AtomicBoolean(true); + VMTransport t = ((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class); + t.setTransportListener(new TransportListener() { + @Override + public void onCommand(Object command) { + // block first dispatch for a while so broker backs up, but other connection should be able to proceed + if (first.compareAndSet(true, false)) { + try { + ok.set(done.await(35, TimeUnit.SECONDS)); + LOG.info("Done waiting: " + ok.get()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + @Override + public void onException(IOException error) { + } + + @Override + public void transportInterupted() { + } + + @Override + public void transportResumed() { + } + }); + + connection = factory.createConnection(); + connections.add(connection); + ((ActiveMQConnection)connection).setWatchTopicAdvisories(false); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + for (int i=0; i<2500; i++) { + TemporaryQueue queue = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(queue); + consumer.close(); + queue.delete(); + } + LOG.info("Done with work: " + ok.get()); + done.countDown(); + assertTrue("ok", ok.get()); + } }