diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index fa7cdb5272..1a3260ecda 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -74,8 +74,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us dispatchMatched(); } - public void activate(UsageManager memoryManager,ConnectionContext context,ConsumerInfo info) throws Exception{ - log.debug("Deactivating "+this); + public synchronized void activate(UsageManager memoryManager,ConnectionContext context,ConsumerInfo info) throws Exception{ + log.debug("Activating "+this); if(!active){ this.active=true; this.context=context; @@ -85,11 +85,10 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us Topic topic=(Topic)iter.next(); topic.activate(context,this); } - } - synchronized(pending){ - pending.setUsageManager(memoryManager); - pending.start(); - } + } + pending.setUsageManager(memoryManager); + pending.start(); + // If nothing was in the persistent store, then try to use the recovery policy. if(pending.isEmpty()){ for(Iterator iter=destinations.values().iterator();iter.hasNext();){ @@ -160,7 +159,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us return md; } - public void add(MessageReference node) throws Exception{ + public synchronized void add(MessageReference node) throws Exception{ if(!active&&!keepDurableSubsActive){ return; } @@ -189,7 +188,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us return active; } - protected void acknowledge(ConnectionContext context,MessageAck ack,MessageReference node) throws IOException{ + protected synchronized void acknowledge(ConnectionContext context,MessageAck ack,MessageReference node) throws IOException{ node.getRegionDestination().acknowledge(context,this,ack,node); redeliveredMessages.remove(node.getMessageId()); node.decrementReferenceCount();