mirror of
https://github.com/apache/activemq.git
synced 2025-02-28 21:29:12 +00:00
resolve https://issues.apache.org/activemq/browse/AMQ-906 - have retries with autoack and runtime exception from onMessage
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@916936 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1f521da77e
commit
7f8c822a20
@ -650,6 +650,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||
void doClose() throws JMSException {
|
||||
dispose();
|
||||
RemoveInfo removeCommand = info.createRemoveCommand();
|
||||
LOG.info("remove: " + this.getConsumerId() + ", lasteDeliveredSequenceId:" + lastDeliveredSequenceId);
|
||||
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
|
||||
this.session.asyncSendPacket(removeCommand);
|
||||
}
|
||||
@ -1205,14 +1206,15 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||
}
|
||||
afterMessageIsConsumed(md, expired);
|
||||
} catch (RuntimeException e) {
|
||||
LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
|
||||
if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
|
||||
// Redeliver the message
|
||||
// schedual redelivery and possible dlq processing
|
||||
rollback();
|
||||
} else {
|
||||
// Transacted or Client ack: Deliver the
|
||||
// next message.
|
||||
afterMessageIsConsumed(md, false);
|
||||
}
|
||||
LOG.error(getConsumerId() + " Exception while processing message: " + e, e);
|
||||
}
|
||||
} else {
|
||||
unconsumedMessages.enqueue(md);
|
||||
@ -1328,14 +1330,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||
if (listener != null) {
|
||||
MessageDispatch md = unconsumedMessages.dequeueNoWait();
|
||||
if (md != null) {
|
||||
try {
|
||||
ActiveMQMessage message = createActiveMQMessage(md);
|
||||
beforeMessageIsConsumed(md);
|
||||
listener.onMessage(message);
|
||||
afterMessageIsConsumed(md, false);
|
||||
} catch (JMSException e) {
|
||||
session.connection.onClientInternalException(e);
|
||||
}
|
||||
dispatch(md);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,11 @@
|
||||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
@ -29,6 +34,9 @@ import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
@ -227,6 +235,97 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||
session.close();
|
||||
}
|
||||
|
||||
public void testQueueSessionListenerExceptionRetry() throws Exception {
|
||||
connection.start();
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue("queue-" + getName());
|
||||
MessageProducer producer = createProducer(session, queue);
|
||||
Message message = createTextMessage(session, "1");
|
||||
producer.send(message);
|
||||
message = createTextMessage(session, "2");
|
||||
producer.send(message);
|
||||
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
final CountDownLatch gotMessage = new CountDownLatch(2);
|
||||
final AtomicInteger count = new AtomicInteger(0);
|
||||
final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
|
||||
final ArrayList<String> received = new ArrayList<String>();
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
public void onMessage(Message message) {
|
||||
LOG.info("Message Received: " + message);
|
||||
try {
|
||||
received.add(((TextMessage) message).getText());
|
||||
} catch (JMSException e) {
|
||||
e.printStackTrace();
|
||||
fail(e.toString());
|
||||
}
|
||||
if (count.incrementAndGet() < maxDeliveries) {
|
||||
throw new RuntimeException(getName() + " force a redelivery");
|
||||
}
|
||||
// new blood
|
||||
count.set(0);
|
||||
gotMessage.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
|
||||
|
||||
for (int i=0; i<maxDeliveries; i++) {
|
||||
assertEquals("got first redelivered: " + i, "1", received.get(i));
|
||||
}
|
||||
for (int i=maxDeliveries; i<maxDeliveries*2; i++) {
|
||||
assertEquals("got first redelivered: " + i, "2", received.get(i));
|
||||
}
|
||||
session.close();
|
||||
}
|
||||
|
||||
|
||||
public void testQueueSessionListenerExceptionDlq() throws Exception {
|
||||
connection.start();
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue("queue-" + getName());
|
||||
MessageProducer producer = createProducer(session, queue);
|
||||
Message message = createTextMessage(session);
|
||||
producer.send(message);
|
||||
|
||||
ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
|
||||
MessageConsumer dlqConsumer = session.createConsumer(dlqDestination);
|
||||
final CountDownLatch gotDlqMessage = new CountDownLatch(1);
|
||||
dlqConsumer.setMessageListener(new MessageListener() {
|
||||
public void onMessage(Message message) {
|
||||
gotDlqMessage.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
|
||||
final CountDownLatch gotMessage = new CountDownLatch(maxDeliveries);
|
||||
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
public void onMessage(Message message) {
|
||||
LOG.info("Message Received: " + message);
|
||||
gotMessage.countDown();
|
||||
throw new RuntimeException(getName() + " force a redelivery");
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS));
|
||||
|
||||
// check DLQ
|
||||
assertTrue("got dlq message", gotDlqMessage.await(20, TimeUnit.SECONDS));
|
||||
|
||||
session.close();
|
||||
}
|
||||
|
||||
|
||||
private TextMessage createTextMessage(Session session, String text) throws JMSException {
|
||||
return session.createTextMessage(text);
|
||||
}
|
||||
private TextMessage createTextMessage(Session session) throws JMSException {
|
||||
return session.createTextMessage("Hello");
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user