https://issues.apache.org/jira/browse/AMQ-6151 - respect prioritizeMessages for pending and redelivered messages

This commit is contained in:
gtully 2016-02-01 12:19:24 +00:00
parent 186b5d0f30
commit 5af5b59d3b
3 changed files with 70 additions and 2 deletions

View File

@ -2001,6 +2001,16 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
pagedInPendingDispatchLock.writeLock().lock();
try {
if (isPrioritizedMessages() && !dispatchPendingList.isEmpty() && list != null && !list.isEmpty()) {
// merge all to select priority order
for (MessageReference qmr : list) {
if (!dispatchPendingList.contains(qmr)) {
dispatchPendingList.addMessageLast(qmr);
}
}
list = null;
}
doActualDispatch(dispatchPendingList);
// and now see if we can dispatch the new stuff.. and append to the pending
// list anything that does not actually get dispatched.

View File

@ -40,6 +40,8 @@ public class QueueDispatchPendingList implements PendingList {
private PendingList pagedInPendingDispatch = new OrderedPendingList();
private PendingList redeliveredWaitingDispatch = new OrderedPendingList();
// when true use one PrioritizedPendingList for everything
private boolean prioritized = false;
@Override
@ -160,6 +162,7 @@ public class QueueDispatchPendingList implements PendingList {
}
public void setPrioritizedMessages(boolean prioritizedMessages) {
prioritized = prioritizedMessages;
if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
pagedInPendingDispatch = new PrioritizedPendingList();
redeliveredWaitingDispatch = new PrioritizedPendingList();
@ -170,10 +173,14 @@ public class QueueDispatchPendingList implements PendingList {
}
public void addMessageForRedelivery(QueueMessageReference qmr) {
redeliveredWaitingDispatch.addMessageLast(qmr);
if (prioritized) {
pagedInPendingDispatch.addMessageLast(qmr);
} else {
redeliveredWaitingDispatch.addMessageLast(qmr);
}
}
public boolean hasRedeliveries(){
return !redeliveredWaitingDispatch.isEmpty();
return prioritized ? !pagedInPendingDispatch.isEmpty() : !redeliveredWaitingDispatch.isEmpty();
}
}

View File

@ -110,6 +110,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
broker.waitUntilStarted();
factory = new ActiveMQConnectionFactory("vm://priorityTest");
factory.setMessagePrioritySupported(true);
ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
prefetch.setAll(prefetchVal);
factory.setPrefetchPolicy(prefetch);
@ -668,6 +669,56 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
queueConsumer.close();
}
public void testInterleaveHiNewConsumerGetsHi() throws Exception {
ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST");
doTestInterleaveHiNewConsumerGetsHi(queue);
}
public void testInterleaveHiNewConsumerGetsHiPull() throws Exception {
ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST?consumer.prefetchSize=0");
doTestInterleaveHiNewConsumerGetsHi(queue);
}
public void doTestInterleaveHiNewConsumerGetsHi(ActiveMQQueue queue) throws Exception {
// one hi sandwich
ProducerThread producerThread = new ProducerThread(queue, 3, LOW_PRI);
producerThread.run();
producerThread = new ProducerThread(queue, 1, HIGH_PRI);
producerThread.run();
producerThread = new ProducerThread(queue, 3, LOW_PRI);
producerThread.run();
// consume hi
MessageConsumer queueConsumer = sess.createConsumer(queue);
Message message = queueConsumer.receive(10000);
assertNotNull("expect #", message);
assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
queueConsumer.close();
// last hi
producerThread = new ProducerThread(queue, 3, LOW_PRI);
producerThread.run();
producerThread = new ProducerThread(queue, 1, HIGH_PRI);
producerThread.run();
// consume hi
queueConsumer = sess.createConsumer(queue);
message = queueConsumer.receive(10000);
assertNotNull("expect #", message);
assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
queueConsumer.close();
// consume the rest
queueConsumer = sess.createConsumer(queue);
for (int i = 0; i < 9; i++) {
message = queueConsumer.receive(10000);
assertNotNull("expect #" + i, message);
assertEquals("correct priority", LOW_PRI, message.getJMSPriority());
}
queueConsumer.close();
}
public void initCombosForTestEveryXHi() {
// the cache limits the priority ordering to available memory
addCombinationValues("useCache", new Object[] {Boolean.FALSE, Boolean.TRUE});