From 0c93dfde72494a5906b2937d320bde8ef46794e6 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Tue, 7 Nov 2006 20:30:37 +0000 Subject: [PATCH] Added message ordering assertions and also a test case that uses consumer.receive() instead of a messsage listener git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@472237 13f79535-47bb-0310-9956-ffa450edef68 --- ...elegatingTransactionalMessageListener.java | 2 +- ...RollbacksWhileConsumingLargeQueueTest.java | 174 +++++++++++++----- 2 files changed, 124 insertions(+), 52 deletions(-) diff --git a/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java b/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java index 724bad5e2f..ecf68ee8a0 100644 --- a/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java +++ b/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java @@ -55,7 +55,7 @@ public class DelegatingTransactionalMessageListener implements MessageListener { underlyingListener.onMessage(message); session.commit(); } - catch (Exception e) { + catch (Throwable e) { rollback(); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java index 8dceae251c..8c8deac79d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java @@ -17,79 +17,151 @@ */ package org.apache.activemq.test.rollback; -import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; -import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; -import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.springframework.jms.core.MessageCreator; - import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.springframework.jms.core.MessageCreator; + +import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; +import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; + /** * @version $Revision$ */ -public class RollbacksWhileConsumingLargeQueueTest extends EmbeddedBrokerTestSupport implements MessageListener { +public class RollbacksWhileConsumingLargeQueueTest extends + EmbeddedBrokerTestSupport implements MessageListener { - protected int numberOfMessagesOnQueue = 6500; - private Connection connection; - private DelegatingTransactionalMessageListener messageListener; - private AtomicInteger counter = new AtomicInteger(0); - private CountDownLatch latch; + protected int numberOfMessagesOnQueue = 6500; + private Connection connection; + private AtomicInteger deliveryCounter = new AtomicInteger(0); + private AtomicInteger ackCounter = new AtomicInteger(0); + private CountDownLatch latch; + private Throwable failure; - public void testConsumeOnFullQueue() throws Exception { - boolean answer = latch.await(1000, TimeUnit.SECONDS); + public void xtestWithReciever() throws Throwable { + latch = new CountDownLatch(numberOfMessagesOnQueue); + Session session = connection.createSession(true, 0); + MessageConsumer consumer = session.createConsumer(destination); - System.out.println("Received: " + counter.get() + " message(s)"); - assertTrue("Did not receive the latch!", answer); - } + long start = System.currentTimeMillis(); + while ((System.currentTimeMillis() - start) < 1000*1000) { + if (getFailure() != null) { + throw getFailure(); + } + + // Are we done receiving all the messages. + if( ackCounter.get() == numberOfMessagesOnQueue ) + return; + + Message message = consumer.receive(1000); + if (message == null) + continue; + + try { + onMessage(message); + session.commit(); + } catch (Throwable e) { + session.rollback(); + } + } + + fail("Did not receive all the messages."); + } + + public void testWithMessageListener() throws Throwable { + latch = new CountDownLatch(numberOfMessagesOnQueue); + new DelegatingTransactionalMessageListener(this, connection, + destination); + + long start = System.currentTimeMillis(); + while ((System.currentTimeMillis() - start) < 1000*1000) { + + if (getFailure() != null) { + throw getFailure(); + } + + if (latch.await(1, TimeUnit.SECONDS)) { + System.out.println("Received: " + deliveryCounter.get() + + " message(s)"); + return; + } + + } + + fail("Did not receive all the messages."); + } - protected void setUp() throws Exception { - super.setUp(); + protected void setUp() throws Exception { + super.setUp(); - connection = createConnection(); - connection.start(); + connection = createConnection(); + connection.start(); - // lets fill the queue up - for (int i = 0; i < numberOfMessagesOnQueue; i++) { - template.send(createMessageCreator(i)); - } + // lets fill the queue up + for (int i = 0; i < numberOfMessagesOnQueue; i++) { + template.send(createMessageCreator(i)); + } - latch = new CountDownLatch(numberOfMessagesOnQueue); - messageListener = new DelegatingTransactionalMessageListener(this, connection, destination); - } + } + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + super.tearDown(); + } - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - } - super.tearDown(); - } + protected MessageCreator createMessageCreator(final int i) { + return new MessageCreator() { + public Message createMessage(Session session) throws JMSException { + TextMessage answer = session.createTextMessage("Message: " + i); + answer.setIntProperty("Counter", i); + return answer; + } + }; + } - protected MessageCreator createMessageCreator(final int i) { - return new MessageCreator() { - public Message createMessage(Session session) throws JMSException { - TextMessage answer = session.createTextMessage("Message: " + i); - answer.setIntProperty("Counter", i); - return answer; - } - }; - } + public void onMessage(Message message) { + String msgId = null; + String msgText = null; - public void onMessage(Message message) { - int value = counter.incrementAndGet(); - if (value % 10 == 0) { - throw new RuntimeException("Dummy exception on message: " + value); - } + try { + msgId = message.getJMSMessageID(); + msgText = ((TextMessage) message).getText(); + } catch (JMSException e) { + setFailure(e); + } - log.info("Received message: " + value + " content: " + message); + try { + assertEquals("Message: " + ackCounter.get(), msgText); + } catch (Throwable e) { + setFailure(e); + } - latch.countDown(); - } + int value = deliveryCounter.incrementAndGet(); + if (value % 2 == 0) { + log.info("Rolling Back message: " + value + " id: " + msgId + ", content: " + msgText); + throw new RuntimeException("Dummy exception on message: " + value); + } + + log.info("Received message: " + value + " id: " + msgId + ", content: " + msgText); + ackCounter.incrementAndGet(); + latch.countDown(); + } + + public synchronized Throwable getFailure() { + return failure; + } + + public synchronized void setFailure(Throwable failure) { + this.failure = failure; + } }