diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 826ddbb156..83987ff993 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -37,7 +37,7 @@ public abstract class BaseDestination implements Destination { * The default number of messages to page in to the destination * from persistent storage */ - public static final int DEFAULT_PAGE_SIZE=100; + public static final int DEFAULT_PAGE_SIZE=200; protected final ActiveMQDestination destination; protected final Broker broker; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index acc64f7969..2f030cf695 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -378,9 +378,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } if (callDispatchMatched && destination != null) { - if (destination.isLazyDispatch()) { +// if (destination.isLazyDispatch()) { destination.wakeup(); - } +// } dispatchPending(); } else { if (isSlave()) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index d6234f15fb..3453b48da8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -81,6 +81,8 @@ public class Queue extends BaseDestination implements Task { protected final List consumers = new ArrayList(50); protected PendingMessageCursor messages; private final LinkedHashMap pagedInMessages = new LinkedHashMap(); + // Messages that are paged in but have not yet been targeted at a subscription + private List pagedInPendingDispatch = new ArrayList(100); private MessageGroupMap messageGroupOwners; private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); @@ -317,6 +319,7 @@ public class Queue extends BaseDestination implements Task { } public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { +// System.out.println(getName()+" send "+message.getMessageId()); final ConnectionContext context = producerExchange.getConnectionContext(); // There is delay between the client sending it and it arriving at the // destination.. it may have expired. @@ -946,6 +949,18 @@ public class Queue extends BaseDestination implements Task { result = !messages.isEmpty(); } + // Kinda ugly.. but I think dispatchLock is the only mutex protecting the + // pagedInPendingDispatch variable. + dispatchLock.lock(); + try { + result |= !pagedInPendingDispatch.isEmpty(); + } finally { + dispatchLock.unlock(); + } + + // Perhaps we should page always into the pagedInPendingDispatch list is + // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty() + // then we do a dispatch. if (result) { try { pageInMessages(false); @@ -1134,58 +1149,76 @@ public class Queue extends BaseDestination implements Task { } private void doDispatch(List list) throws Exception { - if (list != null) { - List consumers; - dispatchLock.lock(); - try { + dispatchLock.lock(); + try { + if(!pagedInPendingDispatch.isEmpty()) { +// System.out.println(getName()+": dispatching from pending: "+pagedInPendingDispatch.size()); + // Try to first dispatch anything that had not been dispatched before. + pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch); +// System.out.println(getName()+": new pending list1: "+pagedInPendingDispatch.size()); + } + // and now see if we can dispatch the new stuff.. and append to the pending + // list anything that does not actually get dispatched. + if (list != null && !list.isEmpty()) { +// System.out.println(getName()+": dispatching from paged in: "+list.size()); + pagedInPendingDispatch.addAll(doActualDispatch(list)); +// System.out.println(getName()+": new pending list2: "+pagedInPendingDispatch.size()); + } + } finally { + dispatchLock.unlock(); + } + } + + /** + * @return list of messages that could get dispatched to consumers if they were not full. + */ + private List doActualDispatch(List list) throws Exception { + List rc = new ArrayList(list.size()); + List consumers; + + synchronized (this.consumers) { + consumers = new ArrayList(this.consumers); + } + + for (MessageReference node : list) { + Subscription target = null; + int interestCount=0; + for (Subscription s : consumers) { + if (dispatchSelector.canSelect(s, node)) { + if (!s.isFull()) { + // Dispatch it. + s.add(node); +// System.out.println(getName()+" Dispatched to "+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId()); + target = s; + break; + } + interestCount++; + } + } + + if (target == null && interestCount>0) { + // This means all subs were full... + rc.add((QueueMessageReference)node); + } + + // If it got dispatched, rotate the consumer list to get round robin distribution. + if (target != null && !strictOrderDispatch && consumers.size() > 1 && + !dispatchSelector.isExclusiveConsumer(target)) { synchronized (this.consumers) { - consumers = new ArrayList(this.consumers); - } - - - for (MessageReference node : list) { - Subscription target = null; - List targets = null; - for (Subscription s : consumers) { - if (dispatchSelector.canSelect(s, node)) { - if (!s.isFull()) { - s.add(node); - target = s; - break; - } else { - if (targets == null) { - targets = new ArrayList(); - } - targets.add(s); - } - } - } - if (target == null && targets != null) { - // pick the least loaded to add the message too - for (Subscription s : targets) { - if (target == null - || target.getPendingQueueSize() > s.getPendingQueueSize()) { - target = s; - } - } - if (target != null) { - target.add(node); - } - } - if (target != null && !strictOrderDispatch && consumers.size() > 1 && - !dispatchSelector.isExclusiveConsumer(target)) { - synchronized (this.consumers) { - if( removeFromConsumerList(target) ) { - addToConsumerList(target); - consumers = new ArrayList(this.consumers); - } - } + if( removeFromConsumerList(target) ) { + addToConsumerList(target); + consumers = new ArrayList(this.consumers); } } - } finally { - dispatchLock.unlock(); } } + + //LOG.info(getName()+" Pending messages:"); + //for (MessageReference n : rc) { + // LOG.info(getName()+" - " + n.getMessageId()); + // } + + return rc; } private void pageInMessages() throws Exception {