mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-6151 - respect prioritizeMessages for pending and redelivered messages
(cherry picked from commit 5af5b59d3b
)
This commit is contained in:
parent
aaa2fdd541
commit
ce604fba78
|
@ -2000,6 +2000,16 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
|
|
||||||
pagedInPendingDispatchLock.writeLock().lock();
|
pagedInPendingDispatchLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
|
if (isPrioritizedMessages() && !dispatchPendingList.isEmpty() && list != null && !list.isEmpty()) {
|
||||||
|
// merge all to select priority order
|
||||||
|
for (MessageReference qmr : list) {
|
||||||
|
if (!dispatchPendingList.contains(qmr)) {
|
||||||
|
dispatchPendingList.addMessageLast(qmr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
list = null;
|
||||||
|
}
|
||||||
|
|
||||||
doActualDispatch(dispatchPendingList);
|
doActualDispatch(dispatchPendingList);
|
||||||
// and now see if we can dispatch the new stuff.. and append to the pending
|
// and now see if we can dispatch the new stuff.. and append to the pending
|
||||||
// list anything that does not actually get dispatched.
|
// list anything that does not actually get dispatched.
|
||||||
|
|
|
@ -40,6 +40,8 @@ public class QueueDispatchPendingList implements PendingList {
|
||||||
|
|
||||||
private PendingList pagedInPendingDispatch = new OrderedPendingList();
|
private PendingList pagedInPendingDispatch = new OrderedPendingList();
|
||||||
private PendingList redeliveredWaitingDispatch = new OrderedPendingList();
|
private PendingList redeliveredWaitingDispatch = new OrderedPendingList();
|
||||||
|
// when true use one PrioritizedPendingList for everything
|
||||||
|
private boolean prioritized = false;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -160,6 +162,7 @@ public class QueueDispatchPendingList implements PendingList {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setPrioritizedMessages(boolean prioritizedMessages) {
|
public void setPrioritizedMessages(boolean prioritizedMessages) {
|
||||||
|
prioritized = prioritizedMessages;
|
||||||
if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
|
if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) {
|
||||||
pagedInPendingDispatch = new PrioritizedPendingList();
|
pagedInPendingDispatch = new PrioritizedPendingList();
|
||||||
redeliveredWaitingDispatch = new PrioritizedPendingList();
|
redeliveredWaitingDispatch = new PrioritizedPendingList();
|
||||||
|
@ -170,10 +173,14 @@ public class QueueDispatchPendingList implements PendingList {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addMessageForRedelivery(QueueMessageReference qmr) {
|
public void addMessageForRedelivery(QueueMessageReference qmr) {
|
||||||
|
if (prioritized) {
|
||||||
|
pagedInPendingDispatch.addMessageLast(qmr);
|
||||||
|
} else {
|
||||||
redeliveredWaitingDispatch.addMessageLast(qmr);
|
redeliveredWaitingDispatch.addMessageLast(qmr);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public boolean hasRedeliveries(){
|
public boolean hasRedeliveries(){
|
||||||
return !redeliveredWaitingDispatch.isEmpty();
|
return prioritized ? !pagedInPendingDispatch.isEmpty() : !redeliveredWaitingDispatch.isEmpty();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,6 +110,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
|
||||||
broker.waitUntilStarted();
|
broker.waitUntilStarted();
|
||||||
|
|
||||||
factory = new ActiveMQConnectionFactory("vm://priorityTest");
|
factory = new ActiveMQConnectionFactory("vm://priorityTest");
|
||||||
|
factory.setMessagePrioritySupported(true);
|
||||||
ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
|
ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
|
||||||
prefetch.setAll(prefetchVal);
|
prefetch.setAll(prefetchVal);
|
||||||
factory.setPrefetchPolicy(prefetch);
|
factory.setPrefetchPolicy(prefetch);
|
||||||
|
@ -668,6 +669,56 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
|
||||||
queueConsumer.close();
|
queueConsumer.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testInterleaveHiNewConsumerGetsHi() throws Exception {
|
||||||
|
ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST");
|
||||||
|
doTestInterleaveHiNewConsumerGetsHi(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testInterleaveHiNewConsumerGetsHiPull() throws Exception {
|
||||||
|
ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST?consumer.prefetchSize=0");
|
||||||
|
doTestInterleaveHiNewConsumerGetsHi(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doTestInterleaveHiNewConsumerGetsHi(ActiveMQQueue queue) throws Exception {
|
||||||
|
|
||||||
|
// one hi sandwich
|
||||||
|
ProducerThread producerThread = new ProducerThread(queue, 3, LOW_PRI);
|
||||||
|
producerThread.run();
|
||||||
|
producerThread = new ProducerThread(queue, 1, HIGH_PRI);
|
||||||
|
producerThread.run();
|
||||||
|
producerThread = new ProducerThread(queue, 3, LOW_PRI);
|
||||||
|
producerThread.run();
|
||||||
|
|
||||||
|
// consume hi
|
||||||
|
MessageConsumer queueConsumer = sess.createConsumer(queue);
|
||||||
|
Message message = queueConsumer.receive(10000);
|
||||||
|
assertNotNull("expect #", message);
|
||||||
|
assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
|
||||||
|
queueConsumer.close();
|
||||||
|
|
||||||
|
// last hi
|
||||||
|
producerThread = new ProducerThread(queue, 3, LOW_PRI);
|
||||||
|
producerThread.run();
|
||||||
|
producerThread = new ProducerThread(queue, 1, HIGH_PRI);
|
||||||
|
producerThread.run();
|
||||||
|
|
||||||
|
// consume hi
|
||||||
|
queueConsumer = sess.createConsumer(queue);
|
||||||
|
message = queueConsumer.receive(10000);
|
||||||
|
assertNotNull("expect #", message);
|
||||||
|
assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
|
||||||
|
queueConsumer.close();
|
||||||
|
|
||||||
|
// consume the rest
|
||||||
|
queueConsumer = sess.createConsumer(queue);
|
||||||
|
for (int i = 0; i < 9; i++) {
|
||||||
|
message = queueConsumer.receive(10000);
|
||||||
|
assertNotNull("expect #" + i, message);
|
||||||
|
assertEquals("correct priority", LOW_PRI, message.getJMSPriority());
|
||||||
|
}
|
||||||
|
queueConsumer.close();
|
||||||
|
}
|
||||||
|
|
||||||
public void initCombosForTestEveryXHi() {
|
public void initCombosForTestEveryXHi() {
|
||||||
// the cache limits the priority ordering to available memory
|
// the cache limits the priority ordering to available memory
|
||||||
addCombinationValues("useCache", new Object[] {Boolean.FALSE, Boolean.TRUE});
|
addCombinationValues("useCache", new Object[] {Boolean.FALSE, Boolean.TRUE});
|
||||||
|
|
Loading…
Reference in New Issue