https://issues.apache.org/jira/browse/AMQ-3596 - FilePendingMessageCursor memory list does not respect priority for non persistent messages. Fix with test. Reuse pendinglist from vm cursor for the file pending message cursor in memory list

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1202153 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-11-15 12:21:20 +00:00
parent 1462fd234c
commit 3557361e86
2 changed files with 20 additions and 15 deletions

View File

@ -53,7 +53,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
protected Broker broker;
private final PListStore store;
private final String name;
private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>();
private PendingList memoryList;
private PList diskList;
private Iterator<MessageReference> iter;
private Destination regionDestination;
@ -68,6 +68,11 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
*/
public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) {
super(prioritizedMessages);
if (this.prioritizedMessages) {
this.memoryList = new PrioritizedPendingList();
} else {
this.memoryList = new OrderedPendingList();
}
this.broker = broker;
// the store can be null if the BrokerService has persistence
// turned off
@ -204,7 +209,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
regionDestination = node.getMessage().getRegionDestination();
if (isDiskListEmpty()) {
if (hasSpace() || this.store == null) {
memoryList.add(node);
memoryList.addMessageLast(node);
node.incrementReferenceCount();
setCacheEnabled(true);
return true;
@ -214,7 +219,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
if (isDiskListEmpty()) {
expireOldMessages();
if (hasSpace()) {
memoryList.add(node);
memoryList.addMessageLast(node);
node.incrementReferenceCount();
return true;
} else {
@ -252,7 +257,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
regionDestination = node.getMessage().getRegionDestination();
if (isDiskListEmpty()) {
if (hasSpace()) {
memoryList.addFirst(node);
memoryList.addMessageFirst(node);
node.incrementReferenceCount();
setCacheEnabled(true);
return;
@ -262,7 +267,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
if (isDiskListEmpty()) {
expireOldMessages();
if (hasSpace()) {
memoryList.addFirst(node);
memoryList.addMessageFirst(node);
node.incrementReferenceCount();
return;
} else {
@ -325,7 +330,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
*/
@Override
public synchronized void remove(MessageReference node) {
if (memoryList.remove(node)) {
if (memoryList.remove(node) != null) {
node.decrementReferenceCount();
}
if (!isDiskListEmpty()) {
@ -406,19 +411,15 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
protected synchronized void expireOldMessages() {
if (!memoryList.isEmpty()) {
LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
this.memoryList = new LinkedList<MessageReference>();
while (!tmpList.isEmpty()) {
MessageReference node = tmpList.removeFirst();
for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
MessageReference node = iterator.next();
if (node.isExpired()) {
node.decrementReferenceCount();
discardExpiredMessage(node);
} else {
memoryList.add(node);
iterator.remove();
}
}
}
}
protected synchronized void flushToDisk() {
@ -428,8 +429,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
start = System.currentTimeMillis();
LOG.trace("" + name + ", flushToDisk() mem list size: " +memoryList.size() + " " + (systemUsage != null ? systemUsage.getMemoryUsage() : "") );
}
while (!memoryList.isEmpty()) {
MessageReference node = memoryList.removeFirst();
for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
MessageReference node = iterator.next();
node.decrementReferenceCount();
ByteSequence bs;
try {

View File

@ -18,6 +18,7 @@
package org.apache.activemq.store;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -53,6 +54,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
protected Session sess;
public boolean useCache = true;
public int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
public boolean dispatchAsync = true;
public boolean prioritizeMessages = true;
public boolean immediatePriorityDispatch = true;
@ -150,6 +152,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
try {
MessageProducer producer = sess.createProducer(dest);
producer.setPriority(priority);
producer.setDeliveryMode(deliveryMode);
for (int i = 0; i < messageCount; i++) {
producer.send(sess.createTextMessage("message priority: " + priority));
}
@ -170,6 +173,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
public void initCombosForTestQueues() {
addCombinationValues("useCache", new Object[] {new Boolean(true), new Boolean(false)});
addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT), new Integer(DeliveryMode.PERSISTENT)});
}
public void testQueues() throws Exception {