git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@698573 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-09-24 13:51:18 +00:00
parent 9fe716b99f
commit 63b2c406a8
1 changed files with 16 additions and 8 deletions

View File

@ -18,13 +18,11 @@ package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -86,7 +84,7 @@ public class Queue extends BaseDestination implements Task {
protected PendingMessageCursor messages; protected PendingMessageCursor messages;
private final LinkedHashMap<MessageId,QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId,QueueMessageReference>(); private final LinkedHashMap<MessageId,QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId,QueueMessageReference>();
// Messages that are paged in but have not yet been targeted at a subscription // Messages that are paged in but have not yet been targeted at a subscription
private LinkedHashSet<QueueMessageReference> pagedInPendingDispatch = new LinkedHashSet<QueueMessageReference>(100); private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
private MessageGroupMap messageGroupOwners; private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
@ -968,8 +966,10 @@ public class Queue extends BaseDestination implements Task {
dispatchLock.lock(); dispatchLock.lock();
try { try {
synchronized(pagedInPendingDispatch) { synchronized(pagedInPendingDispatch) {
if (!pagedInPendingDispatch.contains(node)) {
pagedInPendingDispatch.add(node); pagedInPendingDispatch.add(node);
} }
}
} finally { } finally {
dispatchLock.unlock(); dispatchLock.unlock();
} }
@ -1219,7 +1219,15 @@ public class Queue extends BaseDestination implements Task {
// the pending // the pending
// list anything that does not actually get dispatched. // list anything that does not actually get dispatched.
if (list != null && !list.isEmpty()) { if (list != null && !list.isEmpty()) {
if (pagedInPendingDispatch.isEmpty()) {
pagedInPendingDispatch.addAll(doActualDispatch(list)); pagedInPendingDispatch.addAll(doActualDispatch(list));
} else {
for (QueueMessageReference qmr : list) {
if (!pagedInPendingDispatch.contains(qmr)) {
pagedInPendingDispatch.add(qmr);
}
}
}
} }
} }
} finally { } finally {
@ -1231,8 +1239,8 @@ public class Queue extends BaseDestination implements Task {
* @return list of messages that could get dispatched to consumers if they * @return list of messages that could get dispatched to consumers if they
* were not full. * were not full.
*/ */
private LinkedHashSet<QueueMessageReference> doActualDispatch(Collection<QueueMessageReference> collection) throws Exception { private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
LinkedHashSet<QueueMessageReference> rc = new LinkedHashSet<QueueMessageReference>(collection.size()); List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size()); Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
List<Subscription> consumers; List<Subscription> consumers;
@ -1240,7 +1248,7 @@ public class Queue extends BaseDestination implements Task {
consumers = new ArrayList<Subscription>(this.consumers); consumers = new ArrayList<Subscription>(this.consumers);
} }
for (MessageReference node : collection) { for (MessageReference node : list) {
Subscription target = null; Subscription target = null;
int interestCount=0; int interestCount=0;
for (Subscription s : consumers) { for (Subscription s : consumers) {