diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index b6af75c4f9..2f3d8bd412 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -109,7 +109,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock(); protected PendingMessageCursor messages; private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock(); - private final LinkedHashMap pagedInMessages = new LinkedHashMap(); + private final PendingList pagedInMessages = new OrderedPendingList(); // Messages that are paged in but have not yet been targeted at a subscription private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock(); protected PendingList pagedInPendingDispatch = new OrderedPendingList(); @@ -1188,44 +1188,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { pageInMessages(!memoryUsage.isFull(110)); }; - List toExpire = new ArrayList(); - - pagedInPendingDispatchLock.writeLock().lock(); - try { - addAll(pagedInPendingDispatch.values(), browseList, max, toExpire); - for (MessageReference ref : toExpire) { - pagedInPendingDispatch.remove(ref); - if (broker.isExpired(ref)) { - LOG.debug("expiring from pagedInPending: {}", ref); - messageExpired(connectionContext, ref); - } else { - ref.decrementReferenceCount(); - } - } - } finally { - pagedInPendingDispatchLock.writeLock().unlock(); - } - toExpire.clear(); - pagedInMessagesLock.readLock().lock(); - try { - addAll(pagedInMessages.values(), browseList, max, toExpire); - } finally { - pagedInMessagesLock.readLock().unlock(); - } - for (MessageReference ref : toExpire) { - if (broker.isExpired(ref)) { - LOG.debug("expiring from pagedInMessages: {}", ref); - messageExpired(connectionContext, ref); - } else { - pagedInMessagesLock.writeLock().lock(); - try { - pagedInMessages.remove(ref.getMessageId()); - } finally { - pagedInMessagesLock.writeLock().unlock(); - } - ref.decrementReferenceCount(); - } - } + doBrowseList(browseList, max, pagedInPendingDispatch, pagedInPendingDispatchLock, connectionContext, "pagedInPendingDispatch"); + doBrowseList(browseList, max, pagedInMessages, pagedInMessagesLock, connectionContext, "pagedInMessages"); // we need a store iterator to walk messages on disk, independent of the cursor which is tracking // the next message batch @@ -1234,6 +1198,30 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } + protected void doBrowseList(List browseList, int max, PendingList list, ReentrantReadWriteLock lock, ConnectionContext connectionContext, String name) throws Exception { + List toExpire = new ArrayList(); + lock.readLock().lock(); + try { + addAll(list.values(), browseList, max, toExpire); + } finally { + lock.readLock().unlock(); + } + for (MessageReference ref : toExpire) { + if (broker.isExpired(ref)) { + LOG.debug("expiring from {}: {}", name, ref); + messageExpired(connectionContext, ref); + } else { + lock.writeLock().lock(); + try { + list.remove(ref); + } finally { + lock.writeLock().unlock(); + } + ref.decrementReferenceCount(); + } + } + } + private boolean shouldPageInMoreForBrowse(int max) { int alreadyPagedIn = 0; pagedInMessagesLock.readLock().lock(); @@ -1264,7 +1252,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { MessageId msgId = new MessageId(id); pagedInMessagesLock.readLock().lock(); try { - QueueMessageReference ref = this.pagedInMessages.get(msgId); + QueueMessageReference ref = (QueueMessageReference)this.pagedInMessages.get(msgId); if (ref != null) { return ref; } @@ -1535,7 +1523,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { int movedCounter = 0; - Set set = new LinkedHashSet(); + Set set = new LinkedHashSet(); do { doPageIn(true); pagedInMessagesLock.readLock().lock(); @@ -1544,11 +1532,11 @@ public class Queue extends BaseDestination implements Task, UsageListener { } finally { pagedInMessagesLock.readLock().unlock(); } - List list = new ArrayList(set); - for (QueueMessageReference ref : list) { + List list = new ArrayList(set); + for (MessageReference ref : list) { if (filter.evaluate(context, ref)) { // We should only move messages that can be locked. - moveMessageTo(context, ref, dest); + moveMessageTo(context, (QueueMessageReference)ref, dest); set.remove(ref); if (++movedCounter >= maximumMessages && maximumMessages > 0) { return movedCounter; @@ -1564,7 +1552,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { throw new Exception("Retry of message is only possible on Dead Letter Queues!"); } int restoredCounter = 0; - Set set = new LinkedHashSet(); + Set set = new LinkedHashSet(); do { doPageIn(true); pagedInMessagesLock.readLock().lock(); @@ -1573,11 +1561,11 @@ public class Queue extends BaseDestination implements Task, UsageListener { } finally { pagedInMessagesLock.readLock().unlock(); } - List list = new ArrayList(set); - for (QueueMessageReference ref : list) { + List list = new ArrayList(set); + for (MessageReference ref : list) { if (ref.getMessage().getOriginalDestination() != null) { - moveMessageTo(context, ref, ref.getMessage().getOriginalDestination()); + moveMessageTo(context, (QueueMessageReference)ref, ref.getMessage().getOriginalDestination()); set.remove(ref); if (++restoredCounter >= maximumMessages && maximumMessages > 0) { return restoredCounter; @@ -1672,10 +1660,10 @@ public class Queue extends BaseDestination implements Task, UsageListener { } if (hasBrowsers) { - ArrayList alreadyDispatchedMessages = null; + ArrayList alreadyDispatchedMessages = null; pagedInMessagesLock.readLock().lock(); try{ - alreadyDispatchedMessages = new ArrayList(pagedInMessages.values()); + alreadyDispatchedMessages = new ArrayList(pagedInMessages.values()); }finally { pagedInMessagesLock.readLock().unlock(); } @@ -1691,8 +1679,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size()); boolean added = false; - for (QueueMessageReference node : alreadyDispatchedMessages) { - if (!node.isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) { + for (MessageReference node : alreadyDispatchedMessages) { + if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) { msgContext.setMessageReference(node); if (browser.matches(node, msgContext)) { browser.add(node); @@ -1830,7 +1818,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { destinationStatistics.getMessages().decrement(); pagedInMessagesLock.writeLock().lock(); try { - pagedInMessages.remove(reference.getMessageId()); + pagedInMessages.remove(reference); } finally { pagedInMessagesLock.writeLock().unlock(); } @@ -1996,8 +1984,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { resultList = new OrderedPendingList(); } for (QueueMessageReference ref : result) { - if (!pagedInMessages.containsKey(ref.getMessageId())) { - pagedInMessages.put(ref.getMessageId(), ref); + if (!pagedInMessages.contains(ref)) { + pagedInMessages.addMessageLast(ref); resultList.addMessageLast(ref); } else { ref.decrementReferenceCount(); @@ -2260,7 +2248,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (message == null) { pagedInMessagesLock.readLock().lock(); try { - message = pagedInMessages.get(messageId); + message = (QueueMessageReference)pagedInMessages.get(messageId); } finally { pagedInMessagesLock.readLock().unlock(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java index 33062e7d7f..9bf9588e90 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java @@ -164,4 +164,13 @@ public class OrderedPendingList implements PendingList { } } } + + @Override + public MessageReference get(MessageId messageId) { + PendingNode node = map.get(messageId); + if (node != null) { + return node.getMessage(); + } + return null; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java index a44d80e6f9..153d8bd0b8 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Iterator; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.MessageId; public interface PendingList extends Iterable { @@ -108,4 +109,6 @@ public interface PendingList extends Iterable { * The PendingList that is to be added to this collection. */ public void addAll(PendingList pendingList); + + public MessageReference get(MessageId messageId); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java index 0772b201b0..9235b2ce26 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java @@ -156,4 +156,13 @@ public class PrioritizedPendingList implements PendingList { } } + @Override + public MessageReference get(MessageId messageId) { + PendingNode node = map.get(messageId); + if (node != null) { + return node.getMessage(); + } + return null; + } + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java index 05308deb7e..79d7e6c36e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java @@ -329,6 +329,16 @@ public class OrderPendingListTest { theList.add(messageReference); } } + + @Override + public MessageReference get(MessageId messageId) { + for(MessageReference messageReference : theList) { + if (messageReference.getMessageId().equals(messageId)) { + return messageReference; + } + } + return null; + } } static class TestMessageReference implements MessageReference {