diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index a445059e53..0823ddeca9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -18,12 +18,12 @@ package org.apache.activemq.broker.region; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; - +import java.util.List; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; @@ -56,11 +56,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ final protected LinkedList dispatched=new LinkedList(); protected int prefetchExtension=0; - boolean dispatching=false; - - long enqueueCounter; - long dispatchCounter; - long dequeueCounter; + + protected long enqueueCounter; + protected long dispatchCounter; + protected long dequeueCounter; public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException{ @@ -123,8 +122,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } public void add(MessageReference node) throws Exception{ - try { - boolean pendingEmpty = false; + boolean pendingEmpty=false; synchronized(pending){ pendingEmpty=pending.isEmpty(); enqueueCounter++; @@ -134,17 +132,16 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ }else{ optimizePrefetch(); synchronized(pending){ - if(log.isDebugEnabled() && pending.isEmpty()){ + if(pending.isEmpty()&&log.isDebugEnabled()){ log.debug("Prefetch limit."); } pending.addMessageLast(node); } - } - }catch(Throwable e) { - e.printStackTrace(); - + //we might be able to dispatch messages (i.e. not full() anymore) + dispatchMatched(); } } + public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception{ synchronized(pending){ @@ -169,6 +166,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{ // Handle the standard acknowledgment case. + boolean callDispatchMatched=false; synchronized(dispatched){ if(ack.isStandardAck()){ // Acknowledge all dispatched messages up till the message id of the acknowledgment. @@ -216,13 +214,15 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ }else{ prefetchExtension=Math.max(0,prefetchExtension-(index+1)); } - dispatchMatched(); - return; + callDispatchMatched=true; + break; } } } - //this only happens after a reconnect - get an ack which is not valid - log.info("Could not correlate acknowledgment with dispatched message: "+ack); + // this only happens after a reconnect - get an ack which is not valid + if(!callDispatchMatched){ + log.info("Could not correlate acknowledgment with dispatched message: "+ack); + } }else if(ack.isDeliveredAck()){ // Message was delivered but not acknowledged: update pre-fetch counters. // Acknowledge all dispatched messages up till the message id of the acknowledgment. @@ -231,11 +231,13 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ final MessageReference node=(MessageReference)iter.next(); if(ack.getLastMessageId().equals(node.getMessageId())){ prefetchExtension=Math.max(prefetchExtension,index+1); - dispatchMatched(); - return; + callDispatchMatched=true; + break; } } - throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); + if(!callDispatchMatched){ + throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); + } }else if(ack.isPoisonAck()){ // TODO: what if the message is already in a DLQ??? // Handle the poison ACK case: we need to send the message to a DLQ @@ -259,14 +261,19 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ acknowledge(context,ack,node); if(ack.getLastMessageId().equals(messageId)){ prefetchExtension=Math.max(0,prefetchExtension-(index+1)); - dispatchMatched(); - return; + callDispatchMatched=true; + break; } } } - - throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); + if(!callDispatchMatched){ + throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); + } } + } + if(callDispatchMatched){ + dispatchMatched(); + }else{ if(isSlaveBroker()){ throw new JMSException("Slave broker out of sync with master: Acknowledgment ("+ack +") was not in the dispatch list: "+dispatched); @@ -366,40 +373,47 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ */ } - public void add(ConnectionContext context, Destination destination) throws Exception { + public void add(ConnectionContext context,Destination destination) throws Exception{ super.add(context,destination); - pending.add(context,destination); + synchronized(pending){ + pending.add(context,destination); + } } - public void remove(ConnectionContext context, Destination destination) throws Exception { + public void remove(ConnectionContext context,Destination destination) throws Exception{ super.remove(context,destination); - pending.remove(context,destination); - + synchronized(pending){ + pending.remove(context,destination); + } } protected void dispatchMatched() throws IOException{ + List toDispatch=null; synchronized(pending){ - if(!dispatching){ - dispatching=true; - try{ - pending.reset(); - while(pending.hasNext()&&!isFull()){ - MessageReference node=pending.next(); - pending.remove(); - - // Message may have been sitting in the pending list a while - // waiting for the consumer to ak the message. - if( node != QueueMessageReference.NULL_MESSAGE && node.isExpired() ) { - continue; // just drop it. - } - - dispatch(node); + try{ + pending.reset(); + while(pending.hasNext()&&!isFull()){ + MessageReference node=pending.next(); + pending.remove(); + // Message may have been sitting in the pending list a while + // waiting for the consumer to ak the message. + if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ + continue; // just drop it. } - }finally{ - pending.release(); - dispatching=false; + if(toDispatch==null){ + toDispatch=new ArrayList(); + } + toDispatch.add(node); } + }finally{ + pending.release(); + } + } + if(toDispatch!=null){ + for(int i=0;i