https://issues.apache.org/jira/browse/AMQ-3196 - Speed up initial message delivery for offline durable sub with keepDurableSubsActive=true and JDBC store

avoid stop/start of cursor when keepdurablesubsactive=true such that a valid message count is retained by the cursor, independent of the store. This avoids unnecessary calls to the underlyin gtore on activation and reactivation

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1076278 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-03-02 16:23:40 +00:00
parent 330c9a896d
commit 93e2bd355b
7 changed files with 66 additions and 21 deletions

View File

@ -262,4 +262,8 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
}
return result;
}
public void clear() {
map.clear();
}
}

View File

@ -97,7 +97,15 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
return;
}
destinations.put(destination.getActiveMQDestination(), destination);
if (destination.getMessageStore() != null) {
if (active.get() || keepDurableSubsActive) {
Topic topic = (Topic)destination;
topic.activate(context, this);
if (pending.isEmpty(topic)) {
topic.recoverRetroactiveMessages(context, this);
}
this.enqueueCounter+=pending.size();
} else if (destination.getMessageStore() != null) {
TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
try {
this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
@ -107,13 +115,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
throw jmsEx;
}
}
if (active.get() || keepDurableSubsActive) {
Topic topic = (Topic)destination;
topic.activate(context, this);
if (pending.isEmpty(topic)) {
topic.recoverRetroactiveMessages(context, this);
}
}
dispatchPending();
}
@ -305,4 +306,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
protected boolean isDropped(MessageReference node) {
return false;
}
public boolean isKeepDurableSubsActive() {
return keepDurableSubsActive;
}
}

View File

@ -25,8 +25,8 @@ import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.usage.SystemUsage;
@ -49,7 +49,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>();
private final PendingMessageCursor nonPersistent;
private PendingMessageCursor currentCursor;
private final Subscription subscription;
private final DurableTopicSubscription subscription;
private int cacheCurrentLowestPriority = UNKNOWN;
private boolean immediatePriorityDispatch = true;
/**
@ -59,14 +59,14 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
* @param maxBatchSize currently ignored
* @param subscription subscription for this cursor
*/
public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) {
public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, DurableTopicSubscription subscription) {
super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription));
this.subscription=subscription;
this.clientId = clientId;
this.subscriberName = subscriberName;
if (broker.getBrokerService().isPersistent()) {
this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages);
}else {
} else {
this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
}
@ -93,12 +93,21 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
@Override
public synchronized void stop() throws Exception {
if (isStarted()) {
if (subscription.isKeepDurableSubsActive()) {
super.gc();
super.getMessageAudit().clear();
for (PendingMessageCursor tsp : storePrefetches) {
tsp.gc();
tsp.getMessageAudit().clear();
}
} else {
super.stop();
for (PendingMessageCursor tsp : storePrefetches) {
tsp.stop();
}
}
}
}
/**
* Add a destination
@ -216,6 +225,28 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
}
}
@Override
public boolean isTransient() {
return subscription.isKeepDurableSubsActive();
}
@Override
public void addMessageFirst(MessageReference node) throws Exception {
// for keep durable subs active, need to deal with redispatch
if (node != null) {
Message msg = node.getMessage();
if (!msg.isPersistent()) {
nonPersistent.addMessageFirst(node);
} else {
Destination dest = msg.getRegionDestination();
TopicStorePrefetch tsp = topics.get(dest);
if (tsp != null) {
tsp.addMessageFirst(node);
}
}
}
}
@Override
public synchronized void addRecoveredMessage(MessageReference node) throws Exception {
nonPersistent.addMessageLast(node);
@ -380,4 +411,5 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
public void setImmediatePriorityDispatch(boolean immediatePriorityDispatch) {
this.immediatePriorityDispatch = immediatePriorityDispatch;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
@ -42,7 +43,7 @@ public class FilePendingDurableSubscriberMessageStoragePolicy implements Pending
* @param sub
* @return the Pending Message cursor
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) {
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, DurableTopicSubscription sub) {
return new FilePendingMessageCursor(broker,name,AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, sub));
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@ -38,5 +39,5 @@ public interface PendingDurableSubscriberMessageStoragePolicy {
* @param sub
* @return the Pending Message cursor
*/
PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub);
PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, DurableTopicSubscription sub);
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
@ -58,7 +59,7 @@ public class StorePendingDurableSubscriberMessageStoragePolicy implements Pendin
* @param sub
* @return the Pending Message cursor
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) {
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, DurableTopicSubscription sub) {
StoreDurableSubscriberCursor cursor = new StoreDurableSubscriberCursor(broker,clientId, name, maxBatchSize, sub);
cursor.setImmediatePriorityDispatch(isImmediatePriorityDispatch());
return cursor;

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@ -40,7 +41,7 @@ public class VMPendingDurableSubscriberMessageStoragePolicy implements PendingDu
* @param sub
* @return the Pending Message cursor
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name,int maxBatchSize, Subscription sub) {
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name,int maxBatchSize, DurableTopicSubscription sub) {
return new VMPendingMessageCursor(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, sub));
}
}