From 9a6074387637d18154dffb3017f7bf396aa7b237 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Tue, 10 Feb 2009 17:38:28 +0000 Subject: [PATCH] test for AMQ-2100 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@743027 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/activemq/JMSConsumerTest.java | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java index 535119ef24..8cf633f698 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -16,12 +16,18 @@ */ package org.apache.activemq; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -36,6 +42,8 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import edu.emory.mathcs.backport.java.util.Collections; + /** * Test cases used to test the JMS message consumer. * @@ -109,6 +117,73 @@ public class JMSConsumerTest extends JmsTestSupport { assertEquals(2, counter.get()); } + + public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch closeDone = new CountDownLatch(1); + + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + // preload the queue + sendMessages(session, destination, 2000); + + + final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination); + + final Map exceptions = + Collections.synchronizedMap(new HashMap()); + Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { + public void uncaughtException(Thread t, Throwable e) { + LOG.error("Uncaught exception:", e); + exceptions.put(t, e); + } + }); + + final class AckAndClose implements Runnable { + private Message message; + + public AckAndClose(Message m) { + this.message = m; + } + + public void run() { + try { + int count = counter.incrementAndGet(); + if (count == 590) { + // close in a separate thread is ok by jms + consumer.close(); + closeDone.countDown(); + } + if (count % 200 == 0) { + // ensure there are some outstanding messages + // ack every 200 + message.acknowledge(); + } + } catch (Exception e) { + LOG.error("Exception on close or ack:", e); + exceptions.put(Thread.currentThread(), e); + } + } + }; + + final ExecutorService executor = Executors.newCachedThreadPool(); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message m) { + // ack and close eventually in separate thread + executor.execute(new AckAndClose(m)); + } + }); + + assertTrue(closeDone.await(20, TimeUnit.SECONDS)); + // await possible exceptions + Thread.sleep(1000); + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + } + + public void initCombosForTestMutiReceiveWithPrefetch1() { addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),