test for AMQ-2100

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@743027 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-02-10 17:38:28 +00:00
parent cfb4234cee
commit 9a60743876
1 changed files with 75 additions and 0 deletions

View File

@ -16,12 +16,18 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
@ -36,6 +42,8 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.Collections;
/** /**
* Test cases used to test the JMS message consumer. * Test cases used to test the JMS message consumer.
* *
@ -109,6 +117,73 @@ public class JMSConsumerTest extends JmsTestSupport {
assertEquals(2, counter.get()); assertEquals(2, counter.get());
} }
public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception {
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch closeDone = new CountDownLatch(1);
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
// preload the queue
sendMessages(session, destination, 2000);
final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
final Map<Thread, Throwable> exceptions =
Collections.synchronizedMap(new HashMap<Thread, Throwable>());
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Uncaught exception:", e);
exceptions.put(t, e);
}
});
final class AckAndClose implements Runnable {
private Message message;
public AckAndClose(Message m) {
this.message = m;
}
public void run() {
try {
int count = counter.incrementAndGet();
if (count == 590) {
// close in a separate thread is ok by jms
consumer.close();
closeDone.countDown();
}
if (count % 200 == 0) {
// ensure there are some outstanding messages
// ack every 200
message.acknowledge();
}
} catch (Exception e) {
LOG.error("Exception on close or ack:", e);
exceptions.put(Thread.currentThread(), e);
}
}
};
final ExecutorService executor = Executors.newCachedThreadPool();
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
// ack and close eventually in separate thread
executor.execute(new AckAndClose(m));
}
});
assertTrue(closeDone.await(20, TimeUnit.SECONDS));
// await possible exceptions
Thread.sleep(1000);
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
}
public void initCombosForTestMutiReceiveWithPrefetch1() { public void initCombosForTestMutiReceiveWithPrefetch1() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE), addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),