diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 87cfc2250a..ae510d31c3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -95,6 +95,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { private final LinkedHashMap pagedInMessages = new LinkedHashMap(); // Messages that are paged in but have not yet been targeted at a subscription private List pagedInPendingDispatch = new ArrayList(100); + private List redeliveredWaitingDispatch = new ArrayList(); private MessageGroupMap messageGroupOwners; private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); @@ -377,7 +378,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { wakeup(); } } - + public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) throws Exception { destinationStatistics.getConsumers().decrement(); // synchronize with dispatch method so that no new messages are sent @@ -406,7 +407,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { getMessageGroupOwners().removeConsumer(consumerId); // redeliver inflight messages - List list = new ArrayList(); + for (MessageReference ref : sub.remove(context, this)) { QueueMessageReference qmr = (QueueMessageReference) ref; if (qmr.getLockOwner() == sub) { @@ -416,11 +417,10 @@ public class Queue extends BaseDestination implements Task, UsageListener { qmr.incrementRedeliveryCounter(); } } - list.add(qmr); + redeliveredWaitingDispatch.add(qmr); } - - if (!list.isEmpty()) { - doDispatch(list); + if (!redeliveredWaitingDispatch.isEmpty()) { + doDispatch(new ArrayList()); } } if (!(this.optimizedDispatch || isSlave())) { @@ -1220,7 +1220,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { // Perhaps we should page always into the pagedInPendingDispatch list if // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty() // then we do a dispatch. - if (pageInMoreMessages || pendingBrowserDispatch != null) { + if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) { try { pageInMessages(pendingBrowserDispatch != null); @@ -1494,8 +1494,12 @@ public class Queue extends BaseDestination implements Task, UsageListener { synchronized (dispatchMutex) { synchronized (pagedInPendingDispatch) { + if (!redeliveredWaitingDispatch.isEmpty()) { + // Try first to dispatch redelivered messages to keep an proper order + redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch); + } if (!pagedInPendingDispatch.isEmpty()) { - // Try to first dispatch anything that had not been + // Next dispatch anything that had not been // dispatched before. pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch); } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java b/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java new file mode 100644 index 0000000000..5a33b00e7d --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java @@ -0,0 +1,118 @@ +package org.apache.activemq.bugs; + +import java.util.Date; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.bugs.AMQ1866.ConsumerThread; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class OutOfOrderTestCase extends TestCase { + + private static final Log log = LogFactory.getLog(OutOfOrderTestCase.class); + + public static final String BROKER_URL = "tcp://localhost:61616"; + private static final int PREFETCH = 10; + private static final String CONNECTION_URL = BROKER_URL + "?jms.prefetchPolicy.all=" + PREFETCH; + + public static final String QUEUE_NAME = "QUEUE"; + private static final String DESTINATION = "QUEUE?consumer.exclusive=true"; + + BrokerService brokerService; + Session session; + Connection connection; + + int seq = 0; + + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setUseJmx(true); + brokerService.addConnector(BROKER_URL); + brokerService.deleteAllMessages(); + brokerService.start(); + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL); + connection = connectionFactory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + } + + + protected void tearDown() throws Exception { + session.close(); + connection.close(); + brokerService.stop(); + } + + + + public void testOrder() throws Exception { + + log.info("Producing messages 0-29 . . ."); + Destination destination = session.createQueue(DESTINATION); + final MessageProducer messageProducer = session + .createProducer(destination); + try { + for (int i = 0; i < 30; ++i) { + final Message message = session + .createTextMessage(createMessageText(i)); + message.setStringProperty("JMSXGroupID", "FOO"); + + messageProducer.send(message); + log.info("sent " + toString(message)); + } + } finally { + messageProducer.close(); + } + + log.info("Consuming messages 0-9 . . ."); + consumeBatch(); + + log.info("Consuming messages 10-19 . . ."); + consumeBatch(); + + log.info("Consuming messages 20-29 . . ."); + consumeBatch(); + } + + protected void consumeBatch() throws Exception { + Destination destination = session.createQueue(DESTINATION); + final MessageConsumer messageConsumer = session.createConsumer(destination); + try { + for (int i = 0; i < 10; ++i) { + final Message message = messageConsumer.receive(1000L); + log.info("received " + toString(message)); + assertEquals("Message out of order", createMessageText(seq++), ((TextMessage) message).getText()); + message.acknowledge(); + } + } finally { + messageConsumer.close(); + } + } + + private String toString(final Message message) throws JMSException { + String ret = "received message '" + ((TextMessage) message).getText() + "' - " + message.getJMSMessageID(); + if (message.getJMSRedelivered()) + ret += " (redelivered)"; + return ret; + + } + + private static String createMessageText(final int index) { + return "message #" + index; + } +} \ No newline at end of file