Applyed Rodrigo S de Castro's latest unit test patch and fixed the redelivery problem. Redelivery was not being delayed when rollback was called from the message listener.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@397915 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-04-28 15:11:39 +00:00
parent 03de4c2d8b
commit f25e2ca23e
3 changed files with 112 additions and 44 deletions

View File

@ -788,7 +788,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
MessageListener listener = this.messageListener; MessageListener listener = this.messageListener;
try { try {
if (!unconsumedMessages.isClosed()) { if (!unconsumedMessages.isClosed()) {
if (listener != null && started.get()) { if (listener != null && unconsumedMessages.isRunning() ) {
ActiveMQMessage message = createActiveMQMessage(md); ActiveMQMessage message = createActiveMQMessage(md);
beforeMessageIsConsumed(md); beforeMessageIsConsumed(md);
try { try {

View File

@ -147,7 +147,7 @@ public class ActiveMQSessionExecutor implements Task {
return false; return false;
} else { } else {
dispatch(message); dispatch(message);
return true; return messageQueue.isRunning();
} }
} }

View File

@ -35,7 +35,8 @@ import org.apache.commons.logging.LogFactory;
public class MessageListenerRedeliveryTest extends TestCase { public class MessageListenerRedeliveryTest extends TestCase {
private static final Log log = LogFactory.getLog(MessageListenerRedeliveryTest.class); private Log log = LogFactory.getLog(getClass());
private Connection connection; private Connection connection;
protected void setUp() throws Exception { protected void setUp() throws Exception {
@ -55,8 +56,8 @@ public class MessageListenerRedeliveryTest extends TestCase {
protected RedeliveryPolicy getRedeliveryPolicy() { protected RedeliveryPolicy getRedeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(1000); redeliveryPolicy.setInitialRedeliveryDelay(1000);
redeliveryPolicy.setBackOffMultiplier((short) 5); redeliveryPolicy.setMaximumRedeliveries(2);
redeliveryPolicy.setMaximumRedeliveries(10); redeliveryPolicy.setBackOffMultiplier((short) 2);
redeliveryPolicy.setUseExponentialBackOff(true); redeliveryPolicy.setUseExponentialBackOff(true);
return redeliveryPolicy; return redeliveryPolicy;
} }
@ -67,59 +68,34 @@ public class MessageListenerRedeliveryTest extends TestCase {
return factory.createConnection(); return factory.createConnection();
} }
private class ConsumerMessageListenerTest implements MessageListener { private class TestMessageListener implements MessageListener {
private ActiveMQMessageConsumer consumer;
public int counter = 0;
public ConsumerMessageListenerTest(ActiveMQMessageConsumer consumer) {
this.consumer = consumer;
}
public void onMessage(Message message) {
try {
log.info("Message: " + message);
counter++;
if (counter <= 2) {
log.info("ROLLBACK");
consumer.rollback();
} else {
log.info("COMMIT");
message.acknowledge();
consumer.commit();
}
} catch (JMSException e) {
System.err.println("Error when rolling back transaction");
}
}
}
private class SessionMessageListenerTest implements MessageListener {
private Session session; private Session session;
public int counter = 0; public int counter = 0;
public SessionMessageListenerTest(Session session) { public TestMessageListener(Session session) {
this.session = session; this.session = session;
} }
public void onMessage(Message message) { public void onMessage(Message message) {
try { try {
log.info("Message: " + message); log.info("Message Received: " + message);
counter++; counter++;
if (counter < 2) { if (counter <= 3) {
log.info("ROLLBACK"); log.info("Message Rollback.");
session.rollback(); session.rollback();
} else { } else {
log.info("COMMIT"); log.info("Message Commit.");
message.acknowledge(); message.acknowledge();
session.commit(); session.commit();
} }
} catch (JMSException e) { } catch (JMSException e) {
System.err.println("Error when rolling back transaction"); log.error("Error when rolling back transaction");
} }
} }
} }
public void testQueueRollbackMessageListener() throws JMSException { public void testQueueRollbackConsumerListener() throws JMSException {
connection.start(); connection.start();
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
@ -134,26 +110,118 @@ public class MessageListenerRedeliveryTest extends TestCase {
ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer; ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
mc.setRedeliveryPolicy(getRedeliveryPolicy()); mc.setRedeliveryPolicy(getRedeliveryPolicy());
SessionMessageListenerTest listener = new SessionMessageListenerTest(session); TestMessageListener listener = new TestMessageListener(session);
consumer.setMessageListener(listener); consumer.setMessageListener(listener);
try { try {
Thread.sleep(7000); Thread.sleep(500);
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
assertEquals(2, listener.counter); // first try
assertEquals(1, listener.counter);
producer.send(createTextMessage(session)); try {
session.commit(); Thread.sleep(1000);
} catch (InterruptedException e) {
}
// second try (redelivery after 1 sec)
assertEquals(2, listener.counter);
try { try {
Thread.sleep(2000); Thread.sleep(2000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
}
// third try (redelivery after 2 seconds) - it should give up after that
assertEquals(3, listener.counter);
// create new message
producer.send(createTextMessage(session));
session.commit();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// ignore // ignore
} }
// it should be committed, so no redelivery
assertEquals(4, listener.counter);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
// ignore
}
// no redelivery, counter should still be 4
assertEquals(4, listener.counter);
session.close();
}
public void testQueueRollbackSessionListener() throws JMSException {
connection.start();
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue-" + getName());
MessageProducer producer = createProducer(session, queue);
Message message = createTextMessage(session);
producer.send(message);
session.commit();
MessageConsumer consumer = session.createConsumer(queue);
ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
mc.setRedeliveryPolicy(getRedeliveryPolicy());
TestMessageListener listener = new TestMessageListener(session);
consumer.setMessageListener(listener);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
// first try
assertEquals(1, listener.counter);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
// second try (redelivery after 1 sec)
assertEquals(2, listener.counter);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
// third try (redelivery after 2 seconds) - it should give up after that
assertEquals(3, listener.counter); assertEquals(3, listener.counter);
// create new message
producer.send(createTextMessage(session));
session.commit();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// ignore
}
// it should be committed, so no redelivery
assertEquals(4, listener.counter);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
// ignore
}
// no redelivery, counter should still be 4
assertEquals(4, listener.counter);
session.close(); session.close();
} }