- Gaurd access to dispatched list ( a sync was missing).

- Added better exception messages to know what happened when a slave subscription gets out of sync with the master.
- Implemented a simpler isFull()

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@398015 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-04-28 21:24:06 +00:00
parent d65ba8034b
commit 807e18f9f9
2 changed files with 22 additions and 16 deletions

View File

@ -60,35 +60,31 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
synchronized public void add(MessageReference node) throws Exception{ synchronized public void add(MessageReference node) throws Exception{
enqueueCounter++; enqueueCounter++;
if(!isFull()&&!isSlaveBroker()){ if(!isFull()){
dispatch(node); dispatch(node);
}else{ }else{
optimizePrefetch(); optimizePrefetch();
synchronized(pending){ synchronized(pending){
if( pending.isEmpty() ) if( pending.isEmpty() ) {
if (log.isDebugEnabled()){ log.debug("Prefetch limit.");
log.debug("Prefetch limit."); }
}
pending.addLast(node); pending.addLast(node);
} }
} }
} }
public void processMessageDispatchNotification(MessageDispatchNotification mdn){ synchronized public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
synchronized(pending){ synchronized(pending){
for(Iterator i=pending.iterator();i.hasNext();){ for(Iterator i=pending.iterator();i.hasNext();){
MessageReference node=(MessageReference) i.next(); MessageReference node=(MessageReference) i.next();
if(node.getMessageId().equals(mdn.getMessageId())){ if(node.getMessageId().equals(mdn.getMessageId())){
i.remove(); i.remove();
try{ createMessageDispatch(node,node.getMessage());
MessageDispatch md=createMessageDispatch(node,node.getMessage()); dispatched.addLast(node);
dispatched.addLast(node); return;
}catch(Exception e){
log.error("Problem processing MessageDispatchNotification: "+mdn,e);
}
break;
} }
} }
throw new JMSException("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId()+") was not in the pending list: "+pending);
} }
} }
@ -178,7 +174,12 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack); throw new JMSException("Could not correlate acknowledgment with dispatched message: "+ack);
} }
throw new JMSException("Invalid acknowledgment: "+ack);
if( isSlaveBroker() ) {
throw new JMSException("Slave broker out of sync with master: Acknowledgment ("+ack+") was not in the dispatch list: "+dispatched);
} else {
throw new JMSException("Invalid acknowledgment: "+ack);
}
} }
/** /**
@ -201,8 +202,12 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
} }
/**
* Used to determine if the broker can dispatch to the consumer.
* @return
*/
protected boolean isFull(){ protected boolean isFull(){
return dispatched.size()-prefetchExtension>=info.getPrefetchSize(); return isSlaveBroker() || dispatched.size()-prefetchExtension>=info.getPrefetchSize();
} }
/** /**

View File

@ -95,8 +95,9 @@ public interface Subscription {
/** /**
* Used by a Slave Broker to update dispatch infomation * Used by a Slave Broker to update dispatch infomation
* @param mdn * @param mdn
* @throws Exception
*/ */
void processMessageDispatchNotification(MessageDispatchNotification mdn); void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception;
/** /**
* @return true if the broker is currently in slave mode * @return true if the broker is currently in slave mode