mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@697957 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4d0e57261c
commit
e2d743ea8a
|
@ -18,11 +18,13 @@ package org.apache.activemq.broker.region;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -84,7 +86,7 @@ public class Queue extends BaseDestination implements Task {
|
||||||
protected PendingMessageCursor messages;
|
protected PendingMessageCursor messages;
|
||||||
private final LinkedHashMap<MessageId,QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId,QueueMessageReference>();
|
private final LinkedHashMap<MessageId,QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId,QueueMessageReference>();
|
||||||
// Messages that are paged in but have not yet been targeted at a subscription
|
// Messages that are paged in but have not yet been targeted at a subscription
|
||||||
private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
|
private LinkedHashSet<QueueMessageReference> pagedInPendingDispatch = new LinkedHashSet<QueueMessageReference>(100);
|
||||||
private MessageGroupMap messageGroupOwners;
|
private MessageGroupMap messageGroupOwners;
|
||||||
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
|
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
|
||||||
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
|
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
|
||||||
|
@ -1217,15 +1219,7 @@ public class Queue extends BaseDestination implements Task {
|
||||||
// the pending
|
// the pending
|
||||||
// list anything that does not actually get dispatched.
|
// list anything that does not actually get dispatched.
|
||||||
if (list != null && !list.isEmpty()) {
|
if (list != null && !list.isEmpty()) {
|
||||||
if (pagedInPendingDispatch.isEmpty()) {
|
|
||||||
pagedInPendingDispatch.addAll(doActualDispatch(list));
|
pagedInPendingDispatch.addAll(doActualDispatch(list));
|
||||||
} else {
|
|
||||||
for (QueueMessageReference qmr : list) {
|
|
||||||
if (!pagedInPendingDispatch.contains(qmr)) {
|
|
||||||
pagedInPendingDispatch.add(qmr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1237,8 +1231,8 @@ public class Queue extends BaseDestination implements Task {
|
||||||
* @return list of messages that could get dispatched to consumers if they
|
* @return list of messages that could get dispatched to consumers if they
|
||||||
* were not full.
|
* were not full.
|
||||||
*/
|
*/
|
||||||
private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
|
private LinkedHashSet<QueueMessageReference> doActualDispatch(Collection<QueueMessageReference> collection) throws Exception {
|
||||||
List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
|
LinkedHashSet<QueueMessageReference> rc = new LinkedHashSet<QueueMessageReference>(collection.size());
|
||||||
Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
|
Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
|
||||||
List<Subscription> consumers;
|
List<Subscription> consumers;
|
||||||
|
|
||||||
|
@ -1246,7 +1240,7 @@ public class Queue extends BaseDestination implements Task {
|
||||||
consumers = new ArrayList<Subscription>(this.consumers);
|
consumers = new ArrayList<Subscription>(this.consumers);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (MessageReference node : list) {
|
for (MessageReference node : collection) {
|
||||||
Subscription target = null;
|
Subscription target = null;
|
||||||
int interestCount=0;
|
int interestCount=0;
|
||||||
for (Subscription s : consumers) {
|
for (Subscription s : consumers) {
|
||||||
|
|
Loading…
Reference in New Issue