diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 9a7feef685..960ac9c234 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -2001,6 +2001,16 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pagedInPendingDispatchLock.writeLock().lock(); try { + if (isPrioritizedMessages() && !dispatchPendingList.isEmpty() && list != null && !list.isEmpty()) { + // merge all to select priority order + for (MessageReference qmr : list) { + if (!dispatchPendingList.contains(qmr)) { + dispatchPendingList.addMessageLast(qmr); + } + } + list = null; + } + doActualDispatch(dispatchPendingList); // and now see if we can dispatch the new stuff.. and append to the pending // list anything that does not actually get dispatched. diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java index cdddd4c86d..385e2b8c8f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java @@ -40,6 +40,8 @@ public class QueueDispatchPendingList implements PendingList { private PendingList pagedInPendingDispatch = new OrderedPendingList(); private PendingList redeliveredWaitingDispatch = new OrderedPendingList(); + // when true use one PrioritizedPendingList for everything + private boolean prioritized = false; @Override @@ -160,6 +162,7 @@ public class QueueDispatchPendingList implements PendingList { } public void setPrioritizedMessages(boolean prioritizedMessages) { + prioritized = prioritizedMessages; if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) { pagedInPendingDispatch = new PrioritizedPendingList(); redeliveredWaitingDispatch = new PrioritizedPendingList(); @@ -170,10 +173,14 @@ public class QueueDispatchPendingList implements PendingList { } public void addMessageForRedelivery(QueueMessageReference qmr) { - redeliveredWaitingDispatch.addMessageLast(qmr); + if (prioritized) { + pagedInPendingDispatch.addMessageLast(qmr); + } else { + redeliveredWaitingDispatch.addMessageLast(qmr); + } } public boolean hasRedeliveries(){ - return !redeliveredWaitingDispatch.isEmpty(); + return prioritized ? !pagedInPendingDispatch.isEmpty() : !redeliveredWaitingDispatch.isEmpty(); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java index 789e45f18a..e7c746cc92 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java @@ -110,6 +110,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { broker.waitUntilStarted(); factory = new ActiveMQConnectionFactory("vm://priorityTest"); + factory.setMessagePrioritySupported(true); ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); prefetch.setAll(prefetchVal); factory.setPrefetchPolicy(prefetch); @@ -668,6 +669,56 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { queueConsumer.close(); } + public void testInterleaveHiNewConsumerGetsHi() throws Exception { + ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST"); + doTestInterleaveHiNewConsumerGetsHi(queue); + } + + public void testInterleaveHiNewConsumerGetsHiPull() throws Exception { + ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST?consumer.prefetchSize=0"); + doTestInterleaveHiNewConsumerGetsHi(queue); + } + + public void doTestInterleaveHiNewConsumerGetsHi(ActiveMQQueue queue) throws Exception { + + // one hi sandwich + ProducerThread producerThread = new ProducerThread(queue, 3, LOW_PRI); + producerThread.run(); + producerThread = new ProducerThread(queue, 1, HIGH_PRI); + producerThread.run(); + producerThread = new ProducerThread(queue, 3, LOW_PRI); + producerThread.run(); + + // consume hi + MessageConsumer queueConsumer = sess.createConsumer(queue); + Message message = queueConsumer.receive(10000); + assertNotNull("expect #", message); + assertEquals("correct priority", HIGH_PRI, message.getJMSPriority()); + queueConsumer.close(); + + // last hi + producerThread = new ProducerThread(queue, 3, LOW_PRI); + producerThread.run(); + producerThread = new ProducerThread(queue, 1, HIGH_PRI); + producerThread.run(); + + // consume hi + queueConsumer = sess.createConsumer(queue); + message = queueConsumer.receive(10000); + assertNotNull("expect #", message); + assertEquals("correct priority", HIGH_PRI, message.getJMSPriority()); + queueConsumer.close(); + + // consume the rest + queueConsumer = sess.createConsumer(queue); + for (int i = 0; i < 9; i++) { + message = queueConsumer.receive(10000); + assertNotNull("expect #" + i, message); + assertEquals("correct priority", LOW_PRI, message.getJMSPriority()); + } + queueConsumer.close(); + } + public void initCombosForTestEveryXHi() { // the cache limits the priority ordering to available memory addCombinationValues("useCache", new Object[] {Boolean.FALSE, Boolean.TRUE});