diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java index 47b1a28c32..3fb95f5a7f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java @@ -262,4 +262,8 @@ public class ActiveMQMessageAuditNoSync implements Serializable { } return result; } + + public void clear() { + map.clear(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index ab2f41f022..59ee2771ce 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -97,7 +97,15 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us return; } destinations.put(destination.getActiveMQDestination(), destination); - if (destination.getMessageStore() != null) { + + if (active.get() || keepDurableSubsActive) { + Topic topic = (Topic)destination; + topic.activate(context, this); + if (pending.isEmpty(topic)) { + topic.recoverRetroactiveMessages(context, this); + } + this.enqueueCounter+=pending.size(); + } else if (destination.getMessageStore() != null) { TopicMessageStore store = (TopicMessageStore)destination.getMessageStore(); try { this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName()); @@ -107,13 +115,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us throw jmsEx; } } - if (active.get() || keepDurableSubsActive) { - Topic topic = (Topic)destination; - topic.activate(context, this); - if (pending.isEmpty(topic)) { - topic.recoverRetroactiveMessages(context, this); - } - } dispatchPending(); } @@ -304,5 +305,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us protected boolean isDropped(MessageReference node) { return false; - } + } + + public boolean isKeepDurableSubsActive() { + return keepDurableSubsActive; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 3f22208078..2fc7246090 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -25,8 +25,8 @@ import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; import org.apache.activemq.usage.SystemUsage; @@ -49,7 +49,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { private final List storePrefetches = new CopyOnWriteArrayList(); private final PendingMessageCursor nonPersistent; private PendingMessageCursor currentCursor; - private final Subscription subscription; + private final DurableTopicSubscription subscription; private int cacheCurrentLowestPriority = UNKNOWN; private boolean immediatePriorityDispatch = true; /** @@ -59,14 +59,14 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { * @param maxBatchSize currently ignored * @param subscription subscription for this cursor */ - public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) { + public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, DurableTopicSubscription subscription) { super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription)); this.subscription=subscription; this.clientId = clientId; this.subscriberName = subscriberName; if (broker.getBrokerService().isPersistent()) { this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages); - }else { + } else { this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages); } @@ -93,9 +93,18 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { @Override public synchronized void stop() throws Exception { if (isStarted()) { - super.stop(); - for (PendingMessageCursor tsp : storePrefetches) { - tsp.stop(); + if (subscription.isKeepDurableSubsActive()) { + super.gc(); + super.getMessageAudit().clear(); + for (PendingMessageCursor tsp : storePrefetches) { + tsp.gc(); + tsp.getMessageAudit().clear(); + } + } else { + super.stop(); + for (PendingMessageCursor tsp : storePrefetches) { + tsp.stop(); + } } } } @@ -216,6 +225,28 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } } + @Override + public boolean isTransient() { + return subscription.isKeepDurableSubsActive(); + } + + @Override + public void addMessageFirst(MessageReference node) throws Exception { + // for keep durable subs active, need to deal with redispatch + if (node != null) { + Message msg = node.getMessage(); + if (!msg.isPersistent()) { + nonPersistent.addMessageFirst(node); + } else { + Destination dest = msg.getRegionDestination(); + TopicStorePrefetch tsp = topics.get(dest); + if (tsp != null) { + tsp.addMessageFirst(node); + } + } + } + } + @Override public synchronized void addRecoveredMessage(MessageReference node) throws Exception { nonPersistent.addMessageLast(node); @@ -379,5 +410,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) { this.immediatePriorityDispatch = immediatePriorityDispatch; - } + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java index 890137f81a..0f714eb63c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; @@ -42,7 +43,7 @@ public class FilePendingDurableSubscriberMessageStoragePolicy implements Pending * @param sub * @return the Pending Message cursor */ - public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) { + public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, DurableTopicSubscription sub) { return new FilePendingMessageCursor(broker,name,AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, sub)); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java index 5000fbf5df..9a7b6134ca 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; @@ -38,5 +39,5 @@ public interface PendingDurableSubscriberMessageStoragePolicy { * @param sub * @return the Pending Message cursor */ - PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub); + PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, DurableTopicSubscription sub); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java index e81985b61c..07db3c9c98 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; @@ -58,7 +59,7 @@ public class StorePendingDurableSubscriberMessageStoragePolicy implements Pendin * @param sub * @return the Pending Message cursor */ - public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) { + public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, DurableTopicSubscription sub) { StoreDurableSubscriberCursor cursor = new StoreDurableSubscriberCursor(broker,clientId, name, maxBatchSize, sub); cursor.setImmediatePriorityDispatch(isImmediatePriorityDispatch()); return cursor; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java index 13b391e508..cee1f3cfc7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; @@ -40,7 +41,7 @@ public class VMPendingDurableSubscriberMessageStoragePolicy implements PendingDu * @param sub * @return the Pending Message cursor */ - public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name,int maxBatchSize, Subscription sub) { + public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name,int maxBatchSize, DurableTopicSubscription sub) { return new VMPendingMessageCursor(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, sub)); } }