From 807e18f9f9b34923606ac1313c25c74e0e66803b Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Fri, 28 Apr 2006 21:24:06 +0000 Subject: [PATCH] - Gaurd access to dispatched list ( a sync was missing). - Added better exception messages to know what happened when a slave subscription gets out of sync with the master. - Implemented a simpler isFull() git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@398015 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/PrefetchSubscription.java | 35 +++++++++++-------- .../activemq/broker/region/Subscription.java | 3 +- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 239c557023..d97351e205 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -60,35 +60,31 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ synchronized public void add(MessageReference node) throws Exception{ enqueueCounter++; - if(!isFull()&&!isSlaveBroker()){ + if(!isFull()){ dispatch(node); }else{ optimizePrefetch(); synchronized(pending){ - if( pending.isEmpty() ) - if (log.isDebugEnabled()){ - log.debug("Prefetch limit."); - } + if( pending.isEmpty() ) { + log.debug("Prefetch limit."); + } pending.addLast(node); } } } - public void processMessageDispatchNotification(MessageDispatchNotification mdn){ + synchronized public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { synchronized(pending){ for(Iterator i=pending.iterator();i.hasNext();){ MessageReference node=(MessageReference) i.next(); if(node.getMessageId().equals(mdn.getMessageId())){ i.remove(); - try{ - MessageDispatch md=createMessageDispatch(node,node.getMessage()); - dispatched.addLast(node); - }catch(Exception e){ - log.error("Problem processing MessageDispatchNotification: "+mdn,e); - } - break; + createMessageDispatch(node,node.getMessage()); + dispatched.addLast(node); + return; } } + throw new JMSException("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId()+") was not in the pending list: "+pending); } } @@ -178,7 +174,12 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); } - throw new JMSException("Invalid acknowledgment: "+ack); + + if( isSlaveBroker() ) { + throw new JMSException("Slave broker out of sync with master: Acknowledgment ("+ack+") was not in the dispatch list: "+dispatched); + } else { + throw new JMSException("Invalid acknowledgment: "+ack); + } } /** @@ -201,8 +202,12 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } } + /** + * Used to determine if the broker can dispatch to the consumer. + * @return + */ protected boolean isFull(){ - return dispatched.size()-prefetchExtension>=info.getPrefetchSize(); + return isSlaveBroker() || dispatched.size()-prefetchExtension>=info.getPrefetchSize(); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java index eb249cf8aa..7dec5fe02a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -95,8 +95,9 @@ public interface Subscription { /** * Used by a Slave Broker to update dispatch infomation * @param mdn + * @throws Exception */ - void processMessageDispatchNotification(MessageDispatchNotification mdn); + void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception; /** * @return true if the broker is currently in slave mode