diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 2ae01e0ffe..a8be66190c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -18,13 +18,11 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -86,7 +84,7 @@ public class Queue extends BaseDestination implements Task { protected PendingMessageCursor messages; private final LinkedHashMap pagedInMessages = new LinkedHashMap(); // Messages that are paged in but have not yet been targeted at a subscription - private LinkedHashSet pagedInPendingDispatch = new LinkedHashSet(100); + private List pagedInPendingDispatch = new ArrayList(100); private MessageGroupMap messageGroupOwners; private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); @@ -968,7 +966,9 @@ public class Queue extends BaseDestination implements Task { dispatchLock.lock(); try { synchronized(pagedInPendingDispatch) { - pagedInPendingDispatch.add(node); + if (!pagedInPendingDispatch.contains(node)) { + pagedInPendingDispatch.add(node); + } } } finally { dispatchLock.unlock(); @@ -1219,7 +1219,15 @@ public class Queue extends BaseDestination implements Task { // the pending // list anything that does not actually get dispatched. if (list != null && !list.isEmpty()) { - pagedInPendingDispatch.addAll(doActualDispatch(list)); + if (pagedInPendingDispatch.isEmpty()) { + pagedInPendingDispatch.addAll(doActualDispatch(list)); + } else { + for (QueueMessageReference qmr : list) { + if (!pagedInPendingDispatch.contains(qmr)) { + pagedInPendingDispatch.add(qmr); + } + } + } } } } finally { @@ -1231,8 +1239,8 @@ public class Queue extends BaseDestination implements Task { * @return list of messages that could get dispatched to consumers if they * were not full. */ - private LinkedHashSet doActualDispatch(Collection collection) throws Exception { - LinkedHashSet rc = new LinkedHashSet(collection.size()); + private List doActualDispatch(List list) throws Exception { + List rc = new ArrayList(list.size()); Set fullConsumers = new HashSet(this.consumers.size()); List consumers; @@ -1240,7 +1248,7 @@ public class Queue extends BaseDestination implements Task { consumers = new ArrayList(this.consumers); } - for (MessageReference node : collection) { + for (MessageReference node : list) { Subscription target = null; int interestCount=0; for (Subscription s : consumers) {