git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@953382 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-06-10 16:39:27 +00:00
parent 9c47f6b385
commit bb17d61516
1 changed files with 2 additions and 2 deletions

View File

@ -72,11 +72,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
} }
public void add(ConnectionContext context, Destination destination) throws Exception { public void add(ConnectionContext context, Destination destination) throws Exception {
super.add(context, destination);
// do it just once per destination // do it just once per destination
if (destinations.containsKey(destination.getActiveMQDestination())) { if (destinations.containsKey(destination.getActiveMQDestination())) {
return; return;
} }
super.add(context, destination);
destinations.put(destination.getActiveMQDestination(), destination); destinations.put(destination.getActiveMQDestination(), destination);
if (destination.getMessageStore() != null) { if (destination.getMessageStore() != null) {
TopicMessageStore store = (TopicMessageStore)destination.getMessageStore(); TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
@ -123,7 +123,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
pending.setSystemUsage(memoryManager); pending.setSystemUsage(memoryManager);
pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
pending.start(); pending.start();
// If nothing was in the persistent store, then try to use the // If nothing was in the persistent store, then try to use the
// recovery policy. // recovery policy.
if (pending.isEmpty()) { if (pending.isEmpty()) {
@ -151,6 +150,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
topic.deactivate(context, this); topic.deactivate(context, this);
} }
} }
for (final MessageReference node : dispatched) { for (final MessageReference node : dispatched) {
// Mark the dispatched messages as redelivered for next time. // Mark the dispatched messages as redelivered for next time.
Integer count = redeliveredMessages.get(node.getMessageId()); Integer count = redeliveredMessages.get(node.getMessageId());