Apply fix and test from Martin Lichtin to preserve the rollback cause in
all ack modes.
This commit is contained in:
Timothy Bish 2015-11-20 16:18:27 -05:00
parent cc6213ebf2
commit 4a27b72377
2 changed files with 108 additions and 29 deletions

View File

@ -1401,13 +1401,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
afterMessageIsConsumed(md, expired); afterMessageIsConsumed(md, expired);
} catch (RuntimeException e) { } catch (RuntimeException e) {
LOG.error("{} Exception while processing message: {}", getConsumerId(), md.getMessage().getMessageId(), e); LOG.error("{} Exception while processing message: {}", getConsumerId(), md.getMessage().getMessageId(), e);
md.setRollbackCause(e);
if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) { if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
// schedual redelivery and possible dlq processing // schedual redelivery and possible dlq processing
md.setRollbackCause(e);
rollback(); rollback();
} else { } else {
// Transacted or Client ack: Deliver the // Transacted or Client ack: Deliver the next message.
// next message.
afterMessageIsConsumed(md, false); afterMessageIsConsumed(md, false);
} }
} }

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
@ -16,6 +16,11 @@
*/ */
package org.apache.activemq; package org.apache.activemq;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -33,42 +38,49 @@ import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class MessageListenerRedeliveryTest extends TestCase { public class MessageListenerRedeliveryTest {
private static final Logger LOG = LoggerFactory.getLogger(MessageListenerRedeliveryTest.class); private static final Logger LOG = LoggerFactory.getLogger(MessageListenerRedeliveryTest.class);
@Rule
public TestName name = new TestName();
private Connection connection; private Connection connection;
@Override @Before
protected void setUp() throws Exception { public void setUp() throws Exception {
connection = createConnection(); connection = createConnection();
} }
/** @After
* @see junit.framework.TestCase#tearDown() public void tearDown() throws Exception {
*/
@Override
protected void tearDown() throws Exception {
if (connection != null) { if (connection != null) {
connection.close(); connection.close();
connection = null; connection = null;
} }
} }
protected String getTestName() {
return name.getMethodName();
}
protected RedeliveryPolicy getRedeliveryPolicy() { protected RedeliveryPolicy getRedeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(0); redeliveryPolicy.setInitialRedeliveryDelay(0);
redeliveryPolicy.setRedeliveryDelay(1000); redeliveryPolicy.setRedeliveryDelay(1000);
redeliveryPolicy.setMaximumRedeliveries(3); redeliveryPolicy.setMaximumRedeliveries(3);
redeliveryPolicy.setBackOffMultiplier((short)2); redeliveryPolicy.setBackOffMultiplier((short) 2);
redeliveryPolicy.setUseExponentialBackOff(true); redeliveryPolicy.setUseExponentialBackOff(true);
return redeliveryPolicy; return redeliveryPolicy;
} }
@ -107,11 +119,12 @@ public class MessageListenerRedeliveryTest extends TestCase {
} }
} }
@Test(timeout = 60000)
public void testQueueRollbackConsumerListener() throws JMSException { public void testQueueRollbackConsumerListener() throws JMSException {
connection.start(); connection.start();
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue-" + getName()); Queue queue = session.createQueue("queue-" + getTestName());
MessageProducer producer = createProducer(session, queue); MessageProducer producer = createProducer(session, queue);
Message message = createTextMessage(session); Message message = createTextMessage(session);
producer.send(message); producer.send(message);
@ -119,7 +132,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer)consumer; ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
mc.setRedeliveryPolicy(getRedeliveryPolicy()); mc.setRedeliveryPolicy(getRedeliveryPolicy());
TestMessageListener listener = new TestMessageListener(session); TestMessageListener listener = new TestMessageListener(session);
@ -170,11 +183,12 @@ public class MessageListenerRedeliveryTest extends TestCase {
session.close(); session.close();
} }
@Test(timeout = 60000)
public void testQueueRollbackSessionListener() throws JMSException { public void testQueueRollbackSessionListener() throws JMSException {
connection.start(); connection.start();
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue-" + getName()); Queue queue = session.createQueue("queue-" + getTestName());
MessageProducer producer = createProducer(session, queue); MessageProducer producer = createProducer(session, queue);
Message message = createTextMessage(session); Message message = createTextMessage(session);
producer.send(message); producer.send(message);
@ -182,7 +196,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer)consumer; ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
mc.setRedeliveryPolicy(getRedeliveryPolicy()); mc.setRedeliveryPolicy(getRedeliveryPolicy());
TestMessageListener listener = new TestMessageListener(session); TestMessageListener listener = new TestMessageListener(session);
@ -235,11 +249,12 @@ public class MessageListenerRedeliveryTest extends TestCase {
session.close(); session.close();
} }
public void testQueueSessionListenerExceptionRetry() throws Exception { @Test(timeout = 60000)
public void testQueueSessionListenerExceptionRetry() throws Exception {
connection.start(); connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("queue-" + getName()); Queue queue = session.createQueue("queue-" + getTestName());
MessageProducer producer = createProducer(session, queue); MessageProducer producer = createProducer(session, queue);
Message message = createTextMessage(session, "1"); Message message = createTextMessage(session, "1");
producer.send(message); producer.send(message);
@ -249,7 +264,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
final CountDownLatch gotMessage = new CountDownLatch(2); final CountDownLatch gotMessage = new CountDownLatch(2);
final AtomicInteger count = new AtomicInteger(0); final AtomicInteger count = new AtomicInteger(0);
final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries(); final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
final ArrayList<String> received = new ArrayList<String>(); final ArrayList<String> received = new ArrayList<String>();
consumer.setMessageListener(new MessageListener() { consumer.setMessageListener(new MessageListener() {
@ -263,7 +278,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
fail(e.toString()); fail(e.toString());
} }
if (count.incrementAndGet() < maxDeliveries) { if (count.incrementAndGet() < maxDeliveries) {
throw new RuntimeException(getName() + " force a redelivery"); throw new RuntimeException(getTestName() + " force a redelivery");
} }
// new blood // new blood
count.set(0); count.set(0);
@ -273,20 +288,21 @@ public class MessageListenerRedeliveryTest extends TestCase {
assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS)); assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
for (int i=0; i<maxDeliveries; i++) { for (int i = 0; i < maxDeliveries; i++) {
assertEquals("got first redelivered: " + i, "1", received.get(i)); assertEquals("got first redelivered: " + i, "1", received.get(i));
} }
for (int i=maxDeliveries; i<maxDeliveries*2; i++) { for (int i = maxDeliveries; i < maxDeliveries * 2; i++) {
assertEquals("got first redelivered: " + i, "2", received.get(i)); assertEquals("got first redelivered: " + i, "2", received.get(i));
} }
session.close(); session.close();
} }
public void testQueueSessionListenerExceptionDlq() throws Exception { @Test(timeout = 60000)
public void testQueueSessionListenerExceptionDlq() throws Exception {
connection.start(); connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("queue-" + getName()); Queue queue = session.createQueue("queue-" + getTestName());
MessageProducer producer = createProducer(session, queue); MessageProducer producer = createProducer(session, queue);
Message message = createTextMessage(session); Message message = createTextMessage(session);
producer.send(message); producer.send(message);
@ -314,7 +330,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
public void onMessage(Message message) { public void onMessage(Message message) {
LOG.info("Message Received: " + message); LOG.info("Message Received: " + message);
gotMessage.countDown(); gotMessage.countDown();
throw new RuntimeException(getName() + " force a redelivery"); throw new RuntimeException(getTestName() + " force a redelivery");
} }
}); });
@ -331,7 +347,70 @@ public class MessageListenerRedeliveryTest extends TestCase {
LOG.info("DLQ'd message cause reported as: {}", cause); LOG.info("DLQ'd message cause reported as: {}", cause);
assertTrue("cause 'cause' exception is remembered", cause.contains("RuntimeException")); assertTrue("cause 'cause' exception is remembered", cause.contains("RuntimeException"));
assertTrue("is correct exception", cause.contains(getName())); assertTrue("is correct exception", cause.contains(getTestName()));
assertTrue("cause exception is remembered", cause.contains("Throwable"));
assertTrue("cause policy is remembered", cause.contains("RedeliveryPolicy"));
session.close();
}
@Test(timeout = 60000)
public void testTransactedQueueSessionListenerExceptionDlq() throws Exception {
connection.start();
final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("queue-" + getTestName());
MessageProducer producer = createProducer(session, queue);
Message message = createTextMessage(session);
producer.send(message);
session.commit();
final Message[] dlqMessage = new Message[1];
ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
MessageConsumer dlqConsumer = session.createConsumer(dlqDestination);
final CountDownLatch gotDlqMessage = new CountDownLatch(1);
dlqConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
LOG.info("DLQ Message Received: " + message);
dlqMessage[0] = message;
gotDlqMessage.countDown();
}
});
MessageConsumer consumer = session.createConsumer(queue);
final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
final CountDownLatch gotMessage = new CountDownLatch(maxDeliveries);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
LOG.info("Message Received: " + message);
gotMessage.countDown();
try {
session.rollback();
} catch (JMSException e) {
e.printStackTrace();
}
throw new RuntimeException(getTestName() + " force a redelivery");
}
});
assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
// check DLQ
assertTrue("got dlq message", gotDlqMessage.await(20, TimeUnit.SECONDS));
// check DLQ message cause is captured
message = dlqMessage[0];
assertNotNull("dlq message captured", message);
String cause = message.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
LOG.info("DLQ'd message cause reported as: {}", cause);
assertTrue("cause 'cause' exception is remembered", cause.contains("RuntimeException"));
assertTrue("is correct exception", cause.contains(getTestName()));
assertTrue("cause exception is remembered", cause.contains("Throwable")); assertTrue("cause exception is remembered", cause.contains("Throwable"));
assertTrue("cause policy is remembered", cause.contains("RedeliveryPolicy")); assertTrue("cause policy is remembered", cause.contains("RedeliveryPolicy"));
@ -341,6 +420,7 @@ public class MessageListenerRedeliveryTest extends TestCase {
private TextMessage createTextMessage(Session session, String text) throws JMSException { private TextMessage createTextMessage(Session session, String text) throws JMSException {
return session.createTextMessage(text); return session.createTextMessage(text);
} }
private TextMessage createTextMessage(Session session) throws JMSException { private TextMessage createTextMessage(Session session) throws JMSException {
return session.createTextMessage("Hello"); return session.createTextMessage("Hello");
} }