diff --git a/activemq-core/src/main/java/org/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/activemq/ActiveMQMessageConsumer.java index 4b4a23dcf9..74ef5307de 100755 --- a/activemq-core/src/main/java/org/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/activemq/ActiveMQMessageConsumer.java @@ -245,10 +245,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC * @return true if this is a durable topic subscriber */ public boolean isDurableSubscriber() { - // TODO Add ActiveMQTopicSubscriber - return false; - // return this instanceof ActiveMQTopicSubscriber && consumerName != - // null && consumerName.length() > 0; + return info.getSubcriptionName()!=null && info.getDestination().isTopic(); } /** @@ -671,8 +668,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC Scheduler.executeAfterDelay(new Runnable() { public void run() { - if (started.get()) - unconsumedMessages.start(); + try { + if (started.get()) + start(); + } catch (JMSException e) { + session.connection.onAsyncException(e); + } } }, redeliveryDelay); @@ -695,7 +696,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC MessageListener listener = this.messageListener; try { if (!unconsumedMessages.isClosed()) { - if (listener != null) { + if (listener != null && started.get()) { ActiveMQMessage message = createActiveMQMessage(md); beforeMessageIsConsumed(md); listener.onMessage(message); @@ -716,9 +717,19 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC return unconsumedMessages.size(); } - public void start() { + public void start() throws JMSException { started.set(true); unconsumedMessages.start(); + MessageListener listener = this.messageListener; + if( listener!=null ) { + MessageDispatch md; + while( (md = unconsumedMessages.dequeueNoWait())!=null ) { + ActiveMQMessage message = createActiveMQMessage(md); + beforeMessageIsConsumed(md); + listener.onMessage(message); + afterMessageIsConsumed(md, false); + } + } } public void stop() { diff --git a/activemq-core/src/test/java/org/activemq/JMSConsumerTest.java b/activemq-core/src/test/java/org/activemq/JMSConsumerTest.java index d67d240d2d..713dd45bbc 100755 --- a/activemq-core/src/test/java/org/activemq/JMSConsumerTest.java +++ b/activemq-core/src/test/java/org/activemq/JMSConsumerTest.java @@ -59,6 +59,55 @@ public class JMSConsumerTest extends JmsTestSupport { public byte destinationType; public boolean durableConsumer; + public void initCombosForTestMessageListenerWithConsumerCanBeStopped() { + addCombinationValues("deliveryMode", new Object[] { + new Integer(DeliveryMode.NON_PERSISTENT), + new Integer(DeliveryMode.PERSISTENT) }); + addCombinationValues("destinationType", new Object[] { + new Byte(ActiveMQDestination.QUEUE_TYPE), + new Byte(ActiveMQDestination.TOPIC_TYPE), + new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE), + new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE) }); + } + public void testMessageListenerWithConsumerCanBeStopped() throws Throwable { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch done1 = new CountDownLatch(1); + final CountDownLatch done2 = new CountDownLatch(1); + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message m) { + counter.incrementAndGet(); + if( counter.get()==1 ) + done1.countDown(); + if( counter.get()==2 ) + done2.countDown(); + } + }); + + // Send a first message to make sure that the consumer dispatcher is running + sendMessages(session, destination, 1); + assertTrue(done1.await(1, TimeUnit.SECONDS)); + assertEquals(1, counter.get()); + + // Stop the consumer. + consumer.stop(); + + // Send a message, but should not get delivered. + sendMessages(session, destination, 1); + assertFalse(done2.await(1, TimeUnit.SECONDS)); + assertEquals(1, counter.get()); + + // Start the consumer, and the message should now get delivered. + consumer.start(); + assertTrue(done2.await(1, TimeUnit.SECONDS)); + assertEquals(2, counter.get()); + } public void initCombosForTestMutiReceiveWithPrefetch1() { addCombinationValues("deliveryMode", new Object[] {