diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 811420725d..1b9251dadf 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -392,7 +392,7 @@ public class DestinationView implements DestinationViewMBean { int index = 0; for (Subscription subscription : subscriptions) { String connectionClientId = subscription.getContext().getClientId(); - String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription, connectionClientId, objectName); + String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription.getConsumerInfo(), connectionClientId, objectName); answer[index++] = new ObjectName(objectNameStr); } return answer; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java index 0fb21797c6..5356e5f500 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/InactiveDurableSubscriptionView.java @@ -21,6 +21,8 @@ import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo; @@ -36,12 +38,12 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp * * @param broker * @param clientId - * @param sub + * @param subInfo */ - public InactiveDurableSubscriptionView(ManagedRegionBroker broker, String clientId, SubscriptionInfo sub) { - super(broker,clientId, null); + public InactiveDurableSubscriptionView(ManagedRegionBroker broker, String clientId, SubscriptionInfo subInfo, Subscription subscription) { + super(broker,clientId, subscription); this.broker = broker; - this.subscriptionInfo = sub; + this.subscriptionInfo = subInfo; } /** @@ -94,6 +96,12 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp return false; } + @Override + protected ConsumerInfo getConsumerInfo() { + // when inactive, consumer info is stale + return null; + } + /** * Browse messages for this durable subscriber * diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index c3f7d4df1a..a2db3a600c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -185,7 +185,7 @@ public class ManagedRegionBroker extends RegionBroker { public ObjectName registerSubscription(ConnectionContext context, Subscription sub) { String connectionClientId = context.getClientId(); ObjectName brokerJmxObjectName = brokerObjectName; - String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName); + String objectNameStr = getSubscriptionObjectName(sub.getConsumerInfo(), connectionClientId, brokerJmxObjectName); SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName()); try { ObjectName objectName = new ObjectName(objectNameStr); @@ -196,7 +196,7 @@ public class ManagedRegionBroker extends RegionBroker { info.setClientId(context.getClientId()); info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName()); info.setDestination(sub.getConsumerInfo().getDestination()); - addInactiveSubscription(key, info); + addInactiveSubscription(key, info, sub); } else { if (sub.getConsumerInfo().isDurable()) { view = new DurableSubscriptionView(this, context.getClientId(), sub); @@ -217,21 +217,21 @@ public class ManagedRegionBroker extends RegionBroker { } } - public static String getSubscriptionObjectName(Subscription sub, String connectionClientId, ObjectName brokerJmxObjectName) { + public static String getSubscriptionObjectName(ConsumerInfo info, String connectionClientId, ObjectName brokerJmxObjectName) { Hashtable map = brokerJmxObjectName.getKeyPropertyList(); String brokerDomain = brokerJmxObjectName.getDomain(); String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,"; - String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString(); - String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName()); + String destinationType = "destinationType=" + info.getDestination().getDestinationTypeAsString(); + String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(info.getDestination().getPhysicalName()); String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId); String persistentMode = "persistentMode="; String consumerId = ""; - if (sub.getConsumerInfo().isDurable()) { - persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName()); + if (info.isDurable()) { + persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(info.getSubscriptionName()); } else { persistentMode += "Non-Durable"; - if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) { - consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString()); + if (info.getConsumerId() != null) { + consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(info.getConsumerId().toString()); } } objectNameStr += persistentMode + ","; @@ -482,7 +482,7 @@ public class ManagedRegionBroker extends RegionBroker { info.setClientId(subscriptionKey.getClientId()); info.setSubscriptionName(subscriptionKey.getSubscriptionName()); info.setDestination(new ActiveMQTopic(view.getDestinationName())); - addInactiveSubscription(subscriptionKey, info); + addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null)); } } } @@ -512,7 +512,7 @@ public class ManagedRegionBroker extends RegionBroker { Map.Entry entry = (Entry)i.next(); SubscriptionKey key = (SubscriptionKey)entry.getKey(); SubscriptionInfo info = (SubscriptionInfo)entry.getValue(); - addInactiveSubscription(key, info); + addInactiveSubscription(key, info, null); } } @@ -525,12 +525,11 @@ public class ManagedRegionBroker extends RegionBroker { return known; } - protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) { - Hashtable map = brokerObjectName.getKeyPropertyList(); + protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) { try { - ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=Subscription," + "active=false," - + "name=" + JMXSupport.encodeObjectNamePart(key.toString()) + ""); - SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info); + ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info); + ObjectName objectName = new ObjectName(getSubscriptionObjectName(offlineConsumerInfo, info.getClientId(), brokerObjectName)); + SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription); try { AnnotatedMBean.registerMBean(managementContext, view, objectName); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 761952bd15..9fbeaec35e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -47,7 +47,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class); private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap(); - private final ConcurrentHashMap destinations = new ConcurrentHashMap(); + private final ConcurrentHashMap durableDestinations = new ConcurrentHashMap(); private final SubscriptionKey subscriptionKey; private final boolean keepDurableSubsActive; private AtomicBoolean active = new AtomicBoolean(); @@ -96,12 +96,14 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } public void add(ConnectionContext context, Destination destination) throws Exception { - super.add(context, destination); - // do it just once per destination - if (destinations.containsKey(destination.getActiveMQDestination())) { - return; + if (!destinations.contains(destination)) { + super.add(context, destination); } - destinations.put(destination.getActiveMQDestination(), destination); + // do it just once per destination + if (durableDestinations.containsKey(destination.getActiveMQDestination())) { + return; + } + durableDestinations.put(destination.getActiveMQDestination(), destination); if (active.get() || keepDurableSubsActive) { Topic topic = (Topic)destination; @@ -130,7 +132,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us this.info = info; LOG.debug("Activating " + this); if (!keepDurableSubsActive) { - for (Iterator iter = destinations.values() + for (Iterator iter = durableDestinations.values() .iterator(); iter.hasNext();) { Topic topic = (Topic) iter.next(); add(context, topic); @@ -146,7 +148,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us // If nothing was in the persistent store, then try to use the // recovery policy. if (pending.isEmpty()) { - for (Iterator iter = destinations.values() + for (Iterator iter = durableDestinations.values() .iterator(); iter.hasNext();) { Topic topic = (Topic) iter.next(); topic.recoverRetroactiveMessages(context, this); @@ -168,10 +170,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us synchronized (pending) { pending.stop(); } - if (!keepDurableSubsActive) { - for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { - Topic topic = (Topic)iter.next(); + for (Iterator iter = durableDestinations.values().iterator(); iter.hasNext();) { + Topic topic = (Topic)iter.next(); + if (!keepDurableSubsActive) { topic.deactivate(context, this); + } else { + topic.getDestinationStatistics().getInflight().subtract(dispatched.size()); } } @@ -270,7 +274,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us public synchronized String toString() { - return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending=" + return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + enqueueCounter + ", pending=" + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java index e6348a94ef..36cc71f4c6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java @@ -259,7 +259,7 @@ public class TopicRegion extends AbstractRegion { return rc; } - private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) { + public ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) { ConsumerInfo rc = new ConsumerInfo(); rc.setSelector(info.getSelector()); rc.setSubscriptionName(info.getSubscriptionName()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 4d8313c4c8..d8c081eca6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -261,16 +261,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i this.batchResetNeeded = false; } if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) { - this.storeHasMessages = false; try { doFillBatch(); } catch (Exception e) { LOG.error(this + " - Failed to fill batch", e); throw new RuntimeException(e); } - if (!this.batchList.isEmpty() || !hadSpace) { - this.storeHasMessages=true; - } + this.storeHasMessages = !this.batchList.isEmpty() || !hadSpace; } } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java index 2b10fb0cc5..b7b3bc555c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -25,10 +25,13 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; +import javax.management.ObjectName; import junit.framework.Test; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; +import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQTopic; @@ -44,12 +47,15 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class); public boolean usePrioritySupport = Boolean.TRUE; public int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; + public boolean keepDurableSubsActive = true; private BrokerService broker; private ActiveMQTopic topic; private Vector exceptions = new Vector(); protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory("vm://" + getName(true)); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true)); + connectionFactory.setWatchTopicAdvisories(false); + return connectionFactory; } @Override @@ -89,6 +95,8 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp broker.setBrokerName(getName(true)); broker.setDeleteAllMessagesOnStartup(deleteAllMessages); broker.getManagementContext().setCreateConnector(false); + broker.setAdvisorySupport(false); + broker.setKeepDurableSubsActive(keepDurableSubsActive); if (usePrioritySupport) { PolicyEntry policy = new PolicyEntry(); @@ -322,6 +330,119 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals("offline consumer got all", sent, listener.count); } + public void initCombosForTestJMXCountersWithOfflineSubs() throws Exception { + this.addCombinationValues("keepDurableSubsActive", + new Object[]{Boolean.TRUE, Boolean.FALSE}); + } + + public void testJMXCountersWithOfflineSubs() throws Exception { + // create durable subscription 1 + Connection con = createConnection("cliId1"); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", null, true); + session.close(); + con.close(); + + // restart broker + broker.stop(); + createBroker(false /*deleteAllMessages*/); + + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + int sent = 0; + for (int i = 0; i < 10; i++) { + sent++; + Message message = session.createMessage(); + producer.send(topic, message); + } + session.close(); + con.close(); + + // consume some messages + con = createConnection("cliId1"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true); + + for (int i=0; i