https://issues.apache.org/jira/browse/AMQ-5541 - support preemptive redelivery for non persistent case, fix and test

This commit is contained in:
gtully 2015-01-27 16:43:35 +00:00
parent bae0e60a7c
commit dc25f2a8f9
2 changed files with 60 additions and 5 deletions

View File

@ -623,11 +623,13 @@ public class RegionBroker extends EmptyBroker {
long totalTime = endTime - message.getBrokerInTime();
((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
}
if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered() && message.isPersistent()) {
if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered()) {
final int originalValue = message.getRedeliveryCounter();
message.incrementRedeliveryCounter();
try {
((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
if (message.isPersistent()) {
((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
}
messageDispatch.setTransmitCallback(new TransmitCallback() {
// dispatch is considered a delivery, so update sub state post dispatch otherwise
// on a disconnect/reconnect cached messages will not reflect initial delivery attempt

View File

@ -20,6 +20,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Set;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
@ -44,6 +45,7 @@ import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.usage.SystemUsage;
import org.junit.After;
import org.junit.Before;
@ -97,7 +99,7 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
populateDestination(10, destination, connection);
populateDestination(10, destination, connection, true);
TextMessage msg = null;
MessageConsumer consumer = session.createConsumer(destination);
Exception expectedException = null;
@ -165,7 +167,7 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
populateDestination(10, destination, connection);
populateDestination(10, destination, connection, true);
TextMessage msg = null;
MessageConsumer consumer = session.createConsumer(destination);
Exception expectedException = null;
@ -218,6 +220,56 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport {
connection.close();
}
@org.junit.Test
public void testValidateRedeliveryFlagOnNonPersistentAfterTransientFailureConnectionDrop() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+ "?jms.prefetchPolicy.all=0");
connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
populateDestination(10, destination, connection, false);
TextMessage msg = null;
MessageConsumer consumer = session.createConsumer(destination);
for (int i = 0; i < 5; i++) {
msg = (TextMessage) consumer.receive(5000);
assertNotNull("got the message", msg);
assertFalse("not redelivered", msg.getJMSRedelivered());
}
connection.getTransport().narrow(TcpTransport.class).getTransportListener().onException(new IOException("Die"));
connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = session.createQueue(queueName);
consumer = session.createConsumer(destination);
// consume the messages that were previously delivered
for (int i = 0; i < 5; i++) {
msg = (TextMessage) consumer.receive(4000);
LOG.info("redelivered? got: " + msg);
assertNotNull("got the message again", msg);
assertEquals("redelivery flag set on:" + i, true, msg.getJMSRedelivered());
assertTrue("redelivery count survives reconnect for:" + i, msg.getLongProperty("JMSXDeliveryCount") > 1);
msg.acknowledge();
}
// consume the rest that were not redeliveries
for (int i = 0; i < 5; i++) {
msg = (TextMessage) consumer.receive(4000);
LOG.info("not redelivered? got: " + msg);
assertNotNull("got the message", msg);
assertEquals("not a redelivery", false, msg.getJMSRedelivered());
assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
msg.acknowledge();
}
connection.close();
}
private void restartBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
@ -231,9 +283,10 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport {
return broker;
}
private void populateDestination(final int nbMessages, final Destination destination, javax.jms.Connection connection) throws JMSException {
private void populateDestination(final int nbMessages, final Destination destination, javax.jms.Connection connection, boolean persistent) throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
for (int i = 1; i <= nbMessages; i++) {
producer.send(session.createTextMessage("<hello id='" + i + "'/>"));
}