mirror of https://github.com/apache/activemq.git
check we are not a slaveBroker() when dispatching
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@491431 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7695676339
commit
6ddeac7bc0
|
@ -125,18 +125,19 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
|
||||||
|
|
||||||
public void add(MessageReference node) throws Exception{
|
public void add(MessageReference node) throws Exception{
|
||||||
boolean pendingEmpty=false;
|
boolean pendingEmpty=false;
|
||||||
|
|
||||||
synchronized(pending){
|
synchronized(pending){
|
||||||
pendingEmpty=pending.isEmpty();
|
pendingEmpty=pending.isEmpty();
|
||||||
enqueueCounter++;
|
enqueueCounter++;
|
||||||
}
|
}
|
||||||
if(!isFull()&&pendingEmpty){
|
if(!isFull()&&pendingEmpty&&!broker.isSlaveBroker()){
|
||||||
dispatch(node);
|
dispatch(node);
|
||||||
}else{
|
}else{
|
||||||
optimizePrefetch();
|
optimizePrefetch();
|
||||||
synchronized(pending){
|
synchronized(pending){
|
||||||
if(pending.isEmpty()&&log.isDebugEnabled()){
|
if(pending.isEmpty()&&log.isDebugEnabled()){
|
||||||
log.debug("Prefetch limit.");
|
log.debug("Prefetch limit.");
|
||||||
}
|
}
|
||||||
pending.addMessageLast(node);
|
pending.addMessageLast(node);
|
||||||
}
|
}
|
||||||
//we might be able to dispatch messages (i.e. not full() anymore)
|
//we might be able to dispatch messages (i.e. not full() anymore)
|
||||||
|
@ -155,6 +156,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
|
||||||
pending.remove();
|
pending.remove();
|
||||||
createMessageDispatch(node,node.getMessage());
|
createMessageDispatch(node,node.getMessage());
|
||||||
dispatched.addLast(node);
|
dispatched.addLast(node);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -162,7 +164,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
|
||||||
pending.release();
|
pending.release();
|
||||||
}
|
}
|
||||||
throw new JMSException("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId()
|
throw new JMSException("Slave broker out of sync with master: Dispatched message ("+mdn.getMessageId()
|
||||||
+") was not in the pending list: "+pending);
|
+") was not in the pending list");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -395,7 +397,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
|
||||||
|
|
||||||
|
|
||||||
protected void dispatchMatched() throws IOException{
|
protected void dispatchMatched() throws IOException{
|
||||||
if(dispatching.compareAndSet(false,true)){
|
if(!broker.isSlaveBroker() && dispatching.compareAndSet(false,true)){
|
||||||
try{
|
try{
|
||||||
List toDispatch=null;
|
List toDispatch=null;
|
||||||
synchronized(pending){
|
synchronized(pending){
|
||||||
|
|
Loading…
Reference in New Issue