Fixed AMQ-5160, force durable subscriptions to always recover retroactive messages

This commit is contained in:
Dhiraj Bokde 2014-05-13 00:29:03 -07:00 committed by Dejan Bosanac
parent b36adffe71
commit 8947a09eaa
1 changed files with 6 additions and 10 deletions

View File

@ -120,9 +120,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
if (active.get() || keepDurableSubsActive) { if (active.get() || keepDurableSubsActive) {
Topic topic = (Topic) destination; Topic topic = (Topic) destination;
topic.activate(context, this); topic.activate(context, this);
if (pending.isEmpty(topic)) { // always use the recovery policy
topic.recoverRetroactiveMessages(context, this); topic.recoverRetroactiveMessages(context, this);
}
this.enqueueCounter += pending.size(); this.enqueueCounter += pending.size();
} else if (destination.getMessageStore() != null) { } else if (destination.getMessageStore() != null) {
TopicMessageStore store = (TopicMessageStore) destination.getMessageStore(); TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
@ -167,16 +166,13 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
pending.setMaxAuditDepth(getMaxAuditDepth()); pending.setMaxAuditDepth(getMaxAuditDepth());
pending.setMaxProducersToAudit(getMaxProducersToAudit()); pending.setMaxProducersToAudit(getMaxProducersToAudit());
pending.start(); pending.start();
// If nothing was in the persistent store, then try to use the // always use the recovery policy.
// recovery policy.
if (pending.isEmpty()) {
for (Destination destination : durableDestinations.values()) { for (Destination destination : durableDestinations.values()) {
Topic topic = (Topic) destination; Topic topic = (Topic) destination;
topic.recoverRetroactiveMessages(context, this); topic.recoverRetroactiveMessages(context, this);
} }
} }
} }
}
this.active.set(true); this.active.set(true);
this.offlineTimestamp.set(-1); this.offlineTimestamp.set(-1);
dispatchPending(); dispatchPending();