https://issues.apache.org/jira/browse/AMQ-6286 - refine fix to distinguish multiple consumers in a transaction, verify insertion at head will preserve order

This commit is contained in:
gtully 2016-05-17 17:03:05 +01:00
parent 2c3046b816
commit c2230fda4b
2 changed files with 102 additions and 16 deletions

View File

@ -208,7 +208,7 @@ public class QueueDispatchPendingList implements PendingList {
} }
public void addForRedelivery(List<MessageReference> list, boolean noConsumers) { public void addForRedelivery(List<MessageReference> list, boolean noConsumers) {
if (noConsumers && redeliveredWaitingDispatch instanceof OrderedPendingList) { if (noConsumers && redeliveredWaitingDispatch instanceof OrderedPendingList && willBeInOrder(list)) {
// a single consumer can expect repeatable redelivery order irrespective // a single consumer can expect repeatable redelivery order irrespective
// of transaction or prefetch boundaries // of transaction or prefetch boundaries
((OrderedPendingList)redeliveredWaitingDispatch).insertAtHead(list); ((OrderedPendingList)redeliveredWaitingDispatch).insertAtHead(list);
@ -218,4 +218,12 @@ public class QueueDispatchPendingList implements PendingList {
} }
} }
} }
private boolean willBeInOrder(List<MessageReference> list) {
// for a single consumer inserting at head will be in order w.r.t brokerSequence but
// will not be if there were multiple consumers in the mix even if this is the last
// consumer to close (noConsumers==true)
return !redeliveredWaitingDispatch.isEmpty() && list != null && !list.isEmpty() &&
redeliveredWaitingDispatch.iterator().next().getMessageId().getBrokerSequenceId() > list.get(list.size() - 1).getMessageId().getBrokerSequenceId();
}
} }

View File

@ -25,6 +25,8 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Message; import javax.jms.Message;
@ -39,13 +41,26 @@ import static org.junit.Assert.assertEquals;
public class QueueOrderSingleTransactedConsumerTest { public class QueueOrderSingleTransactedConsumerTest {
private static final Logger LOG = LoggerFactory.getLogger(QueueOrderSingleTransactedConsumerTest.class);
BrokerService broker = null; BrokerService broker = null;
ActiveMQQueue dest = new ActiveMQQueue("Queue"); ActiveMQQueue dest = new ActiveMQQueue("Queue");
@Test @Test
public void testSingleConsumerTxRepeat() throws Exception { public void testSingleConsumerTxRepeat() throws Exception {
publishMessages(100); // effect the broker sequence id that is region wide
ActiveMQQueue dummyDest = new ActiveMQQueue("AnotherQueue");
publishMessagesWithOrderProperty(10, 0, dest);
publishMessagesWithOrderProperty(1, 0, dummyDest);
publishMessagesWithOrderProperty(10, 10, dest);
publishMessagesWithOrderProperty(1, 0, dummyDest);
publishMessagesWithOrderProperty(10, 20, dest);
publishMessagesWithOrderProperty(1, 0, dummyDest);
publishMessagesWithOrderProperty(5, 30, dest);
consumeVerifyOrderRollback(20); consumeVerifyOrderRollback(20);
consumeVerifyOrderRollback(10); consumeVerifyOrderRollback(10);
@ -55,48 +70,105 @@ public class QueueOrderSingleTransactedConsumerTest {
@Test @Test
public void testSingleSessionXConsumerTxRepeat() throws Exception { public void testSingleSessionXConsumerTxRepeat() throws Exception {
publishMessages(100); publishMessagesWithOrderProperty(50);
Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection(); Connection connection = getConnectionFactory().createConnection();
connection.start(); connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumeVerifyOrder(session, 20); MessageConsumer messageConsumer = consumeVerifyOrder(session, 20);
messageConsumer.close();
session.rollback(); session.rollback();
consumeVerifyOrder(session, 10); messageConsumer = consumeVerifyOrder(session, 10);
messageConsumer.close();
session.rollback(); session.rollback();
consumeVerifyOrder(session, 5); messageConsumer = consumeVerifyOrder(session, 5);
messageConsumer.close();
session.commit(); session.commit();
connection.close();
}
@Test
public void tesXConsumerTxRepeat() throws Exception {
publishMessagesWithOrderProperty(10);
Connection connection = getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = consumeVerifyOrder(session, 6);
messageConsumer.close();
messageConsumer = consumeVerifyOrder(session, 4, 6);
// rollback before close, so there are two consumers in the mix
session.rollback();
messageConsumer.close();
messageConsumer = consumeVerifyOrder(session, 10);
session.commit();
messageConsumer.close();
connection.close();
}
@Test
public void testSingleTxXConsumerTxRepeat() throws Exception {
publishMessagesWithOrderProperty(10);
Connection connection = getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = consumeVerifyOrder(session, 6);
messageConsumer.close();
messageConsumer = consumeVerifyOrder(session, 4, 6);
messageConsumer.close();
session.rollback();
messageConsumer = consumeVerifyOrder(session, 10);
session.commit();
messageConsumer.close();
connection.close();
} }
private void consumeVerifyOrderRollback(final int num) throws Exception { private void consumeVerifyOrderRollback(final int num) throws Exception {
Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection(); Connection connection = getConnectionFactory().createConnection();
connection.start(); connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumeVerifyOrder(session, num); MessageConsumer messageConsumer = consumeVerifyOrder(session, num);
messageConsumer.close();
session.rollback(); session.rollback();
connection.close(); connection.close();
} }
private void consumeVerifyOrder(Session session, final int num) throws Exception { private MessageConsumer consumeVerifyOrder(Session session, final int num) throws Exception {
return consumeVerifyOrder(session, num, 0);
}
private MessageConsumer consumeVerifyOrder(Session session, final int num, final int base) throws Exception {
MessageConsumer messageConsumer = session.createConsumer(dest); MessageConsumer messageConsumer = session.createConsumer(dest);
for (int i=0; i<num; ) { for (int i=0; i<num; ) {
Message message = messageConsumer.receive(4000); Message message = messageConsumer.receive(4000);
if (message != null) { if (message != null) {
assertEquals(i, message.getIntProperty("Order")); assertEquals(i + base, message.getIntProperty("Order"));
i++; i++;
LOG.debug("Received:" + message.getJMSMessageID() + ", Order: " + message.getIntProperty("Order"));
} }
} }
messageConsumer.close(); return messageConsumer;
} }
private void publishMessages(int num) throws Exception { private void publishMessagesWithOrderProperty(int num) throws Exception {
Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection(); publishMessagesWithOrderProperty(num, 0, dest);
}
private void publishMessagesWithOrderProperty(int num, int seqStart, ActiveMQQueue destination) throws Exception {
Connection connection = getConnectionFactory().createConnection();
connection.start(); connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(dest); MessageProducer messageProducer = session.createProducer(destination);
TextMessage textMessage = session.createTextMessage("A"); TextMessage textMessage = session.createTextMessage("A");
for (int i=0; i<num; i++) { for (int i=0; i<num; i++) {
textMessage.setIntProperty("Order", i); textMessage.setIntProperty("Order", i + seqStart);
messageProducer.send(textMessage); messageProducer.send(textMessage);
} }
} }
@ -130,4 +202,10 @@ public class QueueOrderSingleTransactedConsumerTest {
broker.stop(); broker.stop();
} }
} }
private ActiveMQConnectionFactory getConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
}
} }