mirror of https://github.com/apache/activemq.git
Fixed AMQ-5160, restored previous DurableSubscription behaviour of only recovering messages when cursor is empty, retained messages are always recovered
This commit is contained in:
parent
8644090377
commit
42ad1039cb
|
@ -137,6 +137,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
|||
dispatchPending();
|
||||
}
|
||||
|
||||
// used by RetaineMessageSubscriptionRecoveryPolicy
|
||||
public boolean isEmpty(Topic topic) {
|
||||
return pending.isEmpty(topic);
|
||||
}
|
||||
|
||||
public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception {
|
||||
if (!active.get()) {
|
||||
this.context = context;
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.region.DurableTopicSubscription;
|
||||
import org.apache.activemq.broker.region.MessageReference;
|
||||
import org.apache.activemq.broker.region.SubscriptionRecovery;
|
||||
import org.apache.activemq.broker.region.Topic;
|
||||
|
@ -74,7 +75,14 @@ public class RetainedMessageSubscriptionRecoveryPolicy implements SubscriptionRe
|
|||
sub.addRecoveredMessage(context, retainedMessage);
|
||||
}
|
||||
if (wrapped != null) {
|
||||
wrapped.recover(context, topic, sub);
|
||||
// retain default ActiveMQ behaviour of recovering messages only for empty durable subscriptions
|
||||
boolean recover = true;
|
||||
if (sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isEmpty(topic)) {
|
||||
recover = false;
|
||||
}
|
||||
if (recover) {
|
||||
wrapped.recover(context, topic, sub);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue