synchronized activate () and add() methods

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@547601 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-06-15 10:24:23 +00:00
parent fa5e0a0e15
commit 4d1a176784
1 changed files with 8 additions and 9 deletions

View File

@ -74,8 +74,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
dispatchMatched(); dispatchMatched();
} }
public void activate(UsageManager memoryManager,ConnectionContext context,ConsumerInfo info) throws Exception{ public synchronized void activate(UsageManager memoryManager,ConnectionContext context,ConsumerInfo info) throws Exception{
log.debug("Deactivating "+this); log.debug("Activating "+this);
if(!active){ if(!active){
this.active=true; this.active=true;
this.context=context; this.context=context;
@ -86,10 +86,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
topic.activate(context,this); topic.activate(context,this);
} }
} }
synchronized(pending){ pending.setUsageManager(memoryManager);
pending.setUsageManager(memoryManager); pending.start();
pending.start();
}
// If nothing was in the persistent store, then try to use the recovery policy. // If nothing was in the persistent store, then try to use the recovery policy.
if(pending.isEmpty()){ if(pending.isEmpty()){
for(Iterator iter=destinations.values().iterator();iter.hasNext();){ for(Iterator iter=destinations.values().iterator();iter.hasNext();){
@ -160,7 +159,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
return md; return md;
} }
public void add(MessageReference node) throws Exception{ public synchronized void add(MessageReference node) throws Exception{
if(!active&&!keepDurableSubsActive){ if(!active&&!keepDurableSubsActive){
return; return;
} }
@ -189,7 +188,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
return active; return active;
} }
protected void acknowledge(ConnectionContext context,MessageAck ack,MessageReference node) throws IOException{ protected synchronized void acknowledge(ConnectionContext context,MessageAck ack,MessageReference node) throws IOException{
node.getRegionDestination().acknowledge(context,this,ack,node); node.getRegionDestination().acknowledge(context,this,ack,node);
redeliveredMessages.remove(node.getMessageId()); redeliveredMessages.remove(node.getMessageId());
node.decrementReferenceCount(); node.decrementReferenceCount();