diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index bc521d2a1f..c01bbab844 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -125,18 +125,19 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ public void add(MessageReference node) throws Exception{ boolean pendingEmpty=false; + synchronized(pending){ pendingEmpty=pending.isEmpty(); enqueueCounter++; } - if(!isFull()&&pendingEmpty){ + if(!isFull()&&pendingEmpty&&!broker.isSlaveBroker()){ dispatch(node); }else{ optimizePrefetch(); synchronized(pending){ if(pending.isEmpty()&&log.isDebugEnabled()){ log.debug("Prefetch limit."); - } + } pending.addMessageLast(node); } //we might be able to dispatch messages (i.e. not full() anymore) @@ -155,6 +156,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ pending.remove(); createMessageDispatch(node,node.getMessage()); dispatched.addLast(node); + return; } } @@ -162,7 +164,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ pending.release(); } throw new JMSException("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId() - +") was not in the pending list: "+pending); + +") was not in the pending list"); } } @@ -395,7 +397,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ protected void dispatchMatched() throws IOException{ - if(dispatching.compareAndSet(false,true)){ + if(!broker.isSlaveBroker() && dispatching.compareAndSet(false,true)){ try{ List toDispatch=null; synchronized(pending){