it's ensure dispatching happens in order

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@490111 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-12-25 07:19:29 +00:00
parent d8674a0fed
commit c651ace525
1 changed files with 28 additions and 20 deletions

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
@ -60,6 +61,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
protected long enqueueCounter; protected long enqueueCounter;
protected long dispatchCounter; protected long dispatchCounter;
protected long dequeueCounter; protected long dequeueCounter;
private AtomicBoolean dispatching = new AtomicBoolean();
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor) public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor)
throws InvalidSelectorException{ throws InvalidSelectorException{
@ -389,31 +391,37 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
protected void dispatchMatched() throws IOException{ protected void dispatchMatched() throws IOException{
List toDispatch=null; if(dispatching.compareAndSet(false,true)){
synchronized(pending){
try{ try{
pending.reset(); List toDispatch=null;
while(pending.hasNext()&&!isFull()){ synchronized(pending){
MessageReference node=pending.next(); try{
pending.remove(); pending.reset();
// Message may have been sitting in the pending list a while while(pending.hasNext()&&!isFull()){
// waiting for the consumer to ak the message. MessageReference node=pending.next();
if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ pending.remove();
continue; // just drop it. // Message may have been sitting in the pending list a while
// waiting for the consumer to ak the message.
if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
continue; // just drop it.
}
if(toDispatch==null){
toDispatch=new ArrayList();
}
toDispatch.add(node);
}
}finally{
pending.release();
} }
if(toDispatch==null){ }
toDispatch=new ArrayList(); if(toDispatch!=null){
for(int i=0;i<toDispatch.size();i++){
MessageReference node=(MessageReference)toDispatch.get(i);
dispatch(node);
} }
toDispatch.add(node);
} }
}finally{ }finally{
pending.release(); dispatching.set(false);
}
}
if(toDispatch!=null){
for(int i=0;i<toDispatch.size();i++){
MessageReference node=(MessageReference)toDispatch.get(i);
dispatch(node);
} }
} }
} }