Fixed AMQ-5160, changed DurableTopicSubscription to only recover retroactive messages for retroactive topics or consumers

This commit is contained in:
Dhiraj Bokde 2014-05-13 12:14:21 -07:00 committed by Dejan Bosanac
parent 70f7c5805c
commit 78950ec596
1 changed files with 7 additions and 4 deletions

View File

@ -120,8 +120,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
if (active.get() || keepDurableSubsActive) {
Topic topic = (Topic) destination;
topic.activate(context, this);
// always use the recovery policy
topic.recoverRetroactiveMessages(context, this);
if (topic.isAlwaysRetroactive() || info.isRetroactive()) {
topic.recoverRetroactiveMessages(context, this);
}
this.enqueueCounter += pending.size();
} else if (destination.getMessageStore() != null) {
TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
@ -166,10 +167,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
pending.setMaxAuditDepth(getMaxAuditDepth());
pending.setMaxProducersToAudit(getMaxProducersToAudit());
pending.start();
// always use the recovery policy.
// use recovery policy for retroactive topics and consumers
for (Destination destination : durableDestinations.values()) {
Topic topic = (Topic) destination;
topic.recoverRetroactiveMessages(context, this);
if (topic.isAlwaysRetroactive() || info.isRetroactive()) {
topic.recoverRetroactiveMessages(context, this);
}
}
}
}