diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 760a9690a5..66aaeaefef 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -128,10 +128,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription { public void add(MessageReference node) throws Exception { synchronized (pendingLock) { enqueueCounter++; - pending.addMessageLast(node); - dispatchPending(); + pending.addMessageLast(node); } - + dispatchPending(); } public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index fa4c943800..f7903350bc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -83,6 +84,7 @@ public class Queue extends BaseDestination implements Task { private final Object sendLock = new Object(); private final TaskRunner taskRunner; private final LinkedList messagesWaitingForSpace = new LinkedList(); + private final ReentrantLock dispatchLock = new ReentrantLock(); private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() { public void run() { wakeup(); @@ -98,7 +100,6 @@ public class Queue extends BaseDestination implements Task { } else { this.messages = new StoreQueueCursor(broker,this); } - this.taskRunner = taskFactory.createTaskRunner(this, "Queue " + destination.getPhysicalName()); this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName()); } @@ -172,61 +173,67 @@ public class Queue extends BaseDestination implements Task { return true; } - public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { - sub.add(context, this); - destinationStatistics.getConsumers().increment(); - MessageEvaluationContext msgContext = new MessageEvaluationContext(); + public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { + dispatchLock.lock(); + try { + sub.add(context, this); + destinationStatistics.getConsumers().increment(); + MessageEvaluationContext msgContext = new MessageEvaluationContext(); - // needs to be synchronized - so no contention with dispatching - synchronized (consumers) { - consumers.add(sub); - if (sub.getConsumerInfo().isExclusive()) { - LockOwner owner = (LockOwner) sub; - if (exclusiveOwner == null) { - exclusiveOwner = owner; - } else { - // switch the owner if the priority is higher. - if (owner.getLockPriority() > exclusiveOwner - .getLockPriority()) { + // needs to be synchronized - so no contention with dispatching + synchronized (consumers) { + consumers.add(sub); + if (sub.getConsumerInfo().isExclusive()) { + LockOwner owner = (LockOwner) sub; + if (exclusiveOwner == null) { exclusiveOwner = owner; + } else { + // switch the owner if the priority is higher. + if (owner.getLockPriority() > exclusiveOwner + .getLockPriority()) { + exclusiveOwner = owner; + } } } } - } - // we hold the lock on the dispatchValue - so lets build the paged in - // list directly; - buildList(false); + // we hold the lock on the dispatchValue - so lets build the paged + // in + // list directly; + doPageIn(false); - // synchronize with dispatch method so that no new messages are sent - // while - // setting up a subscription. avoid out of order messages, - // duplicates - // etc. + // synchronize with dispatch method so that no new messages are sent + // while + // setting up a subscription. avoid out of order messages, + // duplicates + // etc. - msgContext.setDestination(destination); - synchronized (pagedInMessages) { - // Add all the matching messages in the queue to the - // subscription. - for (Iterator i = pagedInMessages.values().iterator(); i - .hasNext();) { - QueueMessageReference node = (QueueMessageReference) i.next(); - if (node.isDropped() - || (!sub.getConsumerInfo().isBrowser() && node - .getLockOwner() != null)) { - continue; - } - try { - msgContext.setMessageReference(node); - if (sub.matches(node, msgContext)) { - sub.add(node); + msgContext.setDestination(destination); + synchronized (pagedInMessages) { + // Add all the matching messages in the queue to the + // subscription. + for (Iterator i = pagedInMessages.values() + .iterator(); i.hasNext();) { + QueueMessageReference node = (QueueMessageReference) i + .next(); + if (node.isDropped() + || (!sub.getConsumerInfo().isBrowser() && node + .getLockOwner() != null)) { + continue; + } + try { + msgContext.setMessageReference(node); + if (sub.matches(node, msgContext)) { + sub.add(node); + } + } catch (IOException e) { + log.warn("Could not load message: " + e, e); } - } catch (IOException e) { - log.warn("Could not load message: " + e, e); } } + } finally { + dispatchLock.unlock(); } - } public void removeSubscription(ConnectionContext context, Subscription sub) @@ -956,54 +963,51 @@ public class Queue extends BaseDestination implements Task { wakeup(); } - final synchronized void wakeup() { + final void wakeup() { try { taskRunner.wakeup(); - } catch (InterruptedException e) { log.warn("Task Runner failed to wakeup ", e); } } - - private List doPageIn(boolean force) throws Exception { - List result = null; - result = buildList(force); - return result; - } - - private List buildList(boolean force) throws Exception { - final int toPageIn = getMaxPageSize() - pagedInMessages.size(); + private List doPageIn(boolean force) throws Exception { List result = null; - if ((force || !consumers.isEmpty()) && toPageIn > 0) { - messages.setMaxBatchSize(toPageIn); - int count = 0; - result = new ArrayList(toPageIn); - synchronized (messages) { - try { - messages.reset(); - while (messages.hasNext() && count < toPageIn) { - MessageReference node = messages.next(); - messages.remove(); - if (!broker.isExpired(node)) { - node = createMessageReference(node.getMessage()); - result.add(node); - count++; - } else { - broker.messageExpired(createConnectionContext(), - node); - destinationStatistics.getMessages().decrement(); + dispatchLock.lock(); + try { + final int toPageIn = getMaxPageSize() - pagedInMessages.size(); + if ((force || !consumers.isEmpty()) && toPageIn > 0) { + messages.setMaxBatchSize(toPageIn); + int count = 0; + result = new ArrayList(toPageIn); + synchronized (messages) { + try { + messages.reset(); + while (messages.hasNext() && count < toPageIn) { + MessageReference node = messages.next(); + messages.remove(); + if (!broker.isExpired(node)) { + node = createMessageReference(node.getMessage()); + result.add(node); + count++; + } else { + broker.messageExpired(createConnectionContext(), + node); + destinationStatistics.getMessages().decrement(); + } } + } finally { + messages.release(); + } + } + synchronized (pagedInMessages) { + for(MessageReference ref:result) { + pagedInMessages.put(ref.getMessageId(), ref); } - } finally { - messages.release(); - } - } - synchronized (pagedInMessages) { - for(MessageReference ref:result) { - pagedInMessages.put(ref.getMessageId(), ref); } } + }finally { + dispatchLock.unlock(); } return result; }