diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 0545996b02..9458558720 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -53,7 +53,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple protected Broker broker; private final PListStore store; private final String name; - private LinkedList memoryList = new LinkedList(); + private PendingList memoryList; private PList diskList; private Iterator iter; private Destination regionDestination; @@ -68,6 +68,11 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple */ public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) { super(prioritizedMessages); + if (this.prioritizedMessages) { + this.memoryList = new PrioritizedPendingList(); + } else { + this.memoryList = new OrderedPendingList(); + } this.broker = broker; // the store can be null if the BrokerService has persistence // turned off @@ -204,7 +209,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple regionDestination = node.getMessage().getRegionDestination(); if (isDiskListEmpty()) { if (hasSpace() || this.store == null) { - memoryList.add(node); + memoryList.addMessageLast(node); node.incrementReferenceCount(); setCacheEnabled(true); return true; @@ -214,7 +219,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple if (isDiskListEmpty()) { expireOldMessages(); if (hasSpace()) { - memoryList.add(node); + memoryList.addMessageLast(node); node.incrementReferenceCount(); return true; } else { @@ -252,7 +257,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple regionDestination = node.getMessage().getRegionDestination(); if (isDiskListEmpty()) { if (hasSpace()) { - memoryList.addFirst(node); + memoryList.addMessageFirst(node); node.incrementReferenceCount(); setCacheEnabled(true); return; @@ -262,7 +267,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple if (isDiskListEmpty()) { expireOldMessages(); if (hasSpace()) { - memoryList.addFirst(node); + memoryList.addMessageFirst(node); node.incrementReferenceCount(); return; } else { @@ -325,7 +330,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple */ @Override public synchronized void remove(MessageReference node) { - if (memoryList.remove(node)) { + if (memoryList.remove(node) != null) { node.decrementReferenceCount(); } if (!isDiskListEmpty()) { @@ -406,19 +411,15 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple protected synchronized void expireOldMessages() { if (!memoryList.isEmpty()) { - LinkedList tmpList = new LinkedList(this.memoryList); - this.memoryList = new LinkedList(); - while (!tmpList.isEmpty()) { - MessageReference node = tmpList.removeFirst(); + for (Iterator iterator = memoryList.iterator(); iterator.hasNext();) { + MessageReference node = iterator.next(); if (node.isExpired()) { node.decrementReferenceCount(); discardExpiredMessage(node); - } else { - memoryList.add(node); + iterator.remove(); } } } - } protected synchronized void flushToDisk() { @@ -428,8 +429,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple start = System.currentTimeMillis(); LOG.trace("" + name + ", flushToDisk() mem list size: " +memoryList.size() + " " + (systemUsage != null ? systemUsage.getMemoryUsage() : "") ); } - while (!memoryList.isEmpty()) { - MessageReference node = memoryList.removeFirst(); + for (Iterator iterator = memoryList.iterator(); iterator.hasNext();) { + MessageReference node = iterator.next(); node.decrementReferenceCount(); ByteSequence bs; try { diff --git a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java index 7f4457599c..7111aca7d2 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.store; import javax.jms.Connection; +import javax.jms.DeliveryMode; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -53,6 +54,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { protected Session sess; public boolean useCache = true; + public int deliveryMode = Message.DEFAULT_DELIVERY_MODE; public boolean dispatchAsync = true; public boolean prioritizeMessages = true; public boolean immediatePriorityDispatch = true; @@ -150,6 +152,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { try { MessageProducer producer = sess.createProducer(dest); producer.setPriority(priority); + producer.setDeliveryMode(deliveryMode); for (int i = 0; i < messageCount; i++) { producer.send(sess.createTextMessage("message priority: " + priority)); } @@ -170,6 +173,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport { public void initCombosForTestQueues() { addCombinationValues("useCache", new Object[] {new Boolean(true), new Boolean(false)}); + addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT), new Integer(DeliveryMode.PERSISTENT)}); } public void testQueues() throws Exception {