mirror of https://github.com/apache/activemq.git
Added message ordering assertions and also a test case that uses consumer.receive() instead of a messsage listener
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@472237 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
11501cb38e
commit
0c93dfde72
|
@ -55,7 +55,7 @@ public class DelegatingTransactionalMessageListener implements MessageListener {
|
||||||
underlyingListener.onMessage(message);
|
underlyingListener.onMessage(message);
|
||||||
session.commit();
|
session.commit();
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Throwable e) {
|
||||||
rollback();
|
rollback();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,79 +17,151 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.test.rollback;
|
package org.apache.activemq.test.rollback;
|
||||||
|
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
|
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
|
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
|
||||||
import org.springframework.jms.core.MessageCreator;
|
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageListener;
|
import javax.jms.MessageListener;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
|
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||||
|
import org.springframework.jms.core.MessageCreator;
|
||||||
|
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @version $Revision$
|
* @version $Revision$
|
||||||
*/
|
*/
|
||||||
public class RollbacksWhileConsumingLargeQueueTest extends EmbeddedBrokerTestSupport implements MessageListener {
|
public class RollbacksWhileConsumingLargeQueueTest extends
|
||||||
|
EmbeddedBrokerTestSupport implements MessageListener {
|
||||||
|
|
||||||
protected int numberOfMessagesOnQueue = 6500;
|
protected int numberOfMessagesOnQueue = 6500;
|
||||||
private Connection connection;
|
private Connection connection;
|
||||||
private DelegatingTransactionalMessageListener messageListener;
|
private AtomicInteger deliveryCounter = new AtomicInteger(0);
|
||||||
private AtomicInteger counter = new AtomicInteger(0);
|
private AtomicInteger ackCounter = new AtomicInteger(0);
|
||||||
private CountDownLatch latch;
|
private CountDownLatch latch;
|
||||||
|
private Throwable failure;
|
||||||
|
|
||||||
public void testConsumeOnFullQueue() throws Exception {
|
public void xtestWithReciever() throws Throwable {
|
||||||
boolean answer = latch.await(1000, TimeUnit.SECONDS);
|
latch = new CountDownLatch(numberOfMessagesOnQueue);
|
||||||
|
Session session = connection.createSession(true, 0);
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
System.out.println("Received: " + counter.get() + " message(s)");
|
long start = System.currentTimeMillis();
|
||||||
assertTrue("Did not receive the latch!", answer);
|
while ((System.currentTimeMillis() - start) < 1000*1000) {
|
||||||
}
|
if (getFailure() != null) {
|
||||||
|
throw getFailure();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Are we done receiving all the messages.
|
||||||
|
if( ackCounter.get() == numberOfMessagesOnQueue )
|
||||||
|
return;
|
||||||
|
|
||||||
|
Message message = consumer.receive(1000);
|
||||||
|
if (message == null)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
try {
|
||||||
|
onMessage(message);
|
||||||
|
session.commit();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
session.rollback();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fail("Did not receive all the messages.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testWithMessageListener() throws Throwable {
|
||||||
|
latch = new CountDownLatch(numberOfMessagesOnQueue);
|
||||||
|
new DelegatingTransactionalMessageListener(this, connection,
|
||||||
|
destination);
|
||||||
|
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
while ((System.currentTimeMillis() - start) < 1000*1000) {
|
||||||
|
|
||||||
|
if (getFailure() != null) {
|
||||||
|
throw getFailure();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (latch.await(1, TimeUnit.SECONDS)) {
|
||||||
|
System.out.println("Received: " + deliveryCounter.get()
|
||||||
|
+ " message(s)");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
fail("Did not receive all the messages.");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
|
|
||||||
connection = createConnection();
|
connection = createConnection();
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
// lets fill the queue up
|
// lets fill the queue up
|
||||||
for (int i = 0; i < numberOfMessagesOnQueue; i++) {
|
for (int i = 0; i < numberOfMessagesOnQueue; i++) {
|
||||||
template.send(createMessageCreator(i));
|
template.send(createMessageCreator(i));
|
||||||
}
|
}
|
||||||
|
|
||||||
latch = new CountDownLatch(numberOfMessagesOnQueue);
|
}
|
||||||
messageListener = new DelegatingTransactionalMessageListener(this, connection, destination);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
protected void tearDown() throws Exception {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
protected void tearDown() throws Exception {
|
protected MessageCreator createMessageCreator(final int i) {
|
||||||
if (connection != null) {
|
return new MessageCreator() {
|
||||||
connection.close();
|
public Message createMessage(Session session) throws JMSException {
|
||||||
}
|
TextMessage answer = session.createTextMessage("Message: " + i);
|
||||||
super.tearDown();
|
answer.setIntProperty("Counter", i);
|
||||||
}
|
return answer;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
protected MessageCreator createMessageCreator(final int i) {
|
public void onMessage(Message message) {
|
||||||
return new MessageCreator() {
|
String msgId = null;
|
||||||
public Message createMessage(Session session) throws JMSException {
|
String msgText = null;
|
||||||
TextMessage answer = session.createTextMessage("Message: " + i);
|
|
||||||
answer.setIntProperty("Counter", i);
|
|
||||||
return answer;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
public void onMessage(Message message) {
|
try {
|
||||||
int value = counter.incrementAndGet();
|
msgId = message.getJMSMessageID();
|
||||||
if (value % 10 == 0) {
|
msgText = ((TextMessage) message).getText();
|
||||||
throw new RuntimeException("Dummy exception on message: " + value);
|
} catch (JMSException e) {
|
||||||
}
|
setFailure(e);
|
||||||
|
}
|
||||||
|
|
||||||
log.info("Received message: " + value + " content: " + message);
|
try {
|
||||||
|
assertEquals("Message: " + ackCounter.get(), msgText);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
setFailure(e);
|
||||||
|
}
|
||||||
|
|
||||||
latch.countDown();
|
int value = deliveryCounter.incrementAndGet();
|
||||||
}
|
if (value % 2 == 0) {
|
||||||
|
log.info("Rolling Back message: " + value + " id: " + msgId + ", content: " + msgText);
|
||||||
|
throw new RuntimeException("Dummy exception on message: " + value);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Received message: " + value + " id: " + msgId + ", content: " + msgText);
|
||||||
|
ackCounter.incrementAndGet();
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized Throwable getFailure() {
|
||||||
|
return failure;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void setFailure(Throwable failure) {
|
||||||
|
this.failure = failure;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue