https://issues.apache.org/jira/browse/AMQ-3442: Use real durable sub key as MBean name for inactive durable sub and improve metrics, fix inflight count on deactivate. resolved.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1155437 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-08-09 17:09:49 +00:00
parent fd6be7f340
commit caca105072
7 changed files with 170 additions and 39 deletions

View File

@ -392,7 +392,7 @@ public class DestinationView implements DestinationViewMBean {
int index = 0;
for (Subscription subscription : subscriptions) {
String connectionClientId = subscription.getContext().getClientId();
String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription, connectionClientId, objectName);
String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription.getConsumerInfo(), connectionClientId, objectName);
answer[index++] = new ObjectName(objectNameStr);
}
return answer;

View File

@ -21,6 +21,8 @@ import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SubscriptionInfo;
@ -36,12 +38,12 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
*
* @param broker
* @param clientId
* @param sub
* @param subInfo
*/
public InactiveDurableSubscriptionView(ManagedRegionBroker broker, String clientId, SubscriptionInfo sub) {
super(broker,clientId, null);
public InactiveDurableSubscriptionView(ManagedRegionBroker broker, String clientId, SubscriptionInfo subInfo, Subscription subscription) {
super(broker,clientId, subscription);
this.broker = broker;
this.subscriptionInfo = sub;
this.subscriptionInfo = subInfo;
}
/**
@ -94,6 +96,12 @@ public class InactiveDurableSubscriptionView extends DurableSubscriptionView imp
return false;
}
@Override
protected ConsumerInfo getConsumerInfo() {
// when inactive, consumer info is stale
return null;
}
/**
* Browse messages for this durable subscriber
*

View File

@ -185,7 +185,7 @@ public class ManagedRegionBroker extends RegionBroker {
public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
String connectionClientId = context.getClientId();
ObjectName brokerJmxObjectName = brokerObjectName;
String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName);
String objectNameStr = getSubscriptionObjectName(sub.getConsumerInfo(), connectionClientId, brokerJmxObjectName);
SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
try {
ObjectName objectName = new ObjectName(objectNameStr);
@ -196,7 +196,7 @@ public class ManagedRegionBroker extends RegionBroker {
info.setClientId(context.getClientId());
info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
info.setDestination(sub.getConsumerInfo().getDestination());
addInactiveSubscription(key, info);
addInactiveSubscription(key, info, sub);
} else {
if (sub.getConsumerInfo().isDurable()) {
view = new DurableSubscriptionView(this, context.getClientId(), sub);
@ -217,21 +217,21 @@ public class ManagedRegionBroker extends RegionBroker {
}
}
public static String getSubscriptionObjectName(Subscription sub, String connectionClientId, ObjectName brokerJmxObjectName) {
public static String getSubscriptionObjectName(ConsumerInfo info, String connectionClientId, ObjectName brokerJmxObjectName) {
Hashtable map = brokerJmxObjectName.getKeyPropertyList();
String brokerDomain = brokerJmxObjectName.getDomain();
String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString();
String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName());
String destinationType = "destinationType=" + info.getDestination().getDestinationTypeAsString();
String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(info.getDestination().getPhysicalName());
String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
String persistentMode = "persistentMode=";
String consumerId = "";
if (sub.getConsumerInfo().isDurable()) {
persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName());
if (info.isDurable()) {
persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(info.getSubscriptionName());
} else {
persistentMode += "Non-Durable";
if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) {
consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString());
if (info.getConsumerId() != null) {
consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(info.getConsumerId().toString());
}
}
objectNameStr += persistentMode + ",";
@ -482,7 +482,7 @@ public class ManagedRegionBroker extends RegionBroker {
info.setClientId(subscriptionKey.getClientId());
info.setSubscriptionName(subscriptionKey.getSubscriptionName());
info.setDestination(new ActiveMQTopic(view.getDestinationName()));
addInactiveSubscription(subscriptionKey, info);
addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
}
}
}
@ -512,7 +512,7 @@ public class ManagedRegionBroker extends RegionBroker {
Map.Entry entry = (Entry)i.next();
SubscriptionKey key = (SubscriptionKey)entry.getKey();
SubscriptionInfo info = (SubscriptionInfo)entry.getValue();
addInactiveSubscription(key, info);
addInactiveSubscription(key, info, null);
}
}
@ -525,12 +525,11 @@ public class ManagedRegionBroker extends RegionBroker {
return known;
}
protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) {
Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
try {
ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=Subscription," + "active=false,"
+ "name=" + JMXSupport.encodeObjectNamePart(key.toString()) + "");
SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info);
ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
ObjectName objectName = new ObjectName(getSubscriptionObjectName(offlineConsumerInfo, info.getClientId(), brokerObjectName));
SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription);
try {
AnnotatedMBean.registerMBean(managementContext, view, objectName);

View File

@ -47,7 +47,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
private static final Logger LOG = LoggerFactory.getLogger(DurableTopicSubscription.class);
private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final SubscriptionKey subscriptionKey;
private final boolean keepDurableSubsActive;
private AtomicBoolean active = new AtomicBoolean();
@ -96,12 +96,14 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
public void add(ConnectionContext context, Destination destination) throws Exception {
super.add(context, destination);
// do it just once per destination
if (destinations.containsKey(destination.getActiveMQDestination())) {
return;
if (!destinations.contains(destination)) {
super.add(context, destination);
}
destinations.put(destination.getActiveMQDestination(), destination);
// do it just once per destination
if (durableDestinations.containsKey(destination.getActiveMQDestination())) {
return;
}
durableDestinations.put(destination.getActiveMQDestination(), destination);
if (active.get() || keepDurableSubsActive) {
Topic topic = (Topic)destination;
@ -130,7 +132,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
this.info = info;
LOG.debug("Activating " + this);
if (!keepDurableSubsActive) {
for (Iterator<Destination> iter = destinations.values()
for (Iterator<Destination> iter = durableDestinations.values()
.iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next();
add(context, topic);
@ -146,7 +148,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
// If nothing was in the persistent store, then try to use the
// recovery policy.
if (pending.isEmpty()) {
for (Iterator<Destination> iter = destinations.values()
for (Iterator<Destination> iter = durableDestinations.values()
.iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next();
topic.recoverRetroactiveMessages(context, this);
@ -168,10 +170,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
synchronized (pending) {
pending.stop();
}
if (!keepDurableSubsActive) {
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic)iter.next();
for (Iterator<Destination> iter = durableDestinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic)iter.next();
if (!keepDurableSubsActive) {
topic.deactivate(context, this);
} else {
topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
}
}
@ -270,7 +274,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
public synchronized String toString() {
return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending="
return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + enqueueCounter + ", pending="
+ getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
}

View File

@ -259,7 +259,7 @@ public class TopicRegion extends AbstractRegion {
return rc;
}
private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
public ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
ConsumerInfo rc = new ConsumerInfo();
rc.setSelector(info.getSelector());
rc.setSubscriptionName(info.getSubscriptionName());

View File

@ -261,16 +261,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
this.batchResetNeeded = false;
}
if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) {
this.storeHasMessages = false;
try {
doFillBatch();
} catch (Exception e) {
LOG.error(this + " - Failed to fill batch", e);
throw new RuntimeException(e);
}
if (!this.batchList.isEmpty() || !hadSpace) {
this.storeHasMessages=true;
}
this.storeHasMessages = !this.batchList.isEmpty() || !hadSpace;
}
}

View File

@ -25,10 +25,13 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
@ -44,12 +47,15 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class);
public boolean usePrioritySupport = Boolean.TRUE;
public int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
public boolean keepDurableSubsActive = true;
private BrokerService broker;
private ActiveMQTopic topic;
private Vector<Exception> exceptions = new Vector<Exception>();
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm://" + getName(true));
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
connectionFactory.setWatchTopicAdvisories(false);
return connectionFactory;
}
@Override
@ -89,6 +95,8 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
broker.setBrokerName(getName(true));
broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
broker.getManagementContext().setCreateConnector(false);
broker.setAdvisorySupport(false);
broker.setKeepDurableSubsActive(keepDurableSubsActive);
if (usePrioritySupport) {
PolicyEntry policy = new PolicyEntry();
@ -322,6 +330,119 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals("offline consumer got all", sent, listener.count);
}
public void initCombosForTestJMXCountersWithOfflineSubs() throws Exception {
this.addCombinationValues("keepDurableSubsActive",
new Object[]{Boolean.TRUE, Boolean.FALSE});
}
public void testJMXCountersWithOfflineSubs() throws Exception {
// create durable subscription 1
Connection con = createConnection("cliId1");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", null, true);
session.close();
con.close();
// restart broker
broker.stop();
createBroker(false /*deleteAllMessages*/);
// send messages
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
int sent = 0;
for (int i = 0; i < 10; i++) {
sent++;
Message message = session.createMessage();
producer.send(topic, message);
}
session.close();
con.close();
// consume some messages
con = createConnection("cliId1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
for (int i=0; i<sent/2; i++) {
Message m = consumer.receive(4000);
assertNotNull("got message: " + i, m);
LOG.info("Got :" + i + ", " + m);
}
// check some counters while active
ObjectName activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
LOG.info("active durable sub name: " + activeDurableSubName);
final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean)
broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
assertTrue("is active", durableSubscriptionView.isActive());
assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter());
assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge();
}
}));
assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter());
ObjectName destinationName = broker.getAdminView().getTopics()[0];
TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
assertEquals("inflight", 5, topicView.getInFlightCount());
session.close();
con.close();
// check some counters when inactive
ObjectName inActiveDurableSubName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0];
LOG.info("inactive durable sub name: " + inActiveDurableSubName);
DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean)
broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true);
assertTrue("is not active", !durableSubscriptionView1.isActive());
assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView1.getEnqueueCounter());
assertEquals("correct awaiting ack", 0, durableSubscriptionView1.getMessageCountAwaitingAcknowledge());
assertEquals("correct dequeue", keepDurableSubsActive ? 5 : 0, durableSubscriptionView1.getDequeueCounter());
// destination view
assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
// consume the rest
con = createConnection("cliId1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
for (int i=0; i<sent/2;i++) {
Message m = consumer.receive(30000);
assertNotNull("got message: " + i, m);
LOG.info("Got :" + i + ", " + m);
}
activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0];
LOG.info("durable sub name: " + activeDurableSubName);
final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean)
broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true);
assertTrue("is active", durableSubscriptionView2.isActive());
assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter());
assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
long val = durableSubscriptionView2.getDequeueCounter();
LOG.info("dequeue count:" + val);
return 10 == val;
}
}));
}
public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
this.addCombinationValues("defaultPersistenceAdapter",
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
@ -1062,6 +1183,8 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception {
this.addCombinationValues("journalMaxFileLength",
new Object[]{new Integer(64 * 1024)});
this.addCombinationValues("keepDurableSubsActive",
new Object[]{Boolean.TRUE, Boolean.FALSE});
}
// https://issues.apache.org/jira/browse/AMQ-3206