From e2d743ea8a45701ad46daa6ff0a84f4a545eda44 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 22 Sep 2008 19:30:04 +0000 Subject: [PATCH] Fix for https://issues.apache.org/activemq/browse/AMQ-1947 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@697957 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index f36b296036..2ae01e0ffe 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -18,11 +18,13 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -84,7 +86,7 @@ public class Queue extends BaseDestination implements Task { protected PendingMessageCursor messages; private final LinkedHashMap pagedInMessages = new LinkedHashMap(); // Messages that are paged in but have not yet been targeted at a subscription - private List pagedInPendingDispatch = new ArrayList(100); + private LinkedHashSet pagedInPendingDispatch = new LinkedHashSet(100); private MessageGroupMap messageGroupOwners; private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); @@ -1217,15 +1219,7 @@ public class Queue extends BaseDestination implements Task { // the pending // list anything that does not actually get dispatched. if (list != null && !list.isEmpty()) { - if (pagedInPendingDispatch.isEmpty()) { - pagedInPendingDispatch.addAll(doActualDispatch(list)); - } else { - for (QueueMessageReference qmr : list) { - if (!pagedInPendingDispatch.contains(qmr)) { - pagedInPendingDispatch.add(qmr); - } - } - } + pagedInPendingDispatch.addAll(doActualDispatch(list)); } } } finally { @@ -1237,8 +1231,8 @@ public class Queue extends BaseDestination implements Task { * @return list of messages that could get dispatched to consumers if they * were not full. */ - private List doActualDispatch(List list) throws Exception { - List rc = new ArrayList(list.size()); + private LinkedHashSet doActualDispatch(Collection collection) throws Exception { + LinkedHashSet rc = new LinkedHashSet(collection.size()); Set fullConsumers = new HashSet(this.consumers.size()); List consumers; @@ -1246,7 +1240,7 @@ public class Queue extends BaseDestination implements Task { consumers = new ArrayList(this.consumers); } - for (MessageReference node : list) { + for (MessageReference node : collection) { Subscription target = null; int interestCount=0; for (Subscription s : consumers) {