From c651ace525c7f96892ab909bffbd503679e2125e Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 25 Dec 2006 07:19:29 +0000 Subject: [PATCH] it's ensure dispatching happens in order git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@490111 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/PrefetchSubscription.java | 48 +++++++++++-------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 0823ddeca9..853953d561 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import org.apache.activemq.broker.Broker; @@ -60,6 +61,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ protected long enqueueCounter; protected long dispatchCounter; protected long dequeueCounter; + private AtomicBoolean dispatching = new AtomicBoolean(); public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException{ @@ -389,31 +391,37 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ protected void dispatchMatched() throws IOException{ - List toDispatch=null; - synchronized(pending){ + if(dispatching.compareAndSet(false,true)){ try{ - pending.reset(); - while(pending.hasNext()&&!isFull()){ - MessageReference node=pending.next(); - pending.remove(); - // Message may have been sitting in the pending list a while - // waiting for the consumer to ak the message. - if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ - continue; // just drop it. + List toDispatch=null; + synchronized(pending){ + try{ + pending.reset(); + while(pending.hasNext()&&!isFull()){ + MessageReference node=pending.next(); + pending.remove(); + // Message may have been sitting in the pending list a while + // waiting for the consumer to ak the message. + if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ + continue; // just drop it. + } + if(toDispatch==null){ + toDispatch=new ArrayList(); + } + toDispatch.add(node); + } + }finally{ + pending.release(); } - if(toDispatch==null){ - toDispatch=new ArrayList(); + } + if(toDispatch!=null){ + for(int i=0;i