diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 0823ddeca9..853953d561 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import org.apache.activemq.broker.Broker; @@ -60,6 +61,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ protected long enqueueCounter; protected long dispatchCounter; protected long dequeueCounter; + private AtomicBoolean dispatching = new AtomicBoolean(); public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException{ @@ -389,31 +391,37 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ protected void dispatchMatched() throws IOException{ - List toDispatch=null; - synchronized(pending){ + if(dispatching.compareAndSet(false,true)){ try{ - pending.reset(); - while(pending.hasNext()&&!isFull()){ - MessageReference node=pending.next(); - pending.remove(); - // Message may have been sitting in the pending list a while - // waiting for the consumer to ak the message. - if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ - continue; // just drop it. + List toDispatch=null; + synchronized(pending){ + try{ + pending.reset(); + while(pending.hasNext()&&!isFull()){ + MessageReference node=pending.next(); + pending.remove(); + // Message may have been sitting in the pending list a while + // waiting for the consumer to ak the message. + if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ + continue; // just drop it. + } + if(toDispatch==null){ + toDispatch=new ArrayList(); + } + toDispatch.add(node); + } + }finally{ + pending.release(); } - if(toDispatch==null){ - toDispatch=new ArrayList(); + } + if(toDispatch!=null){ + for(int i=0;i